5 small fixes from the §3.8 scrum2 review wave:
- workflow.stringifyValue now JSON-marshals maps/slices instead of
fmt.Sprint %v (Opus+Kimi convergent: LLM modes were getting Go's
map[k:v] syntax, which is unparseable as JSON context).
- workflow.detectCycle removed — duplicate of topoSort that discarded
the useful node ID. Validate() now calls topoSort directly and
returns its wrapped ErrCycle.
- observer.SourceWorkflow named constant — was an implicit string
cast (observer.Source("workflow")) at the cmd/observerd handler.
- Unused context imports + dead silencer comments removed across
workflow/modes.go and observerd/main.go.
- Unused store parameter dropped from registerBuiltinModes (reserved
comment removed; can be re-added when a mode actually needs it).
just verify still PASS — these are pure cleanup, no behavior change.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
383 lines
11 KiB
Go
383 lines
11 KiB
Go
package workflow
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// Runner executes Workflows. Modes are registered up-front; the
|
|
// catalog is immutable after Build (callers compose by registering
|
|
// at startup, then Run() the catalog repeatedly).
|
|
type Runner struct {
|
|
modes map[string]Mode
|
|
}
|
|
|
|
// NewRunner returns an empty Runner. Use RegisterMode to populate.
|
|
func NewRunner() *Runner {
|
|
return &Runner{modes: make(map[string]Mode)}
|
|
}
|
|
|
|
// RegisterMode adds a capability under the given name. Re-registering
|
|
// the same name overwrites — useful for tests that want to replace a
|
|
// mode with a stub. In production, register-once-at-startup is the
|
|
// expected pattern.
|
|
func (r *Runner) RegisterMode(name string, mode Mode) {
|
|
r.modes[name] = mode
|
|
}
|
|
|
|
// Modes returns the currently-registered mode names. Useful for
|
|
// /v1/observer/workflow/modes-style discovery endpoints.
|
|
func (r *Runner) Modes() []string {
|
|
out := make([]string, 0, len(r.modes))
|
|
for name := range r.modes {
|
|
out = append(out, name)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Run executes a workflow. Validates structure, resolves nodes
|
|
// topologically, executes each node with $-reference substitution,
|
|
// records per-node results in RunResult.
|
|
//
|
|
// Aborting errors (cycle, missing dep, unknown mode) return early
|
|
// with StatusAborted — no nodes execute. Per-node mode errors are
|
|
// recorded in NodeResult.Error and execution continues with
|
|
// independent nodes; downstream nodes that depended on the failing
|
|
// one are SKIPPED with an explanatory error so the cascade is
|
|
// visible in the result rather than silent.
|
|
func (r *Runner) Run(ctx context.Context, w Workflow) (RunResult, error) {
|
|
if err := w.Validate(); err != nil {
|
|
return RunResult{
|
|
Workflow: w.Name, Status: StatusAborted,
|
|
StartedAt: time.Now(),
|
|
}, err
|
|
}
|
|
|
|
order, err := topoSort(w.Nodes)
|
|
if err != nil {
|
|
return RunResult{
|
|
Workflow: w.Name, Status: StatusAborted,
|
|
StartedAt: time.Now(),
|
|
}, err
|
|
}
|
|
|
|
// Verify every node's mode is registered before starting — fail
|
|
// loud if someone references a typo'd mode name. Catches the bug
|
|
// in 5ms instead of after 6 nodes have already run.
|
|
for _, node := range w.Nodes {
|
|
modeName := effectiveMode(node)
|
|
if _, ok := r.modes[modeName]; !ok {
|
|
return RunResult{
|
|
Workflow: w.Name, Status: StatusAborted,
|
|
StartedAt: time.Now(),
|
|
}, fmt.Errorf("%w: %q (node %q)", ErrUnknownMode, modeName, node.ID)
|
|
}
|
|
}
|
|
|
|
t0 := time.Now()
|
|
results := make(map[string]NodeResult, len(w.Nodes))
|
|
resultsList := make([]NodeResult, 0, len(w.Nodes))
|
|
failedNodes := make(map[string]bool) // node IDs whose result was Error
|
|
skippedNodes := make(map[string]bool)
|
|
|
|
for _, nodeID := range order {
|
|
node := findNode(w.Nodes, nodeID)
|
|
modeName := effectiveMode(node)
|
|
|
|
// Skip if any dependency failed or was skipped — cascades
|
|
// failure visibly so callers can see the chain.
|
|
var skipReason string
|
|
for _, dep := range node.DependsOn {
|
|
if failedNodes[dep] {
|
|
skipReason = fmt.Sprintf("upstream node %q failed", dep)
|
|
break
|
|
}
|
|
if skippedNodes[dep] {
|
|
skipReason = fmt.Sprintf("upstream node %q was skipped", dep)
|
|
break
|
|
}
|
|
}
|
|
if skipReason != "" {
|
|
res := NodeResult{
|
|
NodeID: node.ID, Mode: modeName,
|
|
Error: skipReason,
|
|
StartedAt: time.Now(),
|
|
}
|
|
results[node.ID] = res
|
|
resultsList = append(resultsList, res)
|
|
skippedNodes[node.ID] = true
|
|
continue
|
|
}
|
|
|
|
nodeStart := time.Now()
|
|
mode := r.modes[modeName] // pre-validated above; safe lookup
|
|
|
|
// Build the mode's input map with $-references resolved.
|
|
input, refErr := buildInput(node, results)
|
|
if refErr != nil {
|
|
res := NodeResult{
|
|
NodeID: node.ID, Mode: modeName,
|
|
Error: refErr.Error(),
|
|
StartedAt: nodeStart,
|
|
DurationMs: time.Since(nodeStart).Milliseconds(),
|
|
}
|
|
results[node.ID] = res
|
|
resultsList = append(resultsList, res)
|
|
failedNodes[node.ID] = true
|
|
continue
|
|
}
|
|
|
|
modeCtx := Context{
|
|
Ctx: ctx,
|
|
WorkflowName: w.Name,
|
|
NodeID: node.ID,
|
|
Provider: w.Provider,
|
|
Model: w.Model,
|
|
}
|
|
|
|
output, err := mode(modeCtx, input)
|
|
res := NodeResult{
|
|
NodeID: node.ID,
|
|
Mode: modeName,
|
|
Output: output,
|
|
StartedAt: nodeStart,
|
|
DurationMs: time.Since(nodeStart).Milliseconds(),
|
|
}
|
|
if err != nil {
|
|
res.Error = err.Error()
|
|
failedNodes[node.ID] = true
|
|
}
|
|
results[node.ID] = res
|
|
resultsList = append(resultsList, res)
|
|
}
|
|
|
|
status := StatusSucceeded
|
|
if len(failedNodes) > 0 || len(skippedNodes) > 0 {
|
|
status = StatusPartial
|
|
}
|
|
return RunResult{
|
|
Workflow: w.Name,
|
|
Status: status,
|
|
Nodes: resultsList,
|
|
StartedAt: t0,
|
|
DurationMs: time.Since(t0).Milliseconds(),
|
|
}, nil
|
|
}
|
|
|
|
// effectiveMode returns the node's explicit mode if set, else
|
|
// "llm.chat" (the implicit Archon convention).
|
|
func effectiveMode(n Node) string {
|
|
if n.Mode != "" {
|
|
return n.Mode
|
|
}
|
|
return "llm.chat"
|
|
}
|
|
|
|
// findNode is O(n) but called once per execution step on already-
|
|
// validated workflows; n is small (typical workflow ≤10 nodes).
|
|
func findNode(nodes []Node, id string) Node {
|
|
for _, n := range nodes {
|
|
if n.ID == id {
|
|
return n
|
|
}
|
|
}
|
|
return Node{} // never reached on a Validated workflow
|
|
}
|
|
|
|
// ─── Input building + reference substitution ────────────────────
|
|
|
|
// buildInput composes the input map a mode receives. Builds from
|
|
// node.Inputs (deep-copy with $-refs substituted) plus injects the
|
|
// "prompt" key from node.Prompt with $-refs substituted.
|
|
//
|
|
// $-reference syntax: $node_id.output.key — resolves to that key
|
|
// in the prior node's output map. $node_id.output (no .key)
|
|
// resolves to the whole output map. JSON-stringified inline.
|
|
func buildInput(node Node, results map[string]NodeResult) (map[string]any, error) {
|
|
out := make(map[string]any, len(node.Inputs)+1)
|
|
for k, v := range node.Inputs {
|
|
resolved, err := resolveRefs(v, results)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out[k] = resolved
|
|
}
|
|
if node.Prompt != "" {
|
|
resolvedPrompt, err := substituteStringRefs(node.Prompt, results)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out["prompt"] = resolvedPrompt
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// resolveRefs walks any value (string, map, slice, scalar) and
|
|
// substitutes $-references in any string elements.
|
|
func resolveRefs(v any, results map[string]NodeResult) (any, error) {
|
|
switch x := v.(type) {
|
|
case string:
|
|
return substituteStringRefs(x, results)
|
|
case map[string]any:
|
|
out := make(map[string]any, len(x))
|
|
for k, vv := range x {
|
|
r, err := resolveRefs(vv, results)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out[k] = r
|
|
}
|
|
return out, nil
|
|
case []any:
|
|
out := make([]any, len(x))
|
|
for i, vv := range x {
|
|
r, err := resolveRefs(vv, results)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out[i] = r
|
|
}
|
|
return out, nil
|
|
default:
|
|
return v, nil // numbers, bools, nil — pass through
|
|
}
|
|
}
|
|
|
|
// refRe matches $node_id or $node_id.output.key (where key is
|
|
// dotted-path). Captures: 1=node_id, 2=optional ".output[.key]"
|
|
// suffix.
|
|
var refRe = regexp.MustCompile(`\$([a-zA-Z_][a-zA-Z0-9_]*)((?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)`)
|
|
|
|
// substituteStringRefs replaces $node.output.key references in a
|
|
// string with the resolved value (JSON-stringified for non-string
|
|
// targets so the result is always a string).
|
|
func substituteStringRefs(s string, results map[string]NodeResult) (string, error) {
|
|
var firstErr error
|
|
out := refRe.ReplaceAllStringFunc(s, func(match string) string {
|
|
if firstErr != nil {
|
|
return match
|
|
}
|
|
// Re-parse the match because ReplaceAllStringFunc gives the
|
|
// whole match without submatches.
|
|
m := refRe.FindStringSubmatch(match)
|
|
nodeID := m[1]
|
|
path := strings.TrimPrefix(m[2], ".")
|
|
nodeRes, ok := results[nodeID]
|
|
if !ok {
|
|
firstErr = fmt.Errorf("%w: $%s (no such node, or node not yet run)", ErrUnresolvedRef, nodeID)
|
|
return match
|
|
}
|
|
// path "output" or "output.X.Y" walks into nodeRes.Output
|
|
val, err := walkPath(nodeRes.Output, path)
|
|
if err != nil {
|
|
firstErr = fmt.Errorf("%w: $%s — %v", ErrUnresolvedRef, nodeID+m[2], err)
|
|
return match
|
|
}
|
|
return stringifyValue(val)
|
|
})
|
|
return out, firstErr
|
|
}
|
|
|
|
// walkPath resolves a dotted path against a nested map. Empty path
|
|
// returns the whole map. The first segment must be "output" — a
|
|
// convention that matches the SPEC §3.8 reference shape and prevents
|
|
// accidental access to other NodeResult fields.
|
|
func walkPath(output map[string]any, path string) (any, error) {
|
|
if path == "" {
|
|
return output, nil // bare $node — entire NodeResult.Output
|
|
}
|
|
parts := strings.Split(path, ".")
|
|
if parts[0] != "output" {
|
|
return nil, fmt.Errorf("path must start with .output (got %q)", parts[0])
|
|
}
|
|
parts = parts[1:]
|
|
var cur any = output
|
|
for _, p := range parts {
|
|
m, ok := cur.(map[string]any)
|
|
if !ok {
|
|
return nil, fmt.Errorf("cannot traverse into %T at segment %q", cur, p)
|
|
}
|
|
cur, ok = m[p]
|
|
if !ok {
|
|
return nil, fmt.Errorf("key %q not found in output", p)
|
|
}
|
|
}
|
|
return cur, nil
|
|
}
|
|
|
|
// stringifyValue renders a value as a string for prompt substitution.
|
|
// Strings pass through; nil → empty string; everything else is JSON-
|
|
// marshaled (so maps/slices produce valid JSON, not Go's %v syntax
|
|
// like `map[k:v]`). JSON failure falls back to fmt.Sprint so the
|
|
// substitution can never panic on weird inputs. Per 2026-04-29
|
|
// scrum2 (Opus + Kimi convergent): %v output was confusing for LLM
|
|
// modes that expected JSON-shaped context.
|
|
func stringifyValue(v any) string {
|
|
switch x := v.(type) {
|
|
case string:
|
|
return x
|
|
case nil:
|
|
return ""
|
|
default:
|
|
if bs, err := json.Marshal(x); err == nil {
|
|
return string(bs)
|
|
}
|
|
return fmt.Sprint(x)
|
|
}
|
|
}
|
|
|
|
// ─── DAG resolution ──────────────────────────────────────────────
|
|
|
|
// topoSort returns node IDs in a topologically-sorted order such
|
|
// that every dependency precedes its dependent. Cycles return an
|
|
// error (Validate catches them first; this is defense in depth).
|
|
func topoSort(nodes []Node) ([]string, error) {
|
|
indeg := make(map[string]int, len(nodes))
|
|
graph := make(map[string][]string, len(nodes))
|
|
for _, n := range nodes {
|
|
if _, ok := indeg[n.ID]; !ok {
|
|
indeg[n.ID] = 0
|
|
}
|
|
for _, dep := range n.DependsOn {
|
|
graph[dep] = append(graph[dep], n.ID)
|
|
indeg[n.ID]++
|
|
}
|
|
}
|
|
// Kahn's algorithm — preserve original order for ties so output
|
|
// is deterministic across runs.
|
|
queue := make([]string, 0, len(nodes))
|
|
for _, n := range nodes {
|
|
if indeg[n.ID] == 0 {
|
|
queue = append(queue, n.ID)
|
|
}
|
|
}
|
|
out := make([]string, 0, len(nodes))
|
|
for len(queue) > 0 {
|
|
cur := queue[0]
|
|
queue = queue[1:]
|
|
out = append(out, cur)
|
|
for _, child := range graph[cur] {
|
|
indeg[child]--
|
|
if indeg[child] == 0 {
|
|
queue = append(queue, child)
|
|
}
|
|
}
|
|
}
|
|
if len(out) != len(nodes) {
|
|
// Find a node still with non-zero indeg — that's where the
|
|
// cycle is reachable from.
|
|
for id, deg := range indeg {
|
|
if deg > 0 {
|
|
return nil, fmt.Errorf("%w: starting at node %q", ErrCycle, id)
|
|
}
|
|
}
|
|
return nil, ErrCycle
|
|
}
|
|
return out, nil
|
|
}
|
|
|