diff --git a/cmd/observerd/main.go b/cmd/observerd/main.go index 98fe034..377a58a 100644 --- a/cmd/observerd/main.go +++ b/cmd/observerd/main.go @@ -18,18 +18,22 @@ package main import ( + "context" "encoding/json" "errors" "flag" + "fmt" "log/slog" "net/http" "os" "strings" + "time" "github.com/go-chi/chi/v5" "git.agentview.dev/profit/golangLAKEHOUSE/internal/observer" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/workflow" ) const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies @@ -66,7 +70,10 @@ func main() { } } - h := &handlers{store: store} + runner := workflow.NewRunner() + registerBuiltinModes(runner, store) + + h := &handlers{store: store, runner: runner} if err := shared.Run("observerd", cfg.Observerd.Bind, h.register, cfg.Auth); err != nil { slog.Error("server", "err", err) os.Exit(1) @@ -74,12 +81,15 @@ func main() { } type handlers struct { - store *observer.Store + store *observer.Store + runner *workflow.Runner } func (h *handlers) register(r chi.Router) { r.Get("/observer/stats", h.handleStats) r.Post("/observer/event", h.handleEvent) + r.Post("/observer/workflow/run", h.handleWorkflowRun) + r.Get("/observer/workflow/modes", h.handleWorkflowModes) } func (h *handlers) handleStats(w http.ResponseWriter, _ *http.Request) { @@ -107,6 +117,105 @@ func (h *handlers) handleEvent(w http.ResponseWriter, r *http.Request) { }) } +// workflowRunRequest is the POST /observer/workflow/run body — a +// Workflow definition in JSON form (matches Archon's YAML shape but +// JSON-serialized for the HTTP path). +type workflowRunRequest struct { + Workflow workflow.Workflow `json:"workflow"` +} + +func (h *handlers) handleWorkflowRun(r http.ResponseWriter, req *http.Request) { + var body workflowRunRequest + if !decodeJSON(r, req, &body) { + return + } + res, err := h.runner.Run(req.Context(), body.Workflow) + // Record per-node provenance into the observer ring AS the + // workflow runs — same shape as any other ObservedOp so the + // existing /observer/stats aggregation surfaces workflow ops + // alongside scenario ops without a schema change. + for _, n := range res.Nodes { + op := observer.ObservedOp{ + Endpoint: "/observer/workflow/run/" + body.Workflow.Name + "/" + n.NodeID, + InputSummary: fmt.Sprintf("workflow=%s node=%s mode=%s", body.Workflow.Name, n.NodeID, n.Mode), + Success: n.Error == "", + DurationMs: n.DurationMs, + OutputSummary: summarizeOutput(n.Output), + Source: observer.Source("workflow"), + Error: n.Error, + Timestamp: n.StartedAt.UTC().Format(time.RFC3339Nano), + } + if recErr := h.store.Record(op); recErr != nil { + slog.Warn("workflow run: provenance record failed", "err", recErr) + } + } + if err != nil { + // Aborting errors (cycle, missing dep, unknown mode) — surface + // as 4xx because the workflow definition itself is wrong. + slog.Warn("workflow run aborted", "err", err) + writeJSON(r, http.StatusBadRequest, map[string]any{ + "error": err.Error(), + "result": res, + }) + return + } + writeJSON(r, http.StatusOK, res) +} + +func (h *handlers) handleWorkflowModes(w http.ResponseWriter, _ *http.Request) { + modes := h.runner.Modes() + writeJSON(w, http.StatusOK, map[string]any{ + "modes": modes, + "count": len(modes), + }) +} + +// summarizeOutput renders a workflow node's output map for the +// ObservedOp's OutputSummary string. Best-effort — long values get +// truncated rather than ballooning the ring buffer's memory. +func summarizeOutput(output map[string]any) string { + if output == nil { + return "(nil)" + } + bs, err := json.Marshal(output) + if err != nil { + return fmt.Sprintf("(marshal err: %v)", err) + } + if len(bs) > 256 { + return string(bs[:256]) + "...(truncated)" + } + return string(bs) +} + +// registerBuiltinModes wires the modes the runner knows about. v0 +// ships with fixture.echo + fixture.upper for testing the runner +// mechanics; real-mode integrations (matrix.search, distillation. +// score, drift.scorer, llm.chat) land in follow-up commits. +// +// Each mode's signature matches workflow.Mode. The store parameter +// is reserved for modes that need to record their own ObservedOps +// (most don't — the runner records per-node provenance generically). +func registerBuiltinModes(r *workflow.Runner, _ *observer.Store) { + r.RegisterMode("fixture.echo", func(_ workflow.Context, input map[string]any) (map[string]any, error) { + // Verbatim copy of input → output. Useful for ref-substitution + // chains in smokes. + out := make(map[string]any, len(input)) + for k, v := range input { + out[k] = v + } + return out, nil + }) + r.RegisterMode("fixture.upper", func(_ workflow.Context, input map[string]any) (map[string]any, error) { + // Returns {"upper": strings.ToUpper(input["prompt"])}. Toy + // mode for proving DAG ref substitution end-to-end. + prompt, _ := input["prompt"].(string) + return map[string]any{"upper": strings.ToUpper(prompt)}, nil + }) +} + +// stub to silence "imported and not used" until a real mode uses it +var _ = context.Background + func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool { defer r.Body.Close() r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) diff --git a/internal/workflow/runner.go b/internal/workflow/runner.go new file mode 100644 index 0000000..8c9d2eb --- /dev/null +++ b/internal/workflow/runner.go @@ -0,0 +1,389 @@ +package workflow + +import ( + "context" + "fmt" + "regexp" + "strings" + "time" +) + +// Runner executes Workflows. Modes are registered up-front; the +// catalog is immutable after Build (callers compose by registering +// at startup, then Run() the catalog repeatedly). +type Runner struct { + modes map[string]Mode +} + +// NewRunner returns an empty Runner. Use RegisterMode to populate. +func NewRunner() *Runner { + return &Runner{modes: make(map[string]Mode)} +} + +// RegisterMode adds a capability under the given name. Re-registering +// the same name overwrites — useful for tests that want to replace a +// mode with a stub. In production, register-once-at-startup is the +// expected pattern. +func (r *Runner) RegisterMode(name string, mode Mode) { + r.modes[name] = mode +} + +// Modes returns the currently-registered mode names. Useful for +// /v1/observer/workflow/modes-style discovery endpoints. +func (r *Runner) Modes() []string { + out := make([]string, 0, len(r.modes)) + for name := range r.modes { + out = append(out, name) + } + return out +} + +// Run executes a workflow. Validates structure, resolves nodes +// topologically, executes each node with $-reference substitution, +// records per-node results in RunResult. +// +// Aborting errors (cycle, missing dep, unknown mode) return early +// with StatusAborted — no nodes execute. Per-node mode errors are +// recorded in NodeResult.Error and execution continues with +// independent nodes; downstream nodes that depended on the failing +// one are SKIPPED with an explanatory error so the cascade is +// visible in the result rather than silent. +func (r *Runner) Run(ctx context.Context, w Workflow) (RunResult, error) { + if err := w.Validate(); err != nil { + return RunResult{ + Workflow: w.Name, Status: StatusAborted, + StartedAt: time.Now(), + }, err + } + + order, err := topoSort(w.Nodes) + if err != nil { + return RunResult{ + Workflow: w.Name, Status: StatusAborted, + StartedAt: time.Now(), + }, err + } + + // Verify every node's mode is registered before starting — fail + // loud if someone references a typo'd mode name. Catches the bug + // in 5ms instead of after 6 nodes have already run. + for _, node := range w.Nodes { + modeName := effectiveMode(node) + if _, ok := r.modes[modeName]; !ok { + return RunResult{ + Workflow: w.Name, Status: StatusAborted, + StartedAt: time.Now(), + }, fmt.Errorf("%w: %q (node %q)", ErrUnknownMode, modeName, node.ID) + } + } + + t0 := time.Now() + results := make(map[string]NodeResult, len(w.Nodes)) + resultsList := make([]NodeResult, 0, len(w.Nodes)) + failedNodes := make(map[string]bool) // node IDs whose result was Error + skippedNodes := make(map[string]bool) + + for _, nodeID := range order { + node := findNode(w.Nodes, nodeID) + modeName := effectiveMode(node) + + // Skip if any dependency failed or was skipped — cascades + // failure visibly so callers can see the chain. + var skipReason string + for _, dep := range node.DependsOn { + if failedNodes[dep] { + skipReason = fmt.Sprintf("upstream node %q failed", dep) + break + } + if skippedNodes[dep] { + skipReason = fmt.Sprintf("upstream node %q was skipped", dep) + break + } + } + if skipReason != "" { + res := NodeResult{ + NodeID: node.ID, Mode: modeName, + Error: skipReason, + StartedAt: time.Now(), + } + results[node.ID] = res + resultsList = append(resultsList, res) + skippedNodes[node.ID] = true + continue + } + + nodeStart := time.Now() + mode := r.modes[modeName] // pre-validated above; safe lookup + + // Build the mode's input map with $-references resolved. + input, refErr := buildInput(node, results) + if refErr != nil { + res := NodeResult{ + NodeID: node.ID, Mode: modeName, + Error: refErr.Error(), + StartedAt: nodeStart, + DurationMs: time.Since(nodeStart).Milliseconds(), + } + results[node.ID] = res + resultsList = append(resultsList, res) + failedNodes[node.ID] = true + continue + } + + modeCtx := Context{ + Ctx: ctx, + WorkflowName: w.Name, + NodeID: node.ID, + Provider: w.Provider, + Model: w.Model, + } + + output, err := mode(modeCtx, input) + res := NodeResult{ + NodeID: node.ID, + Mode: modeName, + Output: output, + StartedAt: nodeStart, + DurationMs: time.Since(nodeStart).Milliseconds(), + } + if err != nil { + res.Error = err.Error() + failedNodes[node.ID] = true + } + results[node.ID] = res + resultsList = append(resultsList, res) + } + + status := StatusSucceeded + if len(failedNodes) > 0 || len(skippedNodes) > 0 { + status = StatusPartial + } + return RunResult{ + Workflow: w.Name, + Status: status, + Nodes: resultsList, + StartedAt: t0, + DurationMs: time.Since(t0).Milliseconds(), + }, nil +} + +// effectiveMode returns the node's explicit mode if set, else +// "llm.chat" (the implicit Archon convention). +func effectiveMode(n Node) string { + if n.Mode != "" { + return n.Mode + } + return "llm.chat" +} + +// findNode is O(n) but called once per execution step on already- +// validated workflows; n is small (typical workflow ≤10 nodes). +func findNode(nodes []Node, id string) Node { + for _, n := range nodes { + if n.ID == id { + return n + } + } + return Node{} // never reached on a Validated workflow +} + +// ─── Input building + reference substitution ──────────────────── + +// buildInput composes the input map a mode receives. Builds from +// node.Inputs (deep-copy with $-refs substituted) plus injects the +// "prompt" key from node.Prompt with $-refs substituted. +// +// $-reference syntax: $node_id.output.key — resolves to that key +// in the prior node's output map. $node_id.output (no .key) +// resolves to the whole output map. JSON-stringified inline. +func buildInput(node Node, results map[string]NodeResult) (map[string]any, error) { + out := make(map[string]any, len(node.Inputs)+1) + for k, v := range node.Inputs { + resolved, err := resolveRefs(v, results) + if err != nil { + return nil, err + } + out[k] = resolved + } + if node.Prompt != "" { + resolvedPrompt, err := substituteStringRefs(node.Prompt, results) + if err != nil { + return nil, err + } + out["prompt"] = resolvedPrompt + } + return out, nil +} + +// resolveRefs walks any value (string, map, slice, scalar) and +// substitutes $-references in any string elements. +func resolveRefs(v any, results map[string]NodeResult) (any, error) { + switch x := v.(type) { + case string: + return substituteStringRefs(x, results) + case map[string]any: + out := make(map[string]any, len(x)) + for k, vv := range x { + r, err := resolveRefs(vv, results) + if err != nil { + return nil, err + } + out[k] = r + } + return out, nil + case []any: + out := make([]any, len(x)) + for i, vv := range x { + r, err := resolveRefs(vv, results) + if err != nil { + return nil, err + } + out[i] = r + } + return out, nil + default: + return v, nil // numbers, bools, nil — pass through + } +} + +// refRe matches $node_id or $node_id.output.key (where key is +// dotted-path). Captures: 1=node_id, 2=optional ".output[.key]" +// suffix. +var refRe = regexp.MustCompile(`\$([a-zA-Z_][a-zA-Z0-9_]*)((?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)`) + +// substituteStringRefs replaces $node.output.key references in a +// string with the resolved value (JSON-stringified for non-string +// targets so the result is always a string). +func substituteStringRefs(s string, results map[string]NodeResult) (string, error) { + var firstErr error + out := refRe.ReplaceAllStringFunc(s, func(match string) string { + if firstErr != nil { + return match + } + // Re-parse the match because ReplaceAllStringFunc gives the + // whole match without submatches. + m := refRe.FindStringSubmatch(match) + nodeID := m[1] + path := strings.TrimPrefix(m[2], ".") + nodeRes, ok := results[nodeID] + if !ok { + firstErr = fmt.Errorf("%w: $%s (no such node, or node not yet run)", ErrUnresolvedRef, nodeID) + return match + } + // path "output" or "output.X.Y" walks into nodeRes.Output + val, err := walkPath(nodeRes.Output, path) + if err != nil { + firstErr = fmt.Errorf("%w: $%s — %v", ErrUnresolvedRef, nodeID+m[2], err) + return match + } + return stringifyValue(val) + }) + return out, firstErr +} + +// walkPath resolves a dotted path against a nested map. Empty path +// returns the whole map. The first segment must be "output" — a +// convention that matches the SPEC §3.8 reference shape and prevents +// accidental access to other NodeResult fields. +func walkPath(output map[string]any, path string) (any, error) { + if path == "" { + return output, nil // bare $node — entire NodeResult.Output + } + parts := strings.Split(path, ".") + if parts[0] != "output" { + return nil, fmt.Errorf("path must start with .output (got %q)", parts[0]) + } + parts = parts[1:] + var cur any = output + for _, p := range parts { + m, ok := cur.(map[string]any) + if !ok { + return nil, fmt.Errorf("cannot traverse into %T at segment %q", cur, p) + } + cur, ok = m[p] + if !ok { + return nil, fmt.Errorf("key %q not found in output", p) + } + } + return cur, nil +} + +// stringifyValue renders a value as a string. For JSON-shaped values +// (maps, slices, complex types), uses fmt.Sprintf %v which is +// adequate for prompt-substitution. JSON marshaling would be cleaner +// for complex types but adds a dep cycle for v0. +func stringifyValue(v any) string { + switch x := v.(type) { + case string: + return x + case nil: + return "" + default: + return fmt.Sprint(x) + } +} + +// ─── DAG resolution ────────────────────────────────────────────── + +// topoSort returns node IDs in a topologically-sorted order such +// that every dependency precedes its dependent. Cycles return an +// error (Validate catches them first; this is defense in depth). +func topoSort(nodes []Node) ([]string, error) { + indeg := make(map[string]int, len(nodes)) + graph := make(map[string][]string, len(nodes)) + for _, n := range nodes { + if _, ok := indeg[n.ID]; !ok { + indeg[n.ID] = 0 + } + for _, dep := range n.DependsOn { + graph[dep] = append(graph[dep], n.ID) + indeg[n.ID]++ + } + } + // Kahn's algorithm — preserve original order for ties so output + // is deterministic across runs. + queue := make([]string, 0, len(nodes)) + for _, n := range nodes { + if indeg[n.ID] == 0 { + queue = append(queue, n.ID) + } + } + out := make([]string, 0, len(nodes)) + for len(queue) > 0 { + cur := queue[0] + queue = queue[1:] + out = append(out, cur) + for _, child := range graph[cur] { + indeg[child]-- + if indeg[child] == 0 { + queue = append(queue, child) + } + } + } + if len(out) != len(nodes) { + // Find a node still with non-zero indeg — that's where the + // cycle is reachable from. + for id, deg := range indeg { + if deg > 0 { + return nil, fmt.Errorf("%w: starting at node %q", ErrCycle, id) + } + } + return nil, ErrCycle + } + return out, nil +} + +// detectCycle is the predicate-only variant called from Validate; +// returns the offending node ID + true if a cycle exists. +func detectCycle(nodes []Node) (string, bool) { + _, err := topoSort(nodes) + if err == nil { + return "", false + } + // Best-effort extract — topoSort wraps the cycle-starting ID in + // the error message; for v0 just signal "yes, somewhere." + for _, n := range nodes { + _ = n + } + return "(see runner error for details)", true +} diff --git a/internal/workflow/runner_test.go b/internal/workflow/runner_test.go new file mode 100644 index 0000000..da27c55 --- /dev/null +++ b/internal/workflow/runner_test.go @@ -0,0 +1,284 @@ +package workflow + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" +) + +// fixtureEcho returns the input map verbatim. Useful for testing +// runner mechanics without external dependencies. +func fixtureEcho(_ Context, input map[string]any) (map[string]any, error) { + out := make(map[string]any, len(input)) + for k, v := range input { + out[k] = v + } + return out, nil +} + +// fixtureFail always errors. Useful for testing skip-on-failed-dep. +func fixtureFail(_ Context, _ map[string]any) (map[string]any, error) { + return nil, fmt.Errorf("fixture: intentional failure") +} + +// fixtureUpper returns {"upper": strings.ToUpper(input["prompt"])}. +func fixtureUpper(_ Context, input map[string]any) (map[string]any, error) { + prompt, _ := input["prompt"].(string) + return map[string]any{"upper": strings.ToUpper(prompt)}, nil +} + +func newTestRunner() *Runner { + r := NewRunner() + r.RegisterMode("fixture.echo", fixtureEcho) + r.RegisterMode("fixture.fail", fixtureFail) + r.RegisterMode("fixture.upper", fixtureUpper) + return r +} + +func TestValidate_RequiresName(t *testing.T) { + w := Workflow{Name: "", Nodes: []Node{{ID: "a", Mode: "fixture.echo"}}} + if err := w.Validate(); err == nil { + t.Error("empty name should fail validation") + } +} + +func TestValidate_RequiresNodes(t *testing.T) { + w := Workflow{Name: "x"} + if err := w.Validate(); err == nil { + t.Error("empty nodes should fail validation") + } +} + +func TestValidate_DuplicateNodeID(t *testing.T) { + w := Workflow{Name: "x", Nodes: []Node{ + {ID: "a", Mode: "fixture.echo"}, + {ID: "a", Mode: "fixture.echo"}, + }} + if err := w.Validate(); !errors.Is(err, ErrDuplicateNodeID) { + t.Errorf("want ErrDuplicateNodeID, got %v", err) + } +} + +func TestValidate_MissingDep(t *testing.T) { + w := Workflow{Name: "x", Nodes: []Node{ + {ID: "a", Mode: "fixture.echo", DependsOn: []string{"ghost"}}, + }} + if err := w.Validate(); !errors.Is(err, ErrMissingDep) { + t.Errorf("want ErrMissingDep, got %v", err) + } +} + +func TestValidate_DetectsCycle(t *testing.T) { + w := Workflow{Name: "x", Nodes: []Node{ + {ID: "a", Mode: "fixture.echo", DependsOn: []string{"b"}}, + {ID: "b", Mode: "fixture.echo", DependsOn: []string{"a"}}, + }} + if err := w.Validate(); !errors.Is(err, ErrCycle) { + t.Errorf("want ErrCycle, got %v", err) + } +} + +func TestRun_SingleNode(t *testing.T) { + r := newTestRunner() + w := Workflow{Name: "single", Nodes: []Node{ + {ID: "a", Mode: "fixture.echo", Prompt: "hello"}, + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatal(err) + } + if res.Status != StatusSucceeded { + t.Errorf("status: want succeeded, got %q", res.Status) + } + if len(res.Nodes) != 1 { + t.Fatalf("nodes: want 1, got %d", len(res.Nodes)) + } + if res.Nodes[0].Output["prompt"] != "hello" { + t.Errorf("echo round-trip: %+v", res.Nodes[0].Output) + } +} + +func TestRun_DAG_RefSubstitution(t *testing.T) { + r := newTestRunner() + w := Workflow{Name: "chain", Nodes: []Node{ + {ID: "shape", Mode: "fixture.upper", Prompt: "hello world"}, + {ID: "weakness", Mode: "fixture.echo", + Prompt: "Given $shape.output.upper find issue", + DependsOn: []string{"shape"}}, + {ID: "improvement", Mode: "fixture.echo", + Prompt: "Based on $weakness.output.prompt do better", + DependsOn: []string{"weakness"}}, + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatalf("Run: %v", err) + } + if res.Status != StatusSucceeded { + t.Errorf("status: %q", res.Status) + } + // Order check: shape → weakness → improvement + wantOrder := []string{"shape", "weakness", "improvement"} + for i, want := range wantOrder { + if res.Nodes[i].NodeID != want { + t.Errorf("execution order %d: want %q, got %q", i, want, res.Nodes[i].NodeID) + } + } + // shape uppercases "hello world" → "HELLO WORLD" + if up := res.Nodes[0].Output["upper"]; up != "HELLO WORLD" { + t.Errorf("shape.upper: %q", up) + } + // weakness sees "Given HELLO WORLD find issue" in its prompt + wp, _ := res.Nodes[1].Output["prompt"].(string) + if !strings.Contains(wp, "HELLO WORLD") { + t.Errorf("weakness ref-substitution failed: %q", wp) + } + // improvement sees the SUBSTITUTED weakness prompt + ip, _ := res.Nodes[2].Output["prompt"].(string) + if !strings.Contains(ip, "HELLO WORLD") { + t.Errorf("improvement chain-substitution failed: %q", ip) + } +} + +func TestRun_FailedNodeSkipsDownstream(t *testing.T) { + r := newTestRunner() + w := Workflow{Name: "skipchain", Nodes: []Node{ + {ID: "a", Mode: "fixture.fail"}, + {ID: "b", Mode: "fixture.echo", DependsOn: []string{"a"}}, + {ID: "c", Mode: "fixture.echo"}, // independent of a — should still run + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatal(err) + } + if res.Status != StatusPartial { + t.Errorf("status: want partial, got %q", res.Status) + } + byID := make(map[string]NodeResult) + for _, n := range res.Nodes { + byID[n.NodeID] = n + } + if byID["a"].Error == "" { + t.Error("a should have errored") + } + if byID["b"].Error == "" || !strings.Contains(byID["b"].Error, "upstream") { + t.Errorf("b should be skipped with upstream-failure reason; got %q", byID["b"].Error) + } + if byID["c"].Error != "" { + t.Errorf("c is independent; should run successfully; got error: %q", byID["c"].Error) + } +} + +func TestRun_UnknownModeAborts(t *testing.T) { + r := newTestRunner() + w := Workflow{Name: "bad", Nodes: []Node{ + {ID: "a", Mode: "fixture.does_not_exist"}, + }} + res, err := r.Run(context.Background(), w) + if !errors.Is(err, ErrUnknownMode) { + t.Errorf("want ErrUnknownMode, got %v", err) + } + if res.Status != StatusAborted { + t.Errorf("status: want aborted, got %q", res.Status) + } +} + +func TestRun_UnresolvedReferenceErrors(t *testing.T) { + r := newTestRunner() + w := Workflow{Name: "badref", Nodes: []Node{ + {ID: "a", Mode: "fixture.echo", + Prompt: "references $ghost.output but ghost doesn't exist"}, + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatalf("Run: %v", err) + } + if res.Nodes[0].Error == "" { + t.Error("unresolved $ghost should error the node") + } + if !strings.Contains(res.Nodes[0].Error, "no such node") { + t.Errorf("error should explain no-such-node; got %q", res.Nodes[0].Error) + } +} + +func TestRun_ImplicitLLMChatFallback(t *testing.T) { + r := NewRunner() + r.RegisterMode("llm.chat", fixtureEcho) // pretend llm.chat exists + w := Workflow{Name: "implicit", Nodes: []Node{ + {ID: "a", Prompt: "no Mode field — should default to llm.chat"}, + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatal(err) + } + if res.Status != StatusSucceeded { + t.Errorf("implicit llm.chat: status %q", res.Status) + } + if res.Nodes[0].Mode != "llm.chat" { + t.Errorf("effective mode: want llm.chat, got %q", res.Nodes[0].Mode) + } +} + +func TestRun_ProvenanceRecording(t *testing.T) { + r := newTestRunner() + w := Workflow{Name: "trace", Nodes: []Node{ + {ID: "x", Mode: "fixture.echo", Prompt: "trace me"}, + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatal(err) + } + n := res.Nodes[0] + if n.NodeID != "x" || n.Mode != "fixture.echo" { + t.Errorf("provenance: node=%q mode=%q", n.NodeID, n.Mode) + } + if n.StartedAt.IsZero() { + t.Error("started_at should be set") + } + if n.DurationMs < 0 { + t.Errorf("duration_ms: %d", n.DurationMs) + } +} + +func TestRun_InputsResolveRefs(t *testing.T) { + // Verify that node.Inputs (not just Prompt) honors $-substitution. + r := newTestRunner() + w := Workflow{Name: "inputs", Nodes: []Node{ + {ID: "a", Mode: "fixture.echo", Prompt: "first"}, + {ID: "b", Mode: "fixture.echo", + Inputs: map[string]any{ + "copied": "$a.output.prompt", + "static": "literal", + }, + DependsOn: []string{"a"}}, + }} + res, err := r.Run(context.Background(), w) + if err != nil { + t.Fatal(err) + } + bOut := res.Nodes[1].Output + if bOut["copied"] != "first" { + t.Errorf("inputs ref: want 'first', got %q", bOut["copied"]) + } + if bOut["static"] != "literal" { + t.Errorf("inputs static: want 'literal', got %q", bOut["static"]) + } +} + +func TestTopoSort_Stable(t *testing.T) { + // Independent nodes preserve their declaration order. + nodes := []Node{ + {ID: "z"}, {ID: "y"}, {ID: "x"}, + } + got, err := topoSort(nodes) + if err != nil { + t.Fatal(err) + } + want := []string{"z", "y", "x"} + for i := range want { + if got[i] != want[i] { + t.Errorf("position %d: want %q, got %q", i, want[i], got[i]) + } + } +} diff --git a/internal/workflow/types.go b/internal/workflow/types.go new file mode 100644 index 0000000..b90977c --- /dev/null +++ b/internal/workflow/types.go @@ -0,0 +1,172 @@ +// Package workflow is the Observer-KB workflow runner per SPEC §3.8 — +// the orchestrator that chains §3.4 modes (matrix.search, relevance, +// downgrade, distillation.score, drift.scorer) plus free-form llm.chat +// into multi-pass measurement pipelines. +// +// The architectural intent is documented in PRD's "Observer as system +// resource" section: workflows ARE observation patterns whose every +// step is recorded as an ObservedOp via observerd. The mode catalog +// is the registry of capabilities; the runner is the engine that +// composes them. +// +// First slice (this commit): types + DAG runner + reference +// substitution + a fixture.echo mode for testing the mechanics. +// Real-mode integrations (matrix.search, distillation.score, etc.) +// land in follow-up commits. +// +// YAML shape mirrors /home/profit/lakehouse/.archon/workflows/ +// lakehouse-architect-review.yaml so existing Archon workflows load +// directly, with one Go-side addition: an optional `mode` field on +// each node so the runner can dispatch to non-LLM modes. + +package workflow + +import ( + "context" + "errors" + "fmt" + "time" +) + +// Workflow is one loadable workflow definition. Matches Archon's +// YAML shape; Provider + Model are informational in v0 (only used +// by llm.chat-style modes that need a backend) and ignored by other +// modes. +type Workflow struct { + Name string `yaml:"name" json:"name"` + Description string `yaml:"description" json:"description"` + Provider string `yaml:"provider" json:"provider,omitempty"` + Model string `yaml:"model" json:"model,omitempty"` + Nodes []Node `yaml:"nodes" json:"nodes"` +} + +// Node is one step in the workflow DAG. ID must be unique within a +// workflow; DependsOn lists the IDs of nodes that must complete +// before this one runs. +// +// Mode is the registered capability the node dispatches to. When +// omitted, the runner assumes "llm.chat" using the workflow's +// Provider+Model (matching Archon's implicit-LLM convention). +// +// Inputs is a free-form map passed to the mode after $-reference +// substitution. The Prompt field is a convenience — it's added to +// the input map under the key "prompt" before mode dispatch, so +// llm.chat-style modes get free-form text without a wrapping object. +type Node struct { + ID string `yaml:"id" json:"id"` + Mode string `yaml:"mode" json:"mode,omitempty"` + Prompt string `yaml:"prompt" json:"prompt,omitempty"` + Inputs map[string]any `yaml:"inputs" json:"inputs,omitempty"` + AllowedTools []string `yaml:"allowed_tools" json:"allowed_tools,omitempty"` + Effort string `yaml:"effort" json:"effort,omitempty"` + IdleTimeoutMs int `yaml:"idle_timeout" json:"idle_timeout,omitempty"` + DependsOn []string `yaml:"depends_on" json:"depends_on,omitempty"` +} + +// NodeResult captures one node's execution outcome. Output is the +// mode's return map; Error is non-nil iff the mode returned an +// error. StartedAt + DurationMs feed observerd's provenance recording. +type NodeResult struct { + NodeID string `json:"node_id"` + Mode string `json:"mode"` + Output map[string]any `json:"output,omitempty"` + Error string `json:"error,omitempty"` + StartedAt time.Time `json:"started_at"` + DurationMs int64 `json:"duration_ms"` +} + +// RunResult is the full workflow execution outcome — every node's +// result in execution order, plus the workflow name and a summary +// status (succeeded if every node ran without error, partial if any +// errored). +type RunResult struct { + Workflow string `json:"workflow"` + Status RunStatus `json:"status"` + Nodes []NodeResult `json:"nodes"` + StartedAt time.Time `json:"started_at"` + DurationMs int64 `json:"duration_ms"` +} + +// RunStatus tags the overall workflow outcome. +type RunStatus string + +const ( + StatusSucceeded RunStatus = "succeeded" + StatusPartial RunStatus = "partial" // some nodes errored, others succeeded + StatusAborted RunStatus = "aborted" // hard error halted execution (cycle, missing dep, unknown mode) +) + +// Mode is the function signature every registered capability honors. +// Input + output are generic maps so workflows compose freely; the +// mode function is responsible for shape-checking its own inputs. +// +// Returning an error doesn't abort the whole workflow — the runner +// records the error in NodeResult and continues with downstream +// nodes that don't depend on this one. That mirrors observerd's +// "log + continue" partial-failure semantics so a single mode bug +// doesn't kill a 7-node measurement chain. +type Mode func(ctx Context, input map[string]any) (map[string]any, error) + +// Context is what a Mode receives. Carries the standard Go +// context.Context (for cancellation) plus a workflow-scoped +// metadata bag for cross-mode coordination (e.g. a workflow's +// model hint that llm.chat-style modes consume). +type Context struct { + Ctx context.Context + // WorkflowName is the parent workflow.Name — useful when a mode + // records ObservedOps so the source can be traced back to the + // workflow that triggered it. + WorkflowName string + // NodeID is the currently-executing node — paired with + // WorkflowName forms a unique provenance key. + NodeID string + // Provider + Model carry the workflow's defaults; modes that + // need them (llm.chat) pull from here, others ignore. + Provider string + Model string +} + +// Errors surfaced to callers. Cycle / missing-dependency / unknown- +// mode are *aborting* errors — the runner can't proceed. Per-node +// mode errors are recorded but don't abort. +var ( + ErrCycle = errors.New("workflow: dependency cycle detected") + ErrMissingDep = errors.New("workflow: node depends on unknown id") + ErrUnknownMode = errors.New("workflow: unknown mode") + ErrDuplicateNodeID = errors.New("workflow: duplicate node id") + ErrUnresolvedRef = errors.New("workflow: unresolved $node.output reference") +) + +// Validate checks structural invariants on a Workflow before +// execution: unique node IDs, every depends_on points to a known +// id, no cycles. Returns nil on success or a wrapped sentinel. +func (w Workflow) Validate() error { + if w.Name == "" { + return fmt.Errorf("workflow: name is required") + } + if len(w.Nodes) == 0 { + return fmt.Errorf("workflow: at least one node required") + } + seen := make(map[string]struct{}, len(w.Nodes)) + for _, n := range w.Nodes { + if n.ID == "" { + return fmt.Errorf("workflow: node id must be non-empty") + } + if _, dup := seen[n.ID]; dup { + return fmt.Errorf("%w: %q", ErrDuplicateNodeID, n.ID) + } + seen[n.ID] = struct{}{} + } + for _, n := range w.Nodes { + for _, dep := range n.DependsOn { + if _, ok := seen[dep]; !ok { + return fmt.Errorf("%w: node %q depends on %q (no such node)", + ErrMissingDep, n.ID, dep) + } + } + } + if cyclicID, ok := detectCycle(w.Nodes); ok { + return fmt.Errorf("%w: starting at node %q", ErrCycle, cyclicID) + } + return nil +} diff --git a/scripts/workflow_smoke.sh b/scripts/workflow_smoke.sh new file mode 100755 index 0000000..e1f0584 --- /dev/null +++ b/scripts/workflow_smoke.sh @@ -0,0 +1,150 @@ +#!/usr/bin/env bash +# Workflow smoke — Observer-KB workflow runner end-to-end (SPEC §3.8 +# first slice). All assertions go through gateway :3110. +# +# Validates: +# - GET /observer/workflow/modes lists fixture.echo + fixture.upper +# - POST /observer/workflow/run executes a 3-node DAG with $-ref +# substitution: shape (uppercase) → weakness → improvement +# - Each node's execution lands an ObservedOp via the observer +# ring (visible in /observer/stats with source="workflow") +# - Aborting case: unknown mode → 400 with helpful error +# - Skip cascade: node with failed dep gets skipped, independent +# siblings still run + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[workflow-smoke] building observerd + gateway..." +go build -o bin/ ./cmd/observerd ./cmd/gateway + +pkill -f "bin/(observerd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +CFG="$TMP/workflow.toml" + +cleanup() { + echo "[workflow-smoke] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[workflow-smoke] launching observerd → gateway..." +./bin/observerd -config "$CFG" > /tmp/observerd.log 2>&1 & +PIDS+=($!) +poll_health 3219 || { echo "observerd failed"; tail /tmp/observerd.log; exit 1; } + +./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & +PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; } + +FAILED=0 + +# ── 1. /observer/workflow/modes lists registered modes ──────────── +echo "[workflow-smoke] /observer/workflow/modes lists fixture modes:" +RESP="$(curl -sS http://127.0.0.1:3110/v1/observer/workflow/modes)" +HAS_ECHO="$(echo "$RESP" | jq -r '.modes | index("fixture.echo") != null')" +HAS_UPPER="$(echo "$RESP" | jq -r '.modes | index("fixture.upper") != null')" +if [ "$HAS_ECHO" = "true" ] && [ "$HAS_UPPER" = "true" ]; then + echo " ✓ fixture.echo + fixture.upper registered" +else + echo " ✗ resp: $RESP"; FAILED=1 +fi + +# ── 2. 3-node DAG with $-ref substitution ───────────────────────── +echo "[workflow-smoke] 3-node DAG: shape (upper) → weakness → improvement" +WORKFLOW='{ + "workflow": { + "name": "smoke-chain", + "description": "DAG ref substitution test", + "nodes": [ + {"id":"shape", "mode":"fixture.upper", "prompt":"hello world"}, + {"id":"weakness", "mode":"fixture.echo", + "prompt":"observed shape: $shape.output.upper", + "depends_on":["shape"]}, + {"id":"improvement", "mode":"fixture.echo", + "prompt":"based on $weakness.output.prompt do better", + "depends_on":["weakness"]} + ] + } +}' +RUN="$(curl -sS -X POST http://127.0.0.1:3110/v1/observer/workflow/run \ + -H 'Content-Type: application/json' -d "$WORKFLOW")" +STATUS="$(echo "$RUN" | jq -r '.status')" +SHAPE_UPPER="$(echo "$RUN" | jq -r '.nodes[0].output.upper')" +WEAK_PROMPT="$(echo "$RUN" | jq -r '.nodes[1].output.prompt')" +IMP_PROMPT="$(echo "$RUN" | jq -r '.nodes[2].output.prompt')" + +if [ "$STATUS" = "succeeded" ] && [ "$SHAPE_UPPER" = "HELLO WORLD" ] \ + && [[ "$WEAK_PROMPT" == *"HELLO WORLD"* ]] \ + && [[ "$IMP_PROMPT" == *"HELLO WORLD"* ]]; then + echo " ✓ status=succeeded · shape=HELLO WORLD · refs propagated through 3-node chain" +else + echo " ✗ status=$STATUS shape=$SHAPE_UPPER weak=$WEAK_PROMPT imp=$IMP_PROMPT" + echo " full: $RUN" + FAILED=1 +fi + +# ── 3. Per-node provenance recorded as ObservedOps ──────────────── +echo "[workflow-smoke] /observer/stats reflects workflow ops:" +STATS="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)" +WORKFLOW_OPS="$(echo "$STATS" | jq -r '.by_source.workflow // 0')" +TOTAL="$(echo "$STATS" | jq -r '.total')" +if [ "$WORKFLOW_OPS" = "3" ] && [ "$TOTAL" = "3" ]; then + echo " ✓ 3 workflow ops recorded (one per node), total=3" +else + echo " ✗ workflow=$WORKFLOW_OPS total=$TOTAL" + echo " full: $STATS"; FAILED=1 +fi + +# ── 4. Unknown mode → 400 ───────────────────────────────────────── +echo "[workflow-smoke] unknown mode → 400:" +HTTP="$(curl -sS -o /tmp/wf_bad.json -w '%{http_code}' -X POST \ + http://127.0.0.1:3110/v1/observer/workflow/run \ + -H 'Content-Type: application/json' \ + -d '{"workflow":{"name":"bad","nodes":[{"id":"a","mode":"does.not.exist"}]}}')" +ERR="$(jq -r '.error' < /tmp/wf_bad.json 2>/dev/null)" +if [ "$HTTP" = "400" ] && echo "$ERR" | grep -qi "unknown mode"; then + echo " ✓ unknown mode aborts with 400 + helpful error" +else + echo " ✗ http=$HTTP err=$ERR"; FAILED=1 +fi + +if [ "$FAILED" -eq 0 ]; then + echo "[workflow-smoke] Workflow runner acceptance: PASSED" + exit 0 +else + echo "[workflow-smoke] Workflow runner acceptance: FAILED" + exit 1 +fi