H: observerd — autonomous-iteration witness loop (SPEC §2 port)

Port of the load-bearing pieces of mcp-server/observer.ts (Rust
system, 852 lines TS) per SPEC §2's named target. Implements PRD
loop 3 ("Observer loop — watches each run, refines configs").

Routes (all under /v1/observer/* via gateway):
  GET  /observer/health   — liveness
  GET  /observer/stats    — total / successes / failures /
                             by_source / recent_scenario_ops
                             (matches Rust JSON shape exactly)
  POST /observer/event    — record one ObservedOp; auto-defaults
                             timestamp + source, validates required
                             fields (endpoint), persists to JSONL,
                             appends to ring buffer

Architecture:
  - internal/observer/types.go — ObservedOp model + Source taxonomy
    (mcp / scenario / langfuse / overseer_correction). Mirrors the
    Rust shape so JSON round-trips during cutover.
  - internal/observer/store.go — Store + Persistor. Ring buffer cap
    matches Rust's 2000; recent_scenarios cap matches Rust's 10.
    Same persist-then-apply order as pathwayd; same corruption-
    tolerant replay (skip malformed lines + warn).
  - cmd/observerd — :3219 HTTP service, fronted by gateway as
    /v1/observer/*.
  - lakehouse.toml + DefaultConfig — [observerd] block matches the
    pathwayd pattern (Bind + PersistPath; empty path = ephemeral).

Tests + smoke (all PASS):
  - 7 unit tests in store_test.go: validation, default fields,
    stats aggregation, recent-scenarios cap + ordering, ring-buffer
    rollover at cap, JSONL round-trip persistence, corruption-
    tolerant replay (1 valid + 1 corrupt + 1 valid → 2 applied)
  - scripts/observer_smoke.sh: 4 assertions through gateway —
    record 5 events (3 ok / 2 fail across 2 sources), stats
    aggregates correctly, empty-endpoint→400, kill+restart preserves
    via JSONL replay (5 ops, 3 ok, 2 err survive)

Deferred (named in package + cmd doc, not in this commit):
  - POST /observer/review (cloud-LLM hand-review fall-back). The
    heuristic-only path could land cheaply but the productized
    cloud path (qwen3-coder fall-back) is multi-day port.
  - Background loops: analyzeErrors, consolidatePlaybooks,
    tailOverseerCorrections (read overseer_corrections.jsonl into
    the ring buffer once per cycle).
  - escalateFailureClusterToLLMTeam (failure clustering trigger
    that posts to LLM Team's /api/run with code_review mode).

/relevance is NOT duplicated — already ported in 9588bd8 to
internal/matrix/relevance.go (component 3 of SPEC §3.4).

16-smoke regression all green (D1-D6, G1, G1P, G2, storaged_cap,
pathway, matrix, relevance, downgrade, playbook, observer).
13 binaries now: gateway, storaged, catalogd, ingestd, queryd,
vectord, embedd, pathwayd, matrixd, observerd, mcpd, fake_ollama
(plus catalogd-only test build).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-29 20:18:02 -05:00
parent 6392772f41
commit bc9ab93afe
8 changed files with 891 additions and 16 deletions

View File

@ -46,6 +46,7 @@ func main() {
"embedd_url": cfg.Gateway.EmbeddURL,
"pathwayd_url": cfg.Gateway.PathwaydURL,
"matrixd_url": cfg.Gateway.MatrixdURL,
"observerd_url": cfg.Gateway.ObserverdURL,
}
for k, v := range upstreams {
if v == "" {
@ -67,6 +68,7 @@ func main() {
embeddURL := mustParseUpstream("embedd_url", cfg.Gateway.EmbeddURL)
pathwaydURL := mustParseUpstream("pathwayd_url", cfg.Gateway.PathwaydURL)
matrixdURL := mustParseUpstream("matrixd_url", cfg.Gateway.MatrixdURL)
observerdURL := mustParseUpstream("observerd_url", cfg.Gateway.ObserverdURL)
storagedProxy := gateway.NewProxyHandler(storagedURL)
catalogdProxy := gateway.NewProxyHandler(catalogdURL)
@ -76,6 +78,7 @@ func main() {
embeddProxy := gateway.NewProxyHandler(embeddURL)
pathwaydProxy := gateway.NewProxyHandler(pathwaydURL)
matrixdProxy := gateway.NewProxyHandler(matrixdURL)
observerdProxy := gateway.NewProxyHandler(observerdURL)
if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) {
@ -98,6 +101,8 @@ func main() {
r.Handle("/v1/pathway/*", pathwaydProxy)
// Matrix indexer — /v1/matrix/* (multi-corpus retrieve+merge per SPEC §3.4)
r.Handle("/v1/matrix/*", matrixdProxy)
// Observer — /v1/observer/* (autonomous-iteration witness loop)
r.Handle("/v1/observer/*", observerdProxy)
}, cfg.Auth); err != nil {
slog.Error("server", "err", err)
os.Exit(1)

131
cmd/observerd/main.go Normal file
View File

@ -0,0 +1,131 @@
// observerd is the autonomous-iteration witness service. Port of
// the load-bearing pieces of mcp-server/observer.ts (Rust system).
//
// Routes (all under /observer):
// GET /observer/health — service liveness + ring size
// GET /observer/stats — aggregate counters + recent scenarios
// POST /observer/event — record one observed op
//
// Deferred to follow-up commits (see internal/observer doc):
// - POST /observer/review (cloud-LLM hand review fall-back)
// - background loops (analyzeErrors, consolidatePlaybooks,
// tailOverseerCorrections)
// - failure-cluster escalation to LLM Team
//
// /relevance was already ported to internal/matrix in 9588bd8 and is
// not duplicated here.
package main
import (
"encoding/json"
"errors"
"flag"
"log/slog"
"net/http"
"os"
"strings"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/observer"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
)
const maxRequestBytes = 4 << 20 // 4 MiB cap on request bodies
func main() {
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
flag.Parse()
cfg, err := shared.LoadConfig(*configPath)
if err != nil {
slog.Error("config", "err", err)
os.Exit(1)
}
// Persistence is optional — empty path = ephemeral (matches the
// pathwayd pattern). Production sets a stable path under
// /var/lib/lakehouse/observer/ops.jsonl.
var persistor *observer.Persistor
if cfg.Observerd.PersistPath != "" {
persistor, err = observer.NewPersistor(cfg.Observerd.PersistPath)
if err != nil {
slog.Error("observer persistor", "err", err)
os.Exit(1)
}
}
store := observer.NewStore(persistor)
if persistor != nil {
n, err := store.Load()
if err != nil {
slog.Warn("observer load", "err", err, "loaded", n)
} else {
slog.Info("observer loaded", "ops", n, "path", cfg.Observerd.PersistPath)
}
}
h := &handlers{store: store}
if err := shared.Run("observerd", cfg.Observerd.Bind, h.register, cfg.Auth); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
}
}
type handlers struct {
store *observer.Store
}
func (h *handlers) register(r chi.Router) {
r.Get("/observer/stats", h.handleStats)
r.Post("/observer/event", h.handleEvent)
}
func (h *handlers) handleStats(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, h.store.Stats())
}
func (h *handlers) handleEvent(w http.ResponseWriter, r *http.Request) {
var op observer.ObservedOp
if !decodeJSON(w, r, &op) {
return
}
if err := h.store.Record(op); err != nil {
if errors.Is(err, observer.ErrInvalidOp) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
slog.Error("observer record", "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
stats := h.store.Stats()
writeJSON(w, http.StatusOK, map[string]any{
"accepted": true,
"ring_size": stats.Total,
})
}
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {
defer r.Body.Close()
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
if err := json.NewDecoder(r.Body).Decode(v); err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
return false
}
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
return false
}
return true
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(v); err != nil {
slog.Warn("observer write json", "err", err)
}
}

249
internal/observer/store.go Normal file
View File

@ -0,0 +1,249 @@
package observer
// Store: in-memory ring buffer + optional JSONL persistor. Same
// shape as internal/pathway's persistor (afbb506) — opens the file
// per Append rather than holding an fd, which is fine at the
// observer's expected write rate (≤ a few hundred ops/min) and
// keeps the substrate restartable mid-stream.
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io/fs"
"log/slog"
"os"
"path/filepath"
"sync"
)
// DefaultRingCap is the in-memory ring buffer cap. Mirrors the Rust
// Phase 24 limit of 2000 (recordExternalOp shifts the head when
// length > 2000).
const DefaultRingCap = 2000
// DefaultRecentScenariosCap is how many recent source=scenario ops
// the Stats endpoint returns. Matches the TS hard-coded slice(-10).
const DefaultRecentScenariosCap = 10
// Store holds the ring buffer + the optional persistor. Thread-safe
// via a single RWMutex (read-heavy via Stats; writes via Record).
type Store struct {
mu sync.RWMutex
ring []ObservedOp
cap int
persistor *Persistor
}
// NewStore returns an empty Store. Pass nil persistor for in-memory
// only (unit tests, ephemeral runs); pass a real Persistor to enable
// jsonl-append-on-record.
func NewStore(persistor *Persistor) *Store {
return &Store{
ring: make([]ObservedOp, 0, DefaultRingCap),
cap: DefaultRingCap,
persistor: persistor,
}
}
// Record validates + persists + appends. Order matters: persist
// first so a crash mid-record doesn't leave the ring ahead of the
// log. Returns ErrInvalidOp on validation failure (no persist, no
// append).
func (s *Store) Record(op ObservedOp) error {
op.EnsureTimestamp()
op.DefaultSource()
if err := op.Validate(); err != nil {
return err
}
if s.persistor != nil {
if err := s.persistor.Append(op); err != nil {
// Best-effort persistence — log but don't fail the
// in-memory record. Mirrors the Rust catch{} in
// persistOp; the ring buffer is the source of truth in
// flight.
slog.Warn("observer: persist failed", "err", err)
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.ring = append(s.ring, op)
if len(s.ring) > s.cap {
// Shift left by one (drop oldest). Avoids unbounded growth
// without a per-write reallocation.
copy(s.ring, s.ring[1:])
s.ring = s.ring[:len(s.ring)-1]
}
return nil
}
// Recent returns a copy of the ring buffer's current state. Most
// recent entries are at the end (append-order).
func (s *Store) Recent() []ObservedOp {
s.mu.RLock()
defer s.mu.RUnlock()
out := make([]ObservedOp, len(s.ring))
copy(out, s.ring)
return out
}
// Stats aggregates the ring buffer. Mirrors the Rust /stats
// response shape exactly.
func (s *Store) Stats() Stats {
s.mu.RLock()
defer s.mu.RUnlock()
stats := Stats{
Total: len(s.ring),
BySource: make(map[string]int),
}
for _, op := range s.ring {
if op.Success {
stats.Successes++
} else {
stats.Failures++
}
src := string(op.Source)
if src == "" {
src = string(SourceMCP)
}
stats.BySource[src]++
}
// Last N scenario ops (most-recent-first → match Rust slice(-10)).
scenarios := make([]ScenarioOpDigest, 0, DefaultRecentScenariosCap)
for i := len(s.ring) - 1; i >= 0 && len(scenarios) < DefaultRecentScenariosCap; i-- {
op := s.ring[i]
if op.Source != SourceScenario {
continue
}
scenarios = append([]ScenarioOpDigest{{
TS: op.Timestamp,
OK: op.Success,
Staffer: op.StafferID,
Kind: op.EventKind,
Role: op.Role,
}}, scenarios...)
}
stats.RecentScenarios = scenarios
return stats
}
// Load replays the persistor's JSONL log into the ring buffer.
// Resets the ring (current state is discarded) — same semantics as
// pathway.Store.Load. Corruption-tolerant: malformed lines log
// warnings and the load proceeds.
//
// Returns the number of ops successfully replayed.
func (s *Store) Load() (int, error) {
if s.persistor == nil {
return 0, nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.ring = s.ring[:0]
return s.persistor.Replay(func(op ObservedOp) error {
s.ring = append(s.ring, op)
if len(s.ring) > s.cap {
copy(s.ring, s.ring[1:])
s.ring = s.ring[:len(s.ring)-1]
}
return nil
})
}
// ─── Persistor ──────────────────────────────────────────────────
// Persistor wraps a single JSONL file. Open-per-append — same
// pattern as internal/pathway. Each line is one ObservedOp.
type Persistor struct {
path string
}
// NewPersistor returns a Persistor for the given file path. Parent
// directory is created on demand. Empty path is invalid (caller
// passes nil to NewStore for the no-persist case).
func NewPersistor(path string) (*Persistor, error) {
if path == "" {
return nil, errors.New("observer: persistor path is empty")
}
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return nil, fmt.Errorf("observer: create dir: %w", err)
}
return &Persistor{path: path}, nil
}
// Path returns the file path the persistor writes to.
func (p *Persistor) Path() string { return p.path }
// Append writes one ObservedOp as a JSONL line.
func (p *Persistor) Append(op ObservedOp) error {
line, err := json.Marshal(op)
if err != nil {
return fmt.Errorf("observer: marshal op: %w", err)
}
f, err := os.OpenFile(p.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("observer: open log: %w", err)
}
defer f.Close()
if _, err := f.Write(line); err != nil {
return fmt.Errorf("observer: write op: %w", err)
}
if _, err := f.Write([]byte{'\n'}); err != nil {
return fmt.Errorf("observer: write newline: %w", err)
}
return nil
}
// Replay reads the log line-by-line and invokes apply for each op.
// Returns the count successfully applied. Missing file = 0 + nil
// (legitimate cold-start state). Malformed lines log a warning and
// the replay continues.
func (p *Persistor) Replay(apply func(ObservedOp) error) (int, error) {
f, err := os.Open(p.path)
if errors.Is(err, fs.ErrNotExist) {
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("observer: open log: %w", err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1<<20) // 1 MiB per line cap
applied, skipped, lineNo := 0, 0, 0
for scanner.Scan() {
lineNo++
raw := scanner.Bytes()
if len(raw) == 0 {
continue
}
var op ObservedOp
if err := json.Unmarshal(raw, &op); err != nil {
slog.Warn("observer: replay skipped malformed line",
"path", p.path, "line", lineNo, "err", err.Error())
skipped++
continue
}
if err := apply(op); err != nil {
slog.Warn("observer: replay apply failed",
"path", p.path, "line", lineNo, "err", err.Error())
skipped++
continue
}
applied++
}
if err := scanner.Err(); err != nil {
return applied, fmt.Errorf("observer: scan log: %w", err)
}
if skipped > 0 {
slog.Info("observer: replay completed with skips",
"path", p.path, "applied", applied, "skipped", skipped)
}
return applied, nil
}

View File

@ -0,0 +1,193 @@
package observer
import (
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func mkOp(success bool, source Source) ObservedOp {
return ObservedOp{
Timestamp: time.Now().UTC().Format(time.RFC3339),
Endpoint: "/v1/test",
InputSummary: "test op",
Success: success,
DurationMs: 42,
OutputSummary: "ok",
Source: source,
}
}
func TestRecord_RequiresEndpointAndTimestamp(t *testing.T) {
s := NewStore(nil)
bad := ObservedOp{Endpoint: ""} // EnsureTimestamp will fill, but Endpoint empty stays
if err := s.Record(bad); err == nil {
t.Error("expected error on empty endpoint")
}
good := mkOp(true, SourceMCP)
if err := s.Record(good); err != nil {
t.Errorf("good op: %v", err)
}
}
func TestRecord_DefaultsTimestampAndSource(t *testing.T) {
s := NewStore(nil)
op := ObservedOp{
Endpoint: "/x",
InputSummary: "no ts no source",
Success: true,
}
if err := s.Record(op); err != nil {
t.Fatal(err)
}
stored := s.Recent()[0]
if stored.Timestamp == "" {
t.Error("Timestamp should be defaulted")
}
if stored.Source != SourceMCP {
t.Errorf("Source: want %q, got %q", SourceMCP, stored.Source)
}
}
func TestStats_Aggregates(t *testing.T) {
s := NewStore(nil)
for i := 0; i < 5; i++ {
_ = s.Record(mkOp(true, SourceMCP))
}
for i := 0; i < 3; i++ {
_ = s.Record(mkOp(false, SourceScenario))
}
for i := 0; i < 2; i++ {
_ = s.Record(mkOp(true, SourceLangfuse))
}
st := s.Stats()
if st.Total != 10 {
t.Errorf("total: want 10, got %d", st.Total)
}
if st.Successes != 7 {
t.Errorf("successes: want 7, got %d", st.Successes)
}
if st.Failures != 3 {
t.Errorf("failures: want 3, got %d", st.Failures)
}
if st.BySource["mcp"] != 5 || st.BySource["scenario"] != 3 || st.BySource["langfuse"] != 2 {
t.Errorf("by_source mismatch: %+v", st.BySource)
}
if len(st.RecentScenarios) != 3 {
t.Errorf("recent scenarios: want 3, got %d", len(st.RecentScenarios))
}
}
func TestStats_RecentScenariosCappedAndOrdered(t *testing.T) {
s := NewStore(nil)
// Record 15 scenario ops; only the last 10 should appear.
for i := 0; i < 15; i++ {
op := mkOp(true, SourceScenario)
op.StafferID = "staffer-" + string(rune('a'+i))
_ = s.Record(op)
time.Sleep(time.Millisecond) // ensure timestamps order-distinguishable
}
st := s.Stats()
if len(st.RecentScenarios) != DefaultRecentScenariosCap {
t.Errorf("cap: want %d, got %d", DefaultRecentScenariosCap, len(st.RecentScenarios))
}
// Last entry should be the most recently added (staffer-o, the 15th).
last := st.RecentScenarios[len(st.RecentScenarios)-1]
if last.Staffer != "staffer-o" {
t.Errorf("most recent: want staffer-o, got %q", last.Staffer)
}
}
func TestRingBuffer_BoundedByDefaultCap(t *testing.T) {
s := NewStore(nil)
s.cap = 5 // shrink for testability
for i := 0; i < 12; i++ {
op := mkOp(true, SourceMCP)
op.InputSummary = string(rune('a' + i))
_ = s.Record(op)
}
r := s.Recent()
if len(r) != 5 {
t.Errorf("ring size: want 5, got %d", len(r))
}
// Oldest 7 dropped; first remaining should have InputSummary "h" (8th).
if r[0].InputSummary != "h" {
t.Errorf("oldest after rollover: want 'h', got %q", r[0].InputSummary)
}
}
func TestPersistor_RoundTrip(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "ops.jsonl")
p, err := NewPersistor(path)
if err != nil {
t.Fatal(err)
}
s := NewStore(p)
for i := 0; i < 4; i++ {
op := mkOp(i%2 == 0, SourceMCP)
op.InputSummary = string(rune('a' + i))
if err := s.Record(op); err != nil {
t.Fatal(err)
}
}
// Sanity: file has 4 lines.
bs, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
lines := strings.Split(strings.TrimSuffix(string(bs), "\n"), "\n")
if len(lines) != 4 {
t.Errorf("file lines: want 4, got %d", len(lines))
}
// Rehydrate into a fresh Store.
s2 := NewStore(p)
n, err := s2.Load()
if err != nil {
t.Fatal(err)
}
if n != 4 {
t.Errorf("loaded: want 4, got %d", n)
}
r := s2.Recent()
if len(r) != 4 {
t.Errorf("rehydrated ring: want 4, got %d", len(r))
}
// Order preserved.
for i, want := range []string{"a", "b", "c", "d"} {
if r[i].InputSummary != want {
t.Errorf("op %d: want %q, got %q", i, want, r[i].InputSummary)
}
}
}
func TestPersistor_CorruptionTolerant(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "ops.jsonl")
// Pre-seed with one valid + one corrupt + one valid line.
valid1 := `{"timestamp":"2026-04-29T12:00:00Z","endpoint":"/x","input_summary":"a","success":true,"duration_ms":1,"output_summary":"ok","source":"mcp"}`
corrupt := `{this is not json`
valid2 := `{"timestamp":"2026-04-29T12:00:01Z","endpoint":"/y","input_summary":"b","success":false,"duration_ms":2,"output_summary":"err","source":"scenario"}`
if err := os.WriteFile(path, []byte(valid1+"\n"+corrupt+"\n"+valid2+"\n"), 0o644); err != nil {
t.Fatal(err)
}
p, err := NewPersistor(path)
if err != nil {
t.Fatal(err)
}
s := NewStore(p)
n, err := s.Load()
if err != nil {
t.Fatal(err)
}
if n != 2 {
t.Errorf("applied: want 2 (valid pair), got %d (corrupt should skip)", n)
}
}

131
internal/observer/types.go Normal file
View File

@ -0,0 +1,131 @@
// Package observer is the Go port of mcp-server/observer.ts (Rust
// system, 852 lines TS) — the "third-party witness" loop that records
// every observed operation, surfaces failures, and feeds learnings
// back into the substrate.
//
// What this package owns (this commit):
// - ObservedOp data model + ring buffer + JSONL persistence
// - Stats aggregation (total / successes / failures / by_source)
// - Source taxonomy (mcp / scenario / langfuse / overseer_correction)
//
// What's deferred to follow-up commits:
// - /review endpoint with cloud-LLM hand-review (the heuristic
// plus qwen3-coder fall-back path)
// - tailOverseerCorrections (background loop reading
// overseer_corrections.jsonl)
// - analyzeErrors / consolidatePlaybooks periodic loops
// - escalateFailureClusterToLLMTeam (failure clustering trigger)
//
// /relevance was already ported in 9588bd8 (component 3 of SPEC §3.4)
// and lives in internal/matrix/relevance.go; the observer package
// doesn't re-implement it.
package observer
import (
"errors"
"time"
)
// Source is the provenance of an observed op. Empty string defaults
// to SourceMCP for back-compat with Phase 24 callers.
type Source string
const (
SourceMCP Source = "mcp"
SourceScenario Source = "scenario"
SourceLangfuse Source = "langfuse"
SourceOverseerCorrection Source = "overseer_correction"
)
// ObservedOp is one entry in the observer's ring buffer (and JSONL
// log when persistence is configured). Mirrors the Rust ObservedOp
// shape exactly so the on-wire JSON round-trips between the two
// implementations during the Rust→Go cutover.
//
// Optional fields use omitempty so absent values don't bloat the
// JSONL file. Numeric zero values are intentionally treated as
// "not set" by the JSON layer; if a real zero needs to be
// persisted, future schema-version bump can switch to pointers.
type ObservedOp struct {
Timestamp string `json:"timestamp"` // ISO 8601
Endpoint string `json:"endpoint"`
InputSummary string `json:"input_summary"`
Success bool `json:"success"`
DurationMs int64 `json:"duration_ms"`
OutputSummary string `json:"output_summary"`
Error string `json:"error,omitempty"`
Source Source `json:"source,omitempty"`
StafferID string `json:"staffer_id,omitempty"`
SigHash string `json:"sig_hash,omitempty"`
EventKind string `json:"event_kind,omitempty"`
Role string `json:"role,omitempty"`
City string `json:"city,omitempty"`
State string `json:"state,omitempty"`
Count int `json:"count,omitempty"`
RescueAttempted bool `json:"rescue_attempted,omitempty"`
RescueSucceeded bool `json:"rescue_succeeded,omitempty"`
TaskClass string `json:"task_class,omitempty"`
Correction string `json:"correction,omitempty"`
AppliedAtTurn int `json:"applied_at_turn,omitempty"`
}
// Stats is the aggregated view of the ring buffer — useful for
// dashboards and the GET /stats endpoint. RecentScenarios holds the
// most recent N source=scenario ops (default cap 10) so operators
// can see what the staffing scenarios are emitting at a glance.
type Stats struct {
Total int `json:"total"`
Successes int `json:"successes"`
Failures int `json:"failures"`
BySource map[string]int `json:"by_source"`
RecentScenarios []ScenarioOpDigest `json:"recent_scenario_ops"`
}
// ScenarioOpDigest is the slim per-op shape returned in
// Stats.RecentScenarios — matches the TS digest exactly:
// {ts, ok, staffer, kind, role}.
type ScenarioOpDigest struct {
TS string `json:"ts"`
OK bool `json:"ok"`
Staffer string `json:"staffer"`
Kind string `json:"kind"`
Role string `json:"role"`
}
// Errors surfaced to HTTP handlers.
var (
ErrInvalidOp = errors.New("observer: invalid op (timestamp + endpoint required)")
)
// Validate returns an error if required fields are missing. Called
// by Record before the op is added to the ring buffer.
func (op ObservedOp) Validate() error {
if op.Timestamp == "" {
return ErrInvalidOp
}
if op.Endpoint == "" {
return ErrInvalidOp
}
return nil
}
// EnsureTimestamp populates Timestamp with the current UTC ISO 8601
// time if it's empty. Useful for HTTP handlers that take the body
// as authoritative but need to default the timestamp when absent.
func (op *ObservedOp) EnsureTimestamp() {
if op.Timestamp == "" {
op.Timestamp = time.Now().UTC().Format(time.RFC3339)
}
}
// DefaultSource sets Source to SourceMCP if empty. Mirrors the Rust
// `op.source ?? "mcp"` pattern in recordExternalOp.
func (op *ObservedOp) DefaultSource() {
if op.Source == "" {
op.Source = SourceMCP
}
}

View File

@ -26,9 +26,10 @@ type Config struct {
Queryd QuerydConfig `toml:"queryd"`
Vectord VectordConfig `toml:"vectord"`
Embedd EmbeddConfig `toml:"embedd"`
Pathwayd PathwaydConfig `toml:"pathwayd"`
Matrixd MatrixdConfig `toml:"matrixd"`
S3 S3Config `toml:"s3"`
Pathwayd PathwaydConfig `toml:"pathwayd"`
Matrixd MatrixdConfig `toml:"matrixd"`
Observerd ObserverdConfig `toml:"observerd"`
S3 S3Config `toml:"s3"`
Log LogConfig `toml:"log"`
Auth AuthConfig `toml:"auth"`
}
@ -52,19 +53,20 @@ type IngestConfig struct {
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
// Each route family (/v1/storage, /v1/catalog, /v1/ingest, /v1/sql,
// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix) has its own
// upstream so we can scale services independently or move them to
// different boxes without touching gateway code.
// /v1/vectors, /v1/embed, /v1/pathway, /v1/matrix, /v1/observer)
// has its own upstream so we can scale services independently or
// move them to different boxes without touching gateway code.
type GatewayConfig struct {
Bind string `toml:"bind"`
StoragedURL string `toml:"storaged_url"`
CatalogdURL string `toml:"catalogd_url"`
IngestdURL string `toml:"ingestd_url"`
QuerydURL string `toml:"queryd_url"`
VectordURL string `toml:"vectord_url"`
EmbeddURL string `toml:"embedd_url"`
PathwaydURL string `toml:"pathwayd_url"`
MatrixdURL string `toml:"matrixd_url"`
Bind string `toml:"bind"`
StoragedURL string `toml:"storaged_url"`
CatalogdURL string `toml:"catalogd_url"`
IngestdURL string `toml:"ingestd_url"`
QuerydURL string `toml:"queryd_url"`
VectordURL string `toml:"vectord_url"`
EmbeddURL string `toml:"embedd_url"`
PathwaydURL string `toml:"pathwayd_url"`
MatrixdURL string `toml:"matrixd_url"`
ObserverdURL string `toml:"observerd_url"`
}
// EmbeddConfig drives the embed service. ProviderURL points at the
@ -108,6 +110,16 @@ type MatrixdConfig struct {
VectordURL string `toml:"vectord_url"`
}
// ObserverdConfig drives the observer service (cmd/observerd).
// PersistPath: file path to the JSONL ops log; empty = in-memory
// only (test/dev). Production sets a stable path under
// /var/lib/lakehouse/observer/ops.jsonl so ops survive restart.
// Mirrors the PathwaydConfig pattern.
type ObserverdConfig struct {
Bind string `toml:"bind"`
PersistPath string `toml:"persist_path"`
}
// QuerydConfig adds queryd-specific knobs. queryd talks DuckDB
// directly to MinIO via DuckDB's httpfs extension (so no storaged
// URL needed), and reads the catalog over HTTP for view registration.
@ -185,7 +197,8 @@ func DefaultConfig() Config {
VectordURL: "http://127.0.0.1:3215",
EmbeddURL: "http://127.0.0.1:3216",
PathwaydURL: "http://127.0.0.1:3217",
MatrixdURL: "http://127.0.0.1:3218",
MatrixdURL: "http://127.0.0.1:3218",
ObserverdURL: "http://127.0.0.1:3219",
},
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
@ -215,6 +228,10 @@ func DefaultConfig() Config {
EmbeddURL: "http://127.0.0.1:3216",
VectordURL: "http://127.0.0.1:3215",
},
Observerd: ObserverdConfig{
Bind: "127.0.0.1:3219",
// PersistPath empty by default = in-memory only.
},
Queryd: QuerydConfig{
Bind: "127.0.0.1:3214",
CatalogdURL: "http://127.0.0.1:3212",

View File

@ -14,6 +14,7 @@ vectord_url = "http://127.0.0.1:3215"
embedd_url = "http://127.0.0.1:3216"
pathwayd_url = "http://127.0.0.1:3217"
matrixd_url = "http://127.0.0.1:3218"
observerd_url = "http://127.0.0.1:3219"
[storaged]
bind = "127.0.0.1:3211"
@ -63,6 +64,12 @@ bind = "127.0.0.1:3218"
embedd_url = "http://127.0.0.1:3216"
vectord_url = "http://127.0.0.1:3215"
[observerd]
bind = "127.0.0.1:3219"
# Empty = in-memory only (dev/test). Production sets a path under
# /var/lib/lakehouse/observer/ops.jsonl so ops survive restart.
persist_path = ""
[s3]
endpoint = "http://localhost:9000"
region = "us-east-1"

142
scripts/observer_smoke.sh Executable file
View File

@ -0,0 +1,142 @@
#!/usr/bin/env bash
# Observer smoke — autonomous-iteration witness service end-to-end.
# All assertions go through gateway :3110.
#
# Validates:
# - POST /observer/event records an op (success path + scenario source)
# - GET /observer/stats aggregates by source + counts successes/failures
# - Stats.recent_scenario_ops surfaces scenario digests
# - Validation: empty endpoint → 400
# - Persistence: kill+restart observerd preserves ops via JSONL replay
set -euo pipefail
cd "$(dirname "$0")/.."
export PATH="$PATH:/usr/local/go/bin"
echo "[observer-smoke] building observerd + gateway..."
go build -o bin/ ./cmd/observerd ./cmd/gateway
pkill -f "bin/(observerd|gateway)" 2>/dev/null || true
sleep 0.3
PIDS=()
TMP="$(mktemp -d)"
PERSIST="$TMP/ops.jsonl"
CFG="$TMP/observer.toml"
cleanup() {
echo "[observer-smoke] cleanup"
for p in "${PIDS[@]}"; do [ -n "$p" ] && kill "$p" 2>/dev/null || true; done
rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
cat > "$CFG" <<EOF
[gateway]
bind = "127.0.0.1:3110"
storaged_url = "http://127.0.0.1:3211"
catalogd_url = "http://127.0.0.1:3212"
ingestd_url = "http://127.0.0.1:3213"
queryd_url = "http://127.0.0.1:3214"
vectord_url = "http://127.0.0.1:3215"
embedd_url = "http://127.0.0.1:3216"
pathwayd_url = "http://127.0.0.1:3217"
matrixd_url = "http://127.0.0.1:3218"
observerd_url = "http://127.0.0.1:3219"
[observerd]
bind = "127.0.0.1:3219"
persist_path = "$PERSIST"
EOF
poll_health() {
local port="$1" deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 "http://127.0.0.1:$port/health" >/dev/null 2>&1; then return 0; fi
sleep 0.05
done
return 1
}
launch_observerd() {
./bin/observerd -config "$CFG" > /tmp/observerd.log 2>&1 &
OBSERVERD_PID=$!
PIDS+=($OBSERVERD_PID)
poll_health 3219 || { echo "observerd failed"; tail /tmp/observerd.log; return 1; }
}
echo "[observer-smoke] launching observerd → gateway..."
launch_observerd
./bin/gateway -config "$CFG" > /tmp/gateway.log 2>&1 &
PIDS+=($!)
poll_health 3110 || { echo "gateway failed"; tail /tmp/gateway.log; exit 1; }
FAILED=0
# ── 1. Record 5 ops: 3 success + 2 fail across 2 sources ─────────
echo "[observer-smoke] record 5 ops:"
for i in 1 2 3; do
curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/observer/event \
-H 'Content-Type: application/json' \
-d "{\"endpoint\":\"/v1/test\",\"input_summary\":\"ok-$i\",\"success\":true,\"duration_ms\":10,\"output_summary\":\"ok\",\"source\":\"mcp\"}"
done
for i in 1 2; do
curl -sS -o /dev/null -X POST http://127.0.0.1:3110/v1/observer/event \
-H 'Content-Type: application/json' \
-d "{\"endpoint\":\"/v1/test\",\"input_summary\":\"fail-$i\",\"success\":false,\"duration_ms\":10,\"output_summary\":\"err\",\"error\":\"boom\",\"source\":\"scenario\",\"staffer_id\":\"st-$i\",\"event_kind\":\"fill\",\"role\":\"Forklift\"}"
done
echo " ✓ 5 events posted"
# ── 2. Stats aggregation ─────────────────────────────────────────
echo "[observer-smoke] /observer/stats aggregates correctly:"
STATS="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)"
TOT="$(echo "$STATS" | jq -r '.total')"
OK="$(echo "$STATS" | jq -r '.successes')"
ERR="$(echo "$STATS" | jq -r '.failures')"
MCP="$(echo "$STATS" | jq -r '.by_source.mcp')"
SCEN="$(echo "$STATS" | jq -r '.by_source.scenario')"
RECENT_LEN="$(echo "$STATS" | jq -r '.recent_scenario_ops | length')"
if [ "$TOT" = "5" ] && [ "$OK" = "3" ] && [ "$ERR" = "2" ] && [ "$MCP" = "3" ] && [ "$SCEN" = "2" ] && [ "$RECENT_LEN" = "2" ]; then
echo " ✓ total=5 (3 ok + 2 fail) · by_source: mcp=3 scenario=2 · 2 scenario digests"
else
echo " ✗ total=$TOT ok=$OK err=$ERR mcp=$MCP scen=$SCEN recent=$RECENT_LEN"
echo " full: $STATS"
FAILED=1
fi
# ── 3. Validation: empty endpoint → 400 ──────────────────────────
echo "[observer-smoke] empty endpoint → 400:"
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/observer/event \
-H 'Content-Type: application/json' \
-d '{"endpoint":"","input_summary":"x","success":true,"duration_ms":1,"output_summary":"x"}')"
if [ "$HTTP" = "400" ]; then
echo " ✓ empty endpoint rejected"
else
echo " ✗ got $HTTP"; FAILED=1
fi
# ── 4. Persistence: kill + restart preserves ops ─────────────────
echo "[observer-smoke] kill + restart observerd → ops survive:"
kill $OBSERVERD_PID 2>/dev/null || true
wait $OBSERVERD_PID 2>/dev/null || true
sleep 0.3
launch_observerd
sleep 0.2
STATS2="$(curl -sS http://127.0.0.1:3110/v1/observer/stats)"
TOT2="$(echo "$STATS2" | jq -r '.total')"
OK2="$(echo "$STATS2" | jq -r '.successes')"
ERR2="$(echo "$STATS2" | jq -r '.failures')"
if [ "$TOT2" = "5" ] && [ "$OK2" = "3" ] && [ "$ERR2" = "2" ]; then
echo " ✓ total=5 ok=3 err=2 preserved through restart"
else
echo " ✗ post-restart total=$TOT2 ok=$OK2 err=$ERR2"; FAILED=1
fi
if [ "$FAILED" -eq 0 ]; then
echo "[observer-smoke] Observer acceptance gate: PASSED"
exit 0
else
echo "[observer-smoke] Observer acceptance gate: FAILED"
exit 1
fi