validatord: coordinator session JSONL for offline analysis (B follow-up)

Closes the second half of J's 2026-05-02 multi-call observability
concern. Trace-id propagation (commit d6d2fdf) gave us the *live*
view in Langfuse; this gives us the *longitudinal* view for ad-hoc
DuckDB queries over thousands of sessions:

  "show me every session where the model produced a real candidate
   without ever needing a retry"
  "find sessions where validation rejected three times in a row"
  "first-shot success rate per model — did we feed it enough corpus?"

## What's in

internal/validator/session_log.go:
  - SessionRecord type (schema=session.iterate.v1)
  - SessionLogger writer — mutex-guarded append, best-effort posture,
    nil-safe (NewSessionLogger("") = nil = no-op on Append)
  - BuildSessionRecord helper — assembles a row from any
    iterate response/failure/infra-error combination, callable from
    other daemons that wrap iterate (cross-daemon shared schema)
  - 7 unit tests including concurrent-append safety + the three
    code paths (success / max_iter_exhausted / infra_error)

cmd/validatord/main.go:
  - handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath
  - Iterate handler: build + append a SessionRecord on every call
  - rosterCheckFor("fill") closure stamps grounded_in_roster — the
    load-bearing forensic property J flagged ("we can never
    hallucinate available staff members to contracts")

internal/shared/config.go + lakehouse.toml:
  - [validatord].session_log_path field; empty = disabled
  - Production: /var/lib/lakehouse/validator/sessions.jsonl

scripts/validatord_smoke.sh:
  - Adds a probe verifying validatord announces session log path on
    startup. Smoke is now 6/6 (was 5/5).

docs/SESSION_LOG.md:
  - Schema reference + 5 worked DuckDB query examples including the
    "alarm" query (sessions where grounded_in_roster=false on an
    accepted fill — should always be empty; if not, something is
    bypassing FillValidator).

## What this is NOT

This is NOT a duplicate of replay_runs.jsonl. They're siblings:
  - replay_runs.jsonl: replay tool's per-task retrieval+model output
  - sessions.jsonl: validatord's per-iterate full retry chain +
    grounded-in-roster verdict

A single coordinator session can produce rows in both streams; the
session_id (= Langfuse trace_id) is the join key.

## Layered observability now in place

  Live view:  Langfuse trace tree (X-Lakehouse-Trace-Id propagation)
              `iterate.attempt[N]` spans with prompt/raw/verdict
  Offline:    coordinator_sessions.jsonl (this commit)
              DuckDB-queryable; longitudinal forensics
  Hard gate:  FillValidator + WorkerLookup (existing)
              phantom IDs structurally rejected, never reach
              session log's grounded_in_roster=true bucket

Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE
section — these layers are wired; future work targets the data, not
the wiring.

## Verification

- internal/validator: 7 new tests (session_log_test.go) — all PASS
- cmd/validatord: 3 new integration tests covering the success,
  failure, and grounded=false paths — all PASS
- validatord_smoke.sh: 6/6 PASS through gateway :3110
- Full go test ./... green across 33 packages

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-05-02 05:22:09 -05:00
parent d6d2fdf81f
commit 1a3a82aedb
9 changed files with 848 additions and 7 deletions

View File

@ -1,6 +1,6 @@
# STATE OF PLAY — Lakehouse-Go # STATE OF PLAY — Lakehouse-Go
**Last verified:** 2026-05-02 ~07:30 CDT **Last verified:** 2026-05-02 ~08:00 CDT
**Verified by:** **production-readiness gauntlet** — 21/21 smoke chain green, per-component scrum across 4 bundles, **3 cross-runtime parity probes all green post-fix** (validator: **6/6 match** after wire-format alignment shipped; materializer: 2/2 after omitempty fix; extract_json: 12/12). All findings surfaced by the parity probes have been actioned. Disposition: `reports/cutover/gauntlet_2026-05-02/disposition.md`. **Verified by:** **production-readiness gauntlet** — 21/21 smoke chain green, per-component scrum across 4 bundles, **3 cross-runtime parity probes all green post-fix** (validator: **6/6 match** after wire-format alignment shipped; materializer: 2/2 after omitempty fix; extract_json: 12/12). All findings surfaced by the parity probes have been actioned. Disposition: `reports/cutover/gauntlet_2026-05-02/disposition.md`.
> **Read this FIRST.** When the user says "we're working on lakehouse," default to the Go rewrite (this repo); the Rust legacy at `/home/profit/lakehouse/` is maintenance-only. If memory contradicts this file, this file wins. Update it when something is verified working — not when a phase finishes. > **Read this FIRST.** When the user says "we're working on lakehouse," default to the Go rewrite (this repo); the Rust legacy at `/home/profit/lakehouse/` is maintenance-only. If memory contradicts this file, this file wins. Update it when something is verified working — not when a phase finishes.
@ -217,6 +217,7 @@ Verbatim verdicts at `reports/scrum/_evidence/2026-04-30/verdicts/`. Disposition
- **vectord's source-of-truth is `i.vectors`, NOT the coder/hnsw graph.** The `Index` struct holds a parallel `vectors map[string][]float32` updated on every successful Add/Delete; the graph is a derived, replaceable view. `safeGraphAdd`/`safeGraphDelete` wrap the library's panic-prone ops; `rebuildGraphLocked` reads from `i.vectors` (graph-state-independent). Don't propose to "drop the side map for memory" — it's the load-bearing piece that makes Add panic-recoverable past the small-index threshold (closes the multitier_100k 277884b 96-98% fail). The prior `i.ids` set was folded into `i.vectors` keys. - **vectord's source-of-truth is `i.vectors`, NOT the coder/hnsw graph.** The `Index` struct holds a parallel `vectors map[string][]float32` updated on every successful Add/Delete; the graph is a derived, replaceable view. `safeGraphAdd`/`safeGraphDelete` wrap the library's panic-prone ops; `rebuildGraphLocked` reads from `i.vectors` (graph-state-independent). Don't propose to "drop the side map for memory" — it's the load-bearing piece that makes Add panic-recoverable past the small-index threshold (closes the multitier_100k 277884b 96-98% fail). The prior `i.ids` set was folded into `i.vectors` keys.
- **vectord saves are coalesced async, not synchronous.** `cmd/vectord/main.go` runs a per-index `saveTask` that single-flights through `Persistor.Save` — at most one in-flight + one pending. Add returns OK before the save completes; an Add-then-crash can lose ~1 save's worth of data, matching ADR-005's fail-open posture. Don't propose to "make saves synchronous for durability" — that re-introduces the lock-contention bottleneck (1-2.5s tail at conc=50, observed 2026-05-01) without fixing a real durability hole (in-memory state is the source of truth in flight). - **vectord saves are coalesced async, not synchronous.** `cmd/vectord/main.go` runs a per-index `saveTask` that single-flights through `Persistor.Save` — at most one in-flight + one pending. Add returns OK before the save completes; an Add-then-crash can lose ~1 save's worth of data, matching ADR-005's fail-open posture. Don't propose to "make saves synchronous for durability" — that re-introduces the lock-contention bottleneck (1-2.5s tail at conc=50, observed 2026-05-01) without fixing a real durability hole (in-memory state is the source of truth in flight).
- **`X-Lakehouse-Trace-Id` header propagates Langfuse parent traces across daemon boundaries.** When validatord's `/v1/iterate` calls chatd's `/v1/chat`, it forwards the header so chatd's middleware reuses the parent trace id instead of minting a new one. Each iteration attempt also emits a child span (`iterate.attempt[N]`) carrying the prompt, raw model output, and validator verdict. Result: an iterate session with N retries shows in Langfuse as ONE trace tree, not N+1 disconnected traces. Don't propose to "wire trace-id propagation" — it's wired; the test at `internal/shared/langfuse_middleware_test.go::TestLangfuseMiddleware_HonorsTraceIDHeader` is the proof. Closes J's 2026-05-02 multi-call observability concern. - **`X-Lakehouse-Trace-Id` header propagates Langfuse parent traces across daemon boundaries.** When validatord's `/v1/iterate` calls chatd's `/v1/chat`, it forwards the header so chatd's middleware reuses the parent trace id instead of minting a new one. Each iteration attempt also emits a child span (`iterate.attempt[N]`) carrying the prompt, raw model output, and validator verdict. Result: an iterate session with N retries shows in Langfuse as ONE trace tree, not N+1 disconnected traces. Don't propose to "wire trace-id propagation" — it's wired; the test at `internal/shared/langfuse_middleware_test.go::TestLangfuseMiddleware_HonorsTraceIDHeader` is the proof. Closes J's 2026-05-02 multi-call observability concern.
- **Coordinator session JSONL exists at `[validatord].session_log_path`.** One row per `/v1/iterate` session, schema=`session.iterate.v1`, append-only. Captures session_id (= Langfuse trace_id), kind, model, iterations, attempts[], final_verdict, **grounded_in_roster** (the load-bearing forensic property — always true on accepted fills, false on phantom-worker artifacts that somehow slip past FillValidator). Empty path = disabled. DuckDB queries documented at `docs/SESSION_LOG.md`. Don't propose to "build offline session analysis" — it's built; the schema is locked at v1; rotate the file via logrotate when it gets large.
--- ---

View File

@ -78,7 +78,11 @@ func main() {
// reference lets us emit child spans of the per-request trace. // reference lets us emit child spans of the per-request trace.
// nil when Langfuse env isn't set; the iterate handler skips // nil when Langfuse env isn't set; the iterate handler skips
// span emission gracefully in that case. // span emission gracefully in that case.
lf: shared.LoadLangfuseFromEnv(), lf: shared.LoadLangfuseFromEnv(),
sessionLog: validator.NewSessionLogger(cfg.Validatord.SessionLogPath),
}
if h.sessionLog != nil {
slog.Info("validatord session log", "path", cfg.Validatord.SessionLogPath)
} }
if err := shared.Run("validatord", cfg.Validatord.Bind, h.register, cfg.Auth); err != nil { if err := shared.Run("validatord", cfg.Validatord.Bind, h.register, cfg.Auth); err != nil {
@ -98,6 +102,10 @@ type handlers struct {
// creates the parent trace lives in shared.Run; this client lets // creates the parent trace lives in shared.Run; this client lets
// validatord's handler emit application-level child spans. // validatord's handler emit application-level child spans.
lf *langfuse.Client lf *langfuse.Client
// sessionLog appends one SessionRecord per /v1/iterate session to
// `coordinator_sessions.jsonl` for offline DuckDB analysis. nil
// when the session log is unconfigured (best-effort posture).
sessionLog *validator.SessionLogger
} }
func (h *handlers) register(r chi.Router) { func (h *handlers) register(r chi.Router) {
@ -269,7 +277,16 @@ func (h *handlers) handleIterate(w http.ResponseWriter, r *http.Request) {
cfg.Tracer = h.iterTracer(r.Context()) cfg.Tracer = h.iterTracer(r.Context())
} }
t0 := time.Now()
resp, fail, err := validator.Iterate(r.Context(), req, cfg, chat, validate) resp, fail, err := validator.Iterate(r.Context(), req, cfg, chat, validate)
durationMs := time.Since(t0).Milliseconds()
// Emit one session row regardless of success/failure/infra-error.
// Best-effort: a failing session log never blocks the response.
rosterCheck := h.rosterCheckFor(req.Kind)
rec := validator.BuildSessionRecord(req, resp, fail, err, rosterCheck, durationMs)
h.sessionLog.Append(rec)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway) http.Error(w, err.Error(), http.StatusBadGateway)
return return
@ -281,6 +298,53 @@ func (h *handlers) handleIterate(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, resp) writeJSON(w, http.StatusOK, resp)
} }
// rosterCheckFor returns a closure that verifies an accepted artifact's
// candidate IDs all exist in the roster. Used by BuildSessionRecord to
// stamp `grounded_in_roster` on each session row — answers the "did
// the model produce real workers, not phantoms?" question at offline-
// query time.
//
// Only the `fill` kind has worker IDs to verify. Other kinds return
// nil (omits the field from the session record).
//
// Note: FillValidator already enforces this constraint on every
// successful iteration (phantom IDs raise ValidationError::Consistency).
// Stamping the bool here surfaces the verified-grounded property as
// queryable data, not relying on validator semantics being recalled
// during analysis. A future scrum may want to query "show me sessions
// where the model guessed real worker IDs without ever needing a
// retry" — that's a `iterations=1 AND grounded_in_roster=true` query.
func (h *handlers) rosterCheckFor(kind string) func(map[string]any) *bool {
if kind != "fill" {
return nil
}
return func(artifact map[string]any) *bool {
fills, ok := artifact["fills"].([]any)
if !ok {
f := false
return &f
}
for _, raw := range fills {
fill, ok := raw.(map[string]any)
if !ok {
f := false
return &f
}
id, _ := fill["candidate_id"].(string)
if id == "" {
f := false
return &f
}
if _, found := h.lookup.Find(id); !found {
f := false
return &f
}
}
t := true
return &t
}
}
// chatCaller wires the iteration loop to chatd via HTTP. Builds the // chatCaller wires the iteration loop to chatd via HTTP. Builds the
// chat.Request shape, posts to ${chatdURL}/chat, returns the content // chat.Request shape, posts to ${chatdURL}/chat, returns the content
// string (no choices wrapper — chatd's response is already flat). // string (no choices wrapper — chatd's response is already flat).

