§3.8 first slice: workflow runner skeleton + DAG executor + observerd integration

Lands the structural piece of SPEC §3.8 (Observer-KB workflow runner)
documented in 97dd3f8: types + DAG runner + reference substitution +
provenance recording into observerd. Real-mode integrations
(matrix.search, distillation.score, drift.scorer, llm.chat) come in
follow-up commits — this commit proves the mechanics.

internal/workflow/types.go:
  - Workflow / Node / NodeResult / RunResult types matching Archon's
    YAML shape so existing workflows (e.g. lakehouse-architect-review.yaml)
    load directly. Optional `mode` field added — implicit fall-back is
    "llm.chat" matching Archon's convention.
  - Mode signature: func(Context, map[string]any) (map[string]any, error)
  - 4 sentinel errors: ErrCycle, ErrMissingDep, ErrUnknownMode,
    ErrDuplicateNodeID, ErrUnresolvedRef
  - Validate enforces structural invariants: unique IDs, every
    depends_on resolves, no cycles

internal/workflow/runner.go:
  - Kahn's-algorithm topological sort, stable for declaration-order
    ties (deterministic execution + JSON output across runs)
  - Reference substitution: $node_id.output.key.path resolves through
    nested maps; $node_id alone resolves to the whole output map
  - Skip cascade: a node whose dependency failed/skipped is skipped
    with explicit "upstream node X failed" error in NodeResult, never
    silently dropped
  - Per-node provenance: NodeResult.StartedAt + DurationMs captured
    for every execution
  - Mode pre-validation: every node's mode checked against registry
    BEFORE any node runs — typo catches in 5ms not after 6 nodes

internal/workflow/runner_test.go (14 tests, all PASS):
  - Validate: missing name, no nodes, duplicate IDs, missing deps, cycles
  - Run: single node, 3-node DAG with chained $-refs (shape→weakness→improvement),
    failed-node skip cascade with independent siblings still running,
    unknown-mode abort, unresolved-reference error, implicit
    llm.chat fallback, provenance fields populated, inputs (not just
    prompt) honor $-refs, topological-sort stability for ties

cmd/observerd extended:
  - POST /observer/workflow/run executes a workflow, records each
    node's execution as an ObservedOp (source="workflow"), returns
    the full RunResult
  - GET /observer/workflow/modes lists the registered mode names
  - registerBuiltinModes wires fixture.echo + fixture.upper for v0;
    real modes register here in follow-up commits

scripts/workflow_smoke.sh (4 assertions PASS):
  - GET /modes lists fixture.echo + fixture.upper
  - 3-node DAG executes: shape (uppercase "hello world") → weakness
    (sees "HELLO WORLD" via $shape.output.upper ref) → improvement
    (sees "HELLO WORLD" propagated through 2-hop $weakness.output.prompt)
  - /observer/stats shows by_source.workflow == 3 (one per node) and
    total == 3 — provenance lands as expected
  - Unknown mode → 400 with "unknown mode" in error body

17-smoke regression all green. Acceptance gates G3.8.A (Archon-shape
workflow loads + executes topologically) + G3.8.B (per-node ObservedOps)
+ G3.8.C ($prior_node.output ref resolves, error on missing ref) all
satisfied. G3.8.D (in-process matrix.search dispatch) deferred until
a real mode is wired.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-29 20:34:30 -05:00
parent 97dd3f826d
commit e30da6e5aa
5 changed files with 1106 additions and 2 deletions

View File

