root bc9ab93afe H: observerd — autonomous-iteration witness loop (SPEC §2 port)
Port of the load-bearing pieces of mcp-server/observer.ts (Rust
system, 852 lines TS) per SPEC §2's named target. Implements PRD
loop 3 ("Observer loop — watches each run, refines configs").

Routes (all under /v1/observer/* via gateway):
  GET  /observer/health   — liveness
  GET  /observer/stats    — total / successes / failures /
                             by_source / recent_scenario_ops
                             (matches Rust JSON shape exactly)
  POST /observer/event    — record one ObservedOp; auto-defaults
                             timestamp + source, validates required
                             fields (endpoint), persists to JSONL,
                             appends to ring buffer

Architecture:
  - internal/observer/types.go — ObservedOp model + Source taxonomy
    (mcp / scenario / langfuse / overseer_correction). Mirrors the
    Rust shape so JSON round-trips during cutover.
  - internal/observer/store.go — Store + Persistor. Ring buffer cap
    matches Rust's 2000; recent_scenarios cap matches Rust's 10.
    Same persist-then-apply order as pathwayd; same corruption-
    tolerant replay (skip malformed lines + warn).
  - cmd/observerd — :3219 HTTP service, fronted by gateway as
    /v1/observer/*.
  - lakehouse.toml + DefaultConfig — [observerd] block matches the
    pathwayd pattern (Bind + PersistPath; empty path = ephemeral).

Tests + smoke (all PASS):
  - 7 unit tests in store_test.go: validation, default fields,
    stats aggregation, recent-scenarios cap + ordering, ring-buffer
    rollover at cap, JSONL round-trip persistence, corruption-
    tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied)
  - scripts/observer_smoke.sh: 4 assertions through gateway —
    record 5 events (3 ok / 2 fail across 2 sources), stats
    aggregates correctly, empty-endpoint→400, kill+restart preserves
    via JSONL replay (5 ops, 3 ok, 2 err survive)

Deferred (named in package + cmd doc, not in this commit):
  - POST /observer/review (cloud-LLM hand-review fall-back). The
    heuristic-only path could land cheaply but the productized
    cloud path (qwen3-coder fall-back) is multi-day port.
  - Background loops: analyzeErrors, consolidatePlaybooks,
    tailOverseerCorrections (read overseer_corrections.jsonl into
    the ring buffer once per cycle).
  - escalateFailureClusterToLLMTeam (failure clustering trigger
    that posts to LLM Team's /api/run with code_review mode).

/relevance is NOT duplicated — already ported in 9588bd8 to
internal/matrix/relevance.go (component 3 of SPEC §3.4).

16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap,
pathway, matrix, relevance, downgrade, playbook, observer).
13 binaries now: gateway, storaged, catalogd, ingestd, queryd,
vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama
(plus catalogd-only test build).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 20:18:02 -05:00

250 lines
6.9 KiB
Go

package observer
// Store: in-memory ring buffer + optional JSONL persistor. Same
// shape as internal/pathway's persistor (afbb506) — opens the file
// per Append rather than holding an fd, which is fine at the
// observer's expected write rate (≤ a few hundred ops/min) and
// keeps the substrate restartable mid-stream.
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io/fs"
"log/slog"
"os"
"path/filepath"
"sync"
)
// DefaultRingCap is the in-memory ring buffer cap. Mirrors the Rust
// Phase 24 limit of 2000 (recordExternalOp shifts the head when
// length > 2000).
const DefaultRingCap = 2000
// DefaultRecentScenariosCap is how many recent source=scenario ops
// the Stats endpoint returns. Matches the TS hard-coded slice(-10).
const DefaultRecentScenariosCap = 10
// Store holds the ring buffer + the optional persistor. Thread-safe
// via a single RWMutex (read-heavy via Stats; writes via Record).
type Store struct {
mu sync.RWMutex
ring []ObservedOp
cap int
persistor *Persistor
}
// NewStore returns an empty Store. Pass nil persistor for in-memory
// only (unit tests, ephemeral runs); pass a real Persistor to enable
// jsonl-append-on-record.
func NewStore(persistor *Persistor) *Store {
return &Store{
ring: make([]ObservedOp, 0, DefaultRingCap),
cap: DefaultRingCap,
persistor: persistor,
}
}
// Record validates + persists + appends. Order matters: persist
// first so a crash mid-record doesn't leave the ring ahead of the
// log. Returns ErrInvalidOp on validation failure (no persist, no
// append).
func (s *Store) Record(op ObservedOp) error {
op.EnsureTimestamp()
op.DefaultSource()
if err := op.Validate(); err != nil {
return err
}
if s.persistor != nil {
if err := s.persistor.Append(op); err != nil {
// Best-effort persistence — log but don't fail the
// in-memory record. Mirrors the Rust catch{} in
// persistOp; the ring buffer is the source of truth in
// flight.
slog.Warn("observer: persist failed", "err", err)
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.ring = append(s.ring, op)
if len(s.ring) > s.cap {
// Shift left by one (drop oldest). Avoids unbounded growth
// without a per-write reallocation.
copy(s.ring, s.ring[1:])
s.ring = s.ring[:len(s.ring)-1]
}
return nil
}
// Recent returns a copy of the ring buffer's current state. Most
// recent entries are at the end (append-order).
func (s *Store) Recent() []ObservedOp {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]ObservedOp, len(s.ring))
copy(out, s.ring)
return out
}
// Stats aggregates the ring buffer. Mirrors the Rust /stats
// response shape exactly.
func (s *Store) Stats() Stats {
s.mu.RLock()
defer s.mu.RUnlock()
stats := Stats{
Total: len(s.ring),
BySource: make(map[string]int),
}
for _, op := range s.ring {
if op.Success {
stats.Successes++
} else {
stats.Failures++
}
src := string(op.Source)
if src == "" {
src = string(SourceMCP)
}
stats.BySource[src]++
}
// Last N scenario ops (most-recent-first → match Rust slice(-10)).
scenarios := make([]ScenarioOpDigest, 0, DefaultRecentScenariosCap)
for i := len(s.ring) - 1; i >= 0 && len(scenarios) < DefaultRecentScenariosCap; i-- {
op := s.ring[i]
if op.Source != SourceScenario {
continue
}
scenarios = append([]ScenarioOpDigest{{
TS: op.Timestamp,
OK: op.Success,
Staffer: op.StafferID,
Kind: op.EventKind,
Role: op.Role,
}}, scenarios...)
}
stats.RecentScenarios = scenarios
return stats
}
// Load replays the persistor's JSONL log into the ring buffer.
// Resets the ring (current state is discarded) — same semantics as
// pathway.Store.Load. Corruption-tolerant: malformed lines log
// warnings and the load proceeds.
//
// Returns the number of ops successfully replayed.
func (s *Store) Load() (int, error) {
if s.persistor == nil {
return 0, nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.ring = s.ring[:0]
return s.persistor.Replay(func(op ObservedOp) error {
s.ring = append(s.ring, op)
if len(s.ring) > s.cap {
copy(s.ring, s.ring[1:])
s.ring = s.ring[:len(s.ring)-1]
}
return nil
})
}
// ─── Persistor ──────────────────────────────────────────────────
// Persistor wraps a single JSONL file. Open-per-append — same
// pattern as internal/pathway. Each line is one ObservedOp.
type Persistor struct {
path string
}
// NewPersistor returns a Persistor for the given file path. Parent
// directory is created on demand. Empty path is invalid (caller
// passes nil to NewStore for the no-persist case).
func NewPersistor(path string) (*Persistor, error) {
if path == "" {
return nil, errors.New("observer: persistor path is empty")
}
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return nil, fmt.Errorf("observer: create dir: %w", err)
}
return &Persistor{path: path}, nil
}
// Path returns the file path the persistor writes to.
func (p *Persistor) Path() string { return p.path }
// Append writes one ObservedOp as a JSONL line.
func (p *Persistor) Append(op ObservedOp) error {
line, err := json.Marshal(op)
if err != nil {
return fmt.Errorf("observer: marshal op: %w", err)
}
f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("observer: open log: %w", err)
}
defer f.Close()
if _, err := f.Write(line); err != nil {
return fmt.Errorf("observer: write op: %w", err)
}
if _, err := f.Write([]byte{'\n'}); err != nil {
return fmt.Errorf("observer: write newline: %w", err)
}
return nil
}
// Replay reads the log line-by-line and invokes apply for each op.
// Returns the count successfully applied. Missing file = 0 + nil
// (legitimate cold-start state). Malformed lines log a warning and
// the replay continues.
func (p *Persistor) Replay(apply func(ObservedOp) error) (int, error) {
f, err := os.Open(p.path)
if errors.Is(err, fs.ErrNotExist) {
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("observer: open log: %w", err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1<<20) // 1 MiB per line cap
applied, skipped, lineNo := 0, 0, 0
for scanner.Scan() {
lineNo++
raw := scanner.Bytes()
if len(raw) == 0 {
continue
}
var op ObservedOp
if err := json.Unmarshal(raw, &op); err != nil {
slog.Warn("observer: replay skipped malformed line",
"path", p.path, "line", lineNo, "err", err.Error())
skipped++
continue
}
if err := apply(op); err != nil {
slog.Warn("observer: replay apply failed",
"path", p.path, "line", lineNo, "err", err.Error())
skipped++
continue
}
applied++
}
if err := scanner.Err(); err != nil {
return applied, fmt.Errorf("observer: scan log: %w", err)
}
if skipped > 0 {
slog.Info("observer: replay completed with skips",
"path", p.path, "applied", applied, "skipped", skipped)
}
return applied, nil
}