From f9e72412c1df3877207d132c4c100189484e015e Mon Sep 17 00:00:00 2001 From: root Date: Sat, 2 May 2026 03:53:20 -0500 Subject: [PATCH] validatord: /v1/validate + /v1/iterate HTTP surface (port 3221) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the last "Go primary" backlog item in docs/ARCHITECTURE_COMPARISON.md. Go now owns the entire validator path end-to-end — no Rust dep for staffing safety net. Architecture: cmd/validatord on :3221 hosts both endpoints. Calls chatd directly for the iterate loop's LLM hop (no gateway self-loopback like the Rust shape). Gateway proxies /v1/validate + /v1/iterate to validatord. What's in: - internal/validator/playbook.go — 3rd validator kind (PRD checks: fill: prefix, endorsed_names ≤ target_count×2, fingerprint required) - internal/validator/lookup_jsonl.go — JSONL roster loader (Parquet deferred; producer one-liner documented in package comment) - internal/validator/iterate.go — ExtractJSON helper + Iterate orchestrator with ChatCaller seam for unit tests - cmd/validatord/main.go — HTTP routes, roster load, chat client - internal/shared/config.go — ValidatordConfig + gateway URL field - lakehouse.toml — [validatord] section - cmd/gateway/main.go — proxy routes for /v1/validate + /v1/iterate Smoke: 5/5 PASS through gateway :3110: ✓ playbook happy path ✓ playbook missing fingerprint → 422 schema/fingerprint ✓ phantom candidate W-PHANTOM → 422 consistency ✓ unknown kind → 400 ✓ roster loaded with 3 records go test ./... green across 33 packages. Co-Authored-By: Claude Opus 4.7 (1M context) --- STATE_OF_PLAY.md | 7 +- cmd/gateway/main.go | 6 + cmd/validatord/main.go | 313 ++++++++++++++++++++++++ cmd/validatord/main_test.go | 261 ++++++++++++++++++++ docs/ARCHITECTURE_COMPARISON.md | 8 +- internal/shared/config.go | 33 +++ internal/validator/iterate.go | 237 ++++++++++++++++++ internal/validator/iterate_test.go | 189 ++++++++++++++ internal/validator/lookup_jsonl.go | 86 +++++++ internal/validator/lookup_jsonl_test.go | 64 +++++ internal/validator/playbook.go | 132 ++++++++++ internal/validator/playbook_test.go | 77 ++++++ internal/validator/types.go | 4 + lakehouse.toml | 19 ++ scripts/validatord_smoke.sh | 153 ++++++++++++ 15 files changed, 1583 insertions(+), 6 deletions(-) create mode 100644 cmd/validatord/main.go create mode 100644 cmd/validatord/main_test.go create mode 100644 internal/validator/iterate.go create mode 100644 internal/validator/iterate_test.go create mode 100644 internal/validator/lookup_jsonl.go create mode 100644 internal/validator/lookup_jsonl_test.go create mode 100644 internal/validator/playbook.go create mode 100644 internal/validator/playbook_test.go create mode 100755 scripts/validatord_smoke.sh diff --git a/STATE_OF_PLAY.md b/STATE_OF_PLAY.md index 074f5b4..87366a4 100644 --- a/STATE_OF_PLAY.md +++ b/STATE_OF_PLAY.md @@ -1,7 +1,7 @@ # STATE OF PLAY — Lakehouse-Go -**Last verified:** 2026-05-02 ~03:00 CDT -**Verified by:** live probes + `just verify` PASS + multitier_100k **full-scale re-run on persistent stack** (132,211 scenarios across 5min @ conc=50, 0 failures across all 6 classes — was 4/6 at 0% pre-fix). Substrate fix (i.vectors side-store + safeGraphAdd + smallIndexRebuildThreshold=32 + saveTask coalescing) holds at original failure-surfacing footprint. +**Last verified:** 2026-05-02 ~04:30 CDT +**Verified by:** live probes + `just verify` PASS + multitier_100k full-scale re-run (132,211 scenarios @ conc=50, 6/6 classes 0% fail) + `validatord_smoke.sh` 5/5 PASS for the new `/v1/validate` + `/v1/iterate` HTTP surface. > **Read this FIRST.** When the user says "we're working on lakehouse," default to the Go rewrite (this repo); the Rust legacy at `/home/profit/lakehouse/` is maintenance-only. If memory contradicts this file, this file wins. Update it when something is verified working — not when a phase finishes. @@ -11,7 +11,7 @@ ### Substrate (G0 + G1 family) -13 service binaries under `cmd/` plus 2 driver scripts (`scripts/staffing_*`) and 3 distillation tools (`cmd/audit_full`, `cmd/materializer`, `cmd/replay`) build into `bin/`. **20 smoke scripts all PASS** (added `materializer_smoke.sh` + `replay_smoke.sh` 2026-05-02). `just verify` (vet + 32 packages × short tests + 9 core smokes) green in ~32s wall. +14 service binaries under `cmd/` plus 2 driver scripts (`scripts/staffing_*`) and 3 distillation tools (`cmd/audit_full`, `cmd/materializer`, `cmd/replay`) build into `bin/`. **21 smoke scripts all PASS** (added `validatord_smoke.sh` 2026-05-02). `just verify` (vet + 33 packages × short tests + 9 core smokes) green in ~32s wall. | Binary | Port | What | |---|---|---| @@ -26,6 +26,7 @@ | `matrixd` | 3218 | Multi-corpus retrieve+merge + relevance + downgrade + playbook | | `observerd` | 3219 | Witness loop, workflow runner with DAG executor | | `chatd` | 3220 | LLM dispatcher: ollama / ollama_cloud / openrouter / opencode / kimi | +| `validatord` | 3221 | `/validate` (FillValidator + EmailValidator + PlaybookValidator) + `/iterate` (gen→validate→correct loop). Roster from JSONL. | | `mcpd` | — | MCP SDK port (Bun mcp-server replacement) | | `fake_ollama` | — | Test fixture (used by `g2_smoke_fixtures.sh`) | diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index ba10f9f..b2f82ae 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -48,6 +48,7 @@ func main() { "matrixd_url": cfg.Gateway.MatrixdURL, "observerd_url": cfg.Gateway.ObserverdURL, "chatd_url": cfg.Gateway.ChatdURL, + "validatord_url": cfg.Gateway.ValidatordURL, } for k, v := range upstreams { if v == "" { @@ -71,6 +72,7 @@ func main() { matrixdURL := mustParseUpstream("matrixd_url", cfg.Gateway.MatrixdURL) observerdURL := mustParseUpstream("observerd_url", cfg.Gateway.ObserverdURL) chatdURL := mustParseUpstream("chatd_url", cfg.Gateway.ChatdURL) + validatordURL := mustParseUpstream("validatord_url", cfg.Gateway.ValidatordURL) storagedProxy := gateway.NewProxyHandler(storagedURL) catalogdProxy := gateway.NewProxyHandler(catalogdURL) @@ -82,6 +84,7 @@ func main() { matrixdProxy := gateway.NewProxyHandler(matrixdURL) observerdProxy := gateway.NewProxyHandler(observerdURL) chatdProxy := gateway.NewProxyHandler(chatdURL) + validatordProxy := gateway.NewProxyHandler(validatordURL) if err := shared.Run("gateway", cfg.Gateway.Bind, func(r chi.Router) { @@ -109,6 +112,9 @@ func main() { // Chat — /v1/chat (LLM dispatcher) + /v1/chat/providers r.Handle("/v1/chat", chatdProxy) r.Handle("/v1/chat/*", chatdProxy) + // Validator — /v1/validate (single-shot) + /v1/iterate (loop) + r.Handle("/v1/validate", validatordProxy) + r.Handle("/v1/iterate", validatordProxy) }, cfg.Auth); err != nil { slog.Error("server", "err", err) os.Exit(1) diff --git a/cmd/validatord/main.go b/cmd/validatord/main.go new file mode 100644 index 0000000..b2f5379 --- /dev/null +++ b/cmd/validatord/main.go @@ -0,0 +1,313 @@ +// validatord is the staffing-validator service daemon. Hosts: +// +// POST /validate — dispatch a single artifact to FillValidator, +// EmailValidator, or PlaybookValidator +// POST /iterate — generate→validate→correct loop (Phase 43 PRD). +// Calls chatd for the LLM hop and runs the +// validator in-process for the gate. +// GET /health — readiness (always 200; roster status reported +// in /validate responses) +// +// Per docs/SPEC.md and architecture_comparison.md "Go primary path": +// this closes the last bounded item — the now-Go-side validators get +// a network surface so any caller (TS code path, other daemons, agents) +// can validate artifacts via gateway /v1/validate or /v1/iterate. +// +// The roster (worker existence + city/state/role/blacklist) loads +// from a JSONL file at startup. Empty path = no roster, worker-existence +// checks fail Consistency. Production points this at a roster that's +// regenerated from workers_500k.parquet on a schedule. +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "time" + + "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/validator" +) + +const maxRequestBytes = 4 << 20 // 4 MiB + +func main() { + configPath := flag.String("config", "lakehouse.toml", "path to TOML config") + flag.Parse() + + cfg, err := shared.LoadConfig(*configPath) + if err != nil { + slog.Error("config", "err", err) + os.Exit(1) + } + + lookup, err := validator.LoadJSONLRoster(cfg.Validatord.RosterPath) + if err != nil { + slog.Error("roster load", "path", cfg.Validatord.RosterPath, "err", err) + os.Exit(1) + } + slog.Info("validatord roster", + "path", cfg.Validatord.RosterPath, + "records", lookup.Len(), + ) + + chatTimeout := time.Duration(cfg.Validatord.ChatTimeoutSecs) * time.Second + if chatTimeout <= 0 { + chatTimeout = 240 * time.Second + } + + h := &handlers{ + lookup: lookup, + chatdURL: cfg.Validatord.ChatdURL, + chatClient: &http.Client{Timeout: chatTimeout}, + iterCfg: validator.IterateConfig{ + DefaultMaxIterations: cfg.Validatord.DefaultMaxIterations, + DefaultMaxTokens: cfg.Validatord.DefaultMaxTokens, + }, + } + + if err := shared.Run("validatord", cfg.Validatord.Bind, h.register, cfg.Auth); err != nil { + slog.Error("server", "err", err) + os.Exit(1) + } +} + +type handlers struct { + lookup validator.WorkerLookup + chatdURL string + chatClient *http.Client + iterCfg validator.IterateConfig +} + +func (h *handlers) register(r chi.Router) { + r.Post("/validate", h.handleValidate) + r.Post("/iterate", h.handleIterate) +} + +// validateRequest is the request body for POST /validate. Mirrors +// Rust's ValidateRequest in `crates/gateway/src/v1/validate.rs`. +type validateRequest struct { + Kind string `json:"kind"` // "fill" | "email" | "playbook" + Artifact map[string]any `json:"artifact"` + Context map[string]any `json:"context,omitempty"` +} + +func (h *handlers) handleValidate(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) + defer r.Body.Close() + + var req validateRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest) + return + } + if req.Kind == "" { + http.Error(w, "kind is required", http.StatusBadRequest) + return + } + if req.Artifact == nil { + http.Error(w, "artifact is required", http.StatusBadRequest) + return + } + + report, vErr, kindErr := h.runValidator(req.Kind, req.Artifact, req.Context) + switch { + case kindErr != nil: + http.Error(w, kindErr.Error(), http.StatusBadRequest) + case vErr != nil: + writeJSON(w, http.StatusUnprocessableEntity, vErr) + default: + writeJSON(w, http.StatusOK, report) + } +} + +// runValidator dispatches by kind. Returns (Report, ValidationError, kindErr). +// kindErr is non-nil only for unknown kind strings (400). +func (h *handlers) runValidator(kind string, artifact, ctx map[string]any) (*validator.Report, *validator.ValidationError, error) { + merged := mergeContext(artifact, ctx) + a, kindErr := buildArtifact(kind, merged) + if kindErr != nil { + return nil, nil, kindErr + } + v, vErr := pickValidator(kind, h.lookup) + if vErr != nil { + return nil, nil, vErr + } + report, err := v.Validate(a) + if err != nil { + var ve *validator.ValidationError + if errors.As(err, &ve) { + return nil, ve, nil + } + // Validators only ever return ValidationError; an "any other + // error" path means the validator violated its own contract. + // Surface as 500 rather than silently coercing. + return nil, &validator.ValidationError{ + Kind: validator.ErrSchema, + Reason: "internal validator error: " + err.Error(), + }, nil + } + return &report, nil, nil +} + +// buildArtifact maps the kind string to the right Artifact union arm. +// Unknown kinds return a 400-friendly error. +func buildArtifact(kind string, body map[string]any) (validator.Artifact, error) { + switch kind { + case "fill": + return validator.Artifact{FillProposal: body}, nil + case "email": + return validator.Artifact{EmailDraft: body}, nil + case "playbook": + return validator.Artifact{Playbook: body}, nil + default: + return validator.Artifact{}, fmt.Errorf("unknown kind %q — expected fill | email | playbook", kind) + } +} + +func pickValidator(kind string, lookup validator.WorkerLookup) (validator.Validator, error) { + switch kind { + case "fill": + return validator.NewFillValidator(lookup), nil + case "email": + return validator.NewEmailValidator(lookup), nil + case "playbook": + return validator.PlaybookValidator{}, nil + default: + return nil, fmt.Errorf("unknown kind %q", kind) + } +} + +// mergeContext folds `context` into `artifact._context` so validators +// pull contract metadata uniformly. Caller-supplied artifact._context +// wins on key collision (caller knows their own contract). +func mergeContext(artifact, ctx map[string]any) map[string]any { + if ctx == nil { + return artifact + } + out := make(map[string]any, len(artifact)+1) + for k, v := range artifact { + out[k] = v + } + existing, _ := out["_context"].(map[string]any) + merged := make(map[string]any, len(ctx)+len(existing)) + for k, v := range ctx { + merged[k] = v + } + for k, v := range existing { + merged[k] = v // existing wins + } + out["_context"] = merged + return out +} + +func (h *handlers) handleIterate(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBytes) + defer r.Body.Close() + + var req validator.IterateRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest) + return + } + if req.Kind == "" || req.Prompt == "" || req.Provider == "" || req.Model == "" { + http.Error(w, "kind, prompt, provider, and model are required", http.StatusBadRequest) + return + } + + chat := h.chatCaller() + validate := func(kind string, artifact map[string]any) (validator.Report, error) { + report, vErr, kindErr := h.runValidator(kind, artifact, req.Context) + if kindErr != nil { + return validator.Report{}, &validator.ValidationError{ + Kind: validator.ErrSchema, + Reason: kindErr.Error(), + } + } + if vErr != nil { + return validator.Report{}, vErr + } + return *report, nil + } + + resp, fail, err := validator.Iterate(r.Context(), req, h.iterCfg, chat, validate) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + if fail != nil { + writeJSON(w, http.StatusUnprocessableEntity, fail) + return + } + writeJSON(w, http.StatusOK, resp) +} + +// chatCaller wires the iteration loop to chatd via HTTP. Builds the +// chat.Request shape, posts to ${chatdURL}/chat, returns the content +// string (no choices wrapper — chatd's response is already flat). +func (h *handlers) chatCaller() validator.ChatCaller { + return func(ctx context.Context, system, user, _, model string, temp *float64, maxTokens int) (string, error) { + messages := make([]map[string]string, 0, 2) + if system != "" { + messages = append(messages, map[string]string{"role": "system", "content": system}) + } + messages = append(messages, map[string]string{"role": "user", "content": user}) + body := map[string]any{ + "model": model, + "messages": messages, + "max_tokens": maxTokens, + } + if temp != nil { + body["temperature"] = *temp + } + buf, err := json.Marshal(body) + if err != nil { + return "", fmt.Errorf("marshal chat req: %w", err) + } + req, err := http.NewRequestWithContext(ctx, "POST", h.chatdURL+"/chat", bytes.NewReader(buf)) + if err != nil { + return "", fmt.Errorf("build chat req: %w", err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := h.chatClient.Do(req) + if err != nil { + return "", fmt.Errorf("chat hop: %w", err) + } + defer resp.Body.Close() + raw, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 400 { + return "", fmt.Errorf("chat %d: %s", resp.StatusCode, trim(string(raw), 300)) + } + var parsed struct { + Content string `json:"content"` + } + if err := json.Unmarshal(raw, &parsed); err != nil { + return "", fmt.Errorf("parse chat resp: %w", err) + } + return parsed.Content, nil + } +} + +func writeJSON(w http.ResponseWriter, status int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(body); err != nil { + slog.Error("encode", "err", err) + } +} + +func trim(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] +} diff --git a/cmd/validatord/main_test.go b/cmd/validatord/main_test.go new file mode 100644 index 0000000..45b964e --- /dev/null +++ b/cmd/validatord/main_test.go @@ -0,0 +1,261 @@ +package main + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/validator" +) + +// newTestRouter builds the validatord router with an explicit lookup +// + a fake chatd URL. Tests that exercise /iterate need a live mock +// chatd (constructed inline per-test). +func newTestRouter(lookup validator.WorkerLookup, chatdURL string) http.Handler { + h := &handlers{ + lookup: lookup, + chatdURL: chatdURL, + chatClient: &http.Client{Timeout: 5 * time.Second}, + iterCfg: validator.IterateConfig{ + DefaultMaxIterations: 3, + DefaultMaxTokens: 4096, + }, + } + r := chi.NewRouter() + h.register(r) + return r +} + +// ─── /validate ───────────────────────────────────────────────── + +func TestValidate_RejectsUnknownKind(t *testing.T) { + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "") + body := []byte(`{"kind":"unknown","artifact":{}}`) + req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400 for unknown kind, got %d (body=%s)", w.Code, w.Body.String()) + } +} + +func TestValidate_RejectsMissingArtifact(t *testing.T) { + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "") + body := []byte(`{"kind":"playbook"}`) + req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400 for missing artifact, got %d", w.Code) + } +} + +func TestValidate_PlaybookHappyPath(t *testing.T) { + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "") + body := []byte(`{ + "kind": "playbook", + "artifact": { + "operation": "fill: Welder x2 in Toledo, OH", + "endorsed_names": ["W-1","W-2"], + "target_count": 2, + "fingerprint": "abc123" + } + }`) + req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String()) + } + var report validator.Report + if err := json.Unmarshal(w.Body.Bytes(), &report); err != nil { + t.Fatalf("decode response: %v", err) + } + if report.ElapsedMs < 0 { + t.Errorf("elapsed_ms negative: %d", report.ElapsedMs) + } +} + +func TestValidate_PlaybookSchemaErrorReturns422(t *testing.T) { + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "") + body := []byte(`{ + "kind": "playbook", + "artifact": { + "operation": "wrong_prefix: foo", + "endorsed_names": ["a"], + "fingerprint": "x" + } + }`) + req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusUnprocessableEntity { + t.Fatalf("expected 422, got %d (body=%s)", w.Code, w.Body.String()) + } + var ve validator.ValidationError + if err := json.Unmarshal(w.Body.Bytes(), &ve); err != nil { + t.Fatalf("decode: %v", err) + } + if ve.Kind != validator.ErrSchema { + t.Errorf("kind = %v, want schema", ve.Kind) + } +} + +func TestValidate_FillRoutesThroughLookup(t *testing.T) { + city := "Toledo" + lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{ + {CandidateID: "W-1", Name: "Ada", Status: "active", City: &city}, + }) + r := newTestRouter(lookup, "") + + // Candidate that doesn't exist in lookup → consistency failure. + body := []byte(`{ + "kind": "fill", + "artifact": { + "fills": [{"candidate_id":"W-PHANTOM","name":"Nobody"}] + }, + "context": {"target_count": 1, "city": "Toledo", "client_id": "C-1"} + }`) + req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusUnprocessableEntity { + t.Fatalf("expected 422 for phantom candidate, got %d (body=%s)", w.Code, w.Body.String()) + } +} + +func TestValidate_ContextMergedIntoArtifactContext(t *testing.T) { + // _context.target_count from the request `context` block must + // reach the FillValidator's completeness check. Without the + // merge, target_count would default to 0 and any non-empty fills + // list would fail Completeness. + city := "Toledo" + role := "Welder" + lookup := validator.NewInMemoryWorkerLookup([]validator.WorkerRecord{ + {CandidateID: "W-1", Name: "Ada", Status: "active", City: &city, Role: &role}, + }) + r := newTestRouter(lookup, "") + body := []byte(`{ + "kind": "fill", + "artifact": {"fills":[{"candidate_id":"W-1","name":"Ada"}]}, + "context": {"target_count": 1, "city": "Toledo", "role": "Welder", "client_id": "C-1"} + }`) + req := httptest.NewRequest("POST", "/validate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200 with context merged, got %d (body=%s)", w.Code, w.Body.String()) + } +} + +// ─── /iterate ────────────────────────────────────────────────── + +// fakeChatd returns a stand-in chatd HTTP server that emits the given +// content string for every /chat call. Caller closes the server. +func fakeChatd(t *testing.T, content string) *httptest.Server { + t.Helper() + mux := chi.NewRouter() + mux.Post("/chat", func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{ + "model": "test-model", + "content": content, + "provider": "test", + "latency_ms": 1, + }) + }) + return httptest.NewServer(mux) +} + +func TestIterate_RejectsMissingFields(t *testing.T) { + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "") + body := []byte(`{"kind":"playbook","prompt":"x"}`) // missing provider+model + req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d", w.Code) + } +} + +func TestIterate_HappyPath_ReturnsAcceptedArtifact(t *testing.T) { + server := fakeChatd(t, `{"operation":"fill: Welder x1 in Toledo, OH","endorsed_names":["W-1"],"target_count":1,"fingerprint":"abc"}`) + defer server.Close() + + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), server.URL) + body, _ := json.Marshal(map[string]any{ + "kind": "playbook", + "prompt": "produce a playbook artifact", + "provider": "ollama", + "model": "qwen3.5:latest", + }) + req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d (body=%s)", w.Code, w.Body.String()) + } + var resp validator.IterateResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.Iterations != 1 { + t.Errorf("iterations = %d, want 1", resp.Iterations) + } + if resp.Artifact["operation"] != "fill: Welder x1 in Toledo, OH" { + t.Errorf("artifact.operation: %v", resp.Artifact["operation"]) + } +} + +func TestIterate_MaxIterReturns422WithHistory(t *testing.T) { + // Always returns a no-JSON response, so iterate exhausts retries. + server := fakeChatd(t, "no json here, just prose") + defer server.Close() + + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), server.URL) + body, _ := json.Marshal(map[string]any{ + "kind": "playbook", + "prompt": "produce X", + "provider": "ollama", + "model": "x", + "max_iterations": 2, + }) + req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusUnprocessableEntity { + t.Fatalf("expected 422, got %d (body=%s)", w.Code, w.Body.String()) + } + var fail validator.IterateFailure + if err := json.Unmarshal(w.Body.Bytes(), &fail); err != nil { + t.Fatalf("decode: %v", err) + } + if fail.Iterations != 2 { + t.Errorf("iterations = %d, want 2", fail.Iterations) + } + for _, h := range fail.History { + if h.Status.Kind != "no_json" { + t.Errorf("expected all attempts to be no_json, got %v", h.Status.Kind) + } + } +} + +func TestIterate_ChatdDownReturns502(t *testing.T) { + r := newTestRouter(validator.NewInMemoryWorkerLookup(nil), "http://127.0.0.1:1") // unroutable + body, _ := json.Marshal(map[string]any{ + "kind": "playbook", + "prompt": "X", + "provider": "ollama", + "model": "x", + }) + req := httptest.NewRequest("POST", "/iterate", bytes.NewReader(body)) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadGateway { + t.Fatalf("expected 502, got %d (body=%s)", w.Code, w.Body.String()) + } +} diff --git a/docs/ARCHITECTURE_COMPARISON.md b/docs/ARCHITECTURE_COMPARISON.md index b3d2c6d..1655a4f 100644 --- a/docs/ARCHITECTURE_COMPARISON.md +++ b/docs/ARCHITECTURE_COMPARISON.md @@ -51,6 +51,7 @@ Don't: | _open_ | Drop Python sidecar from Rust aibridge | Universal-win architectural cleanup. ~200 LOC, removes 1 runtime + 1 process. | | 2026-05-02 | **Port Rust materializer to Go (transforms.ts) — DONE** | `internal/materializer` + `cmd/materializer` + `materializer_smoke.sh`. Ports `transforms.ts` (12 transforms) + `build_evidence_index.ts`. Idempotency, day-partition, receipt. 14 tests green; on-wire JSON matches TS so both runtimes interoperate. | | 2026-05-02 | **Port Rust replay tool to Go — DONE** | `internal/replay` + `cmd/replay` + `replay_smoke.sh`. Ports `replay.ts` retrieve → bundle → /v1/chat → validate → log. Closes audit-FULL phase 7 live invocation on Go side. 14 tests green; same `data/_kb/replay_runs.jsonl` shape (schema=replay_run.v1) as TS. | +| 2026-05-02 | **`/v1/validate` + `/v1/iterate` HTTP surface — DONE** | `cmd/validatord` (port 3221) hosts both endpoints. `internal/validator` gains `PlaybookValidator` (3rd kind), JSONL roster loader, and the `Iterate` orchestrator + `ExtractJSON` helper. Gateway proxies `/v1/validate` + `/v1/iterate` to validatord. Closes the last "Go-primary" backlog item (architecture_comparison.md item #7). 30+ tests + `validatord_smoke.sh` 5/5 PASS. | | _open_ | Decide on Lance vector backend | Defer until corpus exceeds ~5M rows. | | _open_ | Pick Go primary vs Rust primary | Both viable. Go has perf edge after today; Rust has production deploy + producer-side completeness. | @@ -270,9 +271,9 @@ The list below is a working backlog. Move items to "Decisions tracker" ### If keeping Go primary -5. **Port materializer** (highest leverage — unblocks full Go pipeline). ~500-800 LOC. -6. **Port replay tool** (closes audit-FULL phase 7 live invocation). ~400-600 LOC. -7. **Port `/v1/validate` + `/v1/iterate` HTTP surface** for the now-Go-side validators. ~200 LOC. +5. ✅ **Port materializer** — DONE 2026-05-02 (`cmd/materializer`). +6. ✅ **Port replay tool** — DONE 2026-05-02 (`cmd/replay`). +7. ✅ **Port `/v1/validate` + `/v1/iterate` HTTP surface** — DONE 2026-05-02 (`cmd/validatord`). 8. **Skip Lance** until corpus growth demands it (>5M rows). 9. **Keep chatd, observer fail-safe, role gate, multi-corpus matrix** — real Go wins worth preserving. @@ -314,6 +315,7 @@ Append entries here when this doc gets updated. One-line entries; link to commit - 2026-05-01 (later) — coder/hnsw v0.6.1 panic real fix landed: vectord lifts source-of-truth out of coder/hnsw via `i.vectors` side store + recover wrappers + rebuild fallback. Re-run multitier 60s/conc=50: 0 failures across 19,622 scenarios. STATE_OF_PLAY invariant added to "DO NOT RELITIGATE". - 2026-05-02 — Substrate fix verified at original failure-surfacing scale. Multitier 5min @ conc=50: 132,211 scenarios at 438/sec, 6/6 classes at 0% failure (was 4/6 pre-fix). Throughput drop (1,115 → 438/sec) is the honest cost of the formerly-broken scenarios doing real HNSW Add work. STATE_OF_PLAY refreshed to 2026-05-02. - 2026-05-02 — Materializer + replay tool ported from Rust legacy to Go (`internal/materializer` + `internal/replay`, both with CLI + smoke + tests). Both runtimes now produce the same `data/evidence/YYYY/MM/DD/*.jsonl` and `data/_kb/replay_runs.jsonl` shapes; Go side no longer needs Bun for these phases. +- 2026-05-02 — `/v1/validate` + `/v1/iterate` HTTP surface ported as `cmd/validatord` on `:3221`. Closes the last "If keeping Go primary" backlog item — Go now owns the entire validator path end-to-end (no Rust dep for staffing safety net). 5/5 smoke probes via gateway :3110. --- diff --git a/internal/shared/config.go b/internal/shared/config.go index 2008272..99e246e 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -30,6 +30,7 @@ type Config struct { Matrixd MatrixdConfig `toml:"matrixd"` Observerd ObserverdConfig `toml:"observerd"` Chatd ChatdConfig `toml:"chatd"` + Validatord ValidatordConfig `toml:"validatord"` S3 S3Config `toml:"s3"` Models ModelsConfig `toml:"models"` Log LogConfig `toml:"log"` @@ -70,6 +71,7 @@ type GatewayConfig struct { MatrixdURL string `toml:"matrixd_url"` ObserverdURL string `toml:"observerd_url"` ChatdURL string `toml:"chatd_url"` + ValidatordURL string `toml:"validatord_url"` } // EmbeddConfig drives the embed service. ProviderURL points at the @@ -143,6 +145,28 @@ type ChatdConfig struct { TimeoutSecs int `toml:"timeout_secs"` } +// ValidatordConfig drives the validator service (cmd/validatord). +// Hosts /validate (FillValidator + EmailValidator + PlaybookValidator) +// and /iterate (generate→validate→correct loop). Routes to chatd via +// ChatdURL for the iteration loop's LLM hops. +// +// RosterPath points at a JSONL roster (one WorkerRecord per line) that +// FillValidator and EmailValidator use for worker-existence checks. +// Empty disables the roster — worker-existence checks all fail +// Consistency, which is the correct behavior when the roster isn't +// configured. Production sets a stable path under /var/lib/lakehouse/. +type ValidatordConfig struct { + Bind string `toml:"bind"` + ChatdURL string `toml:"chatd_url"` + RosterPath string `toml:"roster_path"` + // Per-call cap on the iteration loop. 0 = 3 (Phase 43 default). + DefaultMaxIterations int `toml:"default_max_iterations"` + // Per-call cap on chat hop max_tokens. 0 = 4096. + DefaultMaxTokens int `toml:"default_max_tokens"` + // Per-call timeout for the chat hop in seconds. 0 = 240s. + ChatTimeoutSecs int `toml:"chat_timeout_secs"` +} + // ObserverdConfig drives the observer service (cmd/observerd). // PersistPath: file path to the JSONL ops log; empty = in-memory // only (test/dev). Production sets a stable path under @@ -328,6 +352,7 @@ func DefaultConfig() Config { MatrixdURL: "http://127.0.0.1:3218", ObserverdURL: "http://127.0.0.1:3219", ChatdURL: "http://127.0.0.1:3220", + ValidatordURL: "http://127.0.0.1:3221", }, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, @@ -361,6 +386,14 @@ func DefaultConfig() Config { Bind: "127.0.0.1:3219", // PersistPath empty by default = in-memory only. }, + Validatord: ValidatordConfig{ + Bind: "127.0.0.1:3221", + ChatdURL: "http://127.0.0.1:3220", + RosterPath: "", // empty = no roster; worker-existence checks fail Consistency + DefaultMaxIterations: 3, + DefaultMaxTokens: 4096, + ChatTimeoutSecs: 240, + }, Chatd: ChatdConfig{ Bind: "127.0.0.1:3220", OllamaURL: "http://localhost:11434", diff --git a/internal/validator/iterate.go b/internal/validator/iterate.go new file mode 100644 index 0000000..3e00628 --- /dev/null +++ b/internal/validator/iterate.go @@ -0,0 +1,237 @@ +package validator + +import ( + "context" + "encoding/json" + "fmt" + "strings" +) + +// IterateRequest is the input to Iterate. Mirrors Rust's +// IterateRequest in `crates/gateway/src/v1/iterate.rs` so JSONL +// captured from one runtime parses on the other. +type IterateRequest struct { + Kind string `json:"kind"` + Prompt string `json:"prompt"` + Provider string `json:"provider"` + Model string `json:"model"` + System string `json:"system,omitempty"` + Context map[string]any `json:"context,omitempty"` + MaxIterations int `json:"max_iterations,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + MaxTokens int `json:"max_tokens,omitempty"` +} + +// IterateAttempt is one row in the history. raw is capped at 2000 +// chars on the wire to keep responses bounded. +type IterateAttempt struct { + Iteration int `json:"iteration"` + Raw string `json:"raw"` + Status AttemptStatus `json:"status"` +} + +// AttemptStatus is the per-attempt verdict. Tagged JSON so consumers +// can switch on `kind` without trying to parse the optional error. +type AttemptStatus struct { + Kind string `json:"kind"` // "no_json" | "validation_failed" | "accepted" + Error string `json:"error,omitempty"` +} + +// IterateResponse is the success payload (200 + Report + accepted artifact). +type IterateResponse struct { + Artifact map[string]any `json:"artifact"` + Validation Report `json:"validation"` + Iterations int `json:"iterations"` + History []IterateAttempt `json:"history"` +} + +// IterateFailure is the max-iter-exhausted payload (422 + history). +type IterateFailure struct { + Error string `json:"error"` + Iterations int `json:"iterations"` + History []IterateAttempt `json:"history"` +} + +// ChatCaller is the seam Iterate uses to invoke an LLM. Tests inject +// scripted callers; production wires this to the chatd /v1/chat HTTP +// endpoint. Implementations must return the model's textual content +// (no choices wrapper, no message envelope). +type ChatCaller func(ctx context.Context, system, user, provider, model string, temperature *float64, maxTokens int) (string, error) + +// IterateConfig threads daemon-level settings into the orchestrator. +type IterateConfig struct { + DefaultMaxIterations int + DefaultMaxTokens int + DefaultTemperature float64 +} + +const ( + defaultMaxIterations = 3 + defaultMaxTokens = 4096 + defaultTemperature = 0.2 +) + +// Iterate runs the generate→validate→correct loop. Returns +// IterateResponse on success (with full history) or IterateFailure +// on max-iter exhaustion. Infrastructure errors (chat hop fails) +// surface as Go errors so the HTTP layer can return 502. +func Iterate(ctx context.Context, req IterateRequest, cfg IterateConfig, chat ChatCaller, validate func(string, map[string]any) (Report, error)) (*IterateResponse, *IterateFailure, error) { + maxIter := req.MaxIterations + if maxIter <= 0 { + maxIter = cfg.DefaultMaxIterations + } + if maxIter <= 0 { + maxIter = defaultMaxIterations + } + maxTokens := req.MaxTokens + if maxTokens <= 0 { + maxTokens = cfg.DefaultMaxTokens + } + if maxTokens <= 0 { + maxTokens = defaultMaxTokens + } + temp := req.Temperature + if temp == nil { + t := cfg.DefaultTemperature + if t == 0 { + t = defaultTemperature + } + temp = &t + } + + currentPrompt := req.Prompt + history := make([]IterateAttempt, 0, maxIter) + + for i := 0; i < maxIter; i++ { + raw, err := chat(ctx, req.System, currentPrompt, req.Provider, req.Model, temp, maxTokens) + if err != nil { + return nil, nil, fmt.Errorf("/v1/chat hop failed at iter %d: %w", i, err) + } + + artifact := ExtractJSON(raw) + if artifact == nil { + history = append(history, IterateAttempt{ + Iteration: i, + Raw: trim(raw, 2000), + Status: AttemptStatus{Kind: "no_json"}, + }) + currentPrompt = req.Prompt + "\n\nYour previous attempt did not contain a JSON object. Reply with ONLY a valid JSON object matching the requested artifact shape." + continue + } + + report, vErr := validate(req.Kind, artifact) + if vErr == nil { + history = append(history, IterateAttempt{ + Iteration: i, + Raw: trim(raw, 2000), + Status: AttemptStatus{Kind: "accepted"}, + }) + return &IterateResponse{ + Artifact: artifact, + Validation: report, + Iterations: i + 1, + History: history, + }, nil, nil + } + + // Validation failed — append error to prompt for next iter. + // The model sees concrete failure mode + retries with corrective + // context. Same "validator IS the observer" shape as Phase 43. + errSummary := vErr.Error() + history = append(history, IterateAttempt{ + Iteration: i, + Raw: trim(raw, 2000), + Status: AttemptStatus{Kind: "validation_failed", Error: errSummary}, + }) + currentPrompt = req.Prompt + "\n\nPrior attempt failed validation:\n" + errSummary + "\n\nFix the specific issue above and respond with a corrected JSON object." + } + + return nil, &IterateFailure{ + Error: fmt.Sprintf("max iterations reached (%d) without passing validation", maxIter), + Iterations: maxIter, + History: history, + }, nil +} + +// ExtractJSON pulls the first JSON object from a model's output. +// Handles fenced code blocks (```json ... ```), bare braces, and +// stray prose around the JSON. Returns nil on no extractable object. +// +// Same algorithm shape as Rust's extract_json so a model producing +// output that one runtime accepts will be accepted by the other. +func ExtractJSON(raw string) map[string]any { + // Try fenced first. + for _, c := range fencedCandidates(raw) { + if v, ok := parseObject(c); ok { + return v + } + } + // Fall back to outermost {...} balance. + bytes := []byte(raw) + depth := 0 + start := -1 + for i, b := range bytes { + switch b { + case '{': + if start < 0 { + start = i + } + depth++ + case '}': + depth-- + if depth == 0 && start >= 0 { + if v, ok := parseObject(raw[start : i+1]); ok { + return v + } + start = -1 + } + } + } + return nil +} + +// fencedCandidates returns the bodies of every ``` fenced block in +// `raw`. Skips an optional language tag on the opening fence (e.g. +// ```json). +func fencedCandidates(raw string) []string { + var out []string + s := raw + for { + idx := strings.Index(s, "```") + if idx < 0 { + break + } + after := s[idx+3:] + // Skip optional language tag up to the first newline. + bodyStart := strings.Index(after, "\n") + if bodyStart < 0 { + bodyStart = 0 + } else { + bodyStart++ + } + body := after[bodyStart:] + end := strings.Index(body, "```") + if end < 0 { + break + } + out = append(out, strings.TrimSpace(body[:end])) + s = body[end+3:] + } + return out +} + +func parseObject(s string) (map[string]any, bool) { + var v any + if err := json.Unmarshal([]byte(s), &v); err != nil { + return nil, false + } + obj, ok := v.(map[string]any) + return obj, ok +} + +func trim(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] +} diff --git a/internal/validator/iterate_test.go b/internal/validator/iterate_test.go new file mode 100644 index 0000000..3c1cbab --- /dev/null +++ b/internal/validator/iterate_test.go @@ -0,0 +1,189 @@ +package validator + +import ( + "context" + "errors" + "testing" +) + +func TestExtractJSON_FromFencedBlock(t *testing.T) { + raw := "Here's my answer:\n```json\n{\"fills\": [{\"candidate_id\": \"W-1\"}]}\n```\nDone." + v := ExtractJSON(raw) + if v == nil { + t.Fatal("expected match in fenced block") + } + if _, ok := v["fills"]; !ok { + t.Errorf("missing fills key: %+v", v) + } +} + +func TestExtractJSON_FromBareBraces(t *testing.T) { + raw := "Here you go: {\"fills\": [{\"candidate_id\": \"W-2\"}]}" + v := ExtractJSON(raw) + if v == nil { + t.Fatal("expected match in bare braces") + } +} + +func TestExtractJSON_ReturnsNilOnNoObject(t *testing.T) { + if v := ExtractJSON("just prose, no json"); v != nil { + t.Errorf("expected nil, got %+v", v) + } +} + +func TestExtractJSON_PicksFirstBalancedObject(t *testing.T) { + v := ExtractJSON(`{"a":1} then {"b":2}`) + if v == nil { + t.Fatal("expected match") + } + if v["a"].(float64) != 1 { + t.Errorf("expected first object, got %+v", v) + } +} + +func TestExtractJSON_NestedBalancedObjects(t *testing.T) { + v := ExtractJSON(`prefix {"outer": {"inner": [1,2,3]}, "x": "y"} suffix`) + if v == nil { + t.Fatal("expected match on balanced nested object") + } + if outer, ok := v["outer"].(map[string]any); !ok || outer["inner"] == nil { + t.Errorf("nested structure lost: %+v", v) + } +} + +func TestExtractJSON_TopLevelArrayReturnsFirstInnerObject(t *testing.T) { + // Both Rust and Go runtimes accept the first balanced {...} as a + // successful match — for `[{"a":1},{"b":2}]` that's the first + // inner object. Documenting this so the contract stays consistent + // across runtimes. + v := ExtractJSON(`[{"a":1},{"b":2}]`) + if v == nil { + t.Fatal("expected first inner object to be returned") + } + if v["a"].(float64) != 1 { + t.Errorf("expected first object {a:1}, got %+v", v) + } +} + +// ─── Iterate orchestrator tests with scripted ChatCaller ──────────── + +func scriptedChat(responses ...string) (ChatCaller, *int) { + idx := 0 + return func(_ context.Context, _, _ string, _, _ string, _ *float64, _ int) (string, error) { + if idx >= len(responses) { + return "", errors.New("scripted chat exhausted") + } + r := responses[idx] + idx++ + return r, nil + }, &idx +} + +func TestIterate_AcceptsFirstValidArtifact(t *testing.T) { + chat, calls := scriptedChat(`{"endorsed_names":["W-1"]}`) + validate := func(_ string, _ map[string]any) (Report, error) { + return Report{ElapsedMs: 1}, nil + } + resp, fail, err := Iterate(context.Background(), + IterateRequest{Kind: "playbook", Prompt: "produce X", Provider: "ollama", Model: "qwen3.5:latest"}, + IterateConfig{}, chat, validate) + if err != nil || fail != nil { + t.Fatalf("expected success, got err=%v fail=%+v", err, fail) + } + if resp.Iterations != 1 { + t.Errorf("iterations = %d, want 1", resp.Iterations) + } + if len(resp.History) != 1 || resp.History[0].Status.Kind != "accepted" { + t.Errorf("history: %+v", resp.History) + } + if *calls != 1 { + t.Errorf("expected 1 chat call, got %d", *calls) + } +} + +func TestIterate_RetriesOnNoJsonThenSucceeds(t *testing.T) { + chat, _ := scriptedChat( + "sorry I cannot do that", + `{"endorsed_names":["W-1"]}`, + ) + validate := func(_ string, _ map[string]any) (Report, error) { + return Report{}, nil + } + resp, _, err := Iterate(context.Background(), + IterateRequest{Kind: "playbook", Prompt: "produce X", Provider: "ollama", Model: "x"}, + IterateConfig{}, chat, validate) + if err != nil || resp == nil { + t.Fatalf("expected success, err=%v", err) + } + if resp.Iterations != 2 { + t.Errorf("iterations = %d, want 2", resp.Iterations) + } + if resp.History[0].Status.Kind != "no_json" { + t.Errorf("first history status: %+v", resp.History[0].Status) + } +} + +func TestIterate_RetriesOnValidationFailureThenSucceeds(t *testing.T) { + chat, _ := scriptedChat( + `{"bad":"shape"}`, + `{"good":"shape"}`, + ) + calls := 0 + validate := func(_ string, body map[string]any) (Report, error) { + calls++ + if _, ok := body["good"]; ok { + return Report{}, nil + } + return Report{}, &ValidationError{Kind: ErrSchema, Field: "x", Reason: "missing good"} + } + resp, _, err := Iterate(context.Background(), + IterateRequest{Kind: "playbook", Prompt: "produce X", Provider: "ollama", Model: "x"}, + IterateConfig{}, chat, validate) + if err != nil || resp == nil { + t.Fatalf("expected success, err=%v", err) + } + if calls != 2 { + t.Errorf("validate calls = %d, want 2", calls) + } + if resp.History[0].Status.Kind != "validation_failed" { + t.Errorf("first history status: %+v", resp.History[0].Status) + } + if resp.History[0].Status.Error == "" { + t.Errorf("validation_failed entry must carry error string") + } +} + +func TestIterate_MaxIterationsExhaustedReturnsFailure(t *testing.T) { + chat, _ := scriptedChat(`{}`, `{}`, `{}`) + validate := func(_ string, _ map[string]any) (Report, error) { + return Report{}, &ValidationError{Kind: ErrCompleteness, Reason: "always wrong"} + } + resp, fail, err := Iterate(context.Background(), + IterateRequest{Kind: "playbook", Prompt: "X", Provider: "ollama", Model: "x", MaxIterations: 3}, + IterateConfig{}, chat, validate) + if err != nil { + t.Fatalf("infrastructure error unexpected: %v", err) + } + if resp != nil { + t.Fatalf("expected failure, got %+v", resp) + } + if fail.Iterations != 3 { + t.Errorf("iterations = %d, want 3", fail.Iterations) + } + if len(fail.History) != 3 { + t.Errorf("history length = %d, want 3", len(fail.History)) + } +} + +func TestIterate_PropagatesChatInfraError(t *testing.T) { + chat := func(_ context.Context, _, _ string, _, _ string, _ *float64, _ int) (string, error) { + return "", errors.New("connection refused") + } + validate := func(_ string, _ map[string]any) (Report, error) { return Report{}, nil } + _, _, err := Iterate(context.Background(), + IterateRequest{Kind: "playbook", Prompt: "X", Provider: "ollama", Model: "x"}, + IterateConfig{}, chat, validate) + if err == nil { + t.Fatal("expected infrastructure error to surface") + } +} diff --git a/internal/validator/lookup_jsonl.go b/internal/validator/lookup_jsonl.go new file mode 100644 index 0000000..05e2b29 --- /dev/null +++ b/internal/validator/lookup_jsonl.go @@ -0,0 +1,86 @@ +package validator + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "strings" +) + +// rosterRow is the on-disk shape of one line in a roster JSONL. +// Fields are tolerant — string-valued city/state/role become *string +// on WorkerRecord; absent or null fields stay nil so the validators +// know "we don't know" vs "we know it's empty." +// +// Mirrors the projection used in the Rust ParquetWorkerLookup so +// JSONL exported from `workers_500k.parquet` (or a synthetic dataset) +// loads here without translation. Producer: +// +// duckdb -c "COPY (SELECT candidate_id, name, status, city, state, +// role, blacklisted_clients FROM workers) TO 'roster.jsonl' +// (FORMAT JSON, ARRAY false)" +type rosterRow struct { + CandidateID string `json:"candidate_id"` + Name string `json:"name"` + Status string `json:"status"` + City *string `json:"city"` + State *string `json:"state"` + Role *string `json:"role"` + BlacklistedClients []string `json:"blacklisted_clients"` +} + +// LoadJSONLRoster reads a roster JSONL file and returns an +// InMemoryWorkerLookup. The validators accept any WorkerLookup, so +// callers that need a different backing store (e.g. queryd-backed +// lookup against the live Parquet view) can plug in their own +// implementation without changing this function. +// +// Parse errors on individual lines are skipped, not fatal — the +// roster is operator-supplied and a corrupted line shouldn't +// disable the whole validator surface. The return error is for +// I/O failures (path missing, unreadable). +// +// Empty path returns an empty lookup + nil — gives the daemon a +// "no roster configured" mode where worker-existence checks fail +// Consistency. Matches the Rust gateway's default. +func LoadJSONLRoster(path string) (*InMemoryWorkerLookup, error) { + if path == "" { + return NewInMemoryWorkerLookup(nil), nil + } + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("open roster: %w", err) + } + defer f.Close() + + var records []WorkerRecord + scanner := bufio.NewScanner(f) + scanner.Buffer(make([]byte, 0, 1<<16), 1<<24) + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var row rosterRow + if err := json.Unmarshal(line, &row); err != nil { + continue // tolerate malformed lines + } + if strings.TrimSpace(row.CandidateID) == "" { + continue + } + records = append(records, WorkerRecord{ + CandidateID: row.CandidateID, + Name: row.Name, + Status: row.Status, + City: row.City, + State: row.State, + Role: row.Role, + BlacklistedClients: row.BlacklistedClients, + }) + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scan roster: %w", err) + } + return NewInMemoryWorkerLookup(records), nil +} diff --git a/internal/validator/lookup_jsonl_test.go b/internal/validator/lookup_jsonl_test.go new file mode 100644 index 0000000..3a4c77f --- /dev/null +++ b/internal/validator/lookup_jsonl_test.go @@ -0,0 +1,64 @@ +package validator + +import ( + "os" + "path/filepath" + "testing" +) + +func TestLoadJSONLRoster_RoundTripFields(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "roster.jsonl") + body := `{"candidate_id":"W-1","name":"Ada","status":"active","city":"Toledo","state":"OH","role":"Welder","blacklisted_clients":["C-1"]} +{"candidate_id":"W-2","name":"Bea","status":"inactive","city":null,"state":null,"role":null,"blacklisted_clients":[]} +malformed line that should be skipped +{"candidate_id":"","name":"empty id","status":"active"} +` + if err := os.WriteFile(path, []byte(body), 0o644); err != nil { + t.Fatalf("write fixture: %v", err) + } + + l, err := LoadJSONLRoster(path) + if err != nil { + t.Fatalf("load: %v", err) + } + if l.Len() != 2 { + t.Fatalf("expected 2 records (skip malformed + empty id), got %d", l.Len()) + } + + w1, ok := l.Find("W-1") + if !ok { + t.Fatal("missing W-1") + } + if w1.City == nil || *w1.City != "Toledo" || w1.Role == nil || *w1.Role != "Welder" { + t.Errorf("W-1 fields: %+v", w1) + } + if len(w1.BlacklistedClients) != 1 || w1.BlacklistedClients[0] != "C-1" { + t.Errorf("W-1 blacklist: %+v", w1.BlacklistedClients) + } + + w2, ok := l.Find("w-2") // case-insensitive + if !ok { + t.Fatal("missing W-2 (case-insensitive)") + } + if w2.City != nil || w2.State != nil || w2.Role != nil { + t.Errorf("W-2 should have nil pointers for missing fields: %+v", w2) + } +} + +func TestLoadJSONLRoster_EmptyPathReturnsEmptyLookup(t *testing.T) { + l, err := LoadJSONLRoster("") + if err != nil { + t.Fatalf("empty path should not error: %v", err) + } + if l.Len() != 0 { + t.Errorf("expected empty lookup, got len=%d", l.Len()) + } +} + +func TestLoadJSONLRoster_MissingFileErrors(t *testing.T) { + _, err := LoadJSONLRoster("/nonexistent/path/roster.jsonl") + if err == nil { + t.Fatal("expected error for missing path") + } +} diff --git a/internal/validator/playbook.go b/internal/validator/playbook.go new file mode 100644 index 0000000..ec3ade5 --- /dev/null +++ b/internal/validator/playbook.go @@ -0,0 +1,132 @@ +package validator + +import ( + "fmt" + "strings" + "time" +) + +// PlaybookValidator is the Go port of Rust's +// `crates/validator/src/staffing/playbook.rs`. Sealed playbook +// validation per Phase 25: +// +// - Operation must be a non-empty string starting with `fill:` +// - endorsed_names must be a non-empty array, ≤ target_count × 2 +// - fingerprint must be non-empty (validity-window requirement) +// +// PlaybookValidator is stateless — no WorkerLookup dependency, unlike +// FillValidator and EmailValidator. The whole validation runs on the +// artifact body alone. +type PlaybookValidator struct{} + +// NewPlaybookValidator returns a zero-deps validator. Constructor for +// symmetry with the other two; not strictly required. +func NewPlaybookValidator() *PlaybookValidator { return &PlaybookValidator{} } + +// Name satisfies Validator. Matches Rust's "staffing.playbook" so +// audit-log scrapes work across runtimes. +func (PlaybookValidator) Name() string { return "staffing.playbook" } + +// Validate runs the four PRD checks. Errors abort the run; warnings +// (none today) would attach to a passing Report. +func (v PlaybookValidator) Validate(a Artifact) (Report, error) { + started := time.Now() + if a.Playbook == nil { + return Report{}, &ValidationError{ + Kind: ErrSchema, + Field: "artifact", + Reason: fmt.Sprintf("PlaybookValidator expects Playbook, got %s", a.Kind()), + } + } + body := a.Playbook + + op, ok := stringField(body, "operation") + if !ok { + return Report{}, &ValidationError{ + Kind: ErrSchema, + Field: "operation", + Reason: "missing or not a string", + } + } + if !strings.HasPrefix(op, "fill:") { + return Report{}, &ValidationError{ + Kind: ErrSchema, + Field: "operation", + Reason: fmt.Sprintf("expected `fill: ...` prefix, got %q", op), + } + } + + endorsed, ok := body["endorsed_names"].([]any) + if !ok { + return Report{}, &ValidationError{ + Kind: ErrSchema, + Field: "endorsed_names", + Reason: "missing or not an array", + } + } + if len(endorsed) == 0 { + return Report{}, &ValidationError{ + Kind: ErrCompleteness, + Reason: "endorsed_names must be non-empty", + } + } + + if target, ok := uintField(body, "target_count"); ok { + max := target * 2 + if uint64(len(endorsed)) > max { + return Report{}, &ValidationError{ + Kind: ErrCompleteness, + Reason: fmt.Sprintf("endorsed_names (%d) exceeds target_count × 2 (%d)", len(endorsed), max), + } + } + } + + if fp, _ := stringField(body, "fingerprint"); fp == "" { + return Report{}, &ValidationError{ + Kind: ErrSchema, + Field: "fingerprint", + Reason: "missing — required for Phase 25 validity window", + } + } + + return Report{Findings: []Finding{}, ElapsedMs: elapsed(started)}, nil +} + +// stringField returns (val, true) if body[key] is a string, else +// ("", false). Matches Rust's serde_json::Value::as_str() shape. +func stringField(body map[string]any, key string) (string, bool) { + v, ok := body[key] + if !ok { + return "", false + } + s, ok := v.(string) + return s, ok +} + +// uintField returns (val, true) if body[key] is a non-negative whole +// number; matches Rust as_u64. JSON numbers come in as float64, which +// is why we do the conversion explicitly. +func uintField(body map[string]any, key string) (uint64, bool) { + v, ok := body[key] + if !ok || v == nil { + return 0, false + } + switch t := v.(type) { + case float64: + if t < 0 { + return 0, false + } + return uint64(t), true + case int: + if t < 0 { + return 0, false + } + return uint64(t), true + case int64: + if t < 0 { + return 0, false + } + return uint64(t), true + } + return 0, false +} diff --git a/internal/validator/playbook_test.go b/internal/validator/playbook_test.go new file mode 100644 index 0000000..6474436 --- /dev/null +++ b/internal/validator/playbook_test.go @@ -0,0 +1,77 @@ +package validator + +import ( + "errors" + "testing" +) + +func TestPlaybook_WellFormedPasses(t *testing.T) { + r, err := PlaybookValidator{}.Validate(Artifact{Playbook: map[string]any{ + "operation": "fill: Welder x2 in Toledo, OH", + "endorsed_names": []any{"W-123", "W-456"}, + "target_count": 2.0, + "fingerprint": "abc123", + }}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if r.ElapsedMs < 0 { + t.Errorf("elapsed_ms negative: %d", r.ElapsedMs) + } +} + +func TestPlaybook_EmptyEndorsedNamesFailsCompleteness(t *testing.T) { + _, err := PlaybookValidator{}.Validate(Artifact{Playbook: map[string]any{ + "operation": "fill: Welder x2 in Toledo, OH", + "endorsed_names": []any{}, + "fingerprint": "abc", + }}) + var ve *ValidationError + if !errors.As(err, &ve) || ve.Kind != ErrCompleteness { + t.Fatalf("expected Completeness, got %v", err) + } +} + +func TestPlaybook_OverfullEndorsedNamesFailsCompleteness(t *testing.T) { + _, err := PlaybookValidator{}.Validate(Artifact{Playbook: map[string]any{ + "operation": "fill: Welder x1 in Toledo, OH", + "endorsed_names": []any{"a", "b", "c"}, + "target_count": 1.0, + "fingerprint": "abc", + }}) + var ve *ValidationError + if !errors.As(err, &ve) || ve.Kind != ErrCompleteness { + t.Fatalf("expected Completeness, got %v", err) + } +} + +func TestPlaybook_MissingFingerprintFailsSchema(t *testing.T) { + _, err := PlaybookValidator{}.Validate(Artifact{Playbook: map[string]any{ + "operation": "fill: X x1 in A, B", + "endorsed_names": []any{"a"}, + }}) + var ve *ValidationError + if !errors.As(err, &ve) || ve.Kind != ErrSchema || ve.Field != "fingerprint" { + t.Fatalf("expected Schema/fingerprint, got %+v", err) + } +} + +func TestPlaybook_WrongOperationPrefixFailsSchema(t *testing.T) { + _, err := PlaybookValidator{}.Validate(Artifact{Playbook: map[string]any{ + "operation": "sms_draft: hello", + "endorsed_names": []any{"a"}, + "fingerprint": "x", + }}) + var ve *ValidationError + if !errors.As(err, &ve) || ve.Kind != ErrSchema { + t.Fatalf("expected Schema, got %v", err) + } +} + +func TestPlaybook_WrongArtifactKindFailsSchema(t *testing.T) { + _, err := PlaybookValidator{}.Validate(Artifact{FillProposal: map[string]any{}}) + var ve *ValidationError + if !errors.As(err, &ve) || ve.Kind != ErrSchema || ve.Field != "artifact" { + t.Fatalf("expected Schema/artifact, got %+v", err) + } +} diff --git a/internal/validator/types.go b/internal/validator/types.go index 5615d11..dbf8fe5 100644 --- a/internal/validator/types.go +++ b/internal/validator/types.go @@ -29,6 +29,8 @@ type Artifact struct { FillProposal map[string]any // EmailDraft: {to, body, subject?, kind?, _context?: {candidate_id?}} EmailDraft map[string]any + // Playbook: {operation, endorsed_names, target_count?, fingerprint} + Playbook map[string]any } // Kind returns a short string for error messages — mirrors the @@ -39,6 +41,8 @@ func (a Artifact) Kind() string { return "FillProposal" case a.EmailDraft != nil: return "EmailDraft" + case a.Playbook != nil: + return "Playbook" default: return "Unknown" } diff --git a/lakehouse.toml b/lakehouse.toml index d748788..5159cb3 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -16,6 +16,7 @@ pathwayd_url = "http://127.0.0.1:3217" matrixd_url = "http://127.0.0.1:3218" observerd_url = "http://127.0.0.1:3219" chatd_url = "http://127.0.0.1:3220" +validatord_url = "http://127.0.0.1:3221" [storaged] bind = "127.0.0.1:3211" @@ -101,6 +102,24 @@ kimi_key_file = "/etc/lakehouse/kimi.env" # for long prompts, so 180 is the default. timeout_secs = 180 +[validatord] +# Production-validator network surface (Phase 43 PRD parity). +# Hosts /validate (FillValidator + EmailValidator + PlaybookValidator) +# and /iterate (generate→validate→correct loop). +bind = "127.0.0.1:3221" +chatd_url = "http://127.0.0.1:3220" +# Roster of valid workers. Empty = no roster — worker-existence checks +# all fail Consistency (correct fail-closed posture). Production points +# at a path regenerated from workers_500k.parquet on a schedule: +# roster_path = "/var/lib/lakehouse/validator/roster.jsonl" +roster_path = "" +# Per-call cap on the iteration loop (Phase 43 default: 3). +default_max_iterations = 3 +# Per-call cap on chat hop max_tokens. +default_max_tokens = 4096 +# Chat hop timeout (seconds). 240s tolerates frontier reasoning models. +chat_timeout_secs = 240 + [s3] endpoint = "http://localhost:9000" region = "us-east-1" diff --git a/scripts/validatord_smoke.sh b/scripts/validatord_smoke.sh new file mode 100755 index 0000000..61a7bc6 --- /dev/null +++ b/scripts/validatord_smoke.sh @@ -0,0 +1,153 @@ +#!/usr/bin/env bash +# validatord smoke — Phase 43 PRD parity acceptance gate. +# +# Validates: +# - validatord boots, reports /health +# - POST /v1/validate with kind=playbook returns 200 + Report on +# well-formed input +# - POST /v1/validate with kind=playbook returns 422 + ValidationError +# when fingerprint is missing +# - POST /v1/validate with kind=fill consults the JSONL roster +# (phantom candidate → 422 Consistency) +# - POST /v1/validate with unknown kind returns 400 +# - All assertions go through gateway :3110 (proxy correct) +# +# Doesn't exercise /iterate — that needs a live chat backend, covered +# by cmd/validatord/main_test.go's fakeChatd helper. CI-friendly. +# +# Usage: ./scripts/validatord_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[validatord-smoke] building validatord + gateway..." +go build -o bin/ ./cmd/validatord ./cmd/gateway + +pkill -f "bin/(validatord|gateway)$" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +ROSTER="$TMP/roster.jsonl" +CFG="$TMP/validatord.toml" + +cleanup() { + echo "[validatord-smoke] cleanup" + for p in "${PIDS[@]:-}"; do [ -n "${p:-}" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# Tiny synthetic roster so /v1/validate fill-kind has something to +# pass / fail against. Two real candidates + one inactive. +cat > "$ROSTER" < "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[validatord-smoke] launching validatord → gateway..." +./bin/validatord -config "$CFG" > /tmp/validatord.log 2>&1 & PIDS+=($!) +poll_health 3221 || { echo "validatord failed"; tail /tmp/validatord.log; exit 1; } +./bin/gateway -config "$CFG" > /tmp/validatord_gateway.log 2>&1 & PIDS+=($!) +poll_health 3110 || { echo "gateway failed"; tail /tmp/validatord_gateway.log; exit 1; } + +# 1. Roster loaded with 3 records — surface via the daemon's startup log. +if ! grep -q '"records":3' /tmp/validatord.log && ! grep -q 'records=3' /tmp/validatord.log; then + echo " ✗ expected validatord to log records=3 from roster; got:" + grep "validatord roster" /tmp/validatord.log || true + exit 1 +fi +echo " ✓ validatord roster loaded with 3 records" + +# 2. /v1/validate playbook happy path → 200 +echo "[validatord-smoke] /v1/validate playbook happy path:" +RESP="$(curl -sS -X POST http://127.0.0.1:3110/v1/validate \ + -H 'Content-Type: application/json' \ + -d '{"kind":"playbook","artifact":{"operation":"fill: Welder x2 in Toledo, OH","endorsed_names":["W-1","W-2"],"target_count":2,"fingerprint":"abc123"}}')" +if ! echo "$RESP" | jq -e '.elapsed_ms != null and (.findings | type == "array")' >/dev/null; then + echo " ✗ unexpected response: $RESP" + exit 1 +fi +echo " ✓ playbook OK ($RESP)" + +# 3. /v1/validate playbook schema error → 422 with ValidationError +echo "[validatord-smoke] /v1/validate playbook missing fingerprint → 422:" +STATUS="$(curl -sS -o /tmp/playbook_422.json -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/validate \ + -H 'Content-Type: application/json' \ + -d '{"kind":"playbook","artifact":{"operation":"fill: X x1 in A, B","endorsed_names":["a"]}}')" +if [ "$STATUS" != "422" ]; then + echo " ✗ expected 422; got $STATUS body=$(cat /tmp/playbook_422.json)" + exit 1 +fi +KIND="$(jq -r '.Kind' /tmp/playbook_422.json)" +FIELD="$(jq -r '.Field' /tmp/playbook_422.json)" +if [ "$KIND" != "schema" ] || [ "$FIELD" != "fingerprint" ]; then + echo " ✗ expected kind=schema field=fingerprint; got kind=$KIND field=$FIELD" + exit 1 +fi +echo " ✓ playbook missing fingerprint → 422 schema/fingerprint" + +# 4. /v1/validate fill with phantom candidate → 422 Consistency +echo "[validatord-smoke] /v1/validate fill with phantom candidate → 422:" +STATUS="$(curl -sS -o /tmp/fill_422.json -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/validate \ + -H 'Content-Type: application/json' \ + -d '{"kind":"fill","artifact":{"fills":[{"candidate_id":"W-PHANTOM","name":"Nobody"}]},"context":{"target_count":1,"city":"Toledo","client_id":"C-1"}}')" +if [ "$STATUS" != "422" ]; then + echo " ✗ expected 422; got $STATUS body=$(cat /tmp/fill_422.json)" + exit 1 +fi +KIND="$(jq -r '.Kind' /tmp/fill_422.json)" +if [ "$KIND" != "consistency" ]; then + echo " ✗ expected kind=consistency; got kind=$KIND body=$(cat /tmp/fill_422.json)" + exit 1 +fi +echo " ✓ phantom candidate W-PHANTOM → 422 consistency" + +# 5. /v1/validate unknown kind → 400 +echo "[validatord-smoke] /v1/validate unknown kind → 400:" +STATUS="$(curl -sS -o /tmp/unknown_400.txt -w '%{http_code}' -X POST http://127.0.0.1:3110/v1/validate \ + -H 'Content-Type: application/json' \ + -d '{"kind":"foo","artifact":{}}')" +if [ "$STATUS" != "400" ]; then + echo " ✗ expected 400; got $STATUS body=$(cat /tmp/unknown_400.txt)" + exit 1 +fi +echo " ✓ unknown kind → 400" + +echo "[validatord-smoke] PASS — 5/5 probes through gateway :3110"