root 8278eb9a87 scrum2 cleanup: JSON-marshal in stringifyValue, drop dead detectCycle, name SourceWorkflow
5 small fixes from the §3.8 scrum2 review wave:

- workflow.stringifyValue now JSON-marshals maps/slices instead of
  fmt.Sprint %v (Opus+Kimi convergent: LLM modes were getting Go's
  map[k:v] syntax, which is unparseable as JSON context).
- workflow.detectCycle removed — duplicate of topoSort that discarded
  the useful node ID. Validate() now calls topoSort directly and
  returns its wrapped ErrCycle.
- observer.SourceWorkflow named constant — was an implicit string
  cast (observer.Source("workflow")) at the cmd/observerd handler.
- Unused context imports + dead silencer comments removed across
  workflow/modes.go and observerd/main.go.
- Unused store parameter dropped from registerBuiltinModes (reserved
  comment removed; can be re-added when a mode actually needs it).

just verify still PASS — these are pure cleanup, no behavior change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 23:16:07 -05:00

383 lines
11 KiB
Go

package workflow
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"
)
// Runner executes Workflows. Modes are registered up-front; the
// catalog is immutable after Build (callers compose by registering
// at startup, then Run() the catalog repeatedly).
type Runner struct {
modes map[string]Mode
}
// NewRunner returns an empty Runner. Use RegisterMode to populate.
func NewRunner() *Runner {
return &Runner{modes: make(map[string]Mode)}
}
// RegisterMode adds a capability under the given name. Re-registering
// the same name overwrites — useful for tests that want to replace a
// mode with a stub. In production, register-once-at-startup is the
// expected pattern.
func (r *Runner) RegisterMode(name string, mode Mode) {
r.modes[name] = mode
}
// Modes returns the currently-registered mode names. Useful for
// /v1/observer/workflow/modes-style discovery endpoints.
func (r *Runner) Modes() []string {
out := make([]string, 0, len(r.modes))
for name := range r.modes {
out = append(out, name)
}
return out
}
// Run executes a workflow. Validates structure, resolves nodes
// topologically, executes each node with $-reference substitution,
// records per-node results in RunResult.
//
// Aborting errors (cycle, missing dep, unknown mode) return early
// with StatusAborted — no nodes execute. Per-node mode errors are
// recorded in NodeResult.Error and execution continues with
// independent nodes; downstream nodes that depended on the failing
// one are SKIPPED with an explanatory error so the cascade is
// visible in the result rather than silent.
func (r *Runner) Run(ctx context.Context, w Workflow) (RunResult, error) {
if err := w.Validate(); err != nil {
return RunResult{
Workflow: w.Name, Status: StatusAborted,
StartedAt: time.Now(),
}, err
}
order, err := topoSort(w.Nodes)
if err != nil {
return RunResult{
Workflow: w.Name, Status: StatusAborted,
StartedAt: time.Now(),
}, err
}
// Verify every node's mode is registered before starting — fail
// loud if someone references a typo'd mode name. Catches the bug
// in 5ms instead of after 6 nodes have already run.
for _, node := range w.Nodes {
modeName := effectiveMode(node)
if _, ok := r.modes[modeName]; !ok {
return RunResult{
Workflow: w.Name, Status: StatusAborted,
StartedAt: time.Now(),
}, fmt.Errorf("%w: %q (node %q)", ErrUnknownMode, modeName, node.ID)
}
}
t0 := time.Now()
results := make(map[string]NodeResult, len(w.Nodes))
resultsList := make([]NodeResult, 0, len(w.Nodes))
failedNodes := make(map[string]bool) // node IDs whose result was Error
skippedNodes := make(map[string]bool)
for _, nodeID := range order {
node := findNode(w.Nodes, nodeID)
modeName := effectiveMode(node)
// Skip if any dependency failed or was skipped — cascades
// failure visibly so callers can see the chain.
var skipReason string
for _, dep := range node.DependsOn {
if failedNodes[dep] {
skipReason = fmt.Sprintf("upstream node %q failed", dep)
break
}
if skippedNodes[dep] {
skipReason = fmt.Sprintf("upstream node %q was skipped", dep)
break
}
}
if skipReason != "" {
res := NodeResult{
NodeID: node.ID, Mode: modeName,
Error: skipReason,
StartedAt: time.Now(),
}
results[node.ID] = res
resultsList = append(resultsList, res)
skippedNodes[node.ID] = true
continue
}
nodeStart := time.Now()
mode := r.modes[modeName] // pre-validated above; safe lookup
// Build the mode's input map with $-references resolved.
input, refErr := buildInput(node, results)
if refErr != nil {
res := NodeResult{
NodeID: node.ID, Mode: modeName,
Error: refErr.Error(),
StartedAt: nodeStart,
DurationMs: time.Since(nodeStart).Milliseconds(),
}
results[node.ID] = res
resultsList = append(resultsList, res)
failedNodes[node.ID] = true
continue
}
modeCtx := Context{
Ctx: ctx,
WorkflowName: w.Name,
NodeID: node.ID,
Provider: w.Provider,
Model: w.Model,
}
output, err := mode(modeCtx, input)
res := NodeResult{
NodeID: node.ID,
Mode: modeName,
Output: output,
StartedAt: nodeStart,
DurationMs: time.Since(nodeStart).Milliseconds(),
}
if err != nil {
res.Error = err.Error()
failedNodes[node.ID] = true
}
results[node.ID] = res
resultsList = append(resultsList, res)
}
status := StatusSucceeded
if len(failedNodes) > 0 || len(skippedNodes) > 0 {
status = StatusPartial
}
return RunResult{
Workflow: w.Name,
Status: status,
Nodes: resultsList,
StartedAt: t0,
DurationMs: time.Since(t0).Milliseconds(),
}, nil
}
// effectiveMode returns the node's explicit mode if set, else
// "llm.chat" (the implicit Archon convention).
func effectiveMode(n Node) string {
if n.Mode != "" {
return n.Mode
}
return "llm.chat"
}
// findNode is O(n) but called once per execution step on already-
// validated workflows; n is small (typical workflow ≤10 nodes).
func findNode(nodes []Node, id string) Node {
for _, n := range nodes {
if n.ID == id {
return n
}
}
return Node{} // never reached on a Validated workflow
}
// ─── Input building + reference substitution ────────────────────
// buildInput composes the input map a mode receives. Builds from
// node.Inputs (deep-copy with $-refs substituted) plus injects the
// "prompt" key from node.Prompt with $-refs substituted.
//
// $-reference syntax: $node_id.output.key — resolves to that key
// in the prior node's output map. $node_id.output (no .key)
// resolves to the whole output map. JSON-stringified inline.
func buildInput(node Node, results map[string]NodeResult) (map[string]any, error) {
out := make(map[string]any, len(node.Inputs)+1)
for k, v := range node.Inputs {
resolved, err := resolveRefs(v, results)
if err != nil {
return nil, err
}
out[k] = resolved
}
if node.Prompt != "" {
resolvedPrompt, err := substituteStringRefs(node.Prompt, results)
if err != nil {
return nil, err
}
out["prompt"] = resolvedPrompt
}
return out, nil
}
// resolveRefs walks any value (string, map, slice, scalar) and
// substitutes $-references in any string elements.
func resolveRefs(v any, results map[string]NodeResult) (any, error) {
switch x := v.(type) {
case string:
return substituteStringRefs(x, results)
case map[string]any:
out := make(map[string]any, len(x))
for k, vv := range x {
r, err := resolveRefs(vv, results)
if err != nil {
return nil, err
}
out[k] = r
}
return out, nil
case []any:
out := make([]any, len(x))
for i, vv := range x {
r, err := resolveRefs(vv, results)
if err != nil {
return nil, err
}
out[i] = r
}
return out, nil
default:
return v, nil // numbers, bools, nil — pass through
}
}
// refRe matches $node_id or $node_id.output.key (where key is
// dotted-path). Captures: 1=node_id, 2=optional ".output[.key]"
// suffix.
var refRe = regexp.MustCompile(`\$([a-zA-Z_][a-zA-Z0-9_]*)((?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)`)
// substituteStringRefs replaces $node.output.key references in a
// string with the resolved value (JSON-stringified for non-string
// targets so the result is always a string).
func substituteStringRefs(s string, results map[string]NodeResult) (string, error) {
var firstErr error
out := refRe.ReplaceAllStringFunc(s, func(match string) string {
if firstErr != nil {
return match
}
// Re-parse the match because ReplaceAllStringFunc gives the
// whole match without submatches.
m := refRe.FindStringSubmatch(match)
nodeID := m[1]
path := strings.TrimPrefix(m[2], ".")
nodeRes, ok := results[nodeID]
if !ok {
firstErr = fmt.Errorf("%w: $%s (no such node, or node not yet run)", ErrUnresolvedRef, nodeID)
return match
}
// path "output" or "output.X.Y" walks into nodeRes.Output
val, err := walkPath(nodeRes.Output, path)
if err != nil {
firstErr = fmt.Errorf("%w: $%s — %v", ErrUnresolvedRef, nodeID+m[2], err)
return match
}
return stringifyValue(val)
})
return out, firstErr
}
// walkPath resolves a dotted path against a nested map. Empty path
// returns the whole map. The first segment must be "output" — a
// convention that matches the SPEC §3.8 reference shape and prevents
// accidental access to other NodeResult fields.
func walkPath(output map[string]any, path string) (any, error) {
if path == "" {
return output, nil // bare $node — entire NodeResult.Output
}
parts := strings.Split(path, ".")
if parts[0] != "output" {
return nil, fmt.Errorf("path must start with .output (got %q)", parts[0])
}
parts = parts[1:]
var cur any = output
for _, p := range parts {
m, ok := cur.(map[string]any)
if !ok {
return nil, fmt.Errorf("cannot traverse into %T at segment %q", cur, p)
}
cur, ok = m[p]
if !ok {
return nil, fmt.Errorf("key %q not found in output", p)
}
}
return cur, nil
}
// stringifyValue renders a value as a string for prompt substitution.
// Strings pass through; nil → empty string; everything else is JSON-
// marshaled (so maps/slices produce valid JSON, not Go's %v syntax
// like `map[k:v]`). JSON failure falls back to fmt.Sprint so the
// substitution can never panic on weird inputs. Per 2026-04-29
// scrum2 (Opus + Kimi convergent): %v output was confusing for LLM
// modes that expected JSON-shaped context.
func stringifyValue(v any) string {
switch x := v.(type) {
case string:
return x
case nil:
return ""
default:
if bs, err := json.Marshal(x); err == nil {
return string(bs)
}
return fmt.Sprint(x)
}
}
// ─── DAG resolution ──────────────────────────────────────────────
// topoSort returns node IDs in a topologically-sorted order such
// that every dependency precedes its dependent. Cycles return an
// error (Validate catches them first; this is defense in depth).
func topoSort(nodes []Node) ([]string, error) {
indeg := make(map[string]int, len(nodes))
graph := make(map[string][]string, len(nodes))
for _, n := range nodes {
if _, ok := indeg[n.ID]; !ok {
indeg[n.ID] = 0
}
for _, dep := range n.DependsOn {
graph[dep] = append(graph[dep], n.ID)
indeg[n.ID]++
}
}
// Kahn's algorithm — preserve original order for ties so output
// is deterministic across runs.
queue := make([]string, 0, len(nodes))
for _, n := range nodes {
if indeg[n.ID] == 0 {
queue = append(queue, n.ID)
}
}
out := make([]string, 0, len(nodes))
for len(queue) > 0 {
cur := queue[0]
queue = queue[1:]
out = append(out, cur)
for _, child := range graph[cur] {
indeg[child]--
if indeg[child] == 0 {
queue = append(queue, child)
}
}
}
if len(out) != len(nodes) {
// Find a node still with non-zero indeg — that's where the
// cycle is reachable from.
for id, deg := range indeg {
if deg > 0 {
return nil, fmt.Errorf("%w: starting at node %q", ErrCycle, id)
}
}
return nil, ErrCycle
}
return out, nil
}