root 1a3a82aedb validatord: coordinator session JSONL for offline analysis (B follow-up)
Closes the second half of J's 2026-05-02 multi-call observability
concern. Trace-id propagation (commit d6d2fdf) gave us the *live*
view in Langfuse; this gives us the *longitudinal* view for ad-hoc
DuckDB queries over thousands of sessions:

  "show me every session where the model produced a real candidate
   without ever needing a retry"
  "find sessions where validation rejected three times in a row"
  "first-shot success rate per model — did we feed it enough corpus?"

## What's in

internal/validator/session_log.go:
  - SessionRecord type (schema=session.iterate.v1)
  - SessionLogger writer — mutex-guarded append, best-effort posture,
    nil-safe (NewSessionLogger("") = nil = no-op on Append)
  - BuildSessionRecord helper — assembles a row from any
    iterate response/failure/infra-error combination, callable from
    other daemons that wrap iterate (cross-daemon shared schema)
  - 7 unit tests including concurrent-append safety + the three
    code paths (success / max_iter_exhausted / infra_error)

cmd/validatord/main.go:
  - handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath
  - Iterate handler: build + append a SessionRecord on every call
  - rosterCheckFor("fill") closure stamps grounded_in_roster — the
    load-bearing forensic property J flagged ("we can never
    hallucinate available staff members to contracts")

internal/shared/config.go + lakehouse.toml:
  - [validatord].session_log_path field; empty = disabled
  - Production: /var/lib/lakehouse/validator/sessions.jsonl

scripts/validatord_smoke.sh:
  - Adds a probe verifying validatord announces session log path on
    startup. Smoke is now 6/6 (was 5/5).

docs/SESSION_LOG.md:
  - Schema reference + 5 worked DuckDB query examples including the
    "alarm" query (sessions where grounded_in_roster=false on an
    accepted fill — should always be empty; if not, something is
    bypassing FillValidator).

## What this is NOT

This is NOT a duplicate of replay_runs.jsonl. They're siblings:
  - replay_runs.jsonl: replay tool's per-task retrieval+model output
  - sessions.jsonl: validatord's per-iterate full retry chain +
    grounded-in-roster verdict

A single coordinator session can produce rows in both streams; the
session_id (= Langfuse trace_id) is the join key.

## Layered observability now in place

  Live view:  Langfuse trace tree (X-Lakehouse-Trace-Id propagation)
              `iterate.attempt[N]` spans with prompt/raw/verdict
  Offline:    coordinator_sessions.jsonl (this commit)
              DuckDB-queryable; longitudinal forensics
  Hard gate:  FillValidator + WorkerLookup (existing)
              phantom IDs structurally rejected, never reach
              session log's grounded_in_roster=true bucket

Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE
section — these layers are wired; future work targets the data, not
the wiring.

## Verification

- internal/validator: 7 new tests (session_log_test.go) — all PASS
- cmd/validatord: 3 new integration tests covering the success,
  failure, and grounded=false paths — all PASS
- validatord_smoke.sh: 6/6 PASS through gateway :3110
- Full go test ./... green across 33 packages

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 05:22:09 -05:00

200 lines
6.8 KiB
Go

package validator
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
)
// SessionRecordSchema versions the on-wire JSON shape. Bump when the
// schema changes incompatibly. Consumers (e.g. duckdb queries over
// coordinator_sessions.jsonl, scrum review tooling) check this field
// to decide whether they understand the row.
const SessionRecordSchema = "session.iterate.v1"
// SessionRecord is one row in coordinator_sessions.jsonl. Captures the
// full retry chain of a single /v1/iterate session for offline
// forensics: "show me all sessions where the validator caught a
// phantom worker" / "show me all sessions where retrieval missed."
//
// The Langfuse trace tree (see X-Lakehouse-Trace-Id propagation
// 2026-05-02) is the live view; this JSONL is the longitudinal view
// for ad-hoc DuckDB queries over thousands of sessions.
type SessionRecord struct {
Schema string `json:"schema"`
SessionID string `json:"session_id"` // = Langfuse trace_id
Timestamp string `json:"timestamp"` // ISO 8601
Daemon string `json:"daemon"`
Kind string `json:"kind"` // fill | email | playbook
Model string `json:"model"`
Provider string `json:"provider"`
Prompt string `json:"prompt"` // truncated to 4000
Iterations int `json:"iterations"`
MaxIterations int `json:"max_iterations"`
FinalVerdict string `json:"final_verdict"` // accepted | max_iter_exhausted | infra_error
Attempts []SessionAttemptRecord `json:"attempts"`
Artifact map[string]any `json:"artifact,omitempty"` // present on success
GroundedInRoster *bool `json:"grounded_in_roster,omitempty"`
DurationMs int64 `json:"duration_ms"`
}
// SessionAttemptRecord mirrors IterateAttempt but stores only the
// stable signals (iteration, verdict, error, span id). The raw model
// output is intentionally NOT captured here — it lives in the
// Langfuse span (queryable by span_id) and the iterate response. Two
// copies would let the JSONL grow unbounded on long sessions.
type SessionAttemptRecord struct {
Iteration int `json:"iteration"`
VerdictKind string `json:"verdict_kind"` // no_json | validation_failed | accepted
Error string `json:"error,omitempty"`
SpanID string `json:"span_id,omitempty"`
}
// SessionLogger appends SessionRecord rows to a JSONL file. Best-effort:
// errors are logged via slog and never returned to the caller (per the
// rest of the observability stack — never block a request because the
// session log is unhappy).
//
// nil is a valid value: NewSessionLogger("") returns nil; Append on
// a nil receiver is a no-op. Lets validatord skip the wiring entirely
// when no session log is configured.
type SessionLogger struct {
path string
mu sync.Mutex
}
// NewSessionLogger constructs a logger writing to `path`. Empty path
// disables logging. Creates parent dirs lazily on first write so the
// constructor doesn't panic when the path is on a not-yet-mounted
// volume (e.g. systemd unit ordering).
func NewSessionLogger(path string) *SessionLogger {
if path == "" {
return nil
}
return &SessionLogger{path: path}
}
// Append writes one JSONL row. Returns nil on success or after a
// best-effort failure. The caller doesn't need to handle errors;
// logging is an observability witness, not a correctness gate.
func (l *SessionLogger) Append(rec SessionRecord) {
if l == nil {
return
}
if rec.Schema == "" {
rec.Schema = SessionRecordSchema
}
if rec.Timestamp == "" {
rec.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
}
if rec.Daemon == "" {
rec.Daemon = "validatord"
}
body, err := json.Marshal(rec)
if err != nil {
slog.Warn("session_log: marshal", "err", err, "session_id", rec.SessionID)
return
}
body = append(body, '\n')
l.mu.Lock()
defer l.mu.Unlock()
if err := os.MkdirAll(filepath.Dir(l.path), 0o755); err != nil {
slog.Warn("session_log: mkdir", "err", err, "path", l.path)
return
}
f, err := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
slog.Warn("session_log: open", "err", err, "path", l.path)
return
}
defer f.Close()
if _, err := f.Write(body); err != nil {
slog.Warn("session_log: write", "err", err, "session_id", rec.SessionID)
}
}
// BuildSessionRecord assembles a SessionRecord from an iterate
// response/failure pair. Exactly one of resp/fail must be non-nil
// (or both nil for an infrastructure failure). Centralized here so
// validatord and any future iterate-using daemon emit the same shape.
//
// rosterCheck is called per accepted artifact to populate
// GroundedInRoster — set to nil to skip the check (e.g. for
// non-fill kinds that don't have worker IDs). Returns the shaped
// record; pass to logger.Append on the caller side so failed
// daemons don't block on their own observability layer.
func BuildSessionRecord(
req IterateRequest,
resp *IterateResponse,
fail *IterateFailure,
infraErr error,
rosterCheck func(map[string]any) *bool,
durationMs int64,
) SessionRecord {
rec := SessionRecord{
SessionID: req.TraceID,
Kind: req.Kind,
Model: req.Model,
Provider: req.Provider,
Prompt: trim(req.Prompt, 4000),
MaxIterations: req.MaxIterations,
DurationMs: durationMs,
}
switch {
case resp != nil:
rec.Iterations = resp.Iterations
rec.FinalVerdict = "accepted"
rec.Attempts = sessionAttemptsFromHistory(resp.History)
rec.Artifact = resp.Artifact
if rosterCheck != nil {
rec.GroundedInRoster = rosterCheck(resp.Artifact)
}
// Keep the trace id authoritative even if the request didn't
// supply one — the iterate response carries the resolved id.
if rec.SessionID == "" {
rec.SessionID = resp.TraceID
}
case fail != nil:
rec.Iterations = fail.Iterations
rec.FinalVerdict = "max_iter_exhausted"
rec.Attempts = sessionAttemptsFromHistory(fail.History)
if rec.SessionID == "" {
rec.SessionID = fail.TraceID
}
default:
// Infrastructure failure (chat hop crashed mid-loop, etc.).
// We still emit a row so the failure is forensically visible —
// otherwise long debugging sessions get harder.
rec.FinalVerdict = "infra_error"
if infraErr != nil {
rec.Attempts = []SessionAttemptRecord{{
Iteration: 0,
VerdictKind: "infra_error",
Error: trim(fmt.Sprintf("%v", infraErr), 800),
}}
}
}
return rec
}
func sessionAttemptsFromHistory(h []IterateAttempt) []SessionAttemptRecord {
out := make([]SessionAttemptRecord, len(h))
for i, a := range h {
out[i] = SessionAttemptRecord{
Iteration: a.Iteration,
VerdictKind: a.Status.Kind,
Error: a.Status.Error,
SpanID: a.SpanID,
}
}
return out
}