package validator import ( "context" "encoding/json" "fmt" "strings" "time" ) // IterateRequest is the input to Iterate. Mirrors Rust's // IterateRequest in `crates/gateway/src/v1/iterate.rs` so JSONL // captured from one runtime parses on the other. type IterateRequest struct { Kind string `json:"kind"` Prompt string `json:"prompt"` Provider string `json:"provider"` Model string `json:"model"` System string `json:"system,omitempty"` Context map[string]any `json:"context,omitempty"` MaxIterations int `json:"max_iterations,omitempty"` Temperature *float64 `json:"temperature,omitempty"` MaxTokens int `json:"max_tokens,omitempty"` // TraceID lets external schedulers force a parent trace id (so the // iterate session lands as a child of an upstream Langfuse trace). // Empty = the caller didn't provide one; validatord may still // generate one from request context. TraceID string `json:"trace_id,omitempty"` } // IterateAttempt is one row in the history. raw is capped at 2000 // chars on the wire to keep responses bounded. SpanID points at the // Langfuse span emitted for this attempt (if a Tracer was wired in // IterateConfig); empty when tracing was disabled. Operators reading // `data/_kb/replay_runs.jsonl` or staffing-co-pilot session logs can // jump from a row to its full trace tree via this id. type IterateAttempt struct { Iteration int `json:"iteration"` Raw string `json:"raw"` Status AttemptStatus `json:"status"` SpanID string `json:"span_id,omitempty"` } // AttemptStatus is the per-attempt verdict. Tagged JSON so consumers // can switch on `kind` without trying to parse the optional error. type AttemptStatus struct { Kind string `json:"kind"` // "no_json" | "validation_failed" | "accepted" Error string `json:"error,omitempty"` } // IterateResponse is the success payload (200 + Report + accepted artifact). // TraceID echoes back the parent trace id so callers can pivot to // Langfuse and see the full session (chat hops + validator verdicts). type IterateResponse struct { Artifact map[string]any `json:"artifact"` Validation Report `json:"validation"` Iterations int `json:"iterations"` History []IterateAttempt `json:"history"` TraceID string `json:"trace_id,omitempty"` } // IterateFailure is the max-iter-exhausted payload (422 + history). type IterateFailure struct { Error string `json:"error"` Iterations int `json:"iterations"` History []IterateAttempt `json:"history"` TraceID string `json:"trace_id,omitempty"` } // ChatCaller is the seam Iterate uses to invoke an LLM. Tests inject // scripted callers; production wires this to the chatd /v1/chat HTTP // endpoint. Implementations must return the model's textual content // (no choices wrapper, no message envelope). traceID is the Langfuse // parent trace id — HTTP transports forward it as the // `X-Lakehouse-Trace-Id` header so chatd's middleware attaches its // spans to the same trace. type ChatCaller func(ctx context.Context, system, user, provider, model string, temperature *float64, maxTokens int, traceID string) (string, error) // AttemptSpan describes one iteration's chat-call + validator-verdict. // The Tracer (see IterateConfig) consumes these to emit Langfuse // spans — one span per attempt, plus inputs/outputs so an operator // scrolling Langfuse sees the full prompt → raw → verdict chain // without having to join multiple JSONL files. type AttemptSpan struct { TraceID string ParentID string // span id of the surrounding /v1/iterate http request, if any Iteration int Model string Provider string Prompt string Raw string Verdict AttemptStatus StartTime time.Time EndTime time.Time } // Tracer is the hook Iterate calls after each attempt. Returns the // span id so it lands in IterateAttempt.SpanID. nil = tracing // disabled (tests, or production with Langfuse unconfigured). // // Production wiring lives in cmd/validatord/main.go and routes to // internal/langfuse.Client — kept as an interface here so the // validator package doesn't depend on langfuse for unit tests. type Tracer func(span AttemptSpan) string // IterateConfig threads daemon-level settings into the orchestrator. type IterateConfig struct { DefaultMaxIterations int DefaultMaxTokens int DefaultTemperature float64 // Tracer (optional) emits one span per iteration attempt for // per-call visibility. nil = no tracing. Tracer Tracer } const ( defaultMaxIterations = 3 defaultMaxTokens = 4096 defaultTemperature = 0.2 ) // Iterate runs the generate→validate→correct loop. Returns // IterateResponse on success (with full history) or IterateFailure // on max-iter exhaustion. Infrastructure errors (chat hop fails) // surface as Go errors so the HTTP layer can return 502. func Iterate(ctx context.Context, req IterateRequest, cfg IterateConfig, chat ChatCaller, validate func(string, map[string]any) (Report, error)) (*IterateResponse, *IterateFailure, error) { maxIter := req.MaxIterations if maxIter <= 0 { maxIter = cfg.DefaultMaxIterations } if maxIter <= 0 { maxIter = defaultMaxIterations } maxTokens := req.MaxTokens if maxTokens <= 0 { maxTokens = cfg.DefaultMaxTokens } if maxTokens <= 0 { maxTokens = defaultMaxTokens } temp := req.Temperature if temp == nil { t := cfg.DefaultTemperature if t == 0 { t = defaultTemperature } temp = &t } currentPrompt := req.Prompt history := make([]IterateAttempt, 0, maxIter) traceID := req.TraceID // recordAttempt: append to history, emit a Langfuse span if a // Tracer is wired, return the SpanID. Centralized so every code // path (no_json, accepted, validation_failed) hits the same // observability path — no easy way to forget tracing on a branch. recordAttempt := func(iteration int, raw string, verdict AttemptStatus, started time.Time) string { spanID := "" if cfg.Tracer != nil && traceID != "" { spanID = cfg.Tracer(AttemptSpan{ TraceID: traceID, Iteration: iteration, Model: req.Model, Provider: req.Provider, Prompt: currentPrompt, Raw: raw, Verdict: verdict, StartTime: started, EndTime: time.Now(), }) } history = append(history, IterateAttempt{ Iteration: iteration, Raw: trim(raw, 2000), Status: verdict, SpanID: spanID, }) return spanID } for i := 0; i < maxIter; i++ { started := time.Now() raw, err := chat(ctx, req.System, currentPrompt, req.Provider, req.Model, temp, maxTokens, traceID) if err != nil { return nil, nil, fmt.Errorf("/v1/chat hop failed at iter %d: %w", i, err) } artifact := ExtractJSON(raw) if artifact == nil { recordAttempt(i, raw, AttemptStatus{Kind: "no_json"}, started) currentPrompt = req.Prompt + "\n\nYour previous attempt did not contain a JSON object. Reply with ONLY a valid JSON object matching the requested artifact shape." continue } report, vErr := validate(req.Kind, artifact) if vErr == nil { recordAttempt(i, raw, AttemptStatus{Kind: "accepted"}, started) return &IterateResponse{ Artifact: artifact, Validation: report, Iterations: i + 1, History: history, TraceID: traceID, }, nil, nil } // Validation failed — append error to prompt for next iter. // The model sees concrete failure mode + retries with corrective // context. Same "validator IS the observer" shape as Phase 43. errSummary := vErr.Error() recordAttempt(i, raw, AttemptStatus{Kind: "validation_failed", Error: errSummary}, started) currentPrompt = req.Prompt + "\n\nPrior attempt failed validation:\n" + errSummary + "\n\nFix the specific issue above and respond with a corrected JSON object." } return nil, &IterateFailure{ Error: fmt.Sprintf("max iterations reached (%d) without passing validation", maxIter), Iterations: maxIter, History: history, TraceID: traceID, }, nil } // ExtractJSON pulls the first JSON object from a model's output. // Handles fenced code blocks (```json ... ```), bare braces, and // stray prose around the JSON. Returns nil on no extractable object. // // Same algorithm shape as Rust's extract_json so a model producing // output that one runtime accepts will be accepted by the other. func ExtractJSON(raw string) map[string]any { // Try fenced first. for _, c := range fencedCandidates(raw) { if v, ok := parseObject(c); ok { return v } } // Fall back to outermost {...} balance. bytes := []byte(raw) depth := 0 start := -1 for i, b := range bytes { switch b { case '{': if start < 0 { start = i } depth++ case '}': depth-- if depth == 0 && start >= 0 { if v, ok := parseObject(raw[start : i+1]); ok { return v } start = -1 } } } return nil } // fencedCandidates returns the bodies of every ``` fenced block in // `raw`. Skips an optional language tag on the opening fence (e.g. // ```json). func fencedCandidates(raw string) []string { var out []string s := raw for { idx := strings.Index(s, "```") if idx < 0 { break } after := s[idx+3:] // Skip optional language tag up to the first newline. bodyStart := strings.Index(after, "\n") if bodyStart < 0 { bodyStart = 0 } else { bodyStart++ } body := after[bodyStart:] end := strings.Index(body, "```") if end < 0 { break } out = append(out, strings.TrimSpace(body[:end])) s = body[end+3:] } return out } func parseObject(s string) (map[string]any, bool) { var v any if err := json.Unmarshal([]byte(s), &v); err != nil { return nil, false } obj, ok := v.(map[string]any) return obj, ok } func trim(s string, n int) string { if len(s) <= n { return s } return s[:n] }