golangLAKEHOUSE/internal/validator/session_log_test.go
root 1a3a82aedb validatord: coordinator session JSONL for offline analysis (B follow-up)
Closes the second half of J's 2026-05-02 multi-call observability
concern. Trace-id propagation (commit d6d2fdf) gave us the *live*
view in Langfuse; this gives us the *longitudinal* view for ad-hoc
DuckDB queries over thousands of sessions:

  "show me every session where the model produced a real candidate
   without ever needing a retry"
  "find sessions where validation rejected three times in a row"
  "first-shot success rate per model — did we feed it enough corpus?"

## What's in

internal/validator/session_log.go:
  - SessionRecord type (schema=session.iterate.v1)
  - SessionLogger writer — mutex-guarded append, best-effort posture,
    nil-safe (NewSessionLogger("") = nil = no-op on Append)
  - BuildSessionRecord helper — assembles a row from any
    iterate response/failure/infra-error combination, callable from
    other daemons that wrap iterate (cross-daemon shared schema)
  - 7 unit tests including concurrent-append safety + the three
    code paths (success / max_iter_exhausted / infra_error)

cmd/validatord/main.go:
  - handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath
  - Iterate handler: build + append a SessionRecord on every call
  - rosterCheckFor("fill") closure stamps grounded_in_roster — the
    load-bearing forensic property J flagged ("we can never
    hallucinate available staff members to contracts")

internal/shared/config.go + lakehouse.toml:
  - [validatord].session_log_path field; empty = disabled
  - Production: /var/lib/lakehouse/validator/sessions.jsonl

scripts/validatord_smoke.sh:
  - Adds a probe verifying validatord announces session log path on
    startup. Smoke is now 6/6 (was 5/5).

docs/SESSION_LOG.md:
  - Schema reference + 5 worked DuckDB query examples including the
    "alarm" query (sessions where grounded_in_roster=false on an
    accepted fill — should always be empty; if not, something is
    bypassing FillValidator).

## What this is NOT

This is NOT a duplicate of replay_runs.jsonl. They're siblings:
  - replay_runs.jsonl: replay tool's per-task retrieval+model output
  - sessions.jsonl: validatord's per-iterate full retry chain +
    grounded-in-roster verdict

A single coordinator session can produce rows in both streams; the
session_id (= Langfuse trace_id) is the join key.

## Layered observability now in place

  Live view:  Langfuse trace tree (X-Lakehouse-Trace-Id propagation)
              `iterate.attempt[N]` spans with prompt/raw/verdict
  Offline:    coordinator_sessions.jsonl (this commit)
              DuckDB-queryable; longitudinal forensics
  Hard gate:  FillValidator + WorkerLookup (existing)
              phantom IDs structurally rejected, never reach
              session log's grounded_in_roster=true bucket

Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE
section — these layers are wired; future work targets the data, not
the wiring.

## Verification

- internal/validator: 7 new tests (session_log_test.go) — all PASS
- cmd/validatord: 3 new integration tests covering the success,
  failure, and grounded=false paths — all PASS
- validatord_smoke.sh: 6/6 PASS through gateway :3110
- Full go test ./... green across 33 packages

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 05:22:09 -05:00

225 lines
6.4 KiB
Go

package validator
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"sync"
"testing"
)
func TestSessionLogger_AppendsSchemaTaggedRows(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "sessions.jsonl")
logger := NewSessionLogger(path)
if logger == nil {
t.Fatal("expected non-nil logger for a non-empty path")
}
rec := SessionRecord{
SessionID: "trace-1",
Kind: "fill",
Model: "qwen3.5:latest",
Provider: "ollama",
Prompt: "produce a fill artifact",
Iterations: 1,
FinalVerdict: "accepted",
}
logger.Append(rec)
rows := readJSONL(t, path)
if len(rows) != 1 {
t.Fatalf("expected 1 row, got %d", len(rows))
}
if rows[0]["schema"] != SessionRecordSchema {
t.Errorf("schema autopopulation: %v", rows[0]["schema"])
}
if rows[0]["session_id"] != "trace-1" {
t.Errorf("session_id round-trip: %v", rows[0]["session_id"])
}
if rows[0]["timestamp"] == "" || rows[0]["timestamp"] == nil {
t.Errorf("timestamp autopopulation: %v", rows[0]["timestamp"])
}
if rows[0]["daemon"] != "validatord" {
t.Errorf("daemon autopopulation: %v", rows[0]["daemon"])
}
}
func TestSessionLogger_NilIsNoOp(t *testing.T) {
var logger *SessionLogger // nil
logger.Append(SessionRecord{SessionID: "trace-1"})
// No panic, no error — the test passing IS the assertion.
}
func TestSessionLogger_EmptyPathReturnsNil(t *testing.T) {
if NewSessionLogger("") != nil {
t.Error("empty path should disable logging (nil logger)")
}
}
// TestSessionLogger_ConcurrentAppends locks the per-row atomicity: N
// goroutines each appending must produce N well-formed JSONL rows
// with no torn writes. Mutex lives at the logger level, not OS-level,
// so this also documents that concurrent logger instances would NOT
// be safe (one logger per daemon — that's fine).
func TestSessionLogger_ConcurrentAppends(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "sessions.jsonl")
logger := NewSessionLogger(path)
const N = 64
var wg sync.WaitGroup
for i := 0; i < N; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
logger.Append(SessionRecord{
SessionID: "trace-" + string(rune('a'+i%26)),
Kind: "fill",
FinalVerdict: "accepted",
Iterations: i,
})
}(i)
}
wg.Wait()
rows := readJSONL(t, path)
if len(rows) != N {
t.Fatalf("expected %d rows after concurrent appends, got %d", N, len(rows))
}
// Every row must be parseable JSON — a torn write would throw
// a decode error, which readJSONL t.Fatal's on.
}
func TestBuildSessionRecord_SuccessPathCapturesArtifact(t *testing.T) {
req := IterateRequest{
TraceID: "trace-x",
Kind: "fill",
Prompt: "produce fill",
Provider: "ollama",
Model: "qwen3.5:latest",
MaxIterations: 3,
}
resp := &IterateResponse{
Artifact: map[string]any{"fills": []any{
map[string]any{"candidate_id": "W-1"},
}},
Validation: Report{},
Iterations: 1,
History: []IterateAttempt{{
Iteration: 0,
Raw: `{"fills":[{"candidate_id":"W-1"}]}`,
Status: AttemptStatus{Kind: "accepted"},
SpanID: "span-0",
}},
TraceID: "trace-x",
}
roster := func(_ map[string]any) *bool {
t := true
return &t
}
rec := BuildSessionRecord(req, resp, nil, nil, roster, 1234)
if rec.FinalVerdict != "accepted" || rec.Iterations != 1 {
t.Errorf("verdict/iterations: %+v", rec)
}
if rec.GroundedInRoster == nil || !*rec.GroundedInRoster {
t.Errorf("grounded_in_roster: expected true, got %v", rec.GroundedInRoster)
}
if rec.Artifact == nil {
t.Errorf("artifact missing on success path")
}
if len(rec.Attempts) != 1 || rec.Attempts[0].SpanID != "span-0" {
t.Errorf("attempts shape: %+v", rec.Attempts)
}
if rec.SessionID != "trace-x" {
t.Errorf("session_id: %s", rec.SessionID)
}
if rec.DurationMs != 1234 {
t.Errorf("duration: %d", rec.DurationMs)
}
}
func TestBuildSessionRecord_FailurePathOmitsArtifact(t *testing.T) {
req := IterateRequest{
TraceID: "trace-y",
Kind: "fill",
Provider: "ollama",
Model: "qwen",
MaxIterations: 3,
}
fail := &IterateFailure{
Error: "max iterations reached (3)",
Iterations: 3,
History: []IterateAttempt{
{Iteration: 0, Status: AttemptStatus{Kind: "no_json"}},
{Iteration: 1, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-X"}},
{Iteration: 2, Status: AttemptStatus{Kind: "validation_failed", Error: "phantom W-Y"}},
},
TraceID: "trace-y",
}
rec := BuildSessionRecord(req, nil, fail, nil, nil, 8765)
if rec.FinalVerdict != "max_iter_exhausted" {
t.Errorf("final_verdict: %s", rec.FinalVerdict)
}
if rec.Artifact != nil {
t.Errorf("artifact must be omitted on failure path: %v", rec.Artifact)
}
if rec.GroundedInRoster != nil {
t.Errorf("grounded_in_roster should be nil on failure: %v", *rec.GroundedInRoster)
}
if len(rec.Attempts) != 3 || rec.Attempts[2].VerdictKind != "validation_failed" {
t.Errorf("attempts: %+v", rec.Attempts)
}
}
func TestBuildSessionRecord_InfraErrorEmitsRow(t *testing.T) {
// An infrastructure failure (e.g. chat hop crashed) must STILL
// produce a session row — otherwise debugging gets harder
// (no log → operator can't see it failed at all).
req := IterateRequest{TraceID: "trace-z", Kind: "fill"}
rec := BuildSessionRecord(req, nil, nil, errExample{"chat refused"}, nil, 50)
if rec.FinalVerdict != "infra_error" {
t.Errorf("verdict: %s", rec.FinalVerdict)
}
if len(rec.Attempts) != 1 || rec.Attempts[0].VerdictKind != "infra_error" {
t.Errorf("attempts: %+v", rec.Attempts)
}
if rec.Attempts[0].Error == "" {
t.Errorf("error must be carried")
}
}
// ─── Helpers ───────────────────────────────────────────────────
type errExample struct{ msg string }
func (e errExample) Error() string { return e.msg }
func readJSONL(t *testing.T, path string) []map[string]any {
t.Helper()
f, err := os.Open(path)
if err != nil {
t.Fatalf("open: %v", err)
}
defer f.Close()
var out []map[string]any
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var row map[string]any
if err := json.Unmarshal(line, &row); err != nil {
t.Fatalf("parse line %q: %v", line, err)
}
out = append(out, row)
}
if err := sc.Err(); err != nil {
t.Fatalf("scan: %v", err)
}
return out
}