root 1a3a82aedb validatord: coordinator session JSONL for offline analysis (B follow-up)
Closes the second half of J's 2026-05-02 multi-call observability
concern. Trace-id propagation (commit d6d2fdf) gave us the *live*
view in Langfuse; this gives us the *longitudinal* view for ad-hoc
DuckDB queries over thousands of sessions:

  "show me every session where the model produced a real candidate
   without ever needing a retry"
  "find sessions where validation rejected three times in a row"
  "first-shot success rate per model — did we feed it enough corpus?"

## What's in

internal/validator/session_log.go:
  - SessionRecord type (schema=session.iterate.v1)
  - SessionLogger writer — mutex-guarded append, best-effort posture,
    nil-safe (NewSessionLogger("") = nil = no-op on Append)
  - BuildSessionRecord helper — assembles a row from any
    iterate response/failure/infra-error combination, callable from
    other daemons that wrap iterate (cross-daemon shared schema)
  - 7 unit tests including concurrent-append safety + the three
    code paths (success / max_iter_exhausted / infra_error)

cmd/validatord/main.go:
  - handlers.sessionLog field + wiring from cfg.Validatord.SessionLogPath
  - Iterate handler: build + append a SessionRecord on every call
  - rosterCheckFor("fill") closure stamps grounded_in_roster — the
    load-bearing forensic property J flagged ("we can never
    hallucinate available staff members to contracts")

internal/shared/config.go + lakehouse.toml:
  - [validatord].session_log_path field; empty = disabled
  - Production: /var/lib/lakehouse/validator/sessions.jsonl

scripts/validatord_smoke.sh:
  - Adds a probe verifying validatord announces session log path on
    startup. Smoke is now 6/6 (was 5/5).

docs/SESSION_LOG.md:
  - Schema reference + 5 worked DuckDB query examples including the
    "alarm" query (sessions where grounded_in_roster=false on an
    accepted fill — should always be empty; if not, something is
    bypassing FillValidator).

## What this is NOT

This is NOT a duplicate of replay_runs.jsonl. They're siblings:
  - replay_runs.jsonl: replay tool's per-task retrieval+model output
  - sessions.jsonl: validatord's per-iterate full retry chain +
    grounded-in-roster verdict

A single coordinator session can produce rows in both streams; the
session_id (= Langfuse trace_id) is the join key.

## Layered observability now in place

  Live view:  Langfuse trace tree (X-Lakehouse-Trace-Id propagation)
              `iterate.attempt[N]` spans with prompt/raw/verdict
  Offline:    coordinator_sessions.jsonl (this commit)
              DuckDB-queryable; longitudinal forensics
  Hard gate:  FillValidator + WorkerLookup (existing)
              phantom IDs structurally rejected, never reach
              session log's grounded_in_roster=true bucket

Per the architecture invariant in STATE_OF_PLAY's DO NOT RELITIGATE
section — these layers are wired; future work targets the data, not
the wiring.

## Verification

- internal/validator: 7 new tests (session_log_test.go) — all PASS
- cmd/validatord: 3 new integration tests covering the success,
  failure, and grounded=false paths — all PASS
- validatord_smoke.sh: 6/6 PASS through gateway :3110
- Full go test ./... green across 33 packages

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 05:22:09 -05:00

428 lines
14 KiB
Go

package main
import (
"bufio"
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/validator"
)
// readSessionLog parses the JSONL session log into row maps. Used by
// the iterate-handler tests to verify per-session entries.
func readSessionLog(t *testing.T, path string) []map[string]any {
t.Helper()
f, err := os.Open(path)
if err != nil {
t.Fatalf("open session log: %v", err)
}
defer f.Close()
var out []map[string]any
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 0, 1<<16), 1<<24)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var row map[string]any
if err := json.Unmarshal(line, &row); err != nil {
t.Fatalf("parse session log line: %v", err)
}
out = append(out, row)
}
return out
}
// newTestRouter builds the validatord router with an explicit lookup
// + a fake chatd URL. Tests that exercise /iterate need a live mock
// chatd (constructed inline per-test).
func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler {
h := newTestHandlers(lookup, chatdURL, nil)
r := chi.NewRouter()
h.register(r)
return r
}
// newTestHandlers exposes the underlying handlers struct so tests can
// inspect post-call state (e.g. session log contents). sessionLog is
// optional — pass nil to disable the session log.
func newTestHandlers(lookup validator.WorkerLookup, chatdURL string, sessionLog *validator.SessionLogger) *handlers {
return &handlers{
lookup: lookup,
chatdURL: chatdURL,
chatClient: &http.Client{Timeout: 5 * time.Second},
iterCfg: validator.IterateConfig{
DefaultMaxIterations: 3,
DefaultMaxTokens: 4096,
},
sessionLog: sessionLog,
}
}
// ─── /validate ─────────────────────────────────────────────────
func TestValidate_RejectsUnknownKind(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "")
body := []byte(`{"kind":"unknown","artifact":{}}`)
req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for unknown kind, got %d (body=%s)", w.Code, w.Body.String())
}
}
func TestValidate_RejectsMissingArtifact(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "")
body := []byte(`{"kind":"playbook"}`)
req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for missing artifact, got %d", w.Code)
}
}
func TestValidate_PlaybookHappyPath(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "")
body := []byte(`{
"kind": "playbook",
"artifact": {
"operation": "fill: Welder x2 in Toledo, OH",
"endorsed_names": ["W-1","W-2"],
"target_count": 2,
"fingerprint": "abc123"
}
}`)
req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String())
}
var report validator.Report
if err := json.Unmarshal(w.Body.Bytes(), &report); err != nil {
t.Fatalf("decode response: %v", err)
}
if report.ElapsedMs < 0 {
t.Errorf("elapsed_ms negative: %d", report.ElapsedMs)
}
}
func TestValidate_PlaybookSchemaErrorReturns422(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "")
body := []byte(`{
"kind": "playbook",
"artifact": {
"operation": "wrong_prefix: foo",
"endorsed_names": ["a"],
"fingerprint": "x"
}
}`)
req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnprocessableEntity {
t.Fatalf("expected 422, got %d (body=%s)", w.Code, w.Body.String())
}
var ve validator.ValidationError
if err := json.Unmarshal(w.Body.Bytes(), &ve); err != nil {
t.Fatalf("decode: %v", err)
}
if ve.Kind != validator.ErrSchema {
t.Errorf("kind = %v, want schema", ve.Kind)
}
}
func TestValidate_FillRoutesThroughLookup(t *testing.T) {
city := "Toledo"
lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{
{CandidateID: "W-1", Name: "Ada", Status: "active", City: &city},
})
r := newTestRouter(lookup, "")
// Candidate that doesn't exist in lookup → consistency failure.
body := []byte(`{
"kind": "fill",
"artifact": {
"fills": [{"candidate_id":"W-PHANTOM","name":"Nobody"}]
},
"context": {"target_count": 1, "city": "Toledo", "client_id": "C-1"}
}`)
req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnprocessableEntity {
t.Fatalf("expected 422 for phantom candidate, got %d (body=%s)", w.Code, w.Body.String())
}
}
func TestValidate_ContextMergedIntoArtifactContext(t *testing.T) {
// _context.target_count from the request `context` block must
// reach the FillValidator's completeness check. Without the
// merge, target_count would default to 0 and any non-empty fills
// list would fail Completeness.
city := "Toledo"
role := "Welder"
lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{
{CandidateID: "W-1", Name: "Ada", Status: "active", City: &city, Role: &role},
})
r := newTestRouter(lookup, "")
body := []byte(`{
"kind": "fill",
"artifact": {"fills":[{"candidate_id":"W-1","name":"Ada"}]},
"context": {"target_count": 1, "city": "Toledo", "role": "Welder", "client_id": "C-1"}
}`)
req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 with context merged, got %d (body=%s)", w.Code, w.Body.String())
}
}
// ─── /iterate ──────────────────────────────────────────────────
// fakeChatd returns a stand-in chatd HTTP server that emits the given
// content string for every /chat call. Caller closes the server.
func fakeChatd(t *testing.T, content string) *httptest.Server {
t.Helper()
mux := chi.NewRouter()
mux.Post("/chat", func(w http.ResponseWriter, _ *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]any{
"model": "test-model",
"content": content,
"provider": "test",
"latency_ms": 1,
})
})
return httptest.NewServer(mux)
}
func TestIterate_RejectsMissingFields(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "")
body := []byte(`{"kind":"playbook","prompt":"x"}`) // missing provider+model
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d", w.Code)
}
}
func TestIterate_HappyPath_ReturnsAcceptedArtifact(t *testing.T) {
server := fakeChatd(t, `{"operation":"fill: Welder x1 in Toledo, OH","endorsed_names":["W-1"],"target_count":1,"fingerprint":"abc"}`)
defer server.Close()
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), server.URL)
body, _ := json.Marshal(map[string]any{
"kind": "playbook",
"prompt": "produce a playbook artifact",
"provider": "ollama",
"model": "qwen3.5:latest",
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String())
}
var resp validator.IterateResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.Iterations != 1 {
t.Errorf("iterations = %d, want 1", resp.Iterations)
}
if resp.Artifact["operation"] != "fill: Welder x1 in Toledo, OH" {
t.Errorf("artifact.operation: %v", resp.Artifact["operation"])
}
}
func TestIterate_MaxIterReturns422WithHistory(t *testing.T) {
// Always returns a no-JSON response, so iterate exhausts retries.
server := fakeChatd(t, "no json here, just prose")
defer server.Close()
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), server.URL)
body, _ := json.Marshal(map[string]any{
"kind": "playbook",
"prompt": "produce X",
"provider": "ollama",
"model": "x",
"max_iterations": 2,
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnprocessableEntity {
t.Fatalf("expected 422, got %d (body=%s)", w.Code, w.Body.String())
}
var fail validator.IterateFailure
if err := json.Unmarshal(w.Body.Bytes(), &fail); err != nil {
t.Fatalf("decode: %v", err)
}
if fail.Iterations != 2 {
t.Errorf("iterations = %d, want 2", fail.Iterations)
}
for _, h := range fail.History {
if h.Status.Kind != "no_json" {
t.Errorf("expected all attempts to be no_json, got %v", h.Status.Kind)
}
}
}
// TestIterate_WritesSessionRowOnSuccess locks the offline-analysis
// contract: every successful /iterate call appends one SessionRecord
// to the configured JSONL with grounded_in_roster=true (because the
// model emitted real candidate IDs that exist in the roster).
func TestIterate_WritesSessionRowOnSuccess(t *testing.T) {
city, role := "Toledo", "Welder"
lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{
{CandidateID: "W-1", Name: "Ada", Status: "active", City: &city, Role: &role},
})
server := fakeChatd(t, `{"fills":[{"candidate_id":"W-1","name":"Ada"}]}`)
defer server.Close()
dir := t.TempDir()
logPath := filepath.Join(dir, "sessions.jsonl")
h := newTestHandlers(lookup, server.URL, validator.NewSessionLogger(logPath))
r := chi.NewRouter()
h.register(r)
body, _ := json.Marshal(map[string]any{
"kind": "fill",
"prompt": "produce fill",
"provider": "ollama",
"model": "qwen3.5:latest",
"context": map[string]any{"target_count": 1, "city": "Toledo", "role": "Welder", "client_id": "C-1"},
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String())
}
rows := readSessionLog(t, logPath)
if len(rows) != 1 {
t.Fatalf("expected 1 session row, got %d", len(rows))
}
row := rows[0]
if row["schema"] != validator.SessionRecordSchema {
t.Errorf("schema: %v", row["schema"])
}
if row["final_verdict"] != "accepted" {
t.Errorf("final_verdict: %v", row["final_verdict"])
}
if row["grounded_in_roster"] != true {
t.Errorf("grounded_in_roster: expected true (W-1 is in roster), got %v", row["grounded_in_roster"])
}
if row["kind"] != "fill" {
t.Errorf("kind: %v", row["kind"])
}
}
// TestIterate_GroundedFalseWhenArtifactReferencesPhantom is the
// load-bearing forensic property J flagged 2026-05-02: phantom
// worker IDs in the final accepted artifact must surface as
// grounded_in_roster=false in the session log. This shouldn't happen
// in practice (FillValidator catches phantoms before they reach this
// row), but the explicit check guards against a future bug where the
// validator weakens or a different validator path skips the roster
// check. Defense-in-depth at the observability layer.
func TestIterate_GroundedFalseWhenArtifactReferencesPhantom(t *testing.T) {
// Empty roster — every candidate ID is "phantom". Plus we use
// the playbook kind so the validator doesn't reject the artifact
// upfront (PlaybookValidator doesn't consult the roster), letting
// the artifact reach the session log. We then build a custom
// rosterCheckFor "fill" closure and exercise it directly.
lookup := validator.NewInMemoryWorkerLookup(nil)
h := newTestHandlers(lookup, "", nil)
check := h.rosterCheckFor("fill")
if check == nil {
t.Fatal("expected non-nil checker for fill kind")
}
got := check(map[string]any{
"fills": []any{
map[string]any{"candidate_id": "W-PHANTOM-NEVER-EXISTS"},
},
})
if got == nil || *got {
t.Errorf("expected grounded=false for phantom candidate, got %v", got)
}
// Other kinds: no roster concept → checker returns nil.
if h.rosterCheckFor("email") != nil {
t.Error("email kind should not have a roster checker")
}
if h.rosterCheckFor("playbook") != nil {
t.Error("playbook kind should not have a roster checker")
}
}
// TestIterate_WritesSessionRowOnFailure: max-iter exhaustion still
// produces a row (omits artifact, captures the retry chain).
func TestIterate_WritesSessionRowOnFailure(t *testing.T) {
server := fakeChatd(t, "no json here, just prose")
defer server.Close()
dir := t.TempDir()
logPath := filepath.Join(dir, "sessions.jsonl")
h := newTestHandlers(validator.NewInMemoryWorkerLookup(nil), server.URL,
validator.NewSessionLogger(logPath))
r := chi.NewRouter()
h.register(r)
body, _ := json.Marshal(map[string]any{
"kind": "playbook",
"prompt": "produce X",
"provider": "ollama",
"model": "x",
"max_iterations": 2,
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusUnprocessableEntity {
t.Fatalf("expected 422, got %d", w.Code)
}
rows := readSessionLog(t, logPath)
if len(rows) != 1 {
t.Fatalf("expected 1 session row, got %d", len(rows))
}
if rows[0]["final_verdict"] != "max_iter_exhausted" {
t.Errorf("final_verdict: %v", rows[0]["final_verdict"])
}
if _, hasArtifact := rows[0]["artifact"]; hasArtifact {
t.Errorf("artifact should be omitted on failure: %v", rows[0]["artifact"])
}
}
func TestIterate_ChatdDownReturns502(t *testing.T) {
r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "http://127.0.0.1:1") // unroutable
body, _ := json.Marshal(map[string]any{
"kind": "playbook",
"prompt": "X",
"provider": "ollama",
"model": "x",
})
req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body))
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadGateway {
t.Fatalf("expected 502, got %d (body=%s)", w.Code, w.Body.String())
}
}