@ -18,18 +18,22 @@
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"strings"
"time"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/observer"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/workflow"
)
const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies
@ -66,7 +70,10 @@ func main() {
}
}
h := &handlers{store: store}
runner := workflow.NewRunner()
registerBuiltinModes(runner, store)
h := &handlers{store: store, runner: runner}
if err := shared.Run("observerd", cfg.Observerd.Bind, h.register, cfg.Auth); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
@ -74,12 +81,15 @@ func main() {
}
type handlers struct {
store *observer.Store
store *observer.Store
runner *workflow.Runner
}
func (h *handlers) register(r chi.Router) {
r.Get("/observer/stats", h.handleStats)
r.Post("/observer/event", h.handleEvent)
r.Post("/observer/workflow/run", h.handleWorkflowRun)
r.Get("/observer/workflow/modes", h.handleWorkflowModes)
}
func (h *handlers) handleStats(w http.ResponseWriter, _ *http.Request) {
@ -107,6 +117,105 @@ func (h *handlers) handleEvent(w http.ResponseWriter, r *http.Request) {
})
}
// workflowRunRequest is the POST /observer/workflow/run body — a
// Workflow definition in JSON form (matches Archon's YAML shape but
// JSON-serialized for the HTTP path).
type workflowRunRequest struct {
Workflow workflow.Workflow `json:"workflow"`
}
func (h *handlers) handleWorkflowRun(r http.ResponseWriter, req *http.Request) {
var body workflowRunRequest
if !decodeJSON(r, req, &body) {
return
}
res, err := h.runner.Run(req.Context(), body.Workflow)
// Record per-node provenance into the observer ring AS the
// workflow runs — same shape as any other ObservedOp so the
// existing /observer/stats aggregation surfaces workflow ops
// alongside scenario ops without a schema change.
for _, n := range res.Nodes {
op := observer.ObservedOp{
Endpoint: "/observer/workflow/run/" + body.Workflow.Name + "/" + n.NodeID,
InputSummary: fmt.Sprintf("workflow=%s node=%s mode=%s", body.Workflow.Name, n.NodeID, n.Mode),
Success: n.Error == "",
DurationMs: n.DurationMs,
OutputSummary: summarizeOutput(n.Output),
Source: observer.Source("workflow"),
Error: n.Error,
Timestamp: n.StartedAt.UTC().Format(time.RFC3339Nano),
}
if recErr := h.store.Record(op); recErr != nil {
slog.Warn("workflow run: provenance record failed", "err", recErr)
}
}
if err != nil {
// Aborting errors (cycle, missing dep, unknown mode) — surface
// as 4xx because the workflow definition itself is wrong.
slog.Warn("workflow run aborted", "err", err)
writeJSON(r, http.StatusBadRequest, map[string]any{
"error": err.Error(),
"result": res,
})
return
}
writeJSON(r, http.StatusOK, res)
}
func (h *handlers) handleWorkflowModes(w http.ResponseWriter, _ *http.Request) {
modes := h.runner.Modes()
writeJSON(w, http.StatusOK, map[string]any{
"modes": modes,
"count": len(modes),
})
}
// summarizeOutput renders a workflow node's output map for the
// ObservedOp's OutputSummary string. Best-effort — long values get
// truncated rather than ballooning the ring buffer's memory.
func summarizeOutput(output map[string]any) string {
if output == nil {
return "(nil)"
}
bs, err := json.Marshal(output)
if err != nil {
return fmt.Sprintf("(marshal err: %v)", err)
}
if len(bs) > 256 {
return string(bs[:256]) + "...(truncated)"
}
return string(bs)
}
// registerBuiltinModes wires the modes the runner knows about. v0
// ships with fixture.echo + fixture.upper for testing the runner
// mechanics; real-mode integrations (matrix.search, distillation.
// score, drift.scorer, llm.chat) land in follow-up commits.
//
// Each mode's signature matches workflow.Mode. The store parameter
// is reserved for modes that need to record their own ObservedOps
// (most don't — the runner records per-node provenance generically).
func registerBuiltinModes(r *workflow.Runner, _ *observer.Store) {
r.RegisterMode("fixture.echo", func(_ workflow.Context, input map[string]any) (map[string]any, error) {
// Verbatim copy of input → output. Useful for ref-substitution
// chains in smokes.
out := make(map[string]any, len(input))
for k, v := range input {
out[k] = v
}
return out, nil
})
r.RegisterMode("fixture.upper", func(_ workflow.Context, input map[string]any) (map[string]any, error) {
// Returns {"upper": strings.ToUpper(input["prompt"])}. Toy
// mode for proving DAG ref substitution end-to-end.
prompt, _ := input["prompt"].(string)
return map[string]any{"upper": strings.ToUpper(prompt)}, nil
})
}
// stub to silence "imported and not used" until a real mode uses it
var _ = context.Background
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
defer r.Body.Close()
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)

389
internal/workflow/runner.go Normal file
View File

