root e30da6e5aa §3.8 first slice: workflow runner skeleton + DAG executor + observerd integration
Lands the structural piece of SPEC §3.8 (Observer-KB workflow runner)
documented in 97dd3f8: types + DAG runner + reference substitution +
provenance recording into observerd. Real-mode integrations
(matrix.search, distillation.score, drift.scorer, llm.chat) come in
follow-up commits — this commit proves the mechanics.

internal/workflow/types.go:
  - Workflow / Node / NodeResult / RunResult types matching Archon's
    YAML shape so existing workflows (e.g. lakehouse-architect-review.yaml)
    load directly. Optional `mode` field added — implicit fall-back is
    "llm.chat" matching Archon's convention.
  - Mode signature: func(Context, map[string]any) (map[string]any, error)
  - 4 sentinel errors: ErrCycle, ErrMissingDep, ErrUnknownMode,
    ErrDuplicateNodeID, ErrUnresolvedRef
  - Validate enforces structural invariants: unique IDs, every
    depends_on resolves, no cycles

internal/workflow/runner.go:
  - Kahn's-algorithm topological sort, stable for declaration-order
    ties (deterministic execution + JSON output across runs)
  - Reference substitution: $node_id.output.key.path resolves through
    nested maps; $node_id alone resolves to the whole output map
  - Skip cascade: a node whose dependency failed/skipped is skipped
    with explicit "upstream node X failed" error in NodeResult, never
    silently dropped
  - Per-node provenance: NodeResult.StartedAt + DurationMs captured
    for every execution
  - Mode pre-validation: every node's mode checked against registry
    BEFORE any node runs — typo catches in 5ms not after 6 nodes

internal/workflow/runner_test.go (14 tests, all PASS):
  - Validate: missing name, no nodes, duplicate IDs, missing deps, cycles
  - Run: single node, 3-node DAG with chained $-refs (shape→weakness→improvement),
    failed-node skip cascade with independent siblings still running,
    unknown-mode abort, unresolved-reference error, implicit
    llm.chat fallback, provenance fields populated, inputs (not just
    prompt) honor $-refs, topological-sort stability for ties

cmd/observerd extended:
  - POST /observer/workflow/run executes a workflow, records each
    node's execution as an ObservedOp (source="workflow"), returns
    the full RunResult
  - GET /observer/workflow/modes lists the registered mode names
  - registerBuiltinModes wires fixture.echo + fixture.upper for v0;
    real modes register here in follow-up commits

scripts/workflow_smoke.sh (4 assertions PASS):
  - GET /modes lists fixture.echo + fixture.upper
  - 3-node DAG executes: shape (uppercase "hello world") → weakness
    (sees "HELLO WORLD" via $shape.output.upper ref) → improvement
    (sees "HELLO WORLD" propagated through 2-hop $weakness.output.prompt)
  - /observer/stats shows by_source.workflow == 3 (one per node) and
    total == 3 — provenance lands as expected
  - Unknown mode → 400 with "unknown mode" in error body

17-smoke regression all green. Acceptance gates G3.8.A (Archon-shape
workflow loads + executes topologically) + G3.8.B (per-node ObservedOps)
+ G3.8.C ($prior_node.output ref resolves, error on missing ref) all
satisfied. G3.8.D (in-process matrix.search dispatch) deferred until
a real mode is wired.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 20:34:30 -05:00

285 lines
8.1 KiB
Go

