Closes the second half of J's 2026-05-02 multi-call observability
concern. Trace-id propagation (commit d6d2fdf) gave us the *live*
view in Langfuse; this gives us the *longitudinal* view for ad-hoc
DuckDB queries over thousands of sessions:
"show me every session where the model produced a real candidate
without ever needing a retry"
"find sessions where validation rejected three times in a row"
"first-shot success rate per model — did we feed it enough corpus?"
## What's in
internal/validator/session_log.go:
- SessionRecord type (schema=session.iterate.v1)
- SessionLogger writer — mutex-guarded append, best-effort posture,
nil-safe (NewSessionLogger("") = nil = no-op on Append)
- BuildSessionRecord helper — assembles a row from any
iterate response/failure/infra-error combination, callable from
other daemons that wrap iterate (cross-daemon shared schema)
- 7 unit tests including concurrent-append safety + the three
code paths (success / max_iter_exhausted / infra_error)
cmd/validatord/main.go:
- handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath
- Iterate handler: build + append a SessionRecord on every call
- rosterCheckFor("fill") closure stamps grounded_in_roster — the
load-bearing forensic property J flagged ("we can never
hallucinate available staff members to contracts")
internal/shared/config.go + lakehouse.toml:
- [validatord].session_log_path field; empty = disabled
- Production: /var/lib/lakehouse/validator/sessions.jsonl
scripts/validatord_smoke.sh:
- Adds a probe verifying validatord announces session log path on
startup. Smoke is now 6/6 (was 5/5).
docs/SESSION_LOG.md:
- Schema reference + 5 worked DuckDB query examples including the
"alarm" query (sessions where grounded_in_roster=false on an
accepted fill — should always be empty; if not, something is
bypassing FillValidator).
## What this is NOT
This is NOT a duplicate of replay_runs.jsonl. They're siblings:
- replay_runs.jsonl: replay tool's per-task retrieval+model output
- sessions.jsonl: validatord's per-iterate full retry chain +
grounded-in-roster verdict
A single coordinator session can produce rows in both streams; the
session_id (= Langfuse trace_id) is the join key.
## Layered observability now in place
Live view: Langfuse trace tree (X-Lakehouse-Trace-Id propagation)
`iterate.attempt[N]` spans with prompt/raw/verdict
Offline: coordinator_sessions.jsonl (this commit)
DuckDB-queryable; longitudinal forensics
Hard gate: FillValidator + WorkerLookup (existing)
phantom IDs structurally rejected, never reach
session log's grounded_in_roster=true bucket
Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE
section — these layers are wired; future work targets the data, not
the wiring.
## Verification
- internal/validator: 7 new tests (session_log_test.go) — all PASS
- cmd/validatord: 3 new integration tests covering the success,
failure, and grounded=false paths — all PASS
- validatord_smoke.sh: 6/6 PASS through gateway :3110
- Full go test ./... green across 33 packages
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
225 lines
6.4 KiB
Go
225 lines
6.4 KiB
Go
package validator
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
)
|
|
|
|
func TestSessionLogger_AppendsSchemaTaggedRows(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "sessions.jsonl")
|
|
logger := NewSessionLogger(path)
|
|
if logger == nil {
|
|
t.Fatal("expected non-nil logger for a non-empty path")
|
|
}
|
|
|
|
rec := SessionRecord{
|
|
SessionID: "trace-1",
|
|
Kind: "fill",
|
|
Model: "qwen3.5:latest",
|
|
Provider: "ollama",
|
|
Prompt: "produce a fill artifact",
|
|
Iterations: 1,
|
|
FinalVerdict: "accepted",
|
|
}
|
|
logger.Append(rec)
|
|
|
|
rows := readJSONL(t, path)
|
|
if len(rows) != 1 {
|
|
t.Fatalf("expected 1 row, got %d", len(rows))
|
|
}
|
|
if rows[0]["schema"] != SessionRecordSchema {
|
|
t.Errorf("schema autopopulation: %v", rows[0]["schema"])
|
|
}
|
|
if rows[0]["session_id"] != "trace-1" {
|
|
t.Errorf("session_id round-trip: %v", rows[0]["session_id"])
|
|
}
|
|
if rows[0]["timestamp"] == "" || rows[0]["timestamp"] == nil {
|
|
t.Errorf("timestamp autopopulation: %v", rows[0]["timestamp"])
|
|
}
|
|
if rows[0]["daemon"] != "validatord" {
|
|
t.Errorf("daemon autopopulation: %v", rows[0]["daemon"])
|
|
}
|
|
}
|
|
|
|
func TestSessionLogger_NilIsNoOp(t *testing.T) {
|
|
var logger *SessionLogger // nil
|
|
logger.Append(SessionRecord{SessionID: "trace-1"})
|
|
// No panic, no error — the test passing IS the assertion.
|
|
}
|
|
|
|
func TestSessionLogger_EmptyPathReturnsNil(t *testing.T) {
|
|
if NewSessionLogger("") != nil {
|
|
t.Error("empty path should disable logging (nil logger)")
|
|
}
|
|
}
|
|
|
|
// TestSessionLogger_ConcurrentAppends locks the per-row atomicity: N
|
|
// goroutines each appending must produce N well-formed JSONL rows
|
|
// with no torn writes. Mutex lives at the logger level, not OS-level,
|
|
// so this also documents that concurrent logger instances would NOT
|
|
// be safe (one logger per daemon — that's fine).
|
|
func TestSessionLogger_ConcurrentAppends(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "sessions.jsonl")
|
|
logger := NewSessionLogger(path)
|
|
|
|
const N = 64
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < N; i++ {
|
|
wg.Add(1)
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
logger.Append(SessionRecord{
|
|
SessionID: "trace-" + string(rune('a'+i%26)),
|
|
Kind: "fill",
|
|
FinalVerdict: "accepted",
|
|
Iterations: i,
|
|
})
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
rows := readJSONL(t, path)
|
|
if len(rows) != N {
|
|
t.Fatalf("expected %d rows after concurrent appends, got %d", N, len(rows))
|
|
}
|
|
// Every row must be parseable JSON — a torn write would throw
|
|
// a decode error, which readJSONL t.Fatal's on.
|
|
}
|
|
|
|
func TestBuildSessionRecord_SuccessPathCapturesArtifact(t *testing.T) {
|
|
req := IterateRequest{
|
|
TraceID: "trace-x",
|
|
Kind: "fill",
|
|
Prompt: "produce fill",
|
|
Provider: "ollama",
|
|
Model: "qwen3.5:latest",
|
|
MaxIterations: 3,
|
|
}
|
|
resp := &IterateResponse{
|
|
Artifact: map[string]any{"fills": []any{
|
|
map[string]any{"candidate_id": "W-1"},
|
|
}},
|
|
Validation: Report{},
|
|
Iterations: 1,
|
|
History: []IterateAttempt{{
|
|
Iteration: 0,
|
|
Raw: `{"fills":[{"candidate_id":"W-1"}]}`,
|
|
Status: AttemptStatus{Kind: "accepted"},
|
|
SpanID: "span-0",
|
|
}},
|
|
TraceID: "trace-x",
|
|
}
|
|
roster := func(_ map[string]any) *bool {
|
|
t := true
|
|
return &t
|
|
}
|
|
rec := BuildSessionRecord(req, resp, nil, nil, roster, 1234)
|
|
if rec.FinalVerdict != "accepted" || rec.Iterations != 1 {
|
|
t.Errorf("verdict/iterations: %+v", rec)
|
|
}
|
|
if rec.GroundedInRoster == nil || !*rec.GroundedInRoster {
|
|
t.Errorf("grounded_in_roster: expected true, got %v", rec.GroundedInRoster)
|
|
}
|
|
if rec.Artifact == nil {
|
|
t.Errorf("artifact missing on success path")
|
|
}
|
|
if len(rec.Attempts) != 1 || rec.Attempts[0].SpanID != "span-0" {
|
|
t.Errorf("attempts shape: %+v", rec.Attempts)
|
|
}
|
|
if rec.SessionID != "trace-x" {
|
|
t.Errorf("session_id: %s", rec.SessionID)
|
|
}
|
|
if rec.DurationMs != 1234 {
|
|
t.Errorf("duration: %d", rec.DurationMs)
|
|
}
|
|
}
|
|
|
|
func TestBuildSessionRecord_FailurePathOmitsArtifact(t *testing.T) {
|
|
req := IterateRequest{
|
|
TraceID: "trace-y",
|
|
Kind: "fill",
|
|
Provider: "ollama",
|
|
Model: "qwen",
|
|
MaxIterations: 3,
|
|
}
|
|
fail := &IterateFailure{
|
|
Error: "max iterations reached (3)",
|
|
Iterations: 3,
|
|
History: []IterateAttempt{
|
|
{Iteration: 0, Status: AttemptStatus{Kind: "no_json"}},
|
|
{Iteration: 1, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-X"}},
|
|
{Iteration: 2, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-Y"}},
|
|
},
|
|
TraceID: "trace-y",
|
|
}
|
|
rec := BuildSessionRecord(req, nil, fail, nil, nil, 8765)
|
|
if rec.FinalVerdict != "max_iter_exhausted" {
|
|
t.Errorf("final_verdict: %s", rec.FinalVerdict)
|
|
}
|
|
if rec.Artifact != nil {
|
|
t.Errorf("artifact must be omitted on failure path: %v", rec.Artifact)
|
|
}
|
|
if rec.GroundedInRoster != nil {
|
|
t.Errorf("grounded_in_roster should be nil on failure: %v", *rec.GroundedInRoster)
|
|
}
|
|
if len(rec.Attempts) != 3 || rec.Attempts[2].VerdictKind != "validation_failed" {
|
|
t.Errorf("attempts: %+v", rec.Attempts)
|
|
}
|
|
}
|
|
|
|
func TestBuildSessionRecord_InfraErrorEmitsRow(t *testing.T) {
|
|
// An infrastructure failure (e.g. chat hop crashed) must STILL
|
|
// produce a session row — otherwise debugging gets harder
|
|
// (no log → operator can't see it failed at all).
|
|
req := IterateRequest{TraceID: "trace-z", Kind: "fill"}
|
|
rec := BuildSessionRecord(req, nil, nil, errExample{"chat refused"}, nil, 50)
|
|
if rec.FinalVerdict != "infra_error" {
|
|
t.Errorf("verdict: %s", rec.FinalVerdict)
|
|
}
|
|
if len(rec.Attempts) != 1 || rec.Attempts[0].VerdictKind != "infra_error" {
|
|
t.Errorf("attempts: %+v", rec.Attempts)
|
|
}
|
|
if rec.Attempts[0].Error == "" {
|
|
t.Errorf("error must be carried")
|
|
}
|
|
}
|
|
|
|
// ─── Helpers ───────────────────────────────────────────────────
|
|
|
|
type errExample struct{ msg string }
|
|
|
|
func (e errExample) Error() string { return e.msg }
|
|
|
|
func readJSONL(t *testing.T, path string) []map[string]any {
|
|
t.Helper()
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
t.Fatalf("open: %v", err)
|
|
}
|
|
defer f.Close()
|
|
var out []map[string]any
|
|
sc := bufio.NewScanner(f)
|
|
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
|
|
for sc.Scan() {
|
|
line := sc.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var row map[string]any
|
|
if err := json.Unmarshal(line, &row); err != nil {
|
|
t.Fatalf("parse line %q: %v", line, err)
|
|
}
|
|
out = append(out, row)
|
|
}
|
|
if err := sc.Err(); err != nil {
|
|
t.Fatalf("scan: %v", err)
|
|
}
|
|
return out
|
|
}
|