Closes the second half of J's 2026-05-02 multi-call observability
concern. Trace-id propagation (commit d6d2fdf) gave us the *live*
view in Langfuse; this gives us the *longitudinal* view for ad-hoc
DuckDB queries over thousands of sessions:
"show me every session where the model produced a real candidate
without ever needing a retry"
"find sessions where validation rejected three times in a row"
"first-shot success rate per model — did we feed it enough corpus?"
## What's in
internal/validator/session_log.go:
- SessionRecord type (schema=session.iterate.v1)
- SessionLogger writer — mutex-guarded append, best-effort posture,
nil-safe (NewSessionLogger("") = nil = no-op on Append)
- BuildSessionRecord helper — assembles a row from any
iterate response/failure/infra-error combination, callable from
other daemons that wrap iterate (cross-daemon shared schema)
- 7 unit tests including concurrent-append safety + the three
code paths (success / max_iter_exhausted / infra_error)
cmd/validatord/main.go:
- handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath
- Iterate handler: build + append a SessionRecord on every call
- rosterCheckFor("fill") closure stamps grounded_in_roster — the
load-bearing forensic property J flagged ("we can never
hallucinate available staff members to contracts")
internal/shared/config.go + lakehouse.toml:
- [validatord].session_log_path field; empty = disabled
- Production: /var/lib/lakehouse/validator/sessions.jsonl
scripts/validatord_smoke.sh:
- Adds a probe verifying validatord announces session log path on
startup. Smoke is now 6/6 (was 5/5).
docs/SESSION_LOG.md:
- Schema reference + 5 worked DuckDB query examples including the
"alarm" query (sessions where grounded_in_roster=false on an
accepted fill — should always be empty; if not, something is
bypassing FillValidator).
## What this is NOT
This is NOT a duplicate of replay_runs.jsonl. They're siblings:
- replay_runs.jsonl: replay tool's per-task retrieval+model output
- sessions.jsonl: validatord's per-iterate full retry chain +
grounded-in-roster verdict
A single coordinator session can produce rows in both streams; the
session_id (= Langfuse trace_id) is the join key.
## Layered observability now in place
Live view: Langfuse trace tree (X-Lakehouse-Trace-Id propagation)
`iterate.attempt[N]` spans with prompt/raw/verdict
Offline: coordinator_sessions.jsonl (this commit)
DuckDB-queryable; longitudinal forensics
Hard gate: FillValidator + WorkerLookup (existing)
phantom IDs structurally rejected, never reach
session log's grounded_in_roster=true bucket
Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE
section — these layers are wired; future work targets the data, not
the wiring.
## Verification
- internal/validator: 7 new tests (session_log_test.go) — all PASS
- cmd/validatord: 3 new integration tests covering the success,
failure, and grounded=false paths — all PASS
- validatord_smoke.sh: 6/6 PASS through gateway :3110
- Full go test ./... green across 33 packages
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
200 lines
6.8 KiB
Go
200 lines
6.8 KiB
Go
package validator
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// SessionRecordSchema versions the on-wire JSON shape. Bump when the
|
|
// schema changes incompatibly. Consumers (e.g. duckdb queries over
|
|
// coordinator_sessions.jsonl, scrum review tooling) check this field
|
|
// to decide whether they understand the row.
|
|
const SessionRecordSchema = "session.iterate.v1"
|
|
|
|
// SessionRecord is one row in coordinator_sessions.jsonl. Captures the
|
|
// full retry chain of a single /v1/iterate session for offline
|
|
// forensics: "show me all sessions where the validator caught a
|
|
// phantom worker" / "show me all sessions where retrieval missed."
|
|
//
|
|
// The Langfuse trace tree (see X-Lakehouse-Trace-Id propagation
|
|
// 2026-05-02) is the live view; this JSONL is the longitudinal view
|
|
// for ad-hoc DuckDB queries over thousands of sessions.
|
|
type SessionRecord struct {
|
|
Schema string `json:"schema"`
|
|
SessionID string `json:"session_id"` // = Langfuse trace_id
|
|
Timestamp string `json:"timestamp"` // ISO 8601
|
|
Daemon string `json:"daemon"`
|
|
Kind string `json:"kind"` // fill | email | playbook
|
|
Model string `json:"model"`
|
|
Provider string `json:"provider"`
|
|
Prompt string `json:"prompt"` // truncated to 4000
|
|
Iterations int `json:"iterations"`
|
|
MaxIterations int `json:"max_iterations"`
|
|
FinalVerdict string `json:"final_verdict"` // accepted | max_iter_exhausted | infra_error
|
|
Attempts []SessionAttemptRecord `json:"attempts"`
|
|
Artifact map[string]any `json:"artifact,omitempty"` // present on success
|
|
GroundedInRoster *bool `json:"grounded_in_roster,omitempty"`
|
|
DurationMs int64 `json:"duration_ms"`
|
|
}
|
|
|
|
// SessionAttemptRecord mirrors IterateAttempt but stores only the
|
|
// stable signals (iteration, verdict, error, span id). The raw model
|
|
// output is intentionally NOT captured here — it lives in the
|
|
// Langfuse span (queryable by span_id) and the iterate response. Two
|
|
// copies would let the JSONL grow unbounded on long sessions.
|
|
type SessionAttemptRecord struct {
|
|
Iteration int `json:"iteration"`
|
|
VerdictKind string `json:"verdict_kind"` // no_json | validation_failed | accepted
|
|
Error string `json:"error,omitempty"`
|
|
SpanID string `json:"span_id,omitempty"`
|
|
}
|
|
|
|
// SessionLogger appends SessionRecord rows to a JSONL file. Best-effort:
|
|
// errors are logged via slog and never returned to the caller (per the
|
|
// rest of the observability stack — never block a request because the
|
|
// session log is unhappy).
|
|
//
|
|
// nil is a valid value: NewSessionLogger("") returns nil; Append on
|
|
// a nil receiver is a no-op. Lets validatord skip the wiring entirely
|
|
// when no session log is configured.
|
|
type SessionLogger struct {
|
|
path string
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewSessionLogger constructs a logger writing to `path`. Empty path
|
|
// disables logging. Creates parent dirs lazily on first write so the
|
|
// constructor doesn't panic when the path is on a not-yet-mounted
|
|
// volume (e.g. systemd unit ordering).
|
|
func NewSessionLogger(path string) *SessionLogger {
|
|
if path == "" {
|
|
return nil
|
|
}
|
|
return &SessionLogger{path: path}
|
|
}
|
|
|
|
// Append writes one JSONL row. Returns nil on success or after a
|
|
// best-effort failure. The caller doesn't need to handle errors;
|
|
// logging is an observability witness, not a correctness gate.
|
|
func (l *SessionLogger) Append(rec SessionRecord) {
|
|
if l == nil {
|
|
return
|
|
}
|
|
if rec.Schema == "" {
|
|
rec.Schema = SessionRecordSchema
|
|
}
|
|
if rec.Timestamp == "" {
|
|
rec.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
|
|
}
|
|
if rec.Daemon == "" {
|
|
rec.Daemon = "validatord"
|
|
}
|
|
body, err := json.Marshal(rec)
|
|
if err != nil {
|
|
slog.Warn("session_log: marshal", "err", err, "session_id", rec.SessionID)
|
|
return
|
|
}
|
|
body = append(body, '\n')
|
|
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if err := os.MkdirAll(filepath.Dir(l.path), 0o755); err != nil {
|
|
slog.Warn("session_log: mkdir", "err", err, "path", l.path)
|
|
return
|
|
}
|
|
f, err := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
slog.Warn("session_log: open", "err", err, "path", l.path)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
if _, err := f.Write(body); err != nil {
|
|
slog.Warn("session_log: write", "err", err, "session_id", rec.SessionID)
|
|
}
|
|
}
|
|
|
|
// BuildSessionRecord assembles a SessionRecord from an iterate
|
|
// response/failure pair. Exactly one of resp/fail must be non-nil
|
|
// (or both nil for an infrastructure failure). Centralized here so
|
|
// validatord and any future iterate-using daemon emit the same shape.
|
|
//
|
|
// rosterCheck is called per accepted artifact to populate
|
|
// GroundedInRoster — set to nil to skip the check (e.g. for
|
|
// non-fill kinds that don't have worker IDs). Returns the shaped
|
|
// record; pass to logger.Append on the caller side so failed
|
|
// daemons don't block on their own observability layer.
|
|
func BuildSessionRecord(
|
|
req IterateRequest,
|
|
resp *IterateResponse,
|
|
fail *IterateFailure,
|
|
infraErr error,
|
|
rosterCheck func(map[string]any) *bool,
|
|
durationMs int64,
|
|
) SessionRecord {
|
|
rec := SessionRecord{
|
|
SessionID: req.TraceID,
|
|
Kind: req.Kind,
|
|
Model: req.Model,
|
|
Provider: req.Provider,
|
|
Prompt: trim(req.Prompt, 4000),
|
|
MaxIterations: req.MaxIterations,
|
|
DurationMs: durationMs,
|
|
}
|
|
|
|
switch {
|
|
case resp != nil:
|
|
rec.Iterations = resp.Iterations
|
|
rec.FinalVerdict = "accepted"
|
|
rec.Attempts = sessionAttemptsFromHistory(resp.History)
|
|
rec.Artifact = resp.Artifact
|
|
if rosterCheck != nil {
|
|
rec.GroundedInRoster = rosterCheck(resp.Artifact)
|
|
}
|
|
// Keep the trace id authoritative even if the request didn't
|
|
// supply one — the iterate response carries the resolved id.
|
|
if rec.SessionID == "" {
|
|
rec.SessionID = resp.TraceID
|
|
}
|
|
case fail != nil:
|
|
rec.Iterations = fail.Iterations
|
|
rec.FinalVerdict = "max_iter_exhausted"
|
|
rec.Attempts = sessionAttemptsFromHistory(fail.History)
|
|
if rec.SessionID == "" {
|
|
rec.SessionID = fail.TraceID
|
|
}
|
|
default:
|
|
// Infrastructure failure (chat hop crashed mid-loop, etc.).
|
|
// We still emit a row so the failure is forensically visible —
|
|
// otherwise long debugging sessions get harder.
|
|
rec.FinalVerdict = "infra_error"
|
|
if infraErr != nil {
|
|
rec.Attempts = []SessionAttemptRecord{{
|
|
Iteration: 0,
|
|
VerdictKind: "infra_error",
|
|
Error: trim(fmt.Sprintf("%v", infraErr), 800),
|
|
}}
|
|
}
|
|
}
|
|
|
|
return rec
|
|
}
|
|
|
|
func sessionAttemptsFromHistory(h []IterateAttempt) []SessionAttemptRecord {
|
|
out := make([]SessionAttemptRecord, len(h))
|
|
for i, a := range h {
|
|
out[i] = SessionAttemptRecord{
|
|
Iteration: a.Iteration,
|
|
VerdictKind: a.Status.Kind,
|
|
Error: a.Status.Error,
|
|
SpanID: a.SpanID,
|
|
}
|
|
}
|
|
return out
|
|
}
|