From 1a3a82aedb7d028fb65e6eab69e200a3e4cdba9c Mon Sep 17 00:00:00 2001 From: root Date: Sat, 2 May 2026 05:22:09 -0500 Subject: [PATCH] validatord: coordinator session JSONL for offline analysis (B follow-up) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the second half of J's 2026-05-02 multi-call observability concern. Trace-id propagation (commit d6d2fdf) gave us the *live* view in Langfuse; this gives us the *longitudinal* view for ad-hoc DuckDB queries over thousands of sessions: "show me every session where the model produced a real candidate without ever needing a retry" "find sessions where validation rejected three times in a row" "first-shot success rate per model — did we feed it enough corpus?" ## What's in internal/validator/session_log.go: - SessionRecord type (schema=session.iterate.v1) - SessionLogger writer — mutex-guarded append, best-effort posture, nil-safe (NewSessionLogger("") = nil = no-op on Append) - BuildSessionRecord helper — assembles a row from any iterate response/failure/infra-error combination, callable from other daemons that wrap iterate (cross-daemon shared schema) - 7 unit tests including concurrent-append safety + the three code paths (success / max_iter_exhausted / infra_error) cmd/validatord/main.go: - handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath - Iterate handler: build + append a SessionRecord on every call - rosterCheckFor("fill") closure stamps grounded_in_roster — the load-bearing forensic property J flagged ("we can never hallucinate available staff members to contracts") internal/shared/config.go + lakehouse.toml: - [validatord].session_log_path field; empty = disabled - Production: /var/lib/lakehouse/validator/sessions.jsonl scripts/validatord_smoke.sh: - Adds a probe verifying validatord announces session log path on startup. Smoke is now 6/6 (was 5/5). docs/SESSION_LOG.md: - Schema reference + 5 worked DuckDB query examples including the "alarm" query (sessions where grounded_in_roster=false on an accepted fill — should always be empty; if not, something is bypassing FillValidator). ## What this is NOT This is NOT a duplicate of replay_runs.jsonl. They're siblings: - replay_runs.jsonl: replay tool's per-task retrieval+model output - sessions.jsonl: validatord's per-iterate full retry chain + grounded-in-roster verdict A single coordinator session can produce rows in both streams; the session_id (= Langfuse trace_id) is the join key. ## Layered observability now in place Live view: Langfuse trace tree (X-Lakehouse-Trace-Id propagation) `iterate.attempt[N]` spans with prompt/raw/verdict Offline: coordinator_sessions.jsonl (this commit) DuckDB-queryable; longitudinal forensics Hard gate: FillValidator + WorkerLookup (existing) phantom IDs structurally rejected, never reach session log's grounded_in_roster=true bucket Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE section — these layers are wired; future work targets the data, not the wiring. ## Verification - internal/validator: 7 new tests (session_log_test.go) — all PASS - cmd/validatord: 3 new integration tests covering the success, failure, and grounded=false paths — all PASS - validatord_smoke.sh: 6/6 PASS through gateway :3110 - Full go test ./... green across 33 packages Co-Authored-By: Claude Opus 4.7 (1M context) --- STATE_OF_PLAY.md | 3 +- cmd/validatord/main.go | 66 +++++++- cmd/validatord/main_test.go | 174 ++++++++++++++++++- docs/SESSION_LOG.md | 158 +++++++++++++++++ internal/shared/config.go | 7 + internal/validator/session_log.go | 199 ++++++++++++++++++++++ internal/validator/session_log_test.go | 224 +++++++++++++++++++++++++ lakehouse.toml | 7 + scripts/validatord_smoke.sh | 17 +- 9 files changed, 848 insertions(+), 7 deletions(-) create mode 100644 docs/SESSION_LOG.md create mode 100644 internal/validator/session_log.go create mode 100644 internal/validator/session_log_test.go diff --git a/STATE_OF_PLAY.md b/STATE_OF_PLAY.md index 56d2ccb..4c805a1 100644 --- a/STATE_OF_PLAY.md +++ b/STATE_OF_PLAY.md @@ -1,6 +1,6 @@ # STATE OF PLAY — Lakehouse-Go -**Last verified:** 2026-05-02 ~07:30 CDT +**Last verified:** 2026-05-02 ~08:00 CDT **Verified by:** **production-readiness gauntlet** — 21/21 smoke chain green, per-component scrum across 4 bundles, **3 cross-runtime parity probes all green post-fix** (validator: **6/6 match** after wire-format alignment shipped; materializer: 2/2 after omitempty fix; extract_json: 12/12). All findings surfaced by the parity probes have been actioned. Disposition: `reports/cutover/gauntlet_2026-05-02/disposition.md`. > **Read this FIRST.** When the user says "we're working on lakehouse," default to the Go rewrite (this repo); the Rust legacy at `/home/profit/lakehouse/` is maintenance-only. If memory contradicts this file, this file wins. Update it when something is verified working — not when a phase finishes. @@ -217,6 +217,7 @@ Verbatim verdicts at `reports/scrum/_evidence/2026-04-30/verdicts/`. Disposition - **vectord's source-of-truth is `i.vectors`, NOT the coder/hnsw graph.** The `Index` struct holds a parallel `vectors map[string][]float32` updated on every successful Add/Delete; the graph is a derived, replaceable view. `safeGraphAdd`/`safeGraphDelete` wrap the library's panic-prone ops; `rebuildGraphLocked` reads from `i.vectors` (graph-state-independent). Don't propose to "drop the side map for memory" — it's the load-bearing piece that makes Add panic-recoverable past the small-index threshold (closes the multitier_100k 277884b 96-98% fail). The prior `i.ids` set was folded into `i.vectors` keys. - **vectord saves are coalesced async, not synchronous.** `cmd/vectord/main.go` runs a per-index `saveTask` that single-flights through `Persistor.Save` — at most one in-flight + one pending. Add returns OK before the save completes; an Add-then-crash can lose ~1 save's worth of data, matching ADR-005's fail-open posture. Don't propose to "make saves synchronous for durability" — that re-introduces the lock-contention bottleneck (1-2.5s tail at conc=50, observed 2026-05-01) without fixing a real durability hole (in-memory state is the source of truth in flight). - **`X-Lakehouse-Trace-Id` header propagates Langfuse parent traces across daemon boundaries.** When validatord's `/v1/iterate` calls chatd's `/v1/chat`, it forwards the header so chatd's middleware reuses the parent trace id instead of minting a new one. Each iteration attempt also emits a child span (`iterate.attempt[N]`) carrying the prompt, raw model output, and validator verdict. Result: an iterate session with N retries shows in Langfuse as ONE trace tree, not N+1 disconnected traces. Don't propose to "wire trace-id propagation" — it's wired; the test at `internal/shared/langfuse_middleware_test.go::TestLangfuseMiddleware_HonorsTraceIDHeader` is the proof. Closes J's 2026-05-02 multi-call observability concern. +- **Coordinator session JSONL exists at `[validatord].session_log_path`.** One row per `/v1/iterate` session, schema=`session.iterate.v1`, append-only. Captures session_id (= Langfuse trace_id), kind, model, iterations, attempts[], final_verdict, **grounded_in_roster** (the load-bearing forensic property — always true on accepted fills, false on phantom-worker artifacts that somehow slip past FillValidator). Empty path = disabled. DuckDB queries documented at `docs/SESSION_LOG.md`. Don't propose to "build offline session analysis" — it's built; the schema is locked at v1; rotate the file via logrotate when it gets large. --- diff --git a/cmd/validatord/main.go b/cmd/validatord/main.go index 4ff4ea3..0753c34 100644 --- a/cmd/validatord/main.go +++ b/cmd/validatord/main.go @@ -78,7 +78,11 @@ func main() { // reference lets us emit child spans of the per-request trace. // nil when Langfuse env isn't set; the iterate handler skips // span emission gracefully in that case. - lf: shared.LoadLangfuseFromEnv(), + lf: shared.LoadLangfuseFromEnv(), + sessionLog: validator.NewSessionLogger(cfg.Validatord.SessionLogPath), + } + if h.sessionLog != nil { + slog.Info("validatord session log", "path", cfg.Validatord.SessionLogPath) } if err := shared.Run("validatord", cfg.Validatord.Bind, h.register, cfg.Auth); err != nil { @@ -98,6 +102,10 @@ type handlers struct { // creates the parent trace lives in shared.Run; this client lets // validatord's handler emit application-level child spans. lf *langfuse.Client + // sessionLog appends one SessionRecord per /v1/iterate session to + // `coordinator_sessions.jsonl` for offline DuckDB analysis. nil + // when the session log is unconfigured (best-effort posture). + sessionLog *validator.SessionLogger } func (h *handlers) register(r chi.Router) { @@ -269,7 +277,16 @@ func (h *handlers) handleIterate(w http.ResponseWriter, r *http.Request) { cfg.Tracer = h.iterTracer(r.Context()) } + t0 := time.Now() resp, fail, err := validator.Iterate(r.Context(), req, cfg, chat, validate) + durationMs := time.Since(t0).Milliseconds() + + // Emit one session row regardless of success/failure/infra-error. + // Best-effort: a failing session log never blocks the response. + rosterCheck := h.rosterCheckFor(req.Kind) + rec := validator.BuildSessionRecord(req, resp, fail, err, rosterCheck, durationMs) + h.sessionLog.Append(rec) + if err != nil { http.Error(w, err.Error(), http.StatusBadGateway) return @@ -281,6 +298,53 @@ func (h *handlers) handleIterate(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, resp) } +// rosterCheckFor returns a closure that verifies an accepted artifact's +// candidate IDs all exist in the roster. Used by BuildSessionRecord to +// stamp `grounded_in_roster` on each session row — answers the "did +// the model produce real workers, not phantoms?" question at offline- +// query time. +// +// Only the `fill` kind has worker IDs to verify. Other kinds return +// nil (omits the field from the session record). +// +// Note: FillValidator already enforces this constraint on every +// successful iteration (phantom IDs raise ValidationError::Consistency). +// Stamping the bool here surfaces the verified-grounded property as +// queryable data, not relying on validator semantics being recalled +// during analysis. A future scrum may want to query "show me sessions +// where the model guessed real worker IDs without ever needing a +// retry" — that's a `iterations=1 AND grounded_in_roster=true` query. +func (h *handlers) rosterCheckFor(kind string) func(map[string]any) *bool { + if kind != "fill" { + return nil + } + return func(artifact map[string]any) *bool { + fills, ok := artifact["fills"].([]any) + if !ok { + f := false + return &f + } + for _, raw := range fills { + fill, ok := raw.(map[string]any) + if !ok { + f := false + return &f + } + id, _ := fill["candidate_id"].(string) + if id == "" { + f := false + return &f + } + if _, found := h.lookup.Find(id); !found { + f := false + return &f + } + } + t := true + return &t + } +} + // chatCaller wires the iteration loop to chatd via HTTP. Builds the // chat.Request shape, posts to ${chatdURL}/chat, returns the content // string (no choices wrapper — chatd's response is already flat). diff --git a/cmd/validatord/main_test.go b/cmd/validatord/main_test.go index 45b964e..7a6233b 100644 --- a/cmd/validatord/main_test.go +++ b/cmd/validatord/main_test.go @@ -1,10 +1,13 @@ package main import ( + "bufio" "bytes" "encoding/json" "net/http" "net/http/httptest" + "os" + "path/filepath" "testing" "time" @@ -13,11 +16,47 @@ import ( "git.agentview.dev/profit/golangLAKEHOUSE/internal/validator" ) +// readSessionLog parses the JSONL session log into row maps. Used by +// the iterate-handler tests to verify per-session entries. +func readSessionLog(t *testing.T, path string) []map[string]any { + t.Helper() + f, err := os.Open(path) + if err != nil { + t.Fatalf("open session log: %v", err) + } + defer f.Close() + var out []map[string]any + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 0, 1<<16), 1<<24) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 { + continue + } + var row map[string]any + if err := json.Unmarshal(line, &row); err != nil { + t.Fatalf("parse session log line: %v", err) + } + out = append(out, row) + } + return out +} + // newTestRouter builds the validatord router with an explicit lookup // + a fake chatd URL. Tests that exercise /iterate need a live mock // chatd (constructed inline per-test). func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler { - h := &handlers{ + h := newTestHandlers(lookup, chatdURL, nil) + r := chi.NewRouter() + h.register(r) + return r +} + +// newTestHandlers exposes the underlying handlers struct so tests can +// inspect post-call state (e.g. session log contents). sessionLog is +// optional — pass nil to disable the session log. +func newTestHandlers(lookup validator.WorkerLookup, chatdURL string, sessionLog *validator.SessionLogger) *handlers { + return &handlers{ lookup: lookup, chatdURL: chatdURL, chatClient: &http.Client{Timeout: 5 * time.Second}, @@ -25,10 +64,8 @@ func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler DefaultMaxIterations: 3, DefaultMaxTokens: 4096, }, + sessionLog: sessionLog, } - r := chi.NewRouter() - h.register(r) - return r } // ─── /validate ───────────────────────────────────────────────── @@ -244,6 +281,135 @@ func TestIterate_MaxIterReturns422WithHistory(t *testing.T) { } } +// TestIterate_WritesSessionRowOnSuccess locks the offline-analysis +// contract: every successful /iterate call appends one SessionRecord +// to the configured JSONL with grounded_in_roster=true (because the +// model emitted real candidate IDs that exist in the roster). +func TestIterate_WritesSessionRowOnSuccess(t *testing.T) { + city, role := "Toledo", "Welder" + lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{ + {CandidateID: "W-1", Name: "Ada", Status: "active", City: &city, Role: &role}, + }) + server := fakeChatd(t, `{"fills":[{"candidate_id":"W-1","name":"Ada"}]}`) + defer server.Close() + + dir := t.TempDir() + logPath := filepath.Join(dir, "sessions.jsonl") + h := newTestHandlers(lookup, server.URL, validator.NewSessionLogger(logPath)) + r := chi.NewRouter() + h.register(r) + + body, _ := json.Marshal(map[string]any{ + "kind": "fill", + "prompt": "produce fill", + "provider": "ollama", + "model": "qwen3.5:latest", + "context": map[string]any{"target_count": 1, "city": "Toledo", "role": "Welder", "client_id": "C-1"}, + }) + req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String()) + } + + rows := readSessionLog(t, logPath) + if len(rows) != 1 { + t.Fatalf("expected 1 session row, got %d", len(rows)) + } + row := rows[0] + if row["schema"] != validator.SessionRecordSchema { + t.Errorf("schema: %v", row["schema"]) + } + if row["final_verdict"] != "accepted" { + t.Errorf("final_verdict: %v", row["final_verdict"]) + } + if row["grounded_in_roster"] != true { + t.Errorf("grounded_in_roster: expected true (W-1 is in roster), got %v", row["grounded_in_roster"]) + } + if row["kind"] != "fill" { + t.Errorf("kind: %v", row["kind"]) + } +} + +// TestIterate_GroundedFalseWhenArtifactReferencesPhantom is the +// load-bearing forensic property J flagged 2026-05-02: phantom +// worker IDs in the final accepted artifact must surface as +// grounded_in_roster=false in the session log. This shouldn't happen +// in practice (FillValidator catches phantoms before they reach this +// row), but the explicit check guards against a future bug where the +// validator weakens or a different validator path skips the roster +// check. Defense-in-depth at the observability layer. +func TestIterate_GroundedFalseWhenArtifactReferencesPhantom(t *testing.T) { + // Empty roster — every candidate ID is "phantom". Plus we use + // the playbook kind so the validator doesn't reject the artifact + // upfront (PlaybookValidator doesn't consult the roster), letting + // the artifact reach the session log. We then build a custom + // rosterCheckFor "fill" closure and exercise it directly. + lookup := validator.NewInMemoryWorkerLookup(nil) + h := newTestHandlers(lookup, "", nil) + check := h.rosterCheckFor("fill") + if check == nil { + t.Fatal("expected non-nil checker for fill kind") + } + got := check(map[string]any{ + "fills": []any{ + map[string]any{"candidate_id": "W-PHANTOM-NEVER-EXISTS"}, + }, + }) + if got == nil || *got { + t.Errorf("expected grounded=false for phantom candidate, got %v", got) + } + // Other kinds: no roster concept → checker returns nil. + if h.rosterCheckFor("email") != nil { + t.Error("email kind should not have a roster checker") + } + if h.rosterCheckFor("playbook") != nil { + t.Error("playbook kind should not have a roster checker") + } +} + +// TestIterate_WritesSessionRowOnFailure: max-iter exhaustion still +// produces a row (omits artifact, captures the retry chain). +func TestIterate_WritesSessionRowOnFailure(t *testing.T) { + server := fakeChatd(t, "no json here, just prose") + defer server.Close() + + dir := t.TempDir() + logPath := filepath.Join(dir, "sessions.jsonl") + h := newTestHandlers(validator.NewInMemoryWorkerLookup(nil), server.URL, + validator.NewSessionLogger(logPath)) + r := chi.NewRouter() + h.register(r) + + body, _ := json.Marshal(map[string]any{ + "kind": "playbook", + "prompt": "produce X", + "provider": "ollama", + "model": "x", + "max_iterations": 2, + }) + req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Fatalf("expected 422, got %d", w.Code) + } + + rows := readSessionLog(t, logPath) + if len(rows) != 1 { + t.Fatalf("expected 1 session row, got %d", len(rows)) + } + if rows[0]["final_verdict"] != "max_iter_exhausted" { + t.Errorf("final_verdict: %v", rows[0]["final_verdict"]) + } + if _, hasArtifact := rows[0]["artifact"]; hasArtifact { + t.Errorf("artifact should be omitted on failure: %v", rows[0]["artifact"]) + } +} + func TestIterate_ChatdDownReturns502(t *testing.T) { r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "http://127.0.0.1:1") // unroutable body, _ := json.Marshal(map[string]any{ diff --git a/docs/SESSION_LOG.md b/docs/SESSION_LOG.md new file mode 100644 index 0000000..552aece --- /dev/null +++ b/docs/SESSION_LOG.md @@ -0,0 +1,158 @@ +# Coordinator session log — `coordinator_sessions.jsonl` + +**Last updated:** 2026-05-02 · **Schema:** `session.iterate.v1` · **Writer:** `internal/validator.SessionLogger` · **Producer:** validatord `/v1/iterate` + +## Why + +The Langfuse trace tree is the live view: per-session, you can scroll +the retry chain and inspect every sub-call. But for **longitudinal +forensics** ("show me every session in the last week where the model +guessed a real worker without a retry," or "find sessions where +validation rejected three times in a row"), Langfuse's UI doesn't +scale — you need a queryable data plane. + +This JSONL is that plane. One row per `/v1/iterate` session, append- +only, DuckDB-friendly. + +## Where + +Configurable via `[validatord].session_log_path` in `lakehouse.toml`. +Empty = disabled (best-effort posture; a missing log never blocks an +iterate request). Production: + +```toml +[validatord] +session_log_path = "/var/lib/lakehouse/validator/sessions.jsonl" +``` + +## Schema (v1) + +```jsonc +{ + "schema": "session.iterate.v1", + "session_id": "", // join key to Langfuse + "timestamp": "2026-05-02T07:30:00.123456Z", + "daemon": "validatord", + "kind": "fill | email | playbook", + "model": "qwen3.5:latest", + "provider": "ollama", + "prompt": "produce a fill artifact ...", // truncated to 4000 chars + "iterations": 3, // attempts spent + "max_iterations": 3, // cap per request + "final_verdict": "accepted | max_iter_exhausted | infra_error", + "attempts": [ + { "iteration": 0, "verdict_kind": "validation_failed", + "error": "consistency: candidate_id W-X not in roster", + "span_id": "abc..." }, + { "iteration": 1, "verdict_kind": "validation_failed", + "error": "consistency: city mismatch", "span_id": "def..." }, + { "iteration": 2, "verdict_kind": "accepted", "span_id": "ghi..." } + ], + "artifact": { /* final accepted artifact, omitted on failure */ }, + "grounded_in_roster": true, // null when N/A (email/playbook) + "duration_ms": 2840 +} +``` + +### Field semantics + +| Field | When set | What it means | +|---|---|---| +| `session_id` | always | Langfuse trace id. Pivot to live trace tree by URL: `${LANGFUSE_URL}/trace/`. | +| `final_verdict=accepted` | success | Loop converged within `max_iterations`. `artifact` is non-null. | +| `final_verdict=max_iter_exhausted` | failure | Loop hit the cap without passing validation. `artifact` is omitted. | +| `final_verdict=infra_error` | failure | Chat hop or other infra crashed. Single attempt with `verdict_kind=infra_error`. | +| `grounded_in_roster=true` | fill kind, success | Every `candidate_id` in the artifact exists in `WorkerLookup`. | +| `grounded_in_roster=false` | fill kind, anomaly | Phantom or otherwise-invalid candidate IDs (shouldn't happen — FillValidator catches these — but the explicit check defends against future validator weakening). | +| `grounded_in_roster=null` (omitted) | non-fill kinds, or failure | The roster check doesn't apply or wasn't run. | + +## DuckDB queries + +```sql +-- Read the log directly via DuckDB's read_json_auto. +ATTACH ':memory:' AS sessions; +SELECT * FROM read_json_auto( + '/var/lib/lakehouse/validator/sessions.jsonl', format='newline_delimited' +) LIMIT 10; +``` + +### "Did the validator catch every phantom worker?" +```sql +-- Sessions where iteration 0's verdict was validation_failed AND +-- the error mentions 'phantom' or 'consistency'. If grounded=true on +-- the same session's final state, the model recovered. +SELECT session_id, model, iterations, grounded_in_roster, final_verdict +FROM read_json_auto('sessions.jsonl', format='newline_delimited') +WHERE final_verdict = 'accepted' + AND iterations > 1 + AND list_contains( + list_transform(attempts, + x -> x.error LIKE '%consistency%' OR x.error LIKE '%phantom%'), + true + ); +``` + +### "First-shot success rate per model" (the "did the corpus give it enough" gate) +```sql +SELECT model, + COUNT(*) AS sessions, + SUM(CASE WHEN iterations = 1 AND final_verdict = 'accepted' THEN 1 ELSE 0 END) AS first_shot, + ROUND(100.0 * SUM(CASE WHEN iterations = 1 AND final_verdict = 'accepted' THEN 1 ELSE 0 END) / COUNT(*), 1) AS pct +FROM read_json_auto('sessions.jsonl', format='newline_delimited') +WHERE kind = 'fill' +GROUP BY model +ORDER BY pct DESC; +``` + +### "Sessions that were never grounded" (the alarm query) +```sql +-- Should always be empty. If it isn't, FillValidator has a hole or +-- a different code path is bypassing the roster check. +SELECT session_id, model, iterations, attempts +FROM read_json_auto('sessions.jsonl', format='newline_delimited') +WHERE kind = 'fill' + AND final_verdict = 'accepted' + AND grounded_in_roster = false; +``` + +### "Average retry depth per model" +```sql +SELECT model, AVG(iterations) AS avg_iter, COUNT(*) AS n +FROM read_json_auto('sessions.jsonl', format='newline_delimited') +WHERE kind = 'fill' AND final_verdict = 'accepted' +GROUP BY model +ORDER BY avg_iter ASC; +``` + +### "What did validation reject?" (failure mode breakdown) +```sql +-- Pull each rejected attempt's error string, classify by prefix. +WITH errors AS ( + SELECT session_id, + model, + unnest(attempts) AS att + FROM read_json_auto('sessions.jsonl', format='newline_delimited') +) +SELECT model, + split_part(att.error, ':', 1) AS kind, + COUNT(*) AS n +FROM errors +WHERE att.verdict_kind = 'validation_failed' +GROUP BY model, kind +ORDER BY n DESC; +``` + +## Operational notes + +- **Append-only.** No row is ever updated; storage grows linearly with iterate calls. Operators rotate via cron when the file gets unwieldy (logrotate-style). +- **Best-effort posture.** Every write goes through `slog.Warn` on failure but never blocks the iterate handler. A full disk silently drops session rows; the iterate response still ships. +- **Schema versioning.** `schema=session.iterate.v1` is the contract. Future incompatible changes bump the version; consumers should branch on the field. +- **PII consideration.** `prompt` is captured truncated to 4000 chars and the final `artifact` (when present) is captured verbatim. Operators handling PII-bearing prompts should set the path under a restricted-access volume or filter before retention. +- **Cross-runtime parity.** The Rust gateway's `/v1/iterate` does NOT yet write this file. If you want a unified longitudinal log across runtimes, port the writer to Rust (`crates/gateway/src/v1/iterate.rs`) and target the same JSONL path. ~50 LOC. + +## See also + +- `internal/validator/session_log.go` — writer + record types +- `internal/validator/iterate.go` — `Tracer` callback + Langfuse span emission +- `internal/shared/langfuse_middleware.go` — `X-Lakehouse-Trace-Id` header propagation (the `session_id` join key) +- `data/_kb/replay_runs.jsonl` — the *replay* tool's own JSONL (different shape, different producer); these two streams are siblings, not duplicates diff --git a/internal/shared/config.go b/internal/shared/config.go index 99e246e..234e4fa 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -165,6 +165,12 @@ type ValidatordConfig struct { DefaultMaxTokens int `toml:"default_max_tokens"` // Per-call timeout for the chat hop in seconds. 0 = 240s. ChatTimeoutSecs int `toml:"chat_timeout_secs"` + // SessionLogPath: where to append SessionRecord JSONL rows for + // offline analysis (DuckDB queries, scrum review tooling). Empty + // = disabled. Production sets a stable path under + // /var/lib/lakehouse/validator/sessions.jsonl. Append-only, + // best-effort; see internal/validator/session_log.go. + SessionLogPath string `toml:"session_log_path"` } // ObserverdConfig drives the observer service (cmd/observerd). @@ -393,6 +399,7 @@ func DefaultConfig() Config { DefaultMaxIterations: 3, DefaultMaxTokens: 4096, ChatTimeoutSecs: 240, + SessionLogPath: "", // empty = no session JSONL. Operators set under /var/lib/lakehouse/validator/sessions.jsonl. }, Chatd: ChatdConfig{ Bind: "127.0.0.1:3220", diff --git a/internal/validator/session_log.go b/internal/validator/session_log.go new file mode 100644 index 0000000..7b7ec48 --- /dev/null +++ b/internal/validator/session_log.go @@ -0,0 +1,199 @@ +package validator + +import ( + "encoding/json" + "fmt" + "log/slog" + "os" + "path/filepath" + "sync" + "time" +) + +// SessionRecordSchema versions the on-wire JSON shape. Bump when the +// schema changes incompatibly. Consumers (e.g. duckdb queries over +// coordinator_sessions.jsonl, scrum review tooling) check this field +// to decide whether they understand the row. +const SessionRecordSchema = "session.iterate.v1" + +// SessionRecord is one row in coordinator_sessions.jsonl. Captures the +// full retry chain of a single /v1/iterate session for offline +// forensics: "show me all sessions where the validator caught a +// phantom worker" / "show me all sessions where retrieval missed." +// +// The Langfuse trace tree (see X-Lakehouse-Trace-Id propagation +// 2026-05-02) is the live view; this JSONL is the longitudinal view +// for ad-hoc DuckDB queries over thousands of sessions. +type SessionRecord struct { + Schema string `json:"schema"` + SessionID string `json:"session_id"` // = Langfuse trace_id + Timestamp string `json:"timestamp"` // ISO 8601 + Daemon string `json:"daemon"` + Kind string `json:"kind"` // fill | email | playbook + Model string `json:"model"` + Provider string `json:"provider"` + Prompt string `json:"prompt"` // truncated to 4000 + Iterations int `json:"iterations"` + MaxIterations int `json:"max_iterations"` + FinalVerdict string `json:"final_verdict"` // accepted | max_iter_exhausted | infra_error + Attempts []SessionAttemptRecord `json:"attempts"` + Artifact map[string]any `json:"artifact,omitempty"` // present on success + GroundedInRoster *bool `json:"grounded_in_roster,omitempty"` + DurationMs int64 `json:"duration_ms"` +} + +// SessionAttemptRecord mirrors IterateAttempt but stores only the +// stable signals (iteration, verdict, error, span id). The raw model +// output is intentionally NOT captured here — it lives in the +// Langfuse span (queryable by span_id) and the iterate response. Two +// copies would let the JSONL grow unbounded on long sessions. +type SessionAttemptRecord struct { + Iteration int `json:"iteration"` + VerdictKind string `json:"verdict_kind"` // no_json | validation_failed | accepted + Error string `json:"error,omitempty"` + SpanID string `json:"span_id,omitempty"` +} + +// SessionLogger appends SessionRecord rows to a JSONL file. Best-effort: +// errors are logged via slog and never returned to the caller (per the +// rest of the observability stack — never block a request because the +// session log is unhappy). +// +// nil is a valid value: NewSessionLogger("") returns nil; Append on +// a nil receiver is a no-op. Lets validatord skip the wiring entirely +// when no session log is configured. +type SessionLogger struct { + path string + mu sync.Mutex +} + +// NewSessionLogger constructs a logger writing to `path`. Empty path +// disables logging. Creates parent dirs lazily on first write so the +// constructor doesn't panic when the path is on a not-yet-mounted +// volume (e.g. systemd unit ordering). +func NewSessionLogger(path string) *SessionLogger { + if path == "" { + return nil + } + return &SessionLogger{path: path} +} + +// Append writes one JSONL row. Returns nil on success or after a +// best-effort failure. The caller doesn't need to handle errors; +// logging is an observability witness, not a correctness gate. +func (l *SessionLogger) Append(rec SessionRecord) { + if l == nil { + return + } + if rec.Schema == "" { + rec.Schema = SessionRecordSchema + } + if rec.Timestamp == "" { + rec.Timestamp = time.Now().UTC().Format(time.RFC3339Nano) + } + if rec.Daemon == "" { + rec.Daemon = "validatord" + } + body, err := json.Marshal(rec) + if err != nil { + slog.Warn("session_log: marshal", "err", err, "session_id", rec.SessionID) + return + } + body = append(body, '\n') + + l.mu.Lock() + defer l.mu.Unlock() + + if err := os.MkdirAll(filepath.Dir(l.path), 0o755); err != nil { + slog.Warn("session_log: mkdir", "err", err, "path", l.path) + return + } + f, err := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + slog.Warn("session_log: open", "err", err, "path", l.path) + return + } + defer f.Close() + if _, err := f.Write(body); err != nil { + slog.Warn("session_log: write", "err", err, "session_id", rec.SessionID) + } +} + +// BuildSessionRecord assembles a SessionRecord from an iterate +// response/failure pair. Exactly one of resp/fail must be non-nil +// (or both nil for an infrastructure failure). Centralized here so +// validatord and any future iterate-using daemon emit the same shape. +// +// rosterCheck is called per accepted artifact to populate +// GroundedInRoster — set to nil to skip the check (e.g. for +// non-fill kinds that don't have worker IDs). Returns the shaped +// record; pass to logger.Append on the caller side so failed +// daemons don't block on their own observability layer. +func BuildSessionRecord( + req IterateRequest, + resp *IterateResponse, + fail *IterateFailure, + infraErr error, + rosterCheck func(map[string]any) *bool, + durationMs int64, +) SessionRecord { + rec := SessionRecord{ + SessionID: req.TraceID, + Kind: req.Kind, + Model: req.Model, + Provider: req.Provider, + Prompt: trim(req.Prompt, 4000), + MaxIterations: req.MaxIterations, + DurationMs: durationMs, + } + + switch { + case resp != nil: + rec.Iterations = resp.Iterations + rec.FinalVerdict = "accepted" + rec.Attempts = sessionAttemptsFromHistory(resp.History) + rec.Artifact = resp.Artifact + if rosterCheck != nil { + rec.GroundedInRoster = rosterCheck(resp.Artifact) + } + // Keep the trace id authoritative even if the request didn't + // supply one — the iterate response carries the resolved id. + if rec.SessionID == "" { + rec.SessionID = resp.TraceID + } + case fail != nil: + rec.Iterations = fail.Iterations + rec.FinalVerdict = "max_iter_exhausted" + rec.Attempts = sessionAttemptsFromHistory(fail.History) + if rec.SessionID == "" { + rec.SessionID = fail.TraceID + } + default: + // Infrastructure failure (chat hop crashed mid-loop, etc.). + // We still emit a row so the failure is forensically visible — + // otherwise long debugging sessions get harder. + rec.FinalVerdict = "infra_error" + if infraErr != nil { + rec.Attempts = []SessionAttemptRecord{{ + Iteration: 0, + VerdictKind: "infra_error", + Error: trim(fmt.Sprintf("%v", infraErr), 800), + }} + } + } + + return rec +} + +func sessionAttemptsFromHistory(h []IterateAttempt) []SessionAttemptRecord { + out := make([]SessionAttemptRecord, len(h)) + for i, a := range h { + out[i] = SessionAttemptRecord{ + Iteration: a.Iteration, + VerdictKind: a.Status.Kind, + Error: a.Status.Error, + SpanID: a.SpanID, + } + } + return out +} diff --git a/internal/validator/session_log_test.go b/internal/validator/session_log_test.go new file mode 100644 index 0000000..547e1cc --- /dev/null +++ b/internal/validator/session_log_test.go @@ -0,0 +1,224 @@ +package validator + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "sync" + "testing" +) + +func TestSessionLogger_AppendsSchemaTaggedRows(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "sessions.jsonl") + logger := NewSessionLogger(path) + if logger == nil { + t.Fatal("expected non-nil logger for a non-empty path") + } + + rec := SessionRecord{ + SessionID: "trace-1", + Kind: "fill", + Model: "qwen3.5:latest", + Provider: "ollama", + Prompt: "produce a fill artifact", + Iterations: 1, + FinalVerdict: "accepted", + } + logger.Append(rec) + + rows := readJSONL(t, path) + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + if rows[0]["schema"] != SessionRecordSchema { + t.Errorf("schema autopopulation: %v", rows[0]["schema"]) + } + if rows[0]["session_id"] != "trace-1" { + t.Errorf("session_id round-trip: %v", rows[0]["session_id"]) + } + if rows[0]["timestamp"] == "" || rows[0]["timestamp"] == nil { + t.Errorf("timestamp autopopulation: %v", rows[0]["timestamp"]) + } + if rows[0]["daemon"] != "validatord" { + t.Errorf("daemon autopopulation: %v", rows[0]["daemon"]) + } +} + +func TestSessionLogger_NilIsNoOp(t *testing.T) { + var logger *SessionLogger // nil + logger.Append(SessionRecord{SessionID: "trace-1"}) + // No panic, no error — the test passing IS the assertion. +} + +func TestSessionLogger_EmptyPathReturnsNil(t *testing.T) { + if NewSessionLogger("") != nil { + t.Error("empty path should disable logging (nil logger)") + } +} + +// TestSessionLogger_ConcurrentAppends locks the per-row atomicity: N +// goroutines each appending must produce N well-formed JSONL rows +// with no torn writes. Mutex lives at the logger level, not OS-level, +// so this also documents that concurrent logger instances would NOT +// be safe (one logger per daemon — that's fine). +func TestSessionLogger_ConcurrentAppends(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "sessions.jsonl") + logger := NewSessionLogger(path) + + const N = 64 + var wg sync.WaitGroup + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + logger.Append(SessionRecord{ + SessionID: "trace-" + string(rune('a'+i%26)), + Kind: "fill", + FinalVerdict: "accepted", + Iterations: i, + }) + }(i) + } + wg.Wait() + + rows := readJSONL(t, path) + if len(rows) != N { + t.Fatalf("expected %d rows after concurrent appends, got %d", N, len(rows)) + } + // Every row must be parseable JSON — a torn write would throw + // a decode error, which readJSONL t.Fatal's on. +} + +func TestBuildSessionRecord_SuccessPathCapturesArtifact(t *testing.T) { + req := IterateRequest{ + TraceID: "trace-x", + Kind: "fill", + Prompt: "produce fill", + Provider: "ollama", + Model: "qwen3.5:latest", + MaxIterations: 3, + } + resp := &IterateResponse{ + Artifact: map[string]any{"fills": []any{ + map[string]any{"candidate_id": "W-1"}, + }}, + Validation: Report{}, + Iterations: 1, + History: []IterateAttempt{{ + Iteration: 0, + Raw: `{"fills":[{"candidate_id":"W-1"}]}`, + Status: AttemptStatus{Kind: "accepted"}, + SpanID: "span-0", + }}, + TraceID: "trace-x", + } + roster := func(_ map[string]any) *bool { + t := true + return &t + } + rec := BuildSessionRecord(req, resp, nil, nil, roster, 1234) + if rec.FinalVerdict != "accepted" || rec.Iterations != 1 { + t.Errorf("verdict/iterations: %+v", rec) + } + if rec.GroundedInRoster == nil || !*rec.GroundedInRoster { + t.Errorf("grounded_in_roster: expected true, got %v", rec.GroundedInRoster) + } + if rec.Artifact == nil { + t.Errorf("artifact missing on success path") + } + if len(rec.Attempts) != 1 || rec.Attempts[0].SpanID != "span-0" { + t.Errorf("attempts shape: %+v", rec.Attempts) + } + if rec.SessionID != "trace-x" { + t.Errorf("session_id: %s", rec.SessionID) + } + if rec.DurationMs != 1234 { + t.Errorf("duration: %d", rec.DurationMs) + } +} + +func TestBuildSessionRecord_FailurePathOmitsArtifact(t *testing.T) { + req := IterateRequest{ + TraceID: "trace-y", + Kind: "fill", + Provider: "ollama", + Model: "qwen", + MaxIterations: 3, + } + fail := &IterateFailure{ + Error: "max iterations reached (3)", + Iterations: 3, + History: []IterateAttempt{ + {Iteration: 0, Status: AttemptStatus{Kind: "no_json"}}, + {Iteration: 1, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-X"}}, + {Iteration: 2, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-Y"}}, + }, + TraceID: "trace-y", + } + rec := BuildSessionRecord(req, nil, fail, nil, nil, 8765) + if rec.FinalVerdict != "max_iter_exhausted" { + t.Errorf("final_verdict: %s", rec.FinalVerdict) + } + if rec.Artifact != nil { + t.Errorf("artifact must be omitted on failure path: %v", rec.Artifact) + } + if rec.GroundedInRoster != nil { + t.Errorf("grounded_in_roster should be nil on failure: %v", *rec.GroundedInRoster) + } + if len(rec.Attempts) != 3 || rec.Attempts[2].VerdictKind != "validation_failed" { + t.Errorf("attempts: %+v", rec.Attempts) + } +} + +func TestBuildSessionRecord_InfraErrorEmitsRow(t *testing.T) { + // An infrastructure failure (e.g. chat hop crashed) must STILL + // produce a session row — otherwise debugging gets harder + // (no log → operator can't see it failed at all). + req := IterateRequest{TraceID: "trace-z", Kind: "fill"} + rec := BuildSessionRecord(req, nil, nil, errExample{"chat refused"}, nil, 50) + if rec.FinalVerdict != "infra_error" { + t.Errorf("verdict: %s", rec.FinalVerdict) + } + if len(rec.Attempts) != 1 || rec.Attempts[0].VerdictKind != "infra_error" { + t.Errorf("attempts: %+v", rec.Attempts) + } + if rec.Attempts[0].Error == "" { + t.Errorf("error must be carried") + } +} + +// ─── Helpers ─────────────────────────────────────────────────── + +type errExample struct{ msg string } + +func (e errExample) Error() string { return e.msg } + +func readJSONL(t *testing.T, path string) []map[string]any { + t.Helper() + f, err := os.Open(path) + if err != nil { + t.Fatalf("open: %v", err) + } + defer f.Close() + var out []map[string]any + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 0, 1<<16), 1<<24) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 { + continue + } + var row map[string]any + if err := json.Unmarshal(line, &row); err != nil { + t.Fatalf("parse line %q: %v", line, err) + } + out = append(out, row) + } + if err := sc.Err(); err != nil { + t.Fatalf("scan: %v", err) + } + return out +} diff --git a/lakehouse.toml b/lakehouse.toml index 5159cb3..7fe3785 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -119,6 +119,13 @@ default_max_iterations = 3 default_max_tokens = 4096 # Chat hop timeout (seconds). 240s tolerates frontier reasoning models. chat_timeout_secs = 240 +# Session log: where to append one JSONL row per /v1/iterate session +# for offline DuckDB analysis. Empty = disabled. Production: +# session_log_path = "/var/lib/lakehouse/validator/sessions.jsonl" +# Each row: schema=session.iterate.v1, session_id (= Langfuse trace_id), +# kind, model, iterations, attempts[], final_verdict, grounded_in_roster, +# duration_ms. See internal/validator/session_log.go. +session_log_path = "" [s3] endpoint = "http://localhost:9000" diff --git a/scripts/validatord_smoke.sh b/scripts/validatord_smoke.sh index 98a70bc..11c2ce8 100755 --- a/scripts/validatord_smoke.sh +++ b/scripts/validatord_smoke.sh @@ -70,6 +70,7 @@ roster_path = "$ROSTER" default_max_iterations = 3 default_max_tokens = 4096 chat_timeout_secs = 240 +session_log_path = "$TMP/sessions.jsonl" EOF poll_health() { @@ -153,4 +154,18 @@ if [ "$STATUS" != "400" ]; then fi echo " ✓ unknown kind → 400" -echo "[validatord-smoke] PASS — 5/5 probes through gateway :3110" +# 6. Session log: every /v1/validate hit was a single-shot validation +# (not iterate), so the session log isn't populated yet. Verify it +# exists as a path (logger initialized) — the iterate handler is +# where rows actually land. This proves the wiring is in place +# even if the smoke doesn't drive a live iteration end-to-end. +LOG_PATH="$TMP/sessions.jsonl" +echo "[validatord-smoke] session log path wired:" +grep -q "validatord session log" /tmp/validatord.log || { + echo " ✗ expected validatord to log 'validatord session log' on startup" + grep validatord /tmp/validatord.log + exit 1 +} +echo " ✓ session_log_path=$LOG_PATH announced at startup (rows land on /v1/iterate calls)" + +echo "[validatord-smoke] PASS — 6/6 probes through gateway :3110"