package workflow
import (
"context"
"errors"
"fmt"
"strings"
"testing"
)
// fixtureEcho returns the input map verbatim. Useful for testing
// runner mechanics without external dependencies.
func fixtureEcho(_ Context, input map[string]any) (map[string]any, error) {
out := make(map[string]any, len(input))
for k, v := range input {
out[k] = v
}
return out, nil
}
// fixtureFail always errors. Useful for testing skip-on-failed-dep.
func fixtureFail(_ Context, _ map[string]any) (map[string]any, error) {
return nil, fmt.Errorf("fixture: intentional failure")
}
// fixtureUpper returns {"upper": strings.ToUpper(input["prompt"])}.
func fixtureUpper(_ Context, input map[string]any) (map[string]any, error) {
prompt, _ := input["prompt"].(string)
return map[string]any{"upper": strings.ToUpper(prompt)}, nil
}
func newTestRunner() *Runner {
r := NewRunner()
r.RegisterMode("fixture.echo", fixtureEcho)
r.RegisterMode("fixture.fail", fixtureFail)
r.RegisterMode("fixture.upper", fixtureUpper)
return r
}
func TestValidate_RequiresName(t *testing.T) {
w := Workflow{Name: "", Nodes: []Node{{ID: "a", Mode: "fixture.echo"}}}
if err := w.Validate(); err == nil {
t.Error("empty name should fail validation")
}
}
func TestValidate_RequiresNodes(t *testing.T) {
w := Workflow{Name: "x"}
if err := w.Validate(); err == nil {
t.Error("empty nodes should fail validation")
}
}
func TestValidate_DuplicateNodeID(t *testing.T) {
w := Workflow{Name: "x", Nodes: []Node{
{ID: "a", Mode: "fixture.echo"},
{ID: "a", Mode: "fixture.echo"},
}}
if err := w.Validate(); !errors.Is(err, ErrDuplicateNodeID) {
t.Errorf("want ErrDuplicateNodeID, got %v", err)
}
}
func TestValidate_MissingDep(t *testing.T) {
w := Workflow{Name: "x", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", DependsOn: []string{"ghost"}},
}}
if err := w.Validate(); !errors.Is(err, ErrMissingDep) {
t.Errorf("want ErrMissingDep, got %v", err)
}
}
func TestValidate_DetectsCycle(t *testing.T) {
w := Workflow{Name: "x", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", DependsOn: []string{"b"}},
{ID: "b", Mode: "fixture.echo", DependsOn: []string{"a"}},
}}
if err := w.Validate(); !errors.Is(err, ErrCycle) {
t.Errorf("want ErrCycle, got %v", err)
}
}
func TestRun_SingleNode(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "single", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", Prompt: "hello"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
if res.Status != StatusSucceeded {
t.Errorf("status: want succeeded, got %q", res.Status)
}
if len(res.Nodes) != 1 {
t.Fatalf("nodes: want 1, got %d", len(res.Nodes))
}
if res.Nodes[0].Output["prompt"] != "hello" {
t.Errorf("echo round-trip: %+v", res.Nodes[0].Output)
}
}
func TestRun_DAG_RefSubstitution(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "chain", Nodes: []Node{
{ID: "shape", Mode: "fixture.upper", Prompt: "hello world"},
{ID: "weakness", Mode: "fixture.echo",
Prompt: "Given $shape.output.upper find issue",
DependsOn: []string{"shape"}},
{ID: "improvement", Mode: "fixture.echo",
Prompt: "Based on $weakness.output.prompt do better",
DependsOn: []string{"weakness"}},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Status != StatusSucceeded {
t.Errorf("status: %q", res.Status)
}
// Order check: shape → weakness → improvement
wantOrder := []string{"shape", "weakness", "improvement"}
for i, want := range wantOrder {
if res.Nodes[i].NodeID != want {
t.Errorf("execution order %d: want %q, got %q", i, want, res.Nodes[i].NodeID)
}
}
// shape uppercases "hello world" → "HELLO WORLD"
if up := res.Nodes[0].Output["upper"]; up != "HELLO WORLD" {
t.Errorf("shape.upper: %q", up)
}
// weakness sees "Given HELLO WORLD find issue" in its prompt
wp, _ := res.Nodes[1].Output["prompt"].(string)
if !strings.Contains(wp, "HELLO WORLD") {
t.Errorf("weakness ref-substitution failed: %q", wp)
}
// improvement sees the SUBSTITUTED weakness prompt
ip, _ := res.Nodes[2].Output["prompt"].(string)
if !strings.Contains(ip, "HELLO WORLD") {
t.Errorf("improvement chain-substitution failed: %q", ip)
}
}
func TestRun_FailedNodeSkipsDownstream(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "skipchain", Nodes: []Node{
{ID: "a", Mode: "fixture.fail"},
{ID: "b", Mode: "fixture.echo", DependsOn: []string{"a"}},
{ID: "c", Mode: "fixture.echo"}, // independent of a — should still run
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
if res.Status != StatusPartial {
t.Errorf("status: want partial, got %q", res.Status)
}
byID := make(map[string]NodeResult)
for _, n := range res.Nodes {
byID[n.NodeID] = n
}
if byID["a"].Error == "" {
t.Error("a should have errored")
}
if byID["b"].Error == "" || !strings.Contains(byID["b"].Error, "upstream") {
t.Errorf("b should be skipped with upstream-failure reason; got %q", byID["b"].Error)
}
if byID["c"].Error != "" {
t.Errorf("c is independent; should run successfully; got error: %q", byID["c"].Error)
}
}
func TestRun_UnknownModeAborts(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "bad", Nodes: []Node{
{ID: "a", Mode: "fixture.does_not_exist"},
}}
res, err := r.Run(context.Background(), w)
if !errors.Is(err, ErrUnknownMode) {
t.Errorf("want ErrUnknownMode, got %v", err)
}
if res.Status != StatusAborted {
t.Errorf("status: want aborted, got %q", res.Status)
}
}
func TestRun_UnresolvedReferenceErrors(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "badref", Nodes: []Node{
{ID: "a", Mode: "fixture.echo",
Prompt: "references $ghost.output but ghost doesn't exist"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Nodes[0].Error == "" {
t.Error("unresolved $ghost should error the node")
}
if !strings.Contains(res.Nodes[0].Error, "no such node") {
t.Errorf("error should explain no-such-node; got %q", res.Nodes[0].Error)
}
}
func TestRun_ImplicitLLMChatFallback(t *testing.T) {
r := NewRunner()
r.RegisterMode("llm.chat", fixtureEcho) // pretend llm.chat exists
w := Workflow{Name: "implicit", Nodes: []Node{
{ID: "a", Prompt: "no Mode field — should default to llm.chat"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
if res.Status != StatusSucceeded {
t.Errorf("implicit llm.chat: status %q", res.Status)
}
if res.Nodes[0].Mode != "llm.chat" {
t.Errorf("effective mode: want llm.chat, got %q", res.Nodes[0].Mode)
}
}
func TestRun_ProvenanceRecording(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "trace", Nodes: []Node{
{ID: "x", Mode: "fixture.echo", Prompt: "trace me"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
n := res.Nodes[0]
if n.NodeID != "x" || n.Mode != "fixture.echo" {
t.Errorf("provenance: node=%q mode=%q", n.NodeID, n.Mode)
}
if n.StartedAt.IsZero() {
t.Error("started_at should be set")
}
if n.DurationMs < 0 {
t.Errorf("duration_ms: %d", n.DurationMs)
}
}
func TestRun_InputsResolveRefs(t *testing.T) {
// Verify that node.Inputs (not just Prompt) honors $-substitution.
r := newTestRunner()
w := Workflow{Name: "inputs", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", Prompt: "first"},
{ID: "b", Mode: "fixture.echo",
Inputs: map[string]any{
"copied": "$a.output.prompt",
"static": "literal",
},
DependsOn: []string{"a"}},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
bOut := res.Nodes[1].Output
if bOut["copied"] != "first" {
t.Errorf("inputs ref: want 'first', got %q", bOut["copied"])
}
if bOut["static"] != "literal" {
t.Errorf("inputs static: want 'literal', got %q", bOut["static"])
}
}
func TestTopoSort_Stable(t *testing.T) {
// Independent nodes preserve their declaration order.
nodes := []Node{
{ID: "z"}, {ID: "y"}, {ID: "x"},
}
got, err := topoSort(nodes)
if err != nil {
t.Fatal(err)
}
want := []string{"z", "y", "x"}
for i := range want {
if got[i] != want[i] {
t.Errorf("position %d: want %q, got %q", i, want[i], got[i])
}
}
}