@ -0,0 +1,389 @@
package workflow
import (
"context"
"fmt"
"regexp"
"strings"
"time"
)
// Runner executes Workflows. Modes are registered up-front; the
// catalog is immutable after Build (callers compose by registering
// at startup, then Run() the catalog repeatedly).
type Runner struct {
modes map[string]Mode
}
// NewRunner returns an empty Runner. Use RegisterMode to populate.
func NewRunner() *Runner {
return &Runner{modes: make(map[string]Mode)}
}
// RegisterMode adds a capability under the given name. Re-registering
// the same name overwrites — useful for tests that want to replace a
// mode with a stub. In production, register-once-at-startup is the
// expected pattern.
func (r *Runner) RegisterMode(name string, mode Mode) {
r.modes[name] = mode
}
// Modes returns the currently-registered mode names. Useful for
// /v1/observer/workflow/modes-style discovery endpoints.
func (r *Runner) Modes() []string {
out := make([]string, 0, len(r.modes))
for name := range r.modes {
out = append(out, name)
}
return out
}
// Run executes a workflow. Validates structure, resolves nodes
// topologically, executes each node with $-reference substitution,
// records per-node results in RunResult.
//
// Aborting errors (cycle, missing dep, unknown mode) return early
// with StatusAborted — no nodes execute. Per-node mode errors are
// recorded in NodeResult.Error and execution continues with
// independent nodes; downstream nodes that depended on the failing
// one are SKIPPED with an explanatory error so the cascade is
// visible in the result rather than silent.
func (r *Runner) Run(ctx context.Context, w Workflow) (RunResult, error) {
if err := w.Validate(); err != nil {
return RunResult{
Workflow: w.Name, Status: StatusAborted,
StartedAt: time.Now(),
}, err
}
order, err := topoSort(w.Nodes)
if err != nil {
return RunResult{
Workflow: w.Name, Status: StatusAborted,
StartedAt: time.Now(),
}, err
}
// Verify every node's mode is registered before starting — fail
// loud if someone references a typo'd mode name. Catches the bug
// in 5ms instead of after 6 nodes have already run.
for _, node := range w.Nodes {
modeName := effectiveMode(node)
if _, ok := r.modes[modeName]; !ok {
return RunResult{
Workflow: w.Name, Status: StatusAborted,
StartedAt: time.Now(),
}, fmt.Errorf("%w: %q (node %q)", ErrUnknownMode, modeName, node.ID)
}
}
t0 := time.Now()
results := make(map[string]NodeResult, len(w.Nodes))
resultsList := make([]NodeResult, 0, len(w.Nodes))
failedNodes := make(map[string]bool) // node IDs whose result was Error
skippedNodes := make(map[string]bool)
for _, nodeID := range order {
node := findNode(w.Nodes, nodeID)
modeName := effectiveMode(node)
// Skip if any dependency failed or was skipped — cascades
// failure visibly so callers can see the chain.
var skipReason string
for _, dep := range node.DependsOn {
if failedNodes[dep] {
skipReason = fmt.Sprintf("upstream node %q failed", dep)
break
}
if skippedNodes[dep] {
skipReason = fmt.Sprintf("upstream node %q was skipped", dep)
break
}
}
if skipReason != "" {
res := NodeResult{
NodeID: node.ID, Mode: modeName,
Error: skipReason,
StartedAt: time.Now(),
}
results[node.ID] = res
resultsList = append(resultsList, res)
skippedNodes[node.ID] = true
continue
}
nodeStart := time.Now()
mode := r.modes[modeName] // pre-validated above; safe lookup
// Build the mode's input map with $-references resolved.
input, refErr := buildInput(node, results)
if refErr != nil {
res := NodeResult{
NodeID: node.ID, Mode: modeName,
Error: refErr.Error(),
StartedAt: nodeStart,
DurationMs: time.Since(nodeStart).Milliseconds(),
}
results[node.ID] = res
resultsList = append(resultsList, res)
failedNodes[node.ID] = true
continue
}
modeCtx := Context{
Ctx: ctx,
WorkflowName: w.Name,
NodeID: node.ID,
Provider: w.Provider,
Model: w.Model,
}
output, err := mode(modeCtx, input)
res := NodeResult{
NodeID: node.ID,
Mode: modeName,
Output: output,
StartedAt: nodeStart,
DurationMs: time.Since(nodeStart).Milliseconds(),
}
if err != nil {
res.Error = err.Error()
failedNodes[node.ID] = true
}
results[node.ID] = res
resultsList = append(resultsList, res)
}
status := StatusSucceeded
if len(failedNodes) > 0 || len(skippedNodes) > 0 {
status = StatusPartial
}
return RunResult{
Workflow: w.Name,
Status: status,
Nodes: resultsList,
StartedAt: t0,
DurationMs: time.Since(t0).Milliseconds(),
}, nil
}
// effectiveMode returns the node's explicit mode if set, else
// "llm.chat" (the implicit Archon convention).
func effectiveMode(n Node) string {
if n.Mode != "" {
return n.Mode
}
return "llm.chat"
}
// findNode is O(n) but called once per execution step on already-
// validated workflows; n is small (typical workflow ≤10 nodes).
func findNode(nodes []Node, id string) Node {
for _, n := range nodes {
if n.ID == id {
return n
}
}
return Node{} // never reached on a Validated workflow
}
// ─── Input building + reference substitution ────────────────────
// buildInput composes the input map a mode receives. Builds from
// node.Inputs (deep-copy with $-refs substituted) plus injects the
// "prompt" key from node.Prompt with $-refs substituted.
//
// $-reference syntax: $node_id.output.key — resolves to that key
// in the prior node's output map. $node_id.output (no .key)
// resolves to the whole output map. JSON-stringified inline.
func buildInput(node Node, results map[string]NodeResult) (map[string]any, error) {
out := make(map[string]any, len(node.Inputs)+1)
for k, v := range node.Inputs {
resolved, err := resolveRefs(v, results)
if err != nil {
return nil, err
}
out[k] = resolved
}
if node.Prompt != "" {
resolvedPrompt, err := substituteStringRefs(node.Prompt, results)
if err != nil {
return nil, err
}
out["prompt"] = resolvedPrompt
}
return out, nil
}
// resolveRefs walks any value (string, map, slice, scalar) and
// substitutes $-references in any string elements.
func resolveRefs(v any, results map[string]NodeResult) (any, error) {
switch x := v.(type) {
case string:
return substituteStringRefs(x, results)
case map[string]any:
out := make(map[string]any, len(x))
for k, vv := range x {
r, err := resolveRefs(vv, results)
if err != nil {
return nil, err
}
out[k] = r
}
return out, nil
case []any:
out := make([]any, len(x))
for i, vv := range x {
r, err := resolveRefs(vv, results)
if err != nil {
return nil, err
}
out[i] = r
}
return out, nil
default:
return v, nil // numbers, bools, nil — pass through
}
}
// refRe matches $node_id or $node_id.output.key (where key is
// dotted-path). Captures: 1=node_id, 2=optional ".output[.key]"
// suffix.
var refRe = regexp.MustCompile(`\$([a-zA-Z_][a-zA-Z0-9_]*)((?:\.[a-zA-Z_][a-zA-Z0-9_]*)*)`)
// substituteStringRefs replaces $node.output.key references in a
// string with the resolved value (JSON-stringified for non-string
// targets so the result is always a string).
func substituteStringRefs(s string, results map[string]NodeResult) (string, error) {
var firstErr error
out := refRe.ReplaceAllStringFunc(s, func(match string) string {
if firstErr != nil {
return match
}
// Re-parse the match because ReplaceAllStringFunc gives the
// whole match without submatches.
m := refRe.FindStringSubmatch(match)
nodeID := m[1]
path := strings.TrimPrefix(m[2], ".")
nodeRes, ok := results[nodeID]
if !ok {
firstErr = fmt.Errorf("%w: $%s (no such node, or node not yet run)", ErrUnresolvedRef, nodeID)
return match
}
// path "output" or "output.X.Y" walks into nodeRes.Output
val, err := walkPath(nodeRes.Output, path)
if err != nil {
firstErr = fmt.Errorf("%w: $%s — %v", ErrUnresolvedRef, nodeID+m[2], err)
return match
}
return stringifyValue(val)
})
return out, firstErr
}
// walkPath resolves a dotted path against a nested map. Empty path
// returns the whole map. The first segment must be "output" — a
// convention that matches the SPEC §3.8 reference shape and prevents
// accidental access to other NodeResult fields.
func walkPath(output map[string]any, path string) (any, error) {
if path == "" {
return output, nil // bare $node — entire NodeResult.Output
}
parts := strings.Split(path, ".")
if parts[0] != "output" {
return nil, fmt.Errorf("path must start with .output (got %q)", parts[0])
}
parts = parts[1:]
var cur any = output
for _, p := range parts {
m, ok := cur.(map[string]any)
if !ok {
return nil, fmt.Errorf("cannot traverse into %T at segment %q", cur, p)
}
cur, ok = m[p]
if !ok {
return nil, fmt.Errorf("key %q not found in output", p)
}
}
return cur, nil
}
// stringifyValue renders a value as a string. For JSON-shaped values
// (maps, slices, complex types), uses fmt.Sprintf %v which is
// adequate for prompt-substitution. JSON marshaling would be cleaner
// for complex types but adds a dep cycle for v0.
func stringifyValue(v any) string {
switch x := v.(type) {
case string:
return x
case nil:
return ""
default:
return fmt.Sprint(x)
}
}
// ─── DAG resolution ──────────────────────────────────────────────
// topoSort returns node IDs in a topologically-sorted order such
// that every dependency precedes its dependent. Cycles return an
// error (Validate catches them first; this is defense in depth).
func topoSort(nodes []Node) ([]string, error) {
indeg := make(map[string]int, len(nodes))
graph := make(map[string][]string, len(nodes))
for _, n := range nodes {
if _, ok := indeg[n.ID]; !ok {
indeg[n.ID] = 0
}
for _, dep := range n.DependsOn {
graph[dep] = append(graph[dep], n.ID)
indeg[n.ID]++
}
}
// Kahn's algorithm — preserve original order for ties so output
// is deterministic across runs.
queue := make([]string, 0, len(nodes))
for _, n := range nodes {
if indeg[n.ID] == 0 {
queue = append(queue, n.ID)
}
}
out := make([]string, 0, len(nodes))
for len(queue) > 0 {
cur := queue[0]
queue = queue[1:]
out = append(out, cur)
for _, child := range graph[cur] {
indeg[child]--
if indeg[child] == 0 {
queue = append(queue, child)
}
}
}
if len(out) != len(nodes) {
// Find a node still with non-zero indeg — that's where the
// cycle is reachable from.
for id, deg := range indeg {
if deg > 0 {
return nil, fmt.Errorf("%w: starting at node %q", ErrCycle, id)
}
}
return nil, ErrCycle
}
return out, nil
}
// detectCycle is the predicate-only variant called from Validate;
// returns the offending node ID + true if a cycle exists.
func detectCycle(nodes []Node) (string, bool) {
_, err := topoSort(nodes)
if err == nil {
return "", false
}
// Best-effort extract — topoSort wraps the cycle-starting ID in
// the error message; for v0 just signal "yes, somewhere."
for _, n := range nodes {
_ = n
}
return "(see runner error for details)", true
}

