Port of the load-bearing pieces of mcp-server/observer.ts (Rust
system, 852 lines TS) per SPEC §2's named target. Implements PRD
loop 3 ("Observer loop — watches each run, refines configs").
Routes (all under /v1/observer/* via gateway):
GET /observer/health — liveness
GET /observer/stats — total / successes / failures /
by_source / recent_scenario_ops
(matches Rust JSON shape exactly)
POST /observer/event — record one ObservedOp; auto-defaults
timestamp + source, validates required
fields (endpoint), persists to JSONL,
appends to ring buffer
Architecture:
- internal/observer/types.go — ObservedOp model + Source taxonomy
(mcp / scenario / langfuse / overseer_correction). Mirrors the
Rust shape so JSON round-trips during cutover.
- internal/observer/store.go — Store + Persistor. Ring buffer cap
matches Rust's 2000; recent_scenarios cap matches Rust's 10.
Same persist-then-apply order as pathwayd; same corruption-
tolerant replay (skip malformed lines + warn).
- cmd/observerd — :3219 HTTP service, fronted by gateway as
/v1/observer/*.
- lakehouse.toml + DefaultConfig — [observerd] block matches the
pathwayd pattern (Bind + PersistPath; empty path = ephemeral).
Tests + smoke (all PASS):
- 7 unit tests in store_test.go: validation, default fields,
stats aggregation, recent-scenarios cap + ordering, ring-buffer
rollover at cap, JSONL round-trip persistence, corruption-
tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied)
- scripts/observer_smoke.sh: 4 assertions through gateway —
record 5 events (3 ok / 2 fail across 2 sources), stats
aggregates correctly, empty-endpoint→400, kill+restart preserves
via JSONL replay (5 ops, 3 ok, 2 err survive)
Deferred (named in package + cmd doc, not in this commit):
- POST /observer/review (cloud-LLM hand-review fall-back). The
heuristic-only path could land cheaply but the productized
cloud path (qwen3-coder fall-back) is multi-day port.
- Background loops: analyzeErrors, consolidatePlaybooks,
tailOverseerCorrections (read overseer_corrections.jsonl into
the ring buffer once per cycle).
- escalateFailureClusterToLLMTeam (failure clustering trigger
that posts to LLM Team's /api/run with code_review mode).
/relevance is NOT duplicated — already ported in 9588bd8 to
internal/matrix/relevance.go (component 3 of SPEC §3.4).
16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap,
pathway, matrix, relevance, downgrade, playbook, observer).
13 binaries now: gateway, storaged, catalogd, ingestd, queryd,
vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama
(plus catalogd-only test build).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
194 lines
5.0 KiB
Go
194 lines
5.0 KiB
Go
package observer
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func mkOp(success bool, source Source) ObservedOp {
|
|
return ObservedOp{
|
|
Timestamp: time.Now().UTC().Format(time.RFC3339),
|
|
Endpoint: "/v1/test",
|
|
InputSummary: "test op",
|
|
Success: success,
|
|
DurationMs: 42,
|
|
OutputSummary: "ok",
|
|
Source: source,
|
|
}
|
|
}
|
|
|
|
func TestRecord_RequiresEndpointAndTimestamp(t *testing.T) {
|
|
s := NewStore(nil)
|
|
bad := ObservedOp{Endpoint: ""} // EnsureTimestamp will fill, but Endpoint empty stays
|
|
if err := s.Record(bad); err == nil {
|
|
t.Error("expected error on empty endpoint")
|
|
}
|
|
|
|
good := mkOp(true, SourceMCP)
|
|
if err := s.Record(good); err != nil {
|
|
t.Errorf("good op: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestRecord_DefaultsTimestampAndSource(t *testing.T) {
|
|
s := NewStore(nil)
|
|
op := ObservedOp{
|
|
Endpoint: "/x",
|
|
InputSummary: "no ts no source",
|
|
Success: true,
|
|
}
|
|
if err := s.Record(op); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
stored := s.Recent()[0]
|
|
if stored.Timestamp == "" {
|
|
t.Error("Timestamp should be defaulted")
|
|
}
|
|
if stored.Source != SourceMCP {
|
|
t.Errorf("Source: want %q, got %q", SourceMCP, stored.Source)
|
|
}
|
|
}
|
|
|
|
func TestStats_Aggregates(t *testing.T) {
|
|
s := NewStore(nil)
|
|
for i := 0; i < 5; i++ {
|
|
_ = s.Record(mkOp(true, SourceMCP))
|
|
}
|
|
for i := 0; i < 3; i++ {
|
|
_ = s.Record(mkOp(false, SourceScenario))
|
|
}
|
|
for i := 0; i < 2; i++ {
|
|
_ = s.Record(mkOp(true, SourceLangfuse))
|
|
}
|
|
|
|
st := s.Stats()
|
|
if st.Total != 10 {
|
|
t.Errorf("total: want 10, got %d", st.Total)
|
|
}
|
|
if st.Successes != 7 {
|
|
t.Errorf("successes: want 7, got %d", st.Successes)
|
|
}
|
|
if st.Failures != 3 {
|
|
t.Errorf("failures: want 3, got %d", st.Failures)
|
|
}
|
|
if st.BySource["mcp"] != 5 || st.BySource["scenario"] != 3 || st.BySource["langfuse"] != 2 {
|
|
t.Errorf("by_source mismatch: %+v", st.BySource)
|
|
}
|
|
if len(st.RecentScenarios) != 3 {
|
|
t.Errorf("recent scenarios: want 3, got %d", len(st.RecentScenarios))
|
|
}
|
|
}
|
|
|
|
func TestStats_RecentScenariosCappedAndOrdered(t *testing.T) {
|
|
s := NewStore(nil)
|
|
// Record 15 scenario ops; only the last 10 should appear.
|
|
for i := 0; i < 15; i++ {
|
|
op := mkOp(true, SourceScenario)
|
|
op.StafferID = "staffer-" + string(rune('a'+i))
|
|
_ = s.Record(op)
|
|
time.Sleep(time.Millisecond) // ensure timestamps order-distinguishable
|
|
}
|
|
st := s.Stats()
|
|
if len(st.RecentScenarios) != DefaultRecentScenariosCap {
|
|
t.Errorf("cap: want %d, got %d", DefaultRecentScenariosCap, len(st.RecentScenarios))
|
|
}
|
|
// Last entry should be the most recently added (staffer-o, the 15th).
|
|
last := st.RecentScenarios[len(st.RecentScenarios)-1]
|
|
if last.Staffer != "staffer-o" {
|
|
t.Errorf("most recent: want staffer-o, got %q", last.Staffer)
|
|
}
|
|
}
|
|
|
|
func TestRingBuffer_BoundedByDefaultCap(t *testing.T) {
|
|
s := NewStore(nil)
|
|
s.cap = 5 // shrink for testability
|
|
for i := 0; i < 12; i++ {
|
|
op := mkOp(true, SourceMCP)
|
|
op.InputSummary = string(rune('a' + i))
|
|
_ = s.Record(op)
|
|
}
|
|
r := s.Recent()
|
|
if len(r) != 5 {
|
|
t.Errorf("ring size: want 5, got %d", len(r))
|
|
}
|
|
// Oldest 7 dropped; first remaining should have InputSummary "h" (8th).
|
|
if r[0].InputSummary != "h" {
|
|
t.Errorf("oldest after rollover: want 'h', got %q", r[0].InputSummary)
|
|
}
|
|
}
|
|
|
|
func TestPersistor_RoundTrip(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "ops.jsonl")
|
|
p, err := NewPersistor(path)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
s := NewStore(p)
|
|
|
|
for i := 0; i < 4; i++ {
|
|
op := mkOp(i%2 == 0, SourceMCP)
|
|
op.InputSummary = string(rune('a' + i))
|
|
if err := s.Record(op); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Sanity: file has 4 lines.
|
|
bs, err := os.ReadFile(path)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
lines := strings.Split(strings.TrimSuffix(string(bs), "\n"), "\n")
|
|
if len(lines) != 4 {
|
|
t.Errorf("file lines: want 4, got %d", len(lines))
|
|
}
|
|
|
|
// Rehydrate into a fresh Store.
|
|
s2 := NewStore(p)
|
|
n, err := s2.Load()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if n != 4 {
|
|
t.Errorf("loaded: want 4, got %d", n)
|
|
}
|
|
r := s2.Recent()
|
|
if len(r) != 4 {
|
|
t.Errorf("rehydrated ring: want 4, got %d", len(r))
|
|
}
|
|
// Order preserved.
|
|
for i, want := range []string{"a", "b", "c", "d"} {
|
|
if r[i].InputSummary != want {
|
|
t.Errorf("op %d: want %q, got %q", i, want, r[i].InputSummary)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestPersistor_CorruptionTolerant(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "ops.jsonl")
|
|
// Pre-seed with one valid + one corrupt + one valid line.
|
|
valid1 := `{"timestamp":"2026-04-29T12:00:00Z","endpoint":"/x","input_summary":"a","success":true,"duration_ms":1,"output_summary":"ok","source":"mcp"}`
|
|
corrupt := `{this is not json`
|
|
valid2 := `{"timestamp":"2026-04-29T12:00:01Z","endpoint":"/y","input_summary":"b","success":false,"duration_ms":2,"output_summary":"err","source":"scenario"}`
|
|
if err := os.WriteFile(path, []byte(valid1+"\n"+corrupt+"\n"+valid2+"\n"), 0o644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
p, err := NewPersistor(path)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
s := NewStore(p)
|
|
n, err := s.Load()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if n != 2 {
|
|
t.Errorf("applied: want 2 (valid pair), got %d (corrupt should skip)", n)
|
|
}
|
|
}
|