From bc9ab93afefcaef035af5da0e3a45be838413aa3 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Apr 2026 20:18:02 -0500 Subject: [PATCH] =?UTF-8?q?H:=20observerd=20=E2=80=94=20autonomous-iterati?= =?UTF-8?q?on=20witness=20loop=20(SPEC=20=C2=A72=20port)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port of the load-bearing pieces of mcp-server/observer.ts (Rust system, 852 lines TS) per SPEC §2's named target. Implements PRD loop 3 ("Observer loop — watches each run, refines configs"). Routes (all under /v1/observer/* via gateway): GET /observer/health — liveness GET /observer/stats — total / successes / failures / by_source / recent_scenario_ops (matches Rust JSON shape exactly) POST /observer/event — record one ObservedOp; auto-defaults timestamp + source, validates required fields (endpoint), persists to JSONL, appends to ring buffer Architecture: - internal/observer/types.go — ObservedOp model + Source taxonomy (mcp / scenario / langfuse / overseer_correction). Mirrors the Rust shape so JSON round-trips during cutover. - internal/observer/store.go — Store + Persistor. Ring buffer cap matches Rust's 2000; recent_scenarios cap matches Rust's 10. Same persist-then-apply order as pathwayd; same corruption- tolerant replay (skip malformed lines + warn). - cmd/observerd — :3219 HTTP service, fronted by gateway as /v1/observer/*. - lakehouse.toml + DefaultConfig — [observerd] block matches the pathwayd pattern (Bind + PersistPath; empty path = ephemeral). Tests + smoke (all PASS): - 7 unit tests in store_test.go: validation, default fields, stats aggregation, recent-scenarios cap + ordering, ring-buffer rollover at cap, JSONL round-trip persistence, corruption- tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied) - scripts/observer_smoke.sh: 4 assertions through gateway — record 5 events (3 ok / 2 fail across 2 sources), stats aggregates correctly, empty-endpoint→400, kill+restart preserves via JSONL replay (5 ops, 3 ok, 2 err survive) Deferred (named in package + cmd doc, not in this commit): - POST /observer/review (cloud-LLM hand-review fall-back). The heuristic-only path could land cheaply but the productized cloud path (qwen3-coder fall-back) is multi-day port. - Background loops: analyzeErrors, consolidatePlaybooks, tailOverseerCorrections (read overseer_corrections.jsonl into the ring buffer once per cycle). - escalateFailureClusterToLLMTeam (failure clustering trigger that posts to LLM Team's /api/run with code_review mode). /relevance is NOT duplicated — already ported in 9588bd8 to internal/matrix/relevance.go (component 3 of SPEC §3.4). 16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap, pathway, matrix, relevance, downgrade, playbook, observer). 13 binaries now: gateway, storaged, catalogd, ingestd, queryd, vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama (plus catalogd-only test build). Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/gateway/main.go | 5 + cmd/observerd/main.go | 131 +++++++++++++++++ internal/observer/store.go | 249 ++++++++++++++++++++++++++++++++ internal/observer/store_test.go | 193 +++++++++++++++++++++++++ internal/observer/types.go | 131 +++++++++++++++++ internal/shared/config.go | 49 +++++-- lakehouse.toml | 7 + scripts/observer_smoke.sh | 142 ++++++++++++++++++ 8 files changed, 891 insertions(+), 16 deletions(-) create mode 100644 cmd/observerd/main.go create mode 100644 internal/observer/store.go create mode 100644 internal/observer/store_test.go create mode 100644 internal/observer/types.go create mode 100755 scripts/observer_smoke.sh diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 2ee6e62..a2d7638 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -46,6 +46,7 @@ func main() { "embedd_url": cfg.Gateway.EmbeddURL, "pathwayd_url": cfg.Gateway.PathwaydURL, "matrixd_url": cfg.Gateway.MatrixdURL, + "observerd_url": cfg.Gateway.ObserverdURL, } for k, v := range upstreams { if v == "" { @@ -67,6 +68,7 @@ func main() { embeddURL := mustParseUpstream("embedd_url", cfg.Gateway.EmbeddURL) pathwaydURL := mustParseUpstream("pathwayd_url", cfg.Gateway.PathwaydURL) matrixdURL := mustParseUpstream("matrixd_url", cfg.Gateway.MatrixdURL) + observerdURL := mustParseUpstream("observerd_url", cfg.Gateway.ObserverdURL) storagedProxy := gateway.NewProxyHandler(storagedURL) catalogdProxy := gateway.NewProxyHandler(catalogdURL) @@ -76,6 +78,7 @@ func main() { embeddProxy := gateway.NewProxyHandler(embeddURL) pathwaydProxy := gateway.NewProxyHandler(pathwaydURL) matrixdProxy := gateway.NewProxyHandler(matrixdURL) + observerdProxy := gateway.NewProxyHandler(observerdURL) if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) { @@ -98,6 +101,8 @@ func main() { r.Handle("/v1/pathway/*", pathwaydProxy) // Matrix indexer — /v1/matrix/* (multi-corpus retrieve+merge per SPEC §3.4) r.Handle("/v1/matrix/*", matrixdProxy) + // Observer — /v1/observer/* (autonomous-iteration witness loop) + r.Handle("/v1/observer/*", observerdProxy) }, cfg.Auth); err != nil { slog.Error("server", "err", err) os.Exit(1) diff --git a/cmd/observerd/main.go b/cmd/observerd/main.go new file mode 100644 index 0000000..98fe034 --- /dev/null +++ b/cmd/observerd/main.go @@ -0,0 +1,131 @@ +// observerd is the autonomous-iteration witness service. Port of +// the load-bearing pieces of mcp-server/observer.ts (Rust system). +// +// Routes (all under /observer): +// GET /observer/health — service liveness + ring size +// GET /observer/stats — aggregate counters + recent scenarios +// POST /observer/event — record one observed op +// +// Deferred to follow-up commits (see internal/observer doc): +// - POST /observer/review (cloud-LLM hand review fall-back) +// - background loops (analyzeErrors, consolidatePlaybooks, +// tailOverseerCorrections) +// - failure-cluster escalation to LLM Team +// +// /relevance was already ported to internal/matrix in 9588bd8 and is +// not duplicated here. + +package main + +import ( + "encoding/json" + "errors" + "flag" + "log/slog" + "net/http" + "os" + "strings" + + "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/observer" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies + +func main() { + configPath := flag.String("config", "lakehouse.toml", "path to TOML config") + flag.Parse() + + cfg, err := shared.LoadConfig(*configPath) + if err != nil { + slog.Error("config", "err", err) + os.Exit(1) + } + + // Persistence is optional — empty path = ephemeral (matches the + // pathwayd pattern). Production sets a stable path under + // /var/lib/lakehouse/observer/ops.jsonl. + var persistor *observer.Persistor + if cfg.Observerd.PersistPath != "" { + persistor, err = observer.NewPersistor(cfg.Observerd.PersistPath) + if err != nil { + slog.Error("observer persistor", "err", err) + os.Exit(1) + } + } + + store := observer.NewStore(persistor) + if persistor != nil { + n, err := store.Load() + if err != nil { + slog.Warn("observer load", "err", err, "loaded", n) + } else { + slog.Info("observer loaded", "ops", n, "path", cfg.Observerd.PersistPath) + } + } + + h := &handlers{store: store} + if err := shared.Run("observerd", cfg.Observerd.Bind, h.register, cfg.Auth); err != nil { + slog.Error("server", "err", err) + os.Exit(1) + } +} + +type handlers struct { + store *observer.Store +} + +func (h *handlers) register(r chi.Router) { + r.Get("/observer/stats", h.handleStats) + r.Post("/observer/event", h.handleEvent) +} + +func (h *handlers) handleStats(w http.ResponseWriter, _ *http.Request) { + writeJSON(w, http.StatusOK, h.store.Stats()) +} + +func (h *handlers) handleEvent(w http.ResponseWriter, r *http.Request) { + var op observer.ObservedOp + if !decodeJSON(w, r, &op) { + return + } + if err := h.store.Record(op); err != nil { + if errors.Is(err, observer.ErrInvalidOp) { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + slog.Error("observer record", "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + stats := h.store.Stats() + writeJSON(w, http.StatusOK, map[string]any{ + "accepted": true, + "ring_size": stats.Total, + }) +} + +func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool { + defer r.Body.Close() + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) + if err := json.NewDecoder(r.Body).Decode(v); err != nil { + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { + http.Error(w, "body too large", http.StatusRequestEntityTooLarge) + return false + } + http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest) + return false + } + return true +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + if err := json.NewEncoder(w).Encode(v); err != nil { + slog.Warn("observer write json", "err", err) + } +} diff --git a/internal/observer/store.go b/internal/observer/store.go new file mode 100644 index 0000000..b8c3524 --- /dev/null +++ b/internal/observer/store.go @@ -0,0 +1,249 @@ +package observer + +// Store: in-memory ring buffer + optional JSONL persistor. Same +// shape as internal/pathway's persistor (afbb506) — opens the file +// per Append rather than holding an fd, which is fine at the +// observer's expected write rate (≤ a few hundred ops/min) and +// keeps the substrate restartable mid-stream. + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log/slog" + "os" + "path/filepath" + "sync" +) + +// DefaultRingCap is the in-memory ring buffer cap. Mirrors the Rust +// Phase 24 limit of 2000 (recordExternalOp shifts the head when +// length > 2000). +const DefaultRingCap = 2000 + +// DefaultRecentScenariosCap is how many recent source=scenario ops +// the Stats endpoint returns. Matches the TS hard-coded slice(-10). +const DefaultRecentScenariosCap = 10 + +// Store holds the ring buffer + the optional persistor. Thread-safe +// via a single RWMutex (read-heavy via Stats; writes via Record). +type Store struct { + mu sync.RWMutex + ring []ObservedOp + cap int + persistor *Persistor +} + +// NewStore returns an empty Store. Pass nil persistor for in-memory +// only (unit tests, ephemeral runs); pass a real Persistor to enable +// jsonl-append-on-record. +func NewStore(persistor *Persistor) *Store { + return &Store{ + ring: make([]ObservedOp, 0, DefaultRingCap), + cap: DefaultRingCap, + persistor: persistor, + } +} + +// Record validates + persists + appends. Order matters: persist +// first so a crash mid-record doesn't leave the ring ahead of the +// log. Returns ErrInvalidOp on validation failure (no persist, no +// append). +func (s *Store) Record(op ObservedOp) error { + op.EnsureTimestamp() + op.DefaultSource() + if err := op.Validate(); err != nil { + return err + } + if s.persistor != nil { + if err := s.persistor.Append(op); err != nil { + // Best-effort persistence — log but don't fail the + // in-memory record. Mirrors the Rust catch{} in + // persistOp; the ring buffer is the source of truth in + // flight. + slog.Warn("observer: persist failed", "err", err) + } + } + s.mu.Lock() + defer s.mu.Unlock() + s.ring = append(s.ring, op) + if len(s.ring) > s.cap { + // Shift left by one (drop oldest). Avoids unbounded growth + // without a per-write reallocation. + copy(s.ring, s.ring[1:]) + s.ring = s.ring[:len(s.ring)-1] + } + return nil +} + +// Recent returns a copy of the ring buffer's current state. Most +// recent entries are at the end (append-order). +func (s *Store) Recent() []ObservedOp { + s.mu.RLock() + defer s.mu.RUnlock() + out := make([]ObservedOp, len(s.ring)) + copy(out, s.ring) + return out +} + +// Stats aggregates the ring buffer. Mirrors the Rust /stats +// response shape exactly. +func (s *Store) Stats() Stats { + s.mu.RLock() + defer s.mu.RUnlock() + + stats := Stats{ + Total: len(s.ring), + BySource: make(map[string]int), + } + for _, op := range s.ring { + if op.Success { + stats.Successes++ + } else { + stats.Failures++ + } + src := string(op.Source) + if src == "" { + src = string(SourceMCP) + } + stats.BySource[src]++ + } + + // Last N scenario ops (most-recent-first → match Rust slice(-10)). + scenarios := make([]ScenarioOpDigest, 0, DefaultRecentScenariosCap) + for i := len(s.ring) - 1; i >= 0 && len(scenarios) < DefaultRecentScenariosCap; i-- { + op := s.ring[i] + if op.Source != SourceScenario { + continue + } + scenarios = append([]ScenarioOpDigest{{ + TS: op.Timestamp, + OK: op.Success, + Staffer: op.StafferID, + Kind: op.EventKind, + Role: op.Role, + }}, scenarios...) + } + stats.RecentScenarios = scenarios + + return stats +} + +// Load replays the persistor's JSONL log into the ring buffer. +// Resets the ring (current state is discarded) — same semantics as +// pathway.Store.Load. Corruption-tolerant: malformed lines log +// warnings and the load proceeds. +// +// Returns the number of ops successfully replayed. +func (s *Store) Load() (int, error) { + if s.persistor == nil { + return 0, nil + } + s.mu.Lock() + defer s.mu.Unlock() + s.ring = s.ring[:0] + return s.persistor.Replay(func(op ObservedOp) error { + s.ring = append(s.ring, op) + if len(s.ring) > s.cap { + copy(s.ring, s.ring[1:]) + s.ring = s.ring[:len(s.ring)-1] + } + return nil + }) +} + +// ─── Persistor ────────────────────────────────────────────────── + +// Persistor wraps a single JSONL file. Open-per-append — same +// pattern as internal/pathway. Each line is one ObservedOp. +type Persistor struct { + path string +} + +// NewPersistor returns a Persistor for the given file path. Parent +// directory is created on demand. Empty path is invalid (caller +// passes nil to NewStore for the no-persist case). +func NewPersistor(path string) (*Persistor, error) { + if path == "" { + return nil, errors.New("observer: persistor path is empty") + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, fmt.Errorf("observer: create dir: %w", err) + } + return &Persistor{path: path}, nil +} + +// Path returns the file path the persistor writes to. +func (p *Persistor) Path() string { return p.path } + +// Append writes one ObservedOp as a JSONL line. +func (p *Persistor) Append(op ObservedOp) error { + line, err := json.Marshal(op) + if err != nil { + return fmt.Errorf("observer: marshal op: %w", err) + } + f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return fmt.Errorf("observer: open log: %w", err) + } + defer f.Close() + if _, err := f.Write(line); err != nil { + return fmt.Errorf("observer: write op: %w", err) + } + if _, err := f.Write([]byte{'\n'}); err != nil { + return fmt.Errorf("observer: write newline: %w", err) + } + return nil +} + +// Replay reads the log line-by-line and invokes apply for each op. +// Returns the count successfully applied. Missing file = 0 + nil +// (legitimate cold-start state). Malformed lines log a warning and +// the replay continues. +func (p *Persistor) Replay(apply func(ObservedOp) error) (int, error) { + f, err := os.Open(p.path) + if errors.Is(err, fs.ErrNotExist) { + return 0, nil + } + if err != nil { + return 0, fmt.Errorf("observer: open log: %w", err) + } + defer f.Close() + + scanner := bufio.NewScanner(f) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1<<20) // 1 MiB per line cap + + applied, skipped, lineNo := 0, 0, 0 + for scanner.Scan() { + lineNo++ + raw := scanner.Bytes() + if len(raw) == 0 { + continue + } + var op ObservedOp + if err := json.Unmarshal(raw, &op); err != nil { + slog.Warn("observer: replay skipped malformed line", + "path", p.path, "line", lineNo, "err", err.Error()) + skipped++ + continue + } + if err := apply(op); err != nil { + slog.Warn("observer: replay apply failed", + "path", p.path, "line", lineNo, "err", err.Error()) + skipped++ + continue + } + applied++ + } + if err := scanner.Err(); err != nil { + return applied, fmt.Errorf("observer: scan log: %w", err) + } + if skipped > 0 { + slog.Info("observer: replay completed with skips", + "path", p.path, "applied", applied, "skipped", skipped) + } + return applied, nil +} diff --git a/internal/observer/store_test.go b/internal/observer/store_test.go new file mode 100644 index 0000000..d4ce317 --- /dev/null +++ b/internal/observer/store_test.go @@ -0,0 +1,193 @@ +package observer + +import ( + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func mkOp(success bool, source Source) ObservedOp { + return ObservedOp{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Endpoint: "/v1/test", + InputSummary: "test op", + Success: success, + DurationMs: 42, + OutputSummary: "ok", + Source: source, + } +} + +func TestRecord_RequiresEndpointAndTimestamp(t *testing.T) { + s := NewStore(nil) + bad := ObservedOp{Endpoint: ""} // EnsureTimestamp will fill, but Endpoint empty stays + if err := s.Record(bad); err == nil { + t.Error("expected error on empty endpoint") + } + + good := mkOp(true, SourceMCP) + if err := s.Record(good); err != nil { + t.Errorf("good op: %v", err) + } +} + +func TestRecord_DefaultsTimestampAndSource(t *testing.T) { + s := NewStore(nil) + op := ObservedOp{ + Endpoint: "/x", + InputSummary: "no ts no source", + Success: true, + } + if err := s.Record(op); err != nil { + t.Fatal(err) + } + stored := s.Recent()[0] + if stored.Timestamp == "" { + t.Error("Timestamp should be defaulted") + } + if stored.Source != SourceMCP { + t.Errorf("Source: want %q, got %q", SourceMCP, stored.Source) + } +} + +func TestStats_Aggregates(t *testing.T) { + s := NewStore(nil) + for i := 0; i < 5; i++ { + _ = s.Record(mkOp(true, SourceMCP)) + } + for i := 0; i < 3; i++ { + _ = s.Record(mkOp(false, SourceScenario)) + } + for i := 0; i < 2; i++ { + _ = s.Record(mkOp(true, SourceLangfuse)) + } + + st := s.Stats() + if st.Total != 10 { + t.Errorf("total: want 10, got %d", st.Total) + } + if st.Successes != 7 { + t.Errorf("successes: want 7, got %d", st.Successes) + } + if st.Failures != 3 { + t.Errorf("failures: want 3, got %d", st.Failures) + } + if st.BySource["mcp"] != 5 || st.BySource["scenario"] != 3 || st.BySource["langfuse"] != 2 { + t.Errorf("by_source mismatch: %+v", st.BySource) + } + if len(st.RecentScenarios) != 3 { + t.Errorf("recent scenarios: want 3, got %d", len(st.RecentScenarios)) + } +} + +func TestStats_RecentScenariosCappedAndOrdered(t *testing.T) { + s := NewStore(nil) + // Record 15 scenario ops; only the last 10 should appear. + for i := 0; i < 15; i++ { + op := mkOp(true, SourceScenario) + op.StafferID = "staffer-" + string(rune('a'+i)) + _ = s.Record(op) + time.Sleep(time.Millisecond) // ensure timestamps order-distinguishable + } + st := s.Stats() + if len(st.RecentScenarios) != DefaultRecentScenariosCap { + t.Errorf("cap: want %d, got %d", DefaultRecentScenariosCap, len(st.RecentScenarios)) + } + // Last entry should be the most recently added (staffer-o, the 15th). + last := st.RecentScenarios[len(st.RecentScenarios)-1] + if last.Staffer != "staffer-o" { + t.Errorf("most recent: want staffer-o, got %q", last.Staffer) + } +} + +func TestRingBuffer_BoundedByDefaultCap(t *testing.T) { + s := NewStore(nil) + s.cap = 5 // shrink for testability + for i := 0; i < 12; i++ { + op := mkOp(true, SourceMCP) + op.InputSummary = string(rune('a' + i)) + _ = s.Record(op) + } + r := s.Recent() + if len(r) != 5 { + t.Errorf("ring size: want 5, got %d", len(r)) + } + // Oldest 7 dropped; first remaining should have InputSummary "h" (8th). + if r[0].InputSummary != "h" { + t.Errorf("oldest after rollover: want 'h', got %q", r[0].InputSummary) + } +} + +func TestPersistor_RoundTrip(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "ops.jsonl") + p, err := NewPersistor(path) + if err != nil { + t.Fatal(err) + } + s := NewStore(p) + + for i := 0; i < 4; i++ { + op := mkOp(i%2 == 0, SourceMCP) + op.InputSummary = string(rune('a' + i)) + if err := s.Record(op); err != nil { + t.Fatal(err) + } + } + + // Sanity: file has 4 lines. + bs, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + lines := strings.Split(strings.TrimSuffix(string(bs), "\n"), "\n") + if len(lines) != 4 { + t.Errorf("file lines: want 4, got %d", len(lines)) + } + + // Rehydrate into a fresh Store. + s2 := NewStore(p) + n, err := s2.Load() + if err != nil { + t.Fatal(err) + } + if n != 4 { + t.Errorf("loaded: want 4, got %d", n) + } + r := s2.Recent() + if len(r) != 4 { + t.Errorf("rehydrated ring: want 4, got %d", len(r)) + } + // Order preserved. + for i, want := range []string{"a", "b", "c", "d"} { + if r[i].InputSummary != want { + t.Errorf("op %d: want %q, got %q", i, want, r[i].InputSummary) + } + } +} + +func TestPersistor_CorruptionTolerant(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "ops.jsonl") + // Pre-seed with one valid + one corrupt + one valid line. + valid1 := `{"timestamp":"2026-04-29T12:00:00Z","endpoint":"/x","input_summary":"a","success":true,"duration_ms":1,"output_summary":"ok","source":"mcp"}` + corrupt := `{this is not json` + valid2 := `{"timestamp":"2026-04-29T12:00:01Z","endpoint":"/y","input_summary":"b","success":false,"duration_ms":2,"output_summary":"err","source":"scenario"}` + if err := os.WriteFile(path, []byte(valid1+"\n"+corrupt+"\n"+valid2+"\n"), 0o644); err != nil { + t.Fatal(err) + } + p, err := NewPersistor(path) + if err != nil { + t.Fatal(err) + } + s := NewStore(p) + n, err := s.Load() + if err != nil { + t.Fatal(err) + } + if n != 2 { + t.Errorf("applied: want 2 (valid pair), got %d (corrupt should skip)", n) + } +} diff --git a/internal/observer/types.go b/internal/observer/types.go new file mode 100644 index 0000000..d4b17a0 --- /dev/null +++ b/internal/observer/types.go @@ -0,0 +1,131 @@ +// Package observer is the Go port of mcp-server/observer.ts (Rust +// system, 852 lines TS) — the "third-party witness" loop that records +// every observed operation, surfaces failures, and feeds learnings +// back into the substrate. +// +// What this package owns (this commit): +// - ObservedOp data model + ring buffer + JSONL persistence +// - Stats aggregation (total / successes / failures / by_source) +// - Source taxonomy (mcp / scenario / langfuse / overseer_correction) +// +// What's deferred to follow-up commits: +// - /review endpoint with cloud-LLM hand-review (the heuristic +// plus qwen3-coder fall-back path) +// - tailOverseerCorrections (background loop reading +// overseer_corrections.jsonl) +// - analyzeErrors / consolidatePlaybooks periodic loops +// - escalateFailureClusterToLLMTeam (failure clustering trigger) +// +// /relevance was already ported in 9588bd8 (component 3 of SPEC §3.4) +// and lives in internal/matrix/relevance.go; the observer package +// doesn't re-implement it. + +package observer + +import ( + "errors" + "time" +) + +// Source is the provenance of an observed op. Empty string defaults +// to SourceMCP for back-compat with Phase 24 callers. +type Source string + +const ( + SourceMCP Source = "mcp" + SourceScenario Source = "scenario" + SourceLangfuse Source = "langfuse" + SourceOverseerCorrection Source = "overseer_correction" +) + +// ObservedOp is one entry in the observer's ring buffer (and JSONL +// log when persistence is configured). Mirrors the Rust ObservedOp +// shape exactly so the on-wire JSON round-trips between the two +// implementations during the Rust→Go cutover. +// +// Optional fields use omitempty so absent values don't bloat the +// JSONL file. Numeric zero values are intentionally treated as +// "not set" by the JSON layer; if a real zero needs to be +// persisted, future schema-version bump can switch to pointers. +type ObservedOp struct { + Timestamp string `json:"timestamp"` // ISO 8601 + Endpoint string `json:"endpoint"` + InputSummary string `json:"input_summary"` + Success bool `json:"success"` + DurationMs int64 `json:"duration_ms"` + OutputSummary string `json:"output_summary"` + Error string `json:"error,omitempty"` + + Source Source `json:"source,omitempty"` + StafferID string `json:"staffer_id,omitempty"` + SigHash string `json:"sig_hash,omitempty"` + EventKind string `json:"event_kind,omitempty"` + Role string `json:"role,omitempty"` + City string `json:"city,omitempty"` + State string `json:"state,omitempty"` + Count int `json:"count,omitempty"` + + RescueAttempted bool `json:"rescue_attempted,omitempty"` + RescueSucceeded bool `json:"rescue_succeeded,omitempty"` + + TaskClass string `json:"task_class,omitempty"` + Correction string `json:"correction,omitempty"` + AppliedAtTurn int `json:"applied_at_turn,omitempty"` +} + +// Stats is the aggregated view of the ring buffer — useful for +// dashboards and the GET /stats endpoint. RecentScenarios holds the +// most recent N source=scenario ops (default cap 10) so operators +// can see what the staffing scenarios are emitting at a glance. +type Stats struct { + Total int `json:"total"` + Successes int `json:"successes"` + Failures int `json:"failures"` + BySource map[string]int `json:"by_source"` + RecentScenarios []ScenarioOpDigest `json:"recent_scenario_ops"` +} + +// ScenarioOpDigest is the slim per-op shape returned in +// Stats.RecentScenarios — matches the TS digest exactly: +// {ts, ok, staffer, kind, role}. +type ScenarioOpDigest struct { + TS string `json:"ts"` + OK bool `json:"ok"` + Staffer string `json:"staffer"` + Kind string `json:"kind"` + Role string `json:"role"` +} + +// Errors surfaced to HTTP handlers. +var ( + ErrInvalidOp = errors.New("observer: invalid op (timestamp + endpoint required)") +) + +// Validate returns an error if required fields are missing. Called +// by Record before the op is added to the ring buffer. +func (op ObservedOp) Validate() error { + if op.Timestamp == "" { + return ErrInvalidOp + } + if op.Endpoint == "" { + return ErrInvalidOp + } + return nil +} + +// EnsureTimestamp populates Timestamp with the current UTC ISO 8601 +// time if it's empty. Useful for HTTP handlers that take the body +// as authoritative but need to default the timestamp when absent. +func (op *ObservedOp) EnsureTimestamp() { + if op.Timestamp == "" { + op.Timestamp = time.Now().UTC().Format(time.RFC3339) + } +} + +// DefaultSource sets Source to SourceMCP if empty. Mirrors the Rust +// `op.source ?? "mcp"` pattern in recordExternalOp. +func (op *ObservedOp) DefaultSource() { + if op.Source == "" { + op.Source = SourceMCP + } +} diff --git a/internal/shared/config.go b/internal/shared/config.go index 7268b25..a3d8cdb 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -26,9 +26,10 @@ type Config struct { Queryd QuerydConfig `toml:"queryd"` Vectord VectordConfig `toml:"vectord"` Embedd EmbeddConfig `toml:"embedd"` - Pathwayd PathwaydConfig `toml:"pathwayd"` - Matrixd MatrixdConfig `toml:"matrixd"` - S3 S3Config `toml:"s3"` + Pathwayd PathwaydConfig `toml:"pathwayd"` + Matrixd MatrixdConfig `toml:"matrixd"` + Observerd ObserverdConfig `toml:"observerd"` + S3 S3Config `toml:"s3"` Log LogConfig `toml:"log"` Auth AuthConfig `toml:"auth"` } @@ -52,19 +53,20 @@ type IngestConfig struct { // GatewayConfig adds the upstream URLs the reverse proxy fronts. // Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql, -// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix) has its own -// upstream so we can scale services independently or move them to -// different boxes without touching gateway code. +// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix, /v1/observer) +// has its own upstream so we can scale services independently or +// move them to different boxes without touching gateway code. type GatewayConfig struct { - Bind string `toml:"bind"` - StoragedURL string `toml:"storaged_url"` - CatalogdURL string `toml:"catalogd_url"` - IngestdURL string `toml:"ingestd_url"` - QuerydURL string `toml:"queryd_url"` - VectordURL string `toml:"vectord_url"` - EmbeddURL string `toml:"embedd_url"` - PathwaydURL string `toml:"pathwayd_url"` - MatrixdURL string `toml:"matrixd_url"` + Bind string `toml:"bind"` + StoragedURL string `toml:"storaged_url"` + CatalogdURL string `toml:"catalogd_url"` + IngestdURL string `toml:"ingestd_url"` + QuerydURL string `toml:"queryd_url"` + VectordURL string `toml:"vectord_url"` + EmbeddURL string `toml:"embedd_url"` + PathwaydURL string `toml:"pathwayd_url"` + MatrixdURL string `toml:"matrixd_url"` + ObserverdURL string `toml:"observerd_url"` } // EmbeddConfig drives the embed service. ProviderURL points at the @@ -108,6 +110,16 @@ type MatrixdConfig struct { VectordURL string `toml:"vectord_url"` } +// ObserverdConfig drives the observer service (cmd/observerd). +// PersistPath: file path to the JSONL ops log; empty = in-memory +// only (test/dev). Production sets a stable path under +// /var/lib/lakehouse/observer/ops.jsonl so ops survive restart. +// Mirrors the PathwaydConfig pattern. +type ObserverdConfig struct { + Bind string `toml:"bind"` + PersistPath string `toml:"persist_path"` +} + // QuerydConfig adds queryd-specific knobs. queryd talks DuckDB // directly to MinIO via DuckDB's httpfs extension (so no storaged // URL needed), and reads the catalog over HTTP for view registration. @@ -185,7 +197,8 @@ func DefaultConfig() Config { VectordURL: "http://127.0.0.1:3215", EmbeddURL: "http://127.0.0.1:3216", PathwaydURL: "http://127.0.0.1:3217", - MatrixdURL: "http://127.0.0.1:3218", + MatrixdURL: "http://127.0.0.1:3218", + ObserverdURL: "http://127.0.0.1:3219", }, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, @@ -215,6 +228,10 @@ func DefaultConfig() Config { EmbeddURL: "http://127.0.0.1:3216", VectordURL: "http://127.0.0.1:3215", }, + Observerd: ObserverdConfig{ + Bind: "127.0.0.1:3219", + // PersistPath empty by default = in-memory only. + }, Queryd: QuerydConfig{ Bind: "127.0.0.1:3214", CatalogdURL: "http://127.0.0.1:3212", diff --git a/lakehouse.toml b/lakehouse.toml index a54682d..ae111cd 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -14,6 +14,7 @@ vectord_url = "http://127.0.0.1:3215" embedd_url = "http://127.0.0.1:3216" pathwayd_url = "http://127.0.0.1:3217" matrixd_url = "http://127.0.0.1:3218" +observerd_url = "http://127.0.0.1:3219" [storaged] bind = "127.0.0.1:3211" @@ -63,6 +64,12 @@ bind = "127.0.0.1:3218" embedd_url = "http://127.0.0.1:3216" vectord_url = "http://127.0.0.1:3215" +[observerd] +bind = "127.0.0.1:3219" +# Empty = in-memory only (dev/test). Production sets a path under +# /var/lib/lakehouse/observer/ops.jsonl so ops survive restart. +persist_path = "" + [s3] endpoint = "http://localhost:9000" region = "us-east-1" diff --git a/scripts/observer_smoke.sh b/scripts/observer_smoke.sh new file mode 100755 index 0000000..7f3a169 --- /dev/null +++ b/scripts/observer_smoke.sh @@ -0,0 +1,142 @@ +#!/usr/bin/env bash +# Observer smoke — autonomous-iteration witness service end-to-end. +# All assertions go through gateway :3110. +# +# Validates: +# - POST /observer/event records an op (success path + scenario source) +# - GET /observer/stats aggregates by source + counts successes/failures +# - Stats.recent_scenario_ops surfaces scenario digests +# - Validation: empty endpoint → 400 +# - Persistence: kill+restart observerd preserves ops via JSONL replay + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[observer-smoke] building observerd + gateway..." +go build -o bin/ ./cmd/observerd ./cmd/gateway + +pkill -f "bin/(observerd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +PERSIST="$TMP/ops.jsonl" +CFG="$TMP/observer.toml" + +cleanup() { + echo "[observer-smoke] cleanup" + for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +launch_observerd() { + ./bin/observerd -config "$CFG" > /tmp/observerd.log 2>&1 & + OBSERVERD_PID=$! + PIDS+=($OBSERVERD_PID) + poll_health 3219 || { echo "observerd failed"; tail /tmp/observerd.log; return 1; } +} + +echo "[observer-smoke] launching observerd → gateway..." +launch_observerd +./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 & +PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; } + +FAILED=0 + +# ── 1. Record 5 ops: 3 success + 2 fail across 2 sources ───────── +echo "[observer-smoke] record 5 ops:" +for i in 1 2 3; do + curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/observer/event \ + -H 'Content-Type: application/json' \ + -d "{\"endpoint\":\"/v1/test\",\"input_summary\":\"ok-$i\",\"success\":true,\"duration_ms\":10,\"output_summary\":\"ok\",\"source\":\"mcp\"}" +done +for i in 1 2; do + curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/observer/event \ + -H 'Content-Type: application/json' \ + -d "{\"endpoint\":\"/v1/test\",\"input_summary\":\"fail-$i\",\"success\":false,\"duration_ms\":10,\"output_summary\":\"err\",\"error\":\"boom\",\"source\":\"scenario\",\"staffer_id\":\"st-$i\",\"event_kind\":\"fill\",\"role\":\"Forklift\"}" +done +echo " ✓ 5 events posted" + +# ── 2. Stats aggregation ───────────────────────────────────────── +echo "[observer-smoke] /observer/stats aggregates correctly:" +STATS="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)" +TOT="$(echo "$STATS" | jq -r '.total')" +OK="$(echo "$STATS" | jq -r '.successes')" +ERR="$(echo "$STATS" | jq -r '.failures')" +MCP="$(echo "$STATS" | jq -r '.by_source.mcp')" +SCEN="$(echo "$STATS" | jq -r '.by_source.scenario')" +RECENT_LEN="$(echo "$STATS" | jq -r '.recent_scenario_ops | length')" +if [ "$TOT" = "5" ] && [ "$OK" = "3" ] && [ "$ERR" = "2" ] && [ "$MCP" = "3" ] && [ "$SCEN" = "2" ] && [ "$RECENT_LEN" = "2" ]; then + echo " ✓ total=5 (3 ok + 2 fail) · by_source: mcp=3 scenario=2 · 2 scenario digests" +else + echo " ✗ total=$TOT ok=$OK err=$ERR mcp=$MCP scen=$SCEN recent=$RECENT_LEN" + echo " full: $STATS" + FAILED=1 +fi + +# ── 3. Validation: empty endpoint → 400 ────────────────────────── +echo "[observer-smoke] empty endpoint → 400:" +HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/observer/event \ + -H 'Content-Type: application/json' \ + -d '{"endpoint":"","input_summary":"x","success":true,"duration_ms":1,"output_summary":"x"}')" +if [ "$HTTP" = "400" ]; then + echo " ✓ empty endpoint rejected" +else + echo " ✗ got $HTTP"; FAILED=1 +fi + +# ── 4. Persistence: kill + restart preserves ops ───────────────── +echo "[observer-smoke] kill + restart observerd → ops survive:" +kill $OBSERVERD_PID 2>/dev/null || true +wait $OBSERVERD_PID 2>/dev/null || true +sleep 0.3 +launch_observerd +sleep 0.2 +STATS2="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)" +TOT2="$(echo "$STATS2" | jq -r '.total')" +OK2="$(echo "$STATS2" | jq -r '.successes')" +ERR2="$(echo "$STATS2" | jq -r '.failures')" +if [ "$TOT2" = "5" ] && [ "$OK2" = "3" ] && [ "$ERR2" = "2" ]; then + echo " ✓ total=5 ok=3 err=2 preserved through restart" +else + echo " ✗ post-restart total=$TOT2 ok=$OK2 err=$ERR2"; FAILED=1 +fi + +if [ "$FAILED" -eq 0 ]; then + echo "[observer-smoke] Observer acceptance gate: PASSED" + exit 0 +else + echo "[observer-smoke] Observer acceptance gate: FAILED" + exit 1 +fi