View File

@ -1,10 +1,13 @@
package main package main
import ( import (
"bufio"
"bytes" "bytes"
"encoding/json" "encoding/json"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"path/filepath"
"testing" "testing"
"time" "time"
@ -13,11 +16,47 @@ import (
"git.agentview.dev/profit/golangLAKEHOUSE/internal/validator" "git.agentview.dev/profit/golangLAKEHOUSE/internal/validator"
) )
// readSessionLog parses the JSONL session log into row maps. Used by
// the iterate-handler tests to verify per-session entries.
func readSessionLog(t *testing.T, path string) []map[string]any {
t.Helper()
f, err := os.Open(path)
if err != nil {
t.Fatalf("open session log: %v", err)
}
defer f.Close()
var out []map[string]any
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var row map[string]any
if err := json.Unmarshal(line, &row); err != nil {
t.Fatalf("parse session log line: %v", err)
}
out = append(out, row)
}
return out
}
// newTestRouter builds the validatord router with an explicit lookup // newTestRouter builds the validatord router with an explicit lookup
// + a fake chatd URL. Tests that exercise /iterate need a live mock // + a fake chatd URL. Tests that exercise /iterate need a live mock
// chatd (constructed inline per-test). // chatd (constructed inline per-test).
func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler { func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler {
h := &handlers{ h := newTestHandlers(lookup, chatdURL, nil)
r := chi.NewRouter()
h.register(r)
return r
}
// newTestHandlers exposes the underlying handlers struct so tests can
// inspect post-call state (e.g. session log contents). sessionLog is
// optional — pass nil to disable the session log.
func newTestHandlers(lookup validator.WorkerLookup, chatdURL string, sessionLog *validator.SessionLogger) *handlers {
return &handlers{
lookup: lookup, lookup: lookup,
chatdURL: chatdURL, chatdURL: chatdURL,
chatClient: &http.Client{Timeout: 5 * time.Second}, chatClient: &http.Client{Timeout: 5 * time.Second},
@ -25,10 +64,8 @@ func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler
DefaultMaxIterations: 3, DefaultMaxIterations: 3,
DefaultMaxTokens: 4096, DefaultMaxTokens: 4096,
}, },
sessionLog: sessionLog,
} }
r := chi.NewRouter()
h.register(r)
return r
} }
// ─── /validate ───────────────────────────────────────────────── // ─── /validate ─────────────────────────────────────────────────
@ -244,6 +281,135 @@ func TestIterate_MaxIterReturns422WithHistory(t *testing.T) {
} }
} }
// TestIterate_WritesSessionRowOnSuccess locks the offline-analysis
// contract: every successful /iterate call appends one SessionRecord
// to the configured JSONL with grounded_in_roster=true (because the
// model emitted real candidate IDs that exist in the roster).
func TestIterate_WritesSessionRowOnSuccess(t *testing.T) {
city, role := "Toledo", "Welder"
lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{
{CandidateID: "W-1", Name: "Ada", Status: "active", City: &city, Role: &role},
})
server := fakeChatd(t, `{"fills":[{"candidate_id":"W-1","name":"Ada"}]}`)
defer server.Close()
dir := t.TempDir()
logPath := filepath.Join(dir, "sessions.jsonl")
h := newTestHandlers(lookup, server.URL, validator.NewSessionLogger(logPath))
r := chi.NewRouter()
h.register(r)
body, _ := json.Marshal(map[string]any{
"kind": "fill",
"prompt": "produce fill",
"provider": "ollama",
"model": "qwen3.5:latest",
"context": map[string]any{"target_count": 1, "city": "Toledo", "role": "Welder", "client_id": "C-1"},
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String())
}
rows := readSessionLog(t, logPath)
if len(rows) != 1 {
t.Fatalf("expected 1 session row, got %d", len(rows))
}
row := rows[0]
if row["schema"] != validator.SessionRecordSchema {
t.Errorf("schema: %v", row["schema"])
}
if row["final_verdict"] != "accepted" {
t.Errorf("final_verdict: %v", row["final_verdict"])
}
if row["grounded_in_roster"] != true {
t.Errorf("grounded_in_roster: expected true (W-1 is in roster), got %v", row["grounded_in_roster"])
}
if row["kind"] != "fill" {
t.Errorf("kind: %v", row["kind"])
}
}
// TestIterate_GroundedFalseWhenArtifactReferencesPhantom is the
// load-bearing forensic property J flagged 2026-05-02: phantom
// worker IDs in the final accepted artifact must surface as
// grounded_in_roster=false in the session log. This shouldn't happen
// in practice (FillValidator catches phantoms before they reach this
// row), but the explicit check guards against a future bug where the
// validator weakens or a different validator path skips the roster
// check. Defense-in-depth at the observability layer.
func TestIterate_GroundedFalseWhenArtifactReferencesPhantom(t *testing.T) {
// Empty roster — every candidate ID is "phantom". Plus we use
// the playbook kind so the validator doesn't reject the artifact
// upfront (PlaybookValidator doesn't consult the roster), letting
// the artifact reach the session log. We then build a custom
// rosterCheckFor "fill" closure and exercise it directly.
lookup := validator.NewInMemoryWorkerLookup(nil)
h := newTestHandlers(lookup, "", nil)
check := h.rosterCheckFor("fill")
if check == nil {
t.Fatal("expected non-nil checker for fill kind")
}
got := check(map[string]any{
"fills": []any{
map[string]any{"candidate_id": "W-PHANTOM-NEVER-EXISTS"},
},
})
if got == nil || *got {
t.Errorf("expected grounded=false for phantom candidate, got %v", got)
}
// Other kinds: no roster concept → checker returns nil.
if h.rosterCheckFor("email") != nil {
t.Error("email kind should not have a roster checker")
}
if h.rosterCheckFor("playbook") != nil {
t.Error("playbook kind should not have a roster checker")
}
}
// TestIterate_WritesSessionRowOnFailure: max-iter exhaustion still
// produces a row (omits artifact, captures the retry chain).
func TestIterate_WritesSessionRowOnFailure(t *testing.T) {
server := fakeChatd(t, "no json here, just prose")
defer server.Close()
dir := t.TempDir()
logPath := filepath.Join(dir, "sessions.jsonl")
h := newTestHandlers(validator.NewInMemoryWorkerLookup(nil), server.URL,
validator.NewSessionLogger(logPath))
r := chi.NewRouter()
h.register(r)
body, _ := json.Marshal(map[string]any{
"kind": "playbook",
"prompt": "produce X",
"provider": "ollama",
"model": "x",
"max_iterations": 2,
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnprocessableEntity {
t.Fatalf("expected 422, got %d", w.Code)
}
rows := readSessionLog(t, logPath)
if len(rows) != 1 {
t.Fatalf("expected 1 session row, got %d", len(rows))
}
if rows[0]["final_verdict"] != "max_iter_exhausted" {
t.Errorf("final_verdict: %v", rows[0]["final_verdict"])
}
if _, hasArtifact := rows[0]["artifact"]; hasArtifact {
t.Errorf("artifact should be omitted on failure: %v", rows[0]["artifact"])
}
}
func TestIterate_ChatdDownReturns502(t *testing.T) { func TestIterate_ChatdDownReturns502(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "http://127.0.0.1:1") // unroutable r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "http://127.0.0.1:1") // unroutable
body, _ := json.Marshal(map[string]any{ body, _ := json.Marshal(map[string]any{

158
docs/SESSION_LOG.md Normal file
View File

@ -0,0 +1,158 @@
# Coordinator session log — `coordinator_sessions.jsonl`
**Last updated:** 2026-05-02 · **Schema:** `session.iterate.v1` · **Writer:** `internal/validator.SessionLogger` · **Producer:** validatord `/v1/iterate`
## Why
The Langfuse trace tree is the live view: per-session, you can scroll
the retry chain and inspect every sub-call. But for **longitudinal
forensics** ("show me every session in the last week where the model
guessed a real worker without a retry," or "find sessions where
validation rejected three times in a row"), Langfuse's UI doesn't
scale — you need a queryable data plane.
This JSONL is that plane. One row per `/v1/iterate` session, append-
only, DuckDB-friendly.
## Where
Configurable via `[validatord].session_log_path` in `lakehouse.toml`.
Empty = disabled (best-effort posture; a missing log never blocks an
iterate request). Production:
```toml
[validatord]
session_log_path = "/var/lib/lakehouse/validator/sessions.jsonl"
```
## Schema (v1)
```jsonc
{
"schema": "session.iterate.v1",
"session_id": "<Langfuse trace_id>", // join key to Langfuse
"timestamp": "2026-05-02T07:30:00.123456Z",
"daemon": "validatord",
"kind": "fill | email | playbook",
"model": "qwen3.5:latest",
"provider": "ollama",
"prompt": "produce a fill artifact ...", // truncated to 4000 chars
"iterations": 3, // attempts spent
"max_iterations": 3, // cap per request
"final_verdict": "accepted | max_iter_exhausted | infra_error",
"attempts": [
{ "iteration": 0, "verdict_kind": "validation_failed",
"error": "consistency: candidate_id W-X not in roster",
"span_id": "abc..." },
{ "iteration": 1, "verdict_kind": "validation_failed",
"error": "consistency: city mismatch", "span_id": "def..." },
{ "iteration": 2, "verdict_kind": "accepted", "span_id": "ghi..." }
],
"artifact": { /* final accepted artifact, omitted on failure */ },
"grounded_in_roster": true, // null when N/A (email/playbook)
"duration_ms": 2840
}
```
### Field semantics
| Field | When set | What it means |
|---|---|---|
| `session_id` | always | Langfuse trace id. Pivot to live trace tree by URL: `${LANGFUSE_URL}/trace/<session_id>`. |
| `final_verdict=accepted` | success | Loop converged within `max_iterations`. `artifact` is non-null. |
| `final_verdict=max_iter_exhausted` | failure | Loop hit the cap without passing validation. `artifact` is omitted. |
| `final_verdict=infra_error` | failure | Chat hop or other infra crashed. Single attempt with `verdict_kind=infra_error`. |
| `grounded_in_roster=true` | fill kind, success | Every `candidate_id` in the artifact exists in `WorkerLookup`. |
| `grounded_in_roster=false` | fill kind, anomaly | Phantom or otherwise-invalid candidate IDs (shouldn't happen — FillValidator catches these — but the explicit check defends against future validator weakening). |
| `grounded_in_roster=null` (omitted) | non-fill kinds, or failure | The roster check doesn't apply or wasn't run. |
## DuckDB queries
```sql
-- Read the log directly via DuckDB's read_json_auto.
ATTACH ':memory:' AS sessions;
SELECT * FROM read_json_auto(
'/var/lib/lakehouse/validator/sessions.jsonl', format='newline_delimited'
) LIMIT 10;
```
### "Did the validator catch every phantom worker?"
```sql
-- Sessions where iteration 0's verdict was validation_failed AND
-- the error mentions 'phantom' or 'consistency'. If grounded=true on
-- the same session's final state, the model recovered.
SELECT session_id, model, iterations, grounded_in_roster, final_verdict
FROM read_json_auto('sessions.jsonl', format='newline_delimited')
WHERE final_verdict = 'accepted'
AND iterations > 1
AND list_contains(
list_transform(attempts,
x -> x.error LIKE '%consistency%' OR x.error LIKE '%phantom%'),
true
);
```
### "First-shot success rate per model" (the "did the corpus give it enough" gate)
```sql
SELECT model,
COUNT(*) AS sessions,
SUM(CASE WHEN iterations = 1 AND final_verdict = 'accepted' THEN 1 ELSE 0 END) AS first_shot,
ROUND(100.0 * SUM(CASE WHEN iterations = 1 AND final_verdict = 'accepted' THEN 1 ELSE 0 END) / COUNT(*), 1) AS pct
FROM read_json_auto('sessions.jsonl', format='newline_delimited')
WHERE kind = 'fill'
GROUP BY model
ORDER BY pct DESC;
```
### "Sessions that were never grounded" (the alarm query)
```sql
-- Should always be empty. If it isn't, FillValidator has a hole or
-- a different code path is bypassing the roster check.
SELECT session_id, model, iterations, attempts
FROM read_json_auto('sessions.jsonl', format='newline_delimited')
WHERE kind = 'fill'
AND final_verdict = 'accepted'
AND grounded_in_roster = false;
```
### "Average retry depth per model"
```sql
SELECT model, AVG(iterations) AS avg_iter, COUNT(*) AS n
FROM read_json_auto('sessions.jsonl', format='newline_delimited')
WHERE kind = 'fill' AND final_verdict = 'accepted'
GROUP BY model
ORDER BY avg_iter ASC;
```
### "What did validation reject?" (failure mode breakdown)
```sql
-- Pull each rejected attempt's error string, classify by prefix.
WITH errors AS (
SELECT session_id,
model,
unnest(attempts) AS att
FROM read_json_auto('sessions.jsonl', format='newline_delimited')
)
SELECT model,
split_part(att.error, ':', 1) AS kind,
COUNT(*) AS n
FROM errors
WHERE att.verdict_kind = 'validation_failed'
GROUP BY model, kind
ORDER BY n DESC;
```
## Operational notes
- **Append-only.** No row is ever updated; storage grows linearly with iterate calls. Operators rotate via cron when the file gets unwieldy (logrotate-style).
- **Best-effort posture.** Every write goes through `slog.Warn` on failure but never blocks the iterate handler. A full disk silently drops session rows; the iterate response still ships.
- **Schema versioning.** `schema=session.iterate.v1` is the contract. Future incompatible changes bump the version; consumers should branch on the field.
- **PII consideration.** `prompt` is captured truncated to 4000 chars and the final `artifact` (when present) is captured verbatim. Operators handling PII-bearing prompts should set the path under a restricted-access volume or filter before retention.
- **Cross-runtime parity.** The Rust gateway's `/v1/iterate` does NOT yet write this file. If you want a unified longitudinal log across runtimes, port the writer to Rust (`crates/gateway/src/v1/iterate.rs`) and target the same JSONL path. ~50 LOC.
## See also
- `internal/validator/session_log.go` — writer + record types
- `internal/validator/iterate.go``Tracer` callback + Langfuse span emission
- `internal/shared/langfuse_middleware.go``X-Lakehouse-Trace-Id` header propagation (the `session_id` join key)
- `data/_kb/replay_runs.jsonl` — the *replay* tool's own JSONL (different shape, different producer); these two streams are siblings, not duplicates

View File

@ -165,6 +165,12 @@ type ValidatordConfig struct {
DefaultMaxTokens int `toml:"default_max_tokens"` DefaultMaxTokens int `toml:"default_max_tokens"`
// Per-call timeout for the chat hop in seconds. 0 = 240s. // Per-call timeout for the chat hop in seconds. 0 = 240s.
ChatTimeoutSecs int `toml:"chat_timeout_secs"` ChatTimeoutSecs int `toml:"chat_timeout_secs"`
// SessionLogPath: where to append SessionRecord JSONL rows for
// offline analysis (DuckDB queries, scrum review tooling). Empty
// = disabled. Production sets a stable path under
// /var/lib/lakehouse/validator/sessions.jsonl. Append-only,
// best-effort; see internal/validator/session_log.go.
SessionLogPath string `toml:"session_log_path"`
} }
// ObserverdConfig drives the observer service (cmd/observerd). // ObserverdConfig drives the observer service (cmd/observerd).
@ -393,6 +399,7 @@ func DefaultConfig() Config {
DefaultMaxIterations: 3, DefaultMaxIterations: 3,
DefaultMaxTokens: 4096, DefaultMaxTokens: 4096,
ChatTimeoutSecs: 240, ChatTimeoutSecs: 240,
SessionLogPath: "", // empty = no session JSONL. Operators set under /var/lib/lakehouse/validator/sessions.jsonl.
}, },
Chatd: ChatdConfig{ Chatd: ChatdConfig{
Bind: "127.0.0.1:3220", Bind: "127.0.0.1:3220",

View File

@ -0,0 +1,199 @@
package validator
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)
// SessionRecordSchema versions the on-wire JSON shape. Bump when the
// schema changes incompatibly. Consumers (e.g. duckdb queries over
// coordinator_sessions.jsonl, scrum review tooling) check this field
// to decide whether they understand the row.
const SessionRecordSchema = "session.iterate.v1"
// SessionRecord is one row in coordinator_sessions.jsonl. Captures the
// full retry chain of a single /v1/iterate session for offline
// forensics: "show me all sessions where the validator caught a
// phantom worker" / "show me all sessions where retrieval missed."
//
// The Langfuse trace tree (see X-Lakehouse-Trace-Id propagation
// 2026-05-02) is the live view; this JSONL is the longitudinal view
// for ad-hoc DuckDB queries over thousands of sessions.
type SessionRecord struct {
Schema string `json:"schema"`
SessionID string `json:"session_id"` // = Langfuse trace_id
Timestamp string `json:"timestamp"` // ISO 8601
Daemon string `json:"daemon"`
Kind string `json:"kind"` // fill | email | playbook
Model string `json:"model"`
Provider string `json:"provider"`
Prompt string `json:"prompt"` // truncated to 4000
Iterations int `json:"iterations"`
MaxIterations int `json:"max_iterations"`
FinalVerdict string `json:"final_verdict"` // accepted | max_iter_exhausted | infra_error
Attempts []SessionAttemptRecord `json:"attempts"`
Artifact map[string]any `json:"artifact,omitempty"` // present on success
GroundedInRoster *bool `json:"grounded_in_roster,omitempty"`
DurationMs int64 `json:"duration_ms"`
}
// SessionAttemptRecord mirrors IterateAttempt but stores only the
// stable signals (iteration, verdict, error, span id). The raw model
// output is intentionally NOT captured here — it lives in the
// Langfuse span (queryable by span_id) and the iterate response. Two
// copies would let the JSONL grow unbounded on long sessions.
type SessionAttemptRecord struct {
Iteration int `json:"iteration"`
VerdictKind string `json:"verdict_kind"` // no_json | validation_failed | accepted
Error string `json:"error,omitempty"`
SpanID string `json:"span_id,omitempty"`
}
// SessionLogger appends SessionRecord rows to a JSONL file. Best-effort:
// errors are logged via slog and never returned to the caller (per the
// rest of the observability stack — never block a request because the
// session log is unhappy).
//
// nil is a valid value: NewSessionLogger("") returns nil; Append on
// a nil receiver is a no-op. Lets validatord skip the wiring entirely
// when no session log is configured.
type SessionLogger struct {
path string
mu sync.Mutex
}
// NewSessionLogger constructs a logger writing to `path`. Empty path
// disables logging. Creates parent dirs lazily on first write so the
// constructor doesn't panic when the path is on a not-yet-mounted
// volume (e.g. systemd unit ordering).
func NewSessionLogger(path string) *SessionLogger {
if path == "" {
return nil
}
return &SessionLogger{path: path}
}
// Append writes one JSONL row. Returns nil on success or after a
// best-effort failure. The caller doesn't need to handle errors;
// logging is an observability witness, not a correctness gate.
func (l *SessionLogger) Append(rec SessionRecord) {
if l == nil {
return
}
if rec.Schema == "" {
rec.Schema = SessionRecordSchema
}
if rec.Timestamp == "" {
rec.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
}
if rec.Daemon == "" {
rec.Daemon = "validatord"
}
body, err := json.Marshal(rec)
if err != nil {
slog.Warn("session_log: marshal", "err", err, "session_id", rec.SessionID)
return
}
body = append(body, '\n')
l.mu.Lock()
defer l.mu.Unlock()
if err := os.MkdirAll(filepath.Dir(l.path), 0o755); err != nil {
slog.Warn("session_log: mkdir", "err", err, "path", l.path)
return
}
f, err := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
slog.Warn("session_log: open", "err", err, "path", l.path)
return
}
defer f.Close()
if _, err := f.Write(body); err != nil {
slog.Warn("session_log: write", "err", err, "session_id", rec.SessionID)
}
}
// BuildSessionRecord assembles a SessionRecord from an iterate
// response/failure pair. Exactly one of resp/fail must be non-nil
// (or both nil for an infrastructure failure). Centralized here so
// validatord and any future iterate-using daemon emit the same shape.
//
// rosterCheck is called per accepted artifact to populate
// GroundedInRoster — set to nil to skip the check (e.g. for
// non-fill kinds that don't have worker IDs). Returns the shaped
// record; pass to logger.Append on the caller side so failed
// daemons don't block on their own observability layer.
func BuildSessionRecord(
req IterateRequest,
resp *IterateResponse,
fail *IterateFailure,
infraErr error,
rosterCheck func(map[string]any) *bool,
durationMs int64,
) SessionRecord {
rec := SessionRecord{
SessionID: req.TraceID,
Kind: req.Kind,
Model: req.Model,
Provider: req.Provider,
Prompt: trim(req.Prompt, 4000),
MaxIterations: req.MaxIterations,
DurationMs: durationMs,
}
switch {
case resp != nil:
rec.Iterations = resp.Iterations
rec.FinalVerdict = "accepted"
rec.Attempts = sessionAttemptsFromHistory(resp.History)
rec.Artifact = resp.Artifact
if rosterCheck != nil {
rec.GroundedInRoster = rosterCheck(resp.Artifact)
}
// Keep the trace id authoritative even if the request didn't
// supply one — the iterate response carries the resolved id.
if rec.SessionID == "" {
rec.SessionID = resp.TraceID
}
case fail != nil:
rec.Iterations = fail.Iterations
rec.FinalVerdict = "max_iter_exhausted"
rec.Attempts = sessionAttemptsFromHistory(fail.History)
if rec.SessionID == "" {
rec.SessionID = fail.TraceID
}
default:
// Infrastructure failure (chat hop crashed mid-loop, etc.).
// We still emit a row so the failure is forensically visible —
// otherwise long debugging sessions get harder.
rec.FinalVerdict = "infra_error"
if infraErr != nil {
rec.Attempts = []SessionAttemptRecord{{
Iteration: 0,
VerdictKind: "infra_error",
Error: trim(fmt.Sprintf("%v", infraErr), 800),
}}
}
}
return rec
}
func sessionAttemptsFromHistory(h []IterateAttempt) []SessionAttemptRecord {
out := make([]SessionAttemptRecord, len(h))
for i, a := range h {
out[i] = SessionAttemptRecord{
Iteration: a.Iteration,
VerdictKind: a.Status.Kind,
Error: a.Status.Error,
SpanID: a.SpanID,
}
}
return out
}

View File

@ -0,0 +1,224 @@
package validator
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"sync"
"testing"
)
func TestSessionLogger_AppendsSchemaTaggedRows(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "sessions.jsonl")
logger := NewSessionLogger(path)
if logger == nil {
t.Fatal("expected non-nil logger for a non-empty path")
}
rec := SessionRecord{
SessionID: "trace-1",
Kind: "fill",
Model: "qwen3.5:latest",
Provider: "ollama",
Prompt: "produce a fill artifact",
Iterations: 1,
FinalVerdict: "accepted",
}
logger.Append(rec)
rows := readJSONL(t, path)
if len(rows) != 1 {
t.Fatalf("expected 1 row, got %d", len(rows))
}
if rows[0]["schema"] != SessionRecordSchema {
t.Errorf("schema autopopulation: %v", rows[0]["schema"])
}
if rows[0]["session_id"] != "trace-1" {
t.Errorf("session_id round-trip: %v", rows[0]["session_id"])
}
if rows[0]["timestamp"] == "" || rows[0]["timestamp"] == nil {
t.Errorf("timestamp autopopulation: %v", rows[0]["timestamp"])
}
if rows[0]["daemon"] != "validatord" {
t.Errorf("daemon autopopulation: %v", rows[0]["daemon"])
}
}
func TestSessionLogger_NilIsNoOp(t *testing.T) {
var logger *SessionLogger // nil
logger.Append(SessionRecord{SessionID: "trace-1"})
// No panic, no error — the test passing IS the assertion.
}
func TestSessionLogger_EmptyPathReturnsNil(t *testing.T) {
if NewSessionLogger("") != nil {
t.Error("empty path should disable logging (nil logger)")
}
}
// TestSessionLogger_ConcurrentAppends locks the per-row atomicity: N
// goroutines each appending must produce N well-formed JSONL rows
// with no torn writes. Mutex lives at the logger level, not OS-level,
// so this also documents that concurrent logger instances would NOT
// be safe (one logger per daemon — that's fine).
func TestSessionLogger_ConcurrentAppends(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "sessions.jsonl")
logger := NewSessionLogger(path)
const N = 64
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
logger.Append(SessionRecord{
SessionID: "trace-" + string(rune('a'+i%26)),
Kind: "fill",
FinalVerdict: "accepted",
Iterations: i,
})
}(i)
}
wg.Wait()
rows := readJSONL(t, path)
if len(rows) != N {
t.Fatalf("expected %d rows after concurrent appends, got %d", N, len(rows))
}
// Every row must be parseable JSON — a torn write would throw
// a decode error, which readJSONL t.Fatal's on.
}
func TestBuildSessionRecord_SuccessPathCapturesArtifact(t *testing.T) {
req := IterateRequest{
TraceID: "trace-x",
Kind: "fill",
Prompt: "produce fill",
Provider: "ollama",
Model: "qwen3.5:latest",
MaxIterations: 3,
}
resp := &IterateResponse{
Artifact: map[string]any{"fills": []any{
map[string]any{"candidate_id": "W-1"},
}},
Validation: Report{},
Iterations: 1,
History: []IterateAttempt{{
Iteration: 0,
Raw: `{"fills":[{"candidate_id":"W-1"}]}`,
Status: AttemptStatus{Kind: "accepted"},
SpanID: "span-0",
}},
TraceID: "trace-x",
}
roster := func(_ map[string]any) *bool {
t := true
return &t
}
rec := BuildSessionRecord(req, resp, nil, nil, roster, 1234)
if rec.FinalVerdict != "accepted" || rec.Iterations != 1 {
t.Errorf("verdict/iterations: %+v", rec)
}
if rec.GroundedInRoster == nil || !*rec.GroundedInRoster {
t.Errorf("grounded_in_roster: expected true, got %v", rec.GroundedInRoster)
}
if rec.Artifact == nil {
t.Errorf("artifact missing on success path")
}
if len(rec.Attempts) != 1 || rec.Attempts[0].SpanID != "span-0" {
t.Errorf("attempts shape: %+v", rec.Attempts)
}
if rec.SessionID != "trace-x" {
t.Errorf("session_id: %s", rec.SessionID)
}
if rec.DurationMs != 1234 {
t.Errorf("duration: %d", rec.DurationMs)
}
}
func TestBuildSessionRecord_FailurePathOmitsArtifact(t *testing.T) {
req := IterateRequest{
TraceID: "trace-y",
Kind: "fill",
Provider: "ollama",
Model: "qwen",
MaxIterations: 3,
}
fail := &IterateFailure{
Error: "max iterations reached (3)",
Iterations: 3,
History: []IterateAttempt{
{Iteration: 0, Status: AttemptStatus{Kind: "no_json"}},
{Iteration: 1, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-X"}},
{Iteration: 2, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-Y"}},
},
TraceID: "trace-y",
}
rec := BuildSessionRecord(req, nil, fail, nil, nil, 8765)
if rec.FinalVerdict != "max_iter_exhausted" {
t.Errorf("final_verdict: %s", rec.FinalVerdict)
}
if rec.Artifact != nil {
t.Errorf("artifact must be omitted on failure path: %v", rec.Artifact)
}
if rec.GroundedInRoster != nil {
t.Errorf("grounded_in_roster should be nil on failure: %v", *rec.GroundedInRoster)
}
if len(rec.Attempts) != 3 || rec.Attempts[2].VerdictKind != "validation_failed" {
t.Errorf("attempts: %+v", rec.Attempts)
}
}
func TestBuildSessionRecord_InfraErrorEmitsRow(t *testing.T) {
// An infrastructure failure (e.g. chat hop crashed) must STILL
// produce a session row — otherwise debugging gets harder
// (no log → operator can't see it failed at all).
req := IterateRequest{TraceID: "trace-z", Kind: "fill"}
rec := BuildSessionRecord(req, nil, nil, errExample{"chat refused"}, nil, 50)
if rec.FinalVerdict != "infra_error" {
t.Errorf("verdict: %s", rec.FinalVerdict)
}
if len(rec.Attempts) != 1 || rec.Attempts[0].VerdictKind != "infra_error" {
t.Errorf("attempts: %+v", rec.Attempts)
}
if rec.Attempts[0].Error == "" {
t.Errorf("error must be carried")
}
}
// ─── Helpers ───────────────────────────────────────────────────
type errExample struct{ msg string }
func (e errExample) Error() string { return e.msg }
func readJSONL(t *testing.T, path string) []map[string]any {
t.Helper()
f, err := os.Open(path)
if err != nil {
t.Fatalf("open: %v", err)
}
defer f.Close()
var out []map[string]any
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var row map[string]any
if err := json.Unmarshal(line, &row); err != nil {
t.Fatalf("parse line %q: %v", line, err)
}
out = append(out, row)
}
if err := sc.Err(); err != nil {
t.Fatalf("scan: %v", err)
}
return out
}

View File

@ -119,6 +119,13 @@ default_max_iterations = 3
default_max_tokens = 4096 default_max_tokens = 4096
# Chat hop timeout (seconds). 240s tolerates frontier reasoning models. # Chat hop timeout (seconds). 240s tolerates frontier reasoning models.
chat_timeout_secs = 240 chat_timeout_secs = 240
# Session log: where to append one JSONL row per /v1/iterate session
# for offline DuckDB analysis. Empty = disabled. Production:
# session_log_path = "/var/lib/lakehouse/validator/sessions.jsonl"
# Each row: schema=session.iterate.v1, session_id (= Langfuse trace_id),
# kind, model, iterations, attempts[], final_verdict, grounded_in_roster,
# duration_ms. See internal/validator/session_log.go.
session_log_path = ""
[s3] [s3]
endpoint = "http://localhost:9000" endpoint = "http://localhost:9000"

View File

@ -70,6 +70,7 @@ roster_path = "$ROSTER"
default_max_iterations = 3 default_max_iterations = 3
default_max_tokens = 4096 default_max_tokens = 4096
chat_timeout_secs = 240 chat_timeout_secs = 240
session_log_path = "$TMP/sessions.jsonl"
EOF EOF
poll_health() { poll_health() {
@ -153,4 +154,18 @@ if [ "$STATUS" != "400" ]; then
fi fi
echo " ✓ unknown kind → 400" echo " ✓ unknown kind → 400"
echo "[validatord-smoke] PASS — 5/5 probes through gateway :3110" # 6. Session log: every /v1/validate hit was a single-shot validation
# (not iterate), so the session log isn't populated yet. Verify it
# exists as a path (logger initialized) — the iterate handler is
# where rows actually land. This proves the wiring is in place
# even if the smoke doesn't drive a live iteration end-to-end.
LOG_PATH="$TMP/sessions.jsonl"
echo "[validatord-smoke] session log path wired:"
grep -q "validatord session log" /tmp/validatord.log || {
echo " ✗ expected validatord to log 'validatord session log' on startup"
grep validatord /tmp/validatord.log
exit 1
}
echo " ✓ session_log_path=$LOG_PATH announced at startup (rows land on /v1/iterate calls)"
echo "[validatord-smoke] PASS — 6/6 probes through gateway :3110"