View File

@ -0,0 +1,284 @@
package workflow
import (
"context"
"errors"
"fmt"
"strings"
"testing"
)
// fixtureEcho returns the input map verbatim. Useful for testing
// runner mechanics without external dependencies.
func fixtureEcho(_ Context, input map[string]any) (map[string]any, error) {
out := make(map[string]any, len(input))
for k, v := range input {
out[k] = v
}
return out, nil
}
// fixtureFail always errors. Useful for testing skip-on-failed-dep.
func fixtureFail(_ Context, _ map[string]any) (map[string]any, error) {
return nil, fmt.Errorf("fixture: intentional failure")
}
// fixtureUpper returns {"upper": strings.ToUpper(input["prompt"])}.
func fixtureUpper(_ Context, input map[string]any) (map[string]any, error) {
prompt, _ := input["prompt"].(string)
return map[string]any{"upper": strings.ToUpper(prompt)}, nil
}
func newTestRunner() *Runner {
r := NewRunner()
r.RegisterMode("fixture.echo", fixtureEcho)
r.RegisterMode("fixture.fail", fixtureFail)
r.RegisterMode("fixture.upper", fixtureUpper)
return r
}
func TestValidate_RequiresName(t *testing.T) {
w := Workflow{Name: "", Nodes: []Node{{ID: "a", Mode: "fixture.echo"}}}
if err := w.Validate(); err == nil {
t.Error("empty name should fail validation")
}
}
func TestValidate_RequiresNodes(t *testing.T) {
w := Workflow{Name: "x"}
if err := w.Validate(); err == nil {
t.Error("empty nodes should fail validation")
}
}
func TestValidate_DuplicateNodeID(t *testing.T) {
w := Workflow{Name: "x", Nodes: []Node{
{ID: "a", Mode: "fixture.echo"},
{ID: "a", Mode: "fixture.echo"},
}}
if err := w.Validate(); !errors.Is(err, ErrDuplicateNodeID) {
t.Errorf("want ErrDuplicateNodeID, got %v", err)
}
}
func TestValidate_MissingDep(t *testing.T) {
w := Workflow{Name: "x", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", DependsOn: []string{"ghost"}},
}}
if err := w.Validate(); !errors.Is(err, ErrMissingDep) {
t.Errorf("want ErrMissingDep, got %v", err)
}
}
func TestValidate_DetectsCycle(t *testing.T) {
w := Workflow{Name: "x", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", DependsOn: []string{"b"}},
{ID: "b", Mode: "fixture.echo", DependsOn: []string{"a"}},
}}
if err := w.Validate(); !errors.Is(err, ErrCycle) {
t.Errorf("want ErrCycle, got %v", err)
}
}
func TestRun_SingleNode(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "single", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", Prompt: "hello"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
if res.Status != StatusSucceeded {
t.Errorf("status: want succeeded, got %q", res.Status)
}
if len(res.Nodes) != 1 {
t.Fatalf("nodes: want 1, got %d", len(res.Nodes))
}
if res.Nodes[0].Output["prompt"] != "hello" {
t.Errorf("echo round-trip: %+v", res.Nodes[0].Output)
}
}
func TestRun_DAG_RefSubstitution(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "chain", Nodes: []Node{
{ID: "shape", Mode: "fixture.upper", Prompt: "hello world"},
{ID: "weakness", Mode: "fixture.echo",
Prompt: "Given $shape.output.upper find issue",
DependsOn: []string{"shape"}},
{ID: "improvement", Mode: "fixture.echo",
Prompt: "Based on $weakness.output.prompt do better",
DependsOn: []string{"weakness"}},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Status != StatusSucceeded {
t.Errorf("status: %q", res.Status)
}
// Order check: shape → weakness → improvement
wantOrder := []string{"shape", "weakness", "improvement"}
for i, want := range wantOrder {
if res.Nodes[i].NodeID != want {
t.Errorf("execution order %d: want %q, got %q", i, want, res.Nodes[i].NodeID)
}
}
// shape uppercases "hello world" → "HELLO WORLD"
if up := res.Nodes[0].Output["upper"]; up != "HELLO WORLD" {
t.Errorf("shape.upper: %q", up)
}
// weakness sees "Given HELLO WORLD find issue" in its prompt
wp, _ := res.Nodes[1].Output["prompt"].(string)
if !strings.Contains(wp, "HELLO WORLD") {
t.Errorf("weakness ref-substitution failed: %q", wp)
}
// improvement sees the SUBSTITUTED weakness prompt
ip, _ := res.Nodes[2].Output["prompt"].(string)
if !strings.Contains(ip, "HELLO WORLD") {
t.Errorf("improvement chain-substitution failed: %q", ip)
}
}
func TestRun_FailedNodeSkipsDownstream(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "skipchain", Nodes: []Node{
{ID: "a", Mode: "fixture.fail"},
{ID: "b", Mode: "fixture.echo", DependsOn: []string{"a"}},
{ID: "c", Mode: "fixture.echo"}, // independent of a — should still run
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
if res.Status != StatusPartial {
t.Errorf("status: want partial, got %q", res.Status)
}
byID := make(map[string]NodeResult)
for _, n := range res.Nodes {
byID[n.NodeID] = n
}
if byID["a"].Error == "" {
t.Error("a should have errored")
}
if byID["b"].Error == "" || !strings.Contains(byID["b"].Error, "upstream") {
t.Errorf("b should be skipped with upstream-failure reason; got %q", byID["b"].Error)
}
if byID["c"].Error != "" {
t.Errorf("c is independent; should run successfully; got error: %q", byID["c"].Error)
}
}
func TestRun_UnknownModeAborts(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "bad", Nodes: []Node{
{ID: "a", Mode: "fixture.does_not_exist"},
}}
res, err := r.Run(context.Background(), w)
if !errors.Is(err, ErrUnknownMode) {
t.Errorf("want ErrUnknownMode, got %v", err)
}
if res.Status != StatusAborted {
t.Errorf("status: want aborted, got %q", res.Status)
}
}
func TestRun_UnresolvedReferenceErrors(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "badref", Nodes: []Node{
{ID: "a", Mode: "fixture.echo",
Prompt: "references $ghost.output but ghost doesn't exist"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatalf("Run: %v", err)
}
if res.Nodes[0].Error == "" {
t.Error("unresolved $ghost should error the node")
}
if !strings.Contains(res.Nodes[0].Error, "no such node") {
t.Errorf("error should explain no-such-node; got %q", res.Nodes[0].Error)
}
}
func TestRun_ImplicitLLMChatFallback(t *testing.T) {
r := NewRunner()
r.RegisterMode("llm.chat", fixtureEcho) // pretend llm.chat exists
w := Workflow{Name: "implicit", Nodes: []Node{
{ID: "a", Prompt: "no Mode field — should default to llm.chat"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
if res.Status != StatusSucceeded {
t.Errorf("implicit llm.chat: status %q", res.Status)
}
if res.Nodes[0].Mode != "llm.chat" {
t.Errorf("effective mode: want llm.chat, got %q", res.Nodes[0].Mode)
}
}
func TestRun_ProvenanceRecording(t *testing.T) {
r := newTestRunner()
w := Workflow{Name: "trace", Nodes: []Node{
{ID: "x", Mode: "fixture.echo", Prompt: "trace me"},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
n := res.Nodes[0]
if n.NodeID != "x" || n.Mode != "fixture.echo" {
t.Errorf("provenance: node=%q mode=%q", n.NodeID, n.Mode)
}
if n.StartedAt.IsZero() {
t.Error("started_at should be set")
}
if n.DurationMs < 0 {
t.Errorf("duration_ms: %d", n.DurationMs)
}
}
func TestRun_InputsResolveRefs(t *testing.T) {
// Verify that node.Inputs (not just Prompt) honors $-substitution.
r := newTestRunner()
w := Workflow{Name: "inputs", Nodes: []Node{
{ID: "a", Mode: "fixture.echo", Prompt: "first"},
{ID: "b", Mode: "fixture.echo",
Inputs: map[string]any{
"copied": "$a.output.prompt",
"static": "literal",
},
DependsOn: []string{"a"}},
}}
res, err := r.Run(context.Background(), w)
if err != nil {
t.Fatal(err)
}
bOut := res.Nodes[1].Output
if bOut["copied"] != "first" {
t.Errorf("inputs ref: want 'first', got %q", bOut["copied"])
}
if bOut["static"] != "literal" {
t.Errorf("inputs static: want 'literal', got %q", bOut["static"])
}
}
func TestTopoSort_Stable(t *testing.T) {
// Independent nodes preserve their declaration order.
nodes := []Node{
{ID: "z"}, {ID: "y"}, {ID: "x"},
}
got, err := topoSort(nodes)
if err != nil {
t.Fatal(err)
}
want := []string{"z", "y", "x"}
for i := range want {
if got[i] != want[i] {
t.Errorf("position %d: want %q, got %q", i, want[i], got[i])
}
}
}

172
internal/workflow/types.go Normal file
View File

@ -0,0 +1,172 @@
// Package workflow is the Observer-KB workflow runner per SPEC §3.8 —
// the orchestrator that chains §3.4 modes (matrix.search, relevance,
// downgrade, distillation.score, drift.scorer) plus free-form llm.chat
// into multi-pass measurement pipelines.
//
// The architectural intent is documented in PRD's "Observer as system
// resource" section: workflows ARE observation patterns whose every
// step is recorded as an ObservedOp via observerd. The mode catalog
// is the registry of capabilities; the runner is the engine that
// composes them.
//
// First slice (this commit): types + DAG runner + reference
// substitution + a fixture.echo mode for testing the mechanics.
// Real-mode integrations (matrix.search, distillation.score, etc.)
// land in follow-up commits.
//
// YAML shape mirrors /home/profit/lakehouse/.archon/workflows/
// lakehouse-architect-review.yaml so existing Archon workflows load
// directly, with one Go-side addition: an optional `mode` field on
// each node so the runner can dispatch to non-LLM modes.
package workflow
import (
"context"
"errors"
"fmt"
"time"
)
// Workflow is one loadable workflow definition. Matches Archon's
// YAML shape; Provider + Model are informational in v0 (only used
// by llm.chat-style modes that need a backend) and ignored by other
// modes.
type Workflow struct {
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description"`
Provider string `yaml:"provider" json:"provider,omitempty"`
Model string `yaml:"model" json:"model,omitempty"`
Nodes []Node `yaml:"nodes" json:"nodes"`
}
// Node is one step in the workflow DAG. ID must be unique within a
// workflow; DependsOn lists the IDs of nodes that must complete
// before this one runs.
//
// Mode is the registered capability the node dispatches to. When
// omitted, the runner assumes "llm.chat" using the workflow's
// Provider+Model (matching Archon's implicit-LLM convention).
//
// Inputs is a free-form map passed to the mode after $-reference
// substitution. The Prompt field is a convenience — it's added to
// the input map under the key "prompt" before mode dispatch, so
// llm.chat-style modes get free-form text without a wrapping object.
type Node struct {
ID string `yaml:"id" json:"id"`
Mode string `yaml:"mode" json:"mode,omitempty"`
Prompt string `yaml:"prompt" json:"prompt,omitempty"`
Inputs map[string]any `yaml:"inputs" json:"inputs,omitempty"`
AllowedTools []string `yaml:"allowed_tools" json:"allowed_tools,omitempty"`
Effort string `yaml:"effort" json:"effort,omitempty"`
IdleTimeoutMs int `yaml:"idle_timeout" json:"idle_timeout,omitempty"`
DependsOn []string `yaml:"depends_on" json:"depends_on,omitempty"`
}
// NodeResult captures one node's execution outcome. Output is the
// mode's return map; Error is non-nil iff the mode returned an
// error. StartedAt + DurationMs feed observerd's provenance recording.
type NodeResult struct {
NodeID string `json:"node_id"`
Mode string `json:"mode"`
Output map[string]any `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
DurationMs int64 `json:"duration_ms"`
}
// RunResult is the full workflow execution outcome — every node's
// result in execution order, plus the workflow name and a summary
// status (succeeded if every node ran without error, partial if any
// errored).
type RunResult struct {
Workflow string `json:"workflow"`
Status RunStatus `json:"status"`
Nodes []NodeResult `json:"nodes"`
StartedAt time.Time `json:"started_at"`
DurationMs int64 `json:"duration_ms"`
}
// RunStatus tags the overall workflow outcome.
type RunStatus string
const (
StatusSucceeded RunStatus = "succeeded"
StatusPartial RunStatus = "partial" // some nodes errored, others succeeded
StatusAborted RunStatus = "aborted" // hard error halted execution (cycle, missing dep, unknown mode)
)
// Mode is the function signature every registered capability honors.
// Input + output are generic maps so workflows compose freely; the
// mode function is responsible for shape-checking its own inputs.
//
// Returning an error doesn't abort the whole workflow — the runner
// records the error in NodeResult and continues with downstream
// nodes that don't depend on this one. That mirrors observerd's
// "log + continue" partial-failure semantics so a single mode bug
// doesn't kill a 7-node measurement chain.
type Mode func(ctx Context, input map[string]any) (map[string]any, error)
// Context is what a Mode receives. Carries the standard Go
// context.Context (for cancellation) plus a workflow-scoped
// metadata bag for cross-mode coordination (e.g. a workflow's
// model hint that llm.chat-style modes consume).
type Context struct {
Ctx context.Context
// WorkflowName is the parent workflow.Name — useful when a mode
// records ObservedOps so the source can be traced back to the
// workflow that triggered it.
WorkflowName string
// NodeID is the currently-executing node — paired with
// WorkflowName forms a unique provenance key.
NodeID string
// Provider + Model carry the workflow's defaults; modes that
// need them (llm.chat) pull from here, others ignore.
Provider string
Model string
}
// Errors surfaced to callers. Cycle / missing-dependency / unknown-
// mode are *aborting* errors — the runner can't proceed. Per-node
// mode errors are recorded but don't abort.
var (
ErrCycle = errors.New("workflow: dependency cycle detected")
ErrMissingDep = errors.New("workflow: node depends on unknown id")
ErrUnknownMode = errors.New("workflow: unknown mode")
ErrDuplicateNodeID = errors.New("workflow: duplicate node id")
ErrUnresolvedRef = errors.New("workflow: unresolved $node.output reference")
)
// Validate checks structural invariants on a Workflow before
// execution: unique node IDs, every depends_on points to a known
// id, no cycles. Returns nil on success or a wrapped sentinel.
func (w Workflow) Validate() error {
if w.Name == "" {
return fmt.Errorf("workflow: name is required")
}
if len(w.Nodes) == 0 {
return fmt.Errorf("workflow: at least one node required")
}
seen := make(map[string]struct{}, len(w.Nodes))
for _, n := range w.Nodes {
if n.ID == "" {
return fmt.Errorf("workflow: node id must be non-empty")
}
if _, dup := seen[n.ID]; dup {
return fmt.Errorf("%w: %q", ErrDuplicateNodeID, n.ID)
}
seen[n.ID] = struct{}{}
}
for _, n := range w.Nodes {
for _, dep := range n.DependsOn {
if _, ok := seen[dep]; !ok {
return fmt.Errorf("%w: node %q depends on %q (no such node)",
ErrMissingDep, n.ID, dep)
}
}
}
if cyclicID, ok := detectCycle(w.Nodes); ok {
return fmt.Errorf("%w: starting at node %q", ErrCycle, cyclicID)
}
return nil
}

150
scripts/workflow_smoke.sh Executable file
View File

@ -0,0 +1,150 @@
#!/usr/bin/env bash
# Workflow smoke — Observer-KB workflow runner end-to-end (SPEC §3.8
# first slice). All assertions go through gateway :3110.
#
# Validates:
# - GET /observer/workflow/modes lists fixture.echo + fixture.upper
# - POST /observer/workflow/run executes a 3-node DAG with $-ref
# substitution: shape (uppercase) → weakness → improvement
# - Each node's execution lands an ObservedOp via the observer
# ring (visible in /observer/stats with source="workflow")
# - Aborting case: unknown mode → 400 with helpful error
# - Skip cascade: node with failed dep gets skipped, independent
# siblings still run
set -euo pipefail
cd "$(dirname "$0")/.."
export PATH="$PATH:/usr/local/go/bin"
echo "[workflow-smoke] building observerd + gateway..."
go build -o bin/ ./cmd/observerd ./cmd/gateway
pkill -f "bin/(observerd|gateway)" 2>/dev/null || true
sleep 0.3
PIDS=()
TMP="$(mktemp -d)"
CFG="$TMP/workflow.toml"
cleanup() {
echo "[workflow-smoke] cleanup"
for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done
rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
cat > "$CFG" <<EOF
[gateway]
bind = "127.0.0.1:3110"
storaged_url = "http://127.0.0.1:3211"
catalogd_url = "http://127.0.0.1:3212"
ingestd_url = "http://127.0.0.1:3213"
queryd_url = "http://127.0.0.1:3214"
vectord_url = "http://127.0.0.1:3215"
embedd_url = "http://127.0.0.1:3216"
pathwayd_url = "http://127.0.0.1:3217"
matrixd_url = "http://127.0.0.1:3218"
observerd_url = "http://127.0.0.1:3219"
[observerd]
bind = "127.0.0.1:3219"
EOF
poll_health() {
local port="$1" deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
sleep 0.05
done
return 1
}
echo "[workflow-smoke] launching observerd → gateway..."
./bin/observerd -config "$CFG" > /tmp/observerd.log 2>&1 &
PIDS+=($!)
poll_health 3219 || { echo "observerd failed"; tail /tmp/observerd.log; exit 1; }
./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 &
PIDS+=($!)
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
FAILED=0
# ── 1. /observer/workflow/modes lists registered modes ────────────
echo "[workflow-smoke] /observer/workflow/modes lists fixture modes:"
RESP="$(curl -sS http://127.0.0.1:3110/v1/observer/workflow/modes)"
HAS_ECHO="$(echo "$RESP" | jq -r '.modes | index("fixture.echo") != null')"
HAS_UPPER="$(echo "$RESP" | jq -r '.modes | index("fixture.upper") != null')"
if [ "$HAS_ECHO" = "true" ] && [ "$HAS_UPPER" = "true" ]; then
echo " ✓ fixture.echo + fixture.upper registered"
else
echo " ✗ resp: $RESP"; FAILED=1
fi
# ── 2. 3-node DAG with $-ref substitution ─────────────────────────
echo "[workflow-smoke] 3-node DAG: shape (upper) → weakness → improvement"
WORKFLOW='{
"workflow": {
"name": "smoke-chain",
"description": "DAG ref substitution test",
"nodes": [
{"id":"shape", "mode":"fixture.upper", "prompt":"hello world"},
{"id":"weakness", "mode":"fixture.echo",
"prompt":"observed shape: $shape.output.upper",
"depends_on":["shape"]},
{"id":"improvement", "mode":"fixture.echo",
"prompt":"based on $weakness.output.prompt do better",
"depends_on":["weakness"]}
]
}
}'
RUN="$(curl -sS -X POST http://127.0.0.1:3110/v1/observer/workflow/run \
-H 'Content-Type: application/json' -d "$WORKFLOW")"
STATUS="$(echo "$RUN" | jq -r '.status')"
SHAPE_UPPER="$(echo "$RUN" | jq -r '.nodes[0].output.upper')"
WEAK_PROMPT="$(echo "$RUN" | jq -r '.nodes[1].output.prompt')"
IMP_PROMPT="$(echo "$RUN" | jq -r '.nodes[2].output.prompt')"
if [ "$STATUS" = "succeeded" ] && [ "$SHAPE_UPPER" = "HELLO WORLD" ] \
&& [[ "$WEAK_PROMPT" == *"HELLO WORLD"* ]] \
&& [[ "$IMP_PROMPT" == *"HELLO WORLD"* ]]; then
echo " ✓ status=succeeded · shape=HELLO WORLD · refs propagated through 3-node chain"
else
echo " ✗ status=$STATUS shape=$SHAPE_UPPER weak=$WEAK_PROMPT imp=$IMP_PROMPT"
echo " full: $RUN"
FAILED=1
fi
# ── 3. Per-node provenance recorded as ObservedOps ────────────────
echo "[workflow-smoke] /observer/stats reflects workflow ops:"
STATS="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)"
WORKFLOW_OPS="$(echo "$STATS" | jq -r '.by_source.workflow // 0')"
TOTAL="$(echo "$STATS" | jq -r '.total')"
if [ "$WORKFLOW_OPS" = "3" ] && [ "$TOTAL" = "3" ]; then
echo " ✓ 3 workflow ops recorded (one per node), total=3"
else
echo " ✗ workflow=$WORKFLOW_OPS total=$TOTAL"
echo " full: $STATS"; FAILED=1
fi
# ── 4. Unknown mode → 400 ─────────────────────────────────────────
echo "[workflow-smoke] unknown mode → 400:"
HTTP="$(curl -sS -o /tmp/wf_bad.json -w '%{http_code}' -X POST \
http://127.0.0.1:3110/v1/observer/workflow/run \
-H 'Content-Type: application/json' \
-d '{"workflow":{"name":"bad","nodes":[{"id":"a","mode":"does.not.exist"}]}}')"
ERR="$(jq -r '.error' < /tmp/wf_bad.json 2>/dev/null)"
if [ "$HTTP" = "400" ] && echo "$ERR" | grep -qi "unknown mode"; then
echo " ✓ unknown mode aborts with 400 + helpful error"
else
echo " ✗ http=$HTTP err=$ERR"; FAILED=1
fi
if [ "$FAILED" -eq 0 ]; then
echo "[workflow-smoke] Workflow runner acceptance: PASSED"
exit 0
else
echo "[workflow-smoke] Workflow runner acceptance: FAILED"
exit 1
fi