package workflow import ( "context" "encoding/json" "fmt" "regexp" "strings" "time" ) // Runner executes Workflows. Modes are registered up-front; the // catalog is immutable after Build (callers compose by registering // at startup, then Run() the catalog repeatedly). type Runner struct { modes map[string]Mode } // NewRunner returns an empty Runner. Use RegisterMode to populate. func NewRunner() *Runner { return &Runner{modes: make(map[string]Mode)} } // RegisterMode adds a capability under the given name. Re-registering // the same name overwrites — useful for tests that want to replace a // mode with a stub. In production, register-once-at-startup is the // expected pattern. func (r *Runner) RegisterMode(name string, mode Mode) { r.modes[name] = mode } // Modes returns the currently-registered mode names. Useful for // /v1/observer/workflow/modes-style discovery endpoints. func (r *Runner) Modes() []string { out := make([]string, 0, len(r.modes)) for name := range r.modes { out = append(out, name) } return out } // Run executes a workflow. Validates structure, resolves nodes // topologically, executes each node with $-reference substitution, // records per-node results in RunResult. // // Aborting errors (cycle, missing dep, unknown mode) return early // with StatusAborted — no nodes execute. Per-node mode errors are // recorded in NodeResult.Error and execution continues with // independent nodes; downstream nodes that depended on the failing // one are SKIPPED with an explanatory error so the cascade is // visible in the result rather than silent. func (r *Runner) Run(ctx context.Context, w Workflow) (RunResult, error) { if err := w.Validate(); err != nil { return RunResult{ Workflow: w.Name, Status: StatusAborted, StartedAt: time.Now(), }, err } order, err := topoSort(w.Nodes) if err != nil { return RunResult{ Workflow: w.Name, Status: StatusAborted, StartedAt: time.Now(), }, err } // Verify every node's mode is registered before starting — fail // loud if someone references a typo'd mode name. Catches the bug // in 5ms instead of after 6 nodes have already run. for _, node := range w.Nodes { modeName := effectiveMode(node) if _, ok := r.modes[modeName]; !ok { return RunResult{ Workflow: w.Name, Status: StatusAborted, StartedAt: time.Now(), }, fmt.Errorf("%w: %q (node %q)", ErrUnknownMode, modeName, node.ID) } } t0 := time.Now() results := make(map[string]NodeResult, len(w.Nodes)) resultsList := make([]NodeResult, 0, len(w.Nodes)) failedNodes := make(map[string]bool) // node IDs whose result was Error skippedNodes := make(map[string]bool) for _, nodeID := range order { node := findNode(w.Nodes, nodeID) modeName := effectiveMode(node) // Skip if any dependency failed or was skipped — cascades // failure visibly so callers can see the chain. var skipReason string for _, dep := range node.DependsOn { if failedNodes[dep] { skipReason = fmt.Sprintf("upstream node %q failed", dep) break } if skippedNodes[dep] { skipReason = fmt.Sprintf("upstream node %q was skipped", dep) break } } if skipReason != "" { res := NodeResult{ NodeID: node.ID, Mode: modeName, Error: skipReason, StartedAt: time.Now(), } results[node.ID] = res resultsList = append(resultsList, res) skippedNodes[node.ID] = true continue } nodeStart := time.Now() mode := r.modes[modeName] // pre-validated above; safe lookup // Build the mode's input map with $-references resolved. input, refErr := buildInput(node, results) if refErr != nil { res := NodeResult{ NodeID: node.ID, Mode: modeName, Error: refErr.Error(), StartedAt: nodeStart, DurationMs: time.Since(nodeStart).Milliseconds(), } results[node.ID] = res resultsList = append(resultsList, res) failedNodes[node.ID] = true continue } modeCtx := Context{ Ctx: ctx, WorkflowName: w.Name, NodeID: node.ID, Provider: w.Provider, Model: w.Model, } output, err := mode(modeCtx, input) res := NodeResult{ NodeID: node.ID, Mode: modeName, Output: output, StartedAt: nodeStart, DurationMs: time.Since(nodeStart).Milliseconds(), } if err != nil { res.Error = err.Error() failedNodes[node.ID] = true } results[node.ID] = res resultsList = append(resultsList, res) } status := StatusSucceeded if len(failedNodes) > 0 || len(skippedNodes) > 0 { status = StatusPartial } return RunResult{ Workflow: w.Name, Status: status, Nodes: resultsList, StartedAt: t0, DurationMs: time.Since(t0).Milliseconds(), }, nil } // effectiveMode returns the node's explicit mode if set, else // "llm.chat" (the implicit Archon convention). func effectiveMode(n Node) string { if n.Mode != "" { return n.Mode } return "llm.chat" } // findNode is O(n) but called once per execution step on already- // validated workflows; n is small (typical workflow ≤10 nodes). func findNode(nodes []Node, id string) Node { for _, n := range nodes { if n.ID == id { return n } } return Node{} // never reached on a Validated workflow } // ─── Input building + reference substitution ──────────────────── // buildInput composes the input map a mode receives. Builds from // node.Inputs (deep-copy with $-refs substituted) plus injects the // "prompt" key from node.Prompt with $-refs substituted. // // $-reference syntax: $node_id.output.key — resolves to that key // in the prior node's output map. $node_id.output (no .key) // resolves to the whole output map. JSON-stringified inline. func buildInput(node Node, results map[string]NodeResult) (map[string]any, error) { out := make(map[string]any, len(node.Inputs)+1) for k, v := range node.Inputs { resolved, err := resolveRefs(v, results) if err != nil { return nil, err } out[k] = resolved } if node.Prompt != "" { resolvedPrompt, err := substituteStringRefs(node.Prompt, results) if err != nil { return nil, err } out["prompt"] = resolvedPrompt } return out, nil } // resolveRefs walks any value (string, map, slice, scalar) and // substitutes $-references in any string elements. func resolveRefs(v any, results map[string]NodeResult) (any, error) { switch x := v.(type) { case string: return substituteStringRefs(x, results) case map[string]any: out := make(map[string]any, len(x)) for k, vv := range x { r, err := resolveRefs(vv, results) if err != nil { return nil, err } out[k] = r } return out, nil case []any: out := make([]any, len(x)) for i, vv := range x { r, err := resolveRefs(vv, results) if err != nil { return nil, err } out[i] = r } return out, nil default: return v, nil // numbers, bools, nil — pass through } } // refRe matches $node_id or $node_id.output.key (where key is // dotted-path). Captures: 1=node_id, 2=optional ".output[.key]" // suffix. var refRe = regexp.MustCompile(`\$([a-zA-Z_][a-zA-Z0-9_]*)((?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)`) // substituteStringRefs replaces $node.output.key references in a // string with the resolved value (JSON-stringified for non-string // targets so the result is always a string). func substituteStringRefs(s string, results map[string]NodeResult) (string, error) { var firstErr error out := refRe.ReplaceAllStringFunc(s, func(match string) string { if firstErr != nil { return match } // Re-parse the match because ReplaceAllStringFunc gives the // whole match without submatches. m := refRe.FindStringSubmatch(match) nodeID := m[1] path := strings.TrimPrefix(m[2], ".") nodeRes, ok := results[nodeID] if !ok { firstErr = fmt.Errorf("%w: $%s (no such node, or node not yet run)", ErrUnresolvedRef, nodeID) return match } // path "output" or "output.X.Y" walks into nodeRes.Output val, err := walkPath(nodeRes.Output, path) if err != nil { firstErr = fmt.Errorf("%w: $%s — %v", ErrUnresolvedRef, nodeID+m[2], err) return match } return stringifyValue(val) }) return out, firstErr } // walkPath resolves a dotted path against a nested map. Empty path // returns the whole map. The first segment must be "output" — a // convention that matches the SPEC §3.8 reference shape and prevents // accidental access to other NodeResult fields. func walkPath(output map[string]any, path string) (any, error) { if path == "" { return output, nil // bare $node — entire NodeResult.Output } parts := strings.Split(path, ".") if parts[0] != "output" { return nil, fmt.Errorf("path must start with .output (got %q)", parts[0]) } parts = parts[1:] var cur any = output for _, p := range parts { m, ok := cur.(map[string]any) if !ok { return nil, fmt.Errorf("cannot traverse into %T at segment %q", cur, p) } cur, ok = m[p] if !ok { return nil, fmt.Errorf("key %q not found in output", p) } } return cur, nil } // stringifyValue renders a value as a string for prompt substitution. // Strings pass through; nil → empty string; everything else is JSON- // marshaled (so maps/slices produce valid JSON, not Go's %v syntax // like `map[k:v]`). JSON failure falls back to fmt.Sprint so the // substitution can never panic on weird inputs. Per 2026-04-29 // scrum2 (Opus + Kimi convergent): %v output was confusing for LLM // modes that expected JSON-shaped context. func stringifyValue(v any) string { switch x := v.(type) { case string: return x case nil: return "" default: if bs, err := json.Marshal(x); err == nil { return string(bs) } return fmt.Sprint(x) } } // ─── DAG resolution ────────────────────────────────────────────── // topoSort returns node IDs in a topologically-sorted order such // that every dependency precedes its dependent. Cycles return an // error (Validate catches them first; this is defense in depth). func topoSort(nodes []Node) ([]string, error) { indeg := make(map[string]int, len(nodes)) graph := make(map[string][]string, len(nodes)) for _, n := range nodes { if _, ok := indeg[n.ID]; !ok { indeg[n.ID] = 0 } for _, dep := range n.DependsOn { graph[dep] = append(graph[dep], n.ID) indeg[n.ID]++ } } // Kahn's algorithm — preserve original order for ties so output // is deterministic across runs. queue := make([]string, 0, len(nodes)) for _, n := range nodes { if indeg[n.ID] == 0 { queue = append(queue, n.ID) } } out := make([]string, 0, len(nodes)) for len(queue) > 0 { cur := queue[0] queue = queue[1:] out = append(out, cur) for _, child := range graph[cur] { indeg[child]-- if indeg[child] == 0 { queue = append(queue, child) } } } if len(out) != len(nodes) { // Find a node still with non-zero indeg — that's where the // cycle is reachable from. for id, deg := range indeg { if deg > 0 { return nil, fmt.Errorf("%w: starting at node %q", ErrCycle, id) } } return nil, ErrCycle } return out, nil }