root 6847bbc180 validatord: honor X-Lakehouse-Trace-Id even when Langfuse is off
Surfaced by the 2026-05-02 cross-runtime test: when a caller
forwarded X-Lakehouse-Trace-Id but the langfuse middleware was a
passthrough (no Langfuse env), the header was never read — Go minted
a fallback id, breaking cross-daemon parent-trace linkage.

The middleware only honored the header when its lf client was
non-nil. With LANGFUSE_URL unset on the persistent stack, every
inbound iterate request lost the parent linkage.

Fix: validatord's iterate handler reads the header DIRECTLY (matches
Rust's iterate.rs pattern) before falling through to the ctx value
+ fallback id. Now Go behavior matches Rust regardless of Langfuse
configuration.

Resolution order is:
  1. req.TraceID (caller put it in the JSON body)
  2. X-Lakehouse-Trace-Id header (read directly here)
  3. context value from langfuse middleware (when configured)
  4. fallback to a locally-minted time-ordered hex id

Verified end-to-end:
  curl -H 'X-Lakehouse-Trace-Id: go-cmp-fixed' POST /v1/iterate
  → response.trace_id = "go-cmp-fixed" ✓
  → sessions.jsonl row session_id = "go-cmp-fixed" ✓

Pre-fix (this commit's parent ran from /tmp/val-fresh3 binary):
  same call → trace_id minted as 18abbb5a008061b7-008061e9
  (header silently ignored)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 06:16:25 -05:00

486 lines
16 KiB
Go

// validatord is the staffing-validator service daemon. Hosts:
//
// POST /validate — dispatch a single artifact to FillValidator,
// EmailValidator, or PlaybookValidator
// POST /iterate — generate→validate→correct loop (Phase 43 PRD).
// Calls chatd for the LLM hop and runs the
// validator in-process for the gate.
// GET /health — readiness (always 200; roster status reported
// in /validate responses)
//
// Per docs/SPEC.md and architecture_comparison.md "Go primary path":
// this closes the last bounded item — the now-Go-side validators get
// a network surface so any caller (TS code path, other daemons, agents)
// can validate artifacts via gateway /v1/validate or /v1/iterate.
//
// The roster (worker existence + city/state/role/blacklist) loads
// from a JSONL file at startup. Empty path = no roster, worker-existence
// checks fail Consistency. Production points this at a roster that's
// regenerated from workers_500k.parquet on a schedule.
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"time"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/validator"
)
const maxRequestBytes = 4 << 20 // 4 MiB
func main() {
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
flag.Parse()
cfg, err := shared.LoadConfig(*configPath)
if err != nil {
slog.Error("config", "err", err)
os.Exit(1)
}
lookup, err := validator.LoadJSONLRoster(cfg.Validatord.RosterPath)
if err != nil {
slog.Error("roster load", "path", cfg.Validatord.RosterPath, "err", err)
os.Exit(1)
}
slog.Info("validatord roster",
"path", cfg.Validatord.RosterPath,
"records", lookup.Len(),
)
chatTimeout := time.Duration(cfg.Validatord.ChatTimeoutSecs) * time.Second
if chatTimeout <= 0 {
chatTimeout = 240 * time.Second
}
h := &handlers{
lookup: lookup,
chatdURL: cfg.Validatord.ChatdURL,
chatClient: &http.Client{Timeout: chatTimeout},
iterCfg: validator.IterateConfig{
DefaultMaxIterations: cfg.Validatord.DefaultMaxIterations,
DefaultMaxTokens: cfg.Validatord.DefaultMaxTokens,
},
// Same env loader the middleware uses — getting our own
// reference lets us emit child spans of the per-request trace.
// nil when Langfuse env isn't set; the iterate handler skips
// span emission gracefully in that case.
lf: shared.LoadLangfuseFromEnv(),
sessionLog: validator.NewSessionLogger(cfg.Validatord.SessionLogPath),
}
if h.sessionLog != nil {
slog.Info("validatord session log", "path", cfg.Validatord.SessionLogPath)
}
if err := shared.Run("validatord", cfg.Validatord.Bind, h.register, cfg.Auth); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
}
}
type handlers struct {
lookup validator.WorkerLookup
chatdURL string
chatClient *http.Client
iterCfg validator.IterateConfig
// lf is the Langfuse client (nil when unconfigured — best-effort
// posture matching the rest of the stack). Per-attempt iteration
// spans get emitted via lf when set. The HTTP middleware that
// creates the parent trace lives in shared.Run; this client lets
// validatord's handler emit application-level child spans.
lf *langfuse.Client
// sessionLog appends one SessionRecord per /v1/iterate session to
// `coordinator_sessions.jsonl` for offline DuckDB analysis. nil
// when the session log is unconfigured (best-effort posture).
sessionLog *validator.SessionLogger
}
func (h *handlers) register(r chi.Router) {
r.Post("/validate", h.handleValidate)
r.Post("/iterate", h.handleIterate)
}
// validateRequest is the request body for POST /validate. Mirrors
// Rust's ValidateRequest in `crates/gateway/src/v1/validate.rs`.
type validateRequest struct {
Kind string `json:"kind"` // "fill" | "email" | "playbook"
Artifact map[string]any `json:"artifact"`
Context map[string]any `json:"context,omitempty"`
}
func (h *handlers) handleValidate(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
defer r.Body.Close()
var req validateRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.Kind == "" {
http.Error(w, "kind is required", http.StatusBadRequest)
return
}
if req.Artifact == nil {
http.Error(w, "artifact is required", http.StatusBadRequest)
return
}
report, vErr, kindErr := h.runValidator(req.Kind, req.Artifact, req.Context)
switch {
case kindErr != nil:
http.Error(w, kindErr.Error(), http.StatusBadRequest)
case vErr != nil:
writeJSON(w, http.StatusUnprocessableEntity, vErr)
default:
writeJSON(w, http.StatusOK, report)
}
}
// runValidator dispatches by kind. Returns (Report, ValidationError, kindErr).
// kindErr is non-nil only for unknown kind strings (400).
func (h *handlers) runValidator(kind string, artifact, ctx map[string]any) (*validator.Report, *validator.ValidationError, error) {
merged := mergeContext(artifact, ctx)
a, kindErr := buildArtifact(kind, merged)
if kindErr != nil {
return nil, nil, kindErr
}
v, vErr := pickValidator(kind, h.lookup)
if vErr != nil {
return nil, nil, vErr
}
report, err := v.Validate(a)
if err != nil {
var ve *validator.ValidationError
if errors.As(err, &ve) {
return nil, ve, nil
}
// Validators only ever return ValidationError; an "any other
// error" path means the validator violated its own contract.
// Surface as 500 rather than silently coercing.
return nil, &validator.ValidationError{
Kind: validator.ErrSchema,
Reason: "internal validator error: " + err.Error(),
}, nil
}
return &report, nil, nil
}
// buildArtifact maps the kind string to the right Artifact union arm.
// Unknown kinds return a 400-friendly error.
func buildArtifact(kind string, body map[string]any) (validator.Artifact, error) {
switch kind {
case "fill":
return validator.Artifact{FillProposal: body}, nil
case "email":
return validator.Artifact{EmailDraft: body}, nil
case "playbook":
return validator.Artifact{Playbook: body}, nil
default:
return validator.Artifact{}, fmt.Errorf("unknown kind %q — expected fill | email | playbook", kind)
}
}
func pickValidator(kind string, lookup validator.WorkerLookup) (validator.Validator, error) {
switch kind {
case "fill":
return validator.NewFillValidator(lookup), nil
case "email":
return validator.NewEmailValidator(lookup), nil
case "playbook":
return validator.PlaybookValidator{}, nil
default:
return nil, fmt.Errorf("unknown kind %q", kind)
}
}
// mergeContext folds `context` into `artifact._context` so validators
// pull contract metadata uniformly. Caller-supplied artifact._context
// wins on key collision (caller knows their own contract).
func mergeContext(artifact, ctx map[string]any) map[string]any {
if ctx == nil {
return artifact
}
out := make(map[string]any, len(artifact)+1)
for k, v := range artifact {
out[k] = v
}
existing, _ := out["_context"].(map[string]any)
merged := make(map[string]any, len(ctx)+len(existing))
for k, v := range ctx {
merged[k] = v
}
for k, v := range existing {
merged[k] = v // existing wins
}
out["_context"] = merged
return out
}
func (h *handlers) handleIterate(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes)
defer r.Body.Close()
var req validator.IterateRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.Kind == "" || req.Prompt == "" || req.Provider == "" || req.Model == "" {
http.Error(w, "kind, prompt, provider, and model are required", http.StatusBadRequest)
return
}
// Resolve the parent trace id, in priority order:
// 1. req.TraceID (caller put it in the JSON body)
// 2. X-Lakehouse-Trace-Id header (read DIRECTLY here so the path
// works even when Langfuse is off and the middleware is a
// passthrough — matches Rust's iterate handler shape)
// 3. context value set by the langfuse middleware (when Langfuse
// is configured)
// 4. fallback to a locally-minted time-ordered hex id
//
// Surfaced 2026-05-02 cross-runtime test: Go was minting fallback
// even when caller forwarded the header, because middleware
// passthrough mode never read it. Reading the header here closes
// the gap and makes Go behavior match Rust regardless of
// Langfuse configuration.
if req.TraceID == "" {
req.TraceID = r.Header.Get(shared.TraceIDHeader)
}
if req.TraceID == "" {
req.TraceID = shared.TraceIDFromCtx(r.Context())
}
if req.TraceID == "" {
req.TraceID = newFallbackTraceID()
}
chat := h.chatCaller()
validate := func(kind string, artifact map[string]any) (validator.Report, error) {
report, vErr, kindErr := h.runValidator(kind, artifact, req.Context)
if kindErr != nil {
return validator.Report{}, &validator.ValidationError{
Kind: validator.ErrSchema,
Reason: kindErr.Error(),
}
}
if vErr != nil {
return validator.Report{}, vErr
}
return *report, nil
}
// Wire per-attempt span emission when Langfuse is configured —
// each chat→validate cycle becomes a child span of the iterate
// trace, with prompt/raw/verdict captured for inspection.
cfg := h.iterCfg
if h.lf != nil {
cfg.Tracer = h.iterTracer(r.Context())
}
t0 := time.Now()
resp, fail, err := validator.Iterate(r.Context(), req, cfg, chat, validate)
durationMs := time.Since(t0).Milliseconds()
// Emit one session row regardless of success/failure/infra-error.
// Best-effort: a failing session log never blocks the response.
rosterCheck := h.rosterCheckFor(req.Kind)
rec := validator.BuildSessionRecord(req, resp, fail, err, rosterCheck, durationMs)
h.sessionLog.Append(rec)
if err != nil {
http.Error(w, err.Error(), http.StatusBadGateway)
return
}
if fail != nil {
writeJSON(w, http.StatusUnprocessableEntity, fail)
return
}
writeJSON(w, http.StatusOK, resp)
}
// rosterCheckFor returns a closure that verifies an accepted artifact's
// candidate IDs all exist in the roster. Used by BuildSessionRecord to
// stamp `grounded_in_roster` on each session row — answers the "did
// the model produce real workers, not phantoms?" question at offline-
// query time.
//
// Only the `fill` kind has worker IDs to verify. Other kinds return
// nil (omits the field from the session record).
//
// Note: FillValidator already enforces this constraint on every
// successful iteration (phantom IDs raise ValidationError::Consistency).
// Stamping the bool here surfaces the verified-grounded property as
// queryable data, not relying on validator semantics being recalled
// during analysis. A future scrum may want to query "show me sessions
// where the model guessed real worker IDs without ever needing a
// retry" — that's a `iterations=1 AND grounded_in_roster=true` query.
func (h *handlers) rosterCheckFor(kind string) func(map[string]any) *bool {
if kind != "fill" {
return nil
}
return func(artifact map[string]any) *bool {
fills, ok := artifact["fills"].([]any)
if !ok {
f := false
return &f
}
for _, raw := range fills {
fill, ok := raw.(map[string]any)
if !ok {
f := false
return &f
}
id, _ := fill["candidate_id"].(string)
if id == "" {
f := false
return &f
}
if _, found := h.lookup.Find(id); !found {
f := false
return &f
}
}
t := true
return &t
}
}
// chatCaller wires the iteration loop to chatd via HTTP. Builds the
// chat.Request shape, posts to ${chatdURL}/chat, returns the content
// string (no choices wrapper — chatd's response is already flat).
//
// When traceID is non-empty, forwards it as the `X-Lakehouse-Trace-Id`
// header so chatd's middleware attaches its trace to the same
// Langfuse parent. Closes the multi-call observability gap: a
// /v1/iterate session shows in Langfuse as one trace with N child
// chat spans, instead of N+1 disconnected traces.
func (h *handlers) chatCaller() validator.ChatCaller {
return func(ctx context.Context, system, user, _, model string, temp *float64, maxTokens int, traceID string) (string, error) {
messages := make([]map[string]string, 0, 2)
if system != "" {
messages = append(messages, map[string]string{"role": "system", "content": system})
}
messages = append(messages, map[string]string{"role": "user", "content": user})
body := map[string]any{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
}
if temp != nil {
body["temperature"] = *temp
}
buf, err := json.Marshal(body)
if err != nil {
return "", fmt.Errorf("marshal chat req: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", h.chatdURL+"/chat", bytes.NewReader(buf))
if err != nil {
return "", fmt.Errorf("build chat req: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if traceID != "" {
req.Header.Set(shared.TraceIDHeader, traceID)
}
resp, err := h.chatClient.Do(req)
if err != nil {
return "", fmt.Errorf("chat hop: %w", err)
}
defer resp.Body.Close()
raw, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return "", fmt.Errorf("chat %d: %s", resp.StatusCode, trim(string(raw), 300))
}
var parsed struct {
Content string `json:"content"`
}
if err := json.Unmarshal(raw, &parsed); err != nil {
return "", fmt.Errorf("parse chat resp: %w", err)
}
return parsed.Content, nil
}
}
// iterTracer adapts validator.Tracer → langfuse.Client.Span. Each
// iteration attempt becomes one Langfuse span on the parent trace
// with input={prompt, model, provider}, output={raw, verdict}.
// Operators reading Langfuse can scroll the iterate session and see
// the full retry chain — which prompt produced which raw, which
// validator verdict landed, which retry the model recovered on.
//
// Returns the span id so it lands in IterateAttempt.SpanID — the
// /v1/iterate response carries that id back to the caller for log
// correlation.
//
// Errors are silent (best-effort posture per package langfuse). A
// dropped span never blocks the iterate loop.
func (h *handlers) iterTracer(ctx context.Context) validator.Tracer {
return func(s validator.AttemptSpan) string {
level := "DEFAULT"
if s.Verdict.Kind != "accepted" {
level = "WARNING"
}
return h.lf.Span(ctx, langfuse.SpanInput{
TraceID: s.TraceID,
Name: fmt.Sprintf("iterate.attempt[%d]", s.Iteration),
Input: map[string]any{
"iteration": s.Iteration,
"model": s.Model,
"provider": s.Provider,
"prompt": trim(s.Prompt, 4000),
},
Output: map[string]any{
"verdict": s.Verdict.Kind,
"error": s.Verdict.Error,
"raw": trim(s.Raw, 4000),
},
Metadata: map[string]any{
"verdict_error": s.Verdict.Error,
},
StartTime: s.StartTime,
EndTime: s.EndTime,
Level: level,
})
}
}
// newFallbackTraceID generates a time-ordered hex id used when no
// upstream trace id was forwarded AND the langfuse middleware didn't
// mint one (Langfuse unconfigured). Same shape Langfuse accepts so
// future Langfuse-enabled deployments don't break correlation. Avoids
// pulling in a uuid crate dep; nanosecond precision + a randomness
// suffix is unique enough at the rates iterate runs.
func newFallbackTraceID() string {
ns := time.Now().UTC().UnixNano()
rand := uint32(time.Now().UnixNano() % (1 << 32))
return fmt.Sprintf("%016x-%08x", ns, rand)
}
func writeJSON(w http.ResponseWriter, status int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if err := json.NewEncoder(w).Encode(body); err != nil {
slog.Error("encode", "err", err)
}
}
func trim(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}