Port of the load-bearing pieces of mcp-server/observer.ts (Rust
system, 852 lines TS) per SPEC §2's named target. Implements PRD
loop 3 ("Observer loop — watches each run, refines configs").
Routes (all under /v1/observer/* via gateway):
GET /observer/health — liveness
GET /observer/stats — total / successes / failures /
by_source / recent_scenario_ops
(matches Rust JSON shape exactly)
POST /observer/event — record one ObservedOp; auto-defaults
timestamp + source, validates required
fields (endpoint), persists to JSONL,
appends to ring buffer
Architecture:
- internal/observer/types.go — ObservedOp model + Source taxonomy
(mcp / scenario / langfuse / overseer_correction). Mirrors the
Rust shape so JSON round-trips during cutover.
- internal/observer/store.go — Store + Persistor. Ring buffer cap
matches Rust's 2000; recent_scenarios cap matches Rust's 10.
Same persist-then-apply order as pathwayd; same corruption-
tolerant replay (skip malformed lines + warn).
- cmd/observerd — :3219 HTTP service, fronted by gateway as
/v1/observer/*.
- lakehouse.toml + DefaultConfig — [observerd] block matches the
pathwayd pattern (Bind + PersistPath; empty path = ephemeral).
Tests + smoke (all PASS):
- 7 unit tests in store_test.go: validation, default fields,
stats aggregation, recent-scenarios cap + ordering, ring-buffer
rollover at cap, JSONL round-trip persistence, corruption-
tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied)
- scripts/observer_smoke.sh: 4 assertions through gateway —
record 5 events (3 ok / 2 fail across 2 sources), stats
aggregates correctly, empty-endpoint→400, kill+restart preserves
via JSONL replay (5 ops, 3 ok, 2 err survive)
Deferred (named in package + cmd doc, not in this commit):
- POST /observer/review (cloud-LLM hand-review fall-back). The
heuristic-only path could land cheaply but the productized
cloud path (qwen3-coder fall-back) is multi-day port.
- Background loops: analyzeErrors, consolidatePlaybooks,
tailOverseerCorrections (read overseer_corrections.jsonl into
the ring buffer once per cycle).
- escalateFailureClusterToLLMTeam (failure clustering trigger
that posts to LLM Team's /api/run with code_review mode).
/relevance is NOT duplicated — already ported in 9588bd8 to
internal/matrix/relevance.go (component 3 of SPEC §3.4).
16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap,
pathway, matrix, relevance, downgrade, playbook, observer).
13 binaries now: gateway, storaged, catalogd, ingestd, queryd,
vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama
(plus catalogd-only test build).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
250 lines
6.9 KiB
Go
250 lines
6.9 KiB
Go
package observer
|
|
|
|
// Store: in-memory ring buffer + optional JSONL persistor. Same
|
|
// shape as internal/pathway's persistor (afbb506) — opens the file
|
|
// per Append rather than holding an fd, which is fine at the
|
|
// observer's expected write rate (≤ a few hundred ops/min) and
|
|
// keeps the substrate restartable mid-stream.
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
)
|
|
|
|
// DefaultRingCap is the in-memory ring buffer cap. Mirrors the Rust
|
|
// Phase 24 limit of 2000 (recordExternalOp shifts the head when
|
|
// length > 2000).
|
|
const DefaultRingCap = 2000
|
|
|
|
// DefaultRecentScenariosCap is how many recent source=scenario ops
|
|
// the Stats endpoint returns. Matches the TS hard-coded slice(-10).
|
|
const DefaultRecentScenariosCap = 10
|
|
|
|
// Store holds the ring buffer + the optional persistor. Thread-safe
|
|
// via a single RWMutex (read-heavy via Stats; writes via Record).
|
|
type Store struct {
|
|
mu sync.RWMutex
|
|
ring []ObservedOp
|
|
cap int
|
|
persistor *Persistor
|
|
}
|
|
|
|
// NewStore returns an empty Store. Pass nil persistor for in-memory
|
|
// only (unit tests, ephemeral runs); pass a real Persistor to enable
|
|
// jsonl-append-on-record.
|
|
func NewStore(persistor *Persistor) *Store {
|
|
return &Store{
|
|
ring: make([]ObservedOp, 0, DefaultRingCap),
|
|
cap: DefaultRingCap,
|
|
persistor: persistor,
|
|
}
|
|
}
|
|
|
|
// Record validates + persists + appends. Order matters: persist
|
|
// first so a crash mid-record doesn't leave the ring ahead of the
|
|
// log. Returns ErrInvalidOp on validation failure (no persist, no
|
|
// append).
|
|
func (s *Store) Record(op ObservedOp) error {
|
|
op.EnsureTimestamp()
|
|
op.DefaultSource()
|
|
if err := op.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if s.persistor != nil {
|
|
if err := s.persistor.Append(op); err != nil {
|
|
// Best-effort persistence — log but don't fail the
|
|
// in-memory record. Mirrors the Rust catch{} in
|
|
// persistOp; the ring buffer is the source of truth in
|
|
// flight.
|
|
slog.Warn("observer: persist failed", "err", err)
|
|
}
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.ring = append(s.ring, op)
|
|
if len(s.ring) > s.cap {
|
|
// Shift left by one (drop oldest). Avoids unbounded growth
|
|
// without a per-write reallocation.
|
|
copy(s.ring, s.ring[1:])
|
|
s.ring = s.ring[:len(s.ring)-1]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Recent returns a copy of the ring buffer's current state. Most
|
|
// recent entries are at the end (append-order).
|
|
func (s *Store) Recent() []ObservedOp {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
out := make([]ObservedOp, len(s.ring))
|
|
copy(out, s.ring)
|
|
return out
|
|
}
|
|
|
|
// Stats aggregates the ring buffer. Mirrors the Rust /stats
|
|
// response shape exactly.
|
|
func (s *Store) Stats() Stats {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
stats := Stats{
|
|
Total: len(s.ring),
|
|
BySource: make(map[string]int),
|
|
}
|
|
for _, op := range s.ring {
|
|
if op.Success {
|
|
stats.Successes++
|
|
} else {
|
|
stats.Failures++
|
|
}
|
|
src := string(op.Source)
|
|
if src == "" {
|
|
src = string(SourceMCP)
|
|
}
|
|
stats.BySource[src]++
|
|
}
|
|
|
|
// Last N scenario ops (most-recent-first → match Rust slice(-10)).
|
|
scenarios := make([]ScenarioOpDigest, 0, DefaultRecentScenariosCap)
|
|
for i := len(s.ring) - 1; i >= 0 && len(scenarios) < DefaultRecentScenariosCap; i-- {
|
|
op := s.ring[i]
|
|
if op.Source != SourceScenario {
|
|
continue
|
|
}
|
|
scenarios = append([]ScenarioOpDigest{{
|
|
TS: op.Timestamp,
|
|
OK: op.Success,
|
|
Staffer: op.StafferID,
|
|
Kind: op.EventKind,
|
|
Role: op.Role,
|
|
}}, scenarios...)
|
|
}
|
|
stats.RecentScenarios = scenarios
|
|
|
|
return stats
|
|
}
|
|
|
|
// Load replays the persistor's JSONL log into the ring buffer.
|
|
// Resets the ring (current state is discarded) — same semantics as
|
|
// pathway.Store.Load. Corruption-tolerant: malformed lines log
|
|
// warnings and the load proceeds.
|
|
//
|
|
// Returns the number of ops successfully replayed.
|
|
func (s *Store) Load() (int, error) {
|
|
if s.persistor == nil {
|
|
return 0, nil
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.ring = s.ring[:0]
|
|
return s.persistor.Replay(func(op ObservedOp) error {
|
|
s.ring = append(s.ring, op)
|
|
if len(s.ring) > s.cap {
|
|
copy(s.ring, s.ring[1:])
|
|
s.ring = s.ring[:len(s.ring)-1]
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ─── Persistor ──────────────────────────────────────────────────
|
|
|
|
// Persistor wraps a single JSONL file. Open-per-append — same
|
|
// pattern as internal/pathway. Each line is one ObservedOp.
|
|
type Persistor struct {
|
|
path string
|
|
}
|
|
|
|
// NewPersistor returns a Persistor for the given file path. Parent
|
|
// directory is created on demand. Empty path is invalid (caller
|
|
// passes nil to NewStore for the no-persist case).
|
|
func NewPersistor(path string) (*Persistor, error) {
|
|
if path == "" {
|
|
return nil, errors.New("observer: persistor path is empty")
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return nil, fmt.Errorf("observer: create dir: %w", err)
|
|
}
|
|
return &Persistor{path: path}, nil
|
|
}
|
|
|
|
// Path returns the file path the persistor writes to.
|
|
func (p *Persistor) Path() string { return p.path }
|
|
|
|
// Append writes one ObservedOp as a JSONL line.
|
|
func (p *Persistor) Append(op ObservedOp) error {
|
|
line, err := json.Marshal(op)
|
|
if err != nil {
|
|
return fmt.Errorf("observer: marshal op: %w", err)
|
|
}
|
|
f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return fmt.Errorf("observer: open log: %w", err)
|
|
}
|
|
defer f.Close()
|
|
if _, err := f.Write(line); err != nil {
|
|
return fmt.Errorf("observer: write op: %w", err)
|
|
}
|
|
if _, err := f.Write([]byte{'\n'}); err != nil {
|
|
return fmt.Errorf("observer: write newline: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Replay reads the log line-by-line and invokes apply for each op.
|
|
// Returns the count successfully applied. Missing file = 0 + nil
|
|
// (legitimate cold-start state). Malformed lines log a warning and
|
|
// the replay continues.
|
|
func (p *Persistor) Replay(apply func(ObservedOp) error) (int, error) {
|
|
f, err := os.Open(p.path)
|
|
if errors.Is(err, fs.ErrNotExist) {
|
|
return 0, nil
|
|
}
|
|
if err != nil {
|
|
return 0, fmt.Errorf("observer: open log: %w", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1<<20) // 1 MiB per line cap
|
|
|
|
applied, skipped, lineNo := 0, 0, 0
|
|
for scanner.Scan() {
|
|
lineNo++
|
|
raw := scanner.Bytes()
|
|
if len(raw) == 0 {
|
|
continue
|
|
}
|
|
var op ObservedOp
|
|
if err := json.Unmarshal(raw, &op); err != nil {
|
|
slog.Warn("observer: replay skipped malformed line",
|
|
"path", p.path, "line", lineNo, "err", err.Error())
|
|
skipped++
|
|
continue
|
|
}
|
|
if err := apply(op); err != nil {
|
|
slog.Warn("observer: replay apply failed",
|
|
"path", p.path, "line", lineNo, "err", err.Error())
|
|
skipped++
|
|
continue
|
|
}
|
|
applied++
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return applied, fmt.Errorf("observer: scan log: %w", err)
|
|
}
|
|
if skipped > 0 {
|
|
slog.Info("observer: replay completed with skips",
|
|
"path", p.path, "applied", applied, "skipped", skipped)
|
|
}
|
|
return applied, nil
|
|
}
|