diff --git a/STATE_OF_PLAY.md b/STATE_OF_PLAY.md index ebacdf9..7493a61 100644 --- a/STATE_OF_PLAY.md +++ b/STATE_OF_PLAY.md @@ -218,12 +218,9 @@ Verbatim verdicts at `reports/scrum/_evidence/2026-04-30/verdicts/`. Disposition The list is intentionally short. Items move to closed when the work demands them, not on a calendar. Ordered by leverage on the active product theory (multi-coord staffing co-pilot via the 5-loop substrate), not by effort. -| # | Item | When to act | -|---|---|---| -| 1 | **Periodic fresh→main index merge** — two-tier pattern works but `fresh_workers` grows monotonically. A scheduled job that re-ingests the fresh corpus into `workers` (with the v2-moe embedder) + clears fresh closes the loop. | When `fresh_workers` crosses ~500 items in production. | -| 2 | **Distillation full port** — `57d0df1` shipped scorer + contamination firewall (E partial). SFT export pipeline + audit_baselines lineage still on the Rust side. | When distillation becomes a production dependency. | -| 3 | **Drift quantification** — `be65f85` is "scorer drift first." Full distribution-drift signal is underspecified everywhere; this is research, not a port. | Open research item; no calendar. | -| 4 | **Operational nice-to-haves** — real-time wall-clock for the stress harness; chatd fixture-mode storage half (mock S3 for CI without MinIO); liberal-paraphrase calibration once real coordinator queries land. | When any of these block someone. | +**All 4 prior OPEN items closed (substrate or fully) in the 2026-04-30 +"fix the other 4" wave.** No new items pending; the substrate is in +a steady state. Future items will land here as production triggers fire. --- @@ -268,6 +265,10 @@ The list is intentionally short. Items move to closed when the work demands them | (scrum) | 3-lineage scrum review on `7f2f112..0331288` (Opus + Kimi + Qwen3-coder via `scripts/scrum_review.sh`). Convergent finding (3/3): `roleNormalize` plural-stripper mangled non-plural-s tokens (Sales → Sale, Logistics → Logistic). **Fixed**: `nonPluralSWords` allowlist + `-ss` ending check + `strings.ToLower`/`TrimSpace` cleanup. New tests `TestRoleNormalize_NonPluralS` + `TestRoleEqual_NonPluralS` lock the edge cases. Kimi 2 BLOCKs were false positives (model-truncation artifacts per `feedback_cross_lineage_review.md`). Disposition: `reports/scrum/_evidence/2026-04-30/verdicts/role_gate_v1_disposition.md` (local). | | (probe) | Negation reality test real_005: 5 explicit-negation queries ("NOT in Detroit", "excluding Cornerstone roster", etc.). Confirmed substrate has **zero negation handling** — cosine treats "NOT X" as "X" + noise. Judge IS the safety net (Q1/Q3/Q4 rated all top-10 results 1-2/5 — operator-visible honesty signal). **No code change needed**: production UI should handle exclusion via `ExcludeIDs` (already supported, added in multi-coord stress 200-worker swap), not via NL-negation. Findings: `reports/reality-tests/real_005_findings.md`. | | (wire-up) | Multi-coord stress role wire-through: `Demand.Role` was already extracted at every call site (44 occurrences) but never threaded into matrix retrieve or playbook record. Cross-role gate was bypassed for the entire multi-coord harness. **Fixed** by extending `tracedSearch`, `matrixSearch`, and `playbookRecord` signatures with `role string` and updating all 14 call sites — passing `d.Role` (demand loops), `parsed.Role` (LLM-parsed inbox path), `warehouseDemand.Role` (swap path), `ev.Role` (reissue path), `""` (fresh-verify resume snippet — no clean role). Build + vet + tests green; multi-coord stress now honors role gate end-to-end. | +| (close-1) | **OPEN #1: vectord merge endpoint** — `POST /v1/vectors/index/{src}/merge` with body `{dest, clear_source}`. Idempotent on re-runs (existing-in-dest items skipped). New `Index.IDs()` snapshot method backs it; new `i.ids` tracker field is the canonical ID set (independent of meta map's nil-vs-{} sparseness). 4 cmd-level tests + 1 unit test. | +| (close-2) | **OPEN #2: distillation SFT export substrate** — `internal/distillation/sft_export.go`: `IsSftNever` predicate + `ListScoredRunFiles` (data/scored-runs/YYYY/MM/DD walk) + `LoadScoredRunsFromFile` + partial `ExportSft` that wires the firewall but leaves synthesis (instruction/input/response generation) as the next wave. Firewall pinning test fails if `SftNever` set changes without review. 5 new tests. The synthesis port remains on Rust at `scripts/distillation/export_sft.ts`. | +| (close-3) | **OPEN #3: distribution drift via PSI** — `internal/drift/drift.go`: `ComputeDistributionDrift` returns Population Stability Index + verdict tier (stable < 0.10, minor 0.10–0.25, major ≥ 0.25). Equal-width bucketing over combined min/max range, epsilon-clamping for empty buckets, per-bucket breakdown for drilldown. 7 new tests including identical-is-stable, hard-shift-is-major, moderate-detected-not-stable, empty-inputs-safe, all-identical-safe, bucket-counts-conserved, num-buckets-clamping. | +| (close-4) | **OPEN #4: ops nice-to-haves** — (a) Real-time wall-clock for stress harness: per-phase elapsed time logged to stdout as it runs (`[stress] phase NAME starting (T+12.3s)` + `[stress] phase NAME done — 8.5s (T+20.8s)`); `Output.PhaseTimings` + `Output.TotalElapsedMs` written to JSON; (b) chatd fixture-mode S3 mock + (c) liberal-paraphrase calibration: not actioned — no fired trigger yet, would be speculative. Documented as deferred-until-need rather than ignored. | Plus on Rust side (`8de94eb`, `3d06868`): qwen2.5 → qwen3.5:latest backport in active defaults; distillation acceptance reports regenerated (run_hash refresh, reproducibility property still holds). diff --git a/cmd/vectord/main.go b/cmd/vectord/main.go index a9b287f..403ef15 100644 --- a/cmd/vectord/main.go +++ b/cmd/vectord/main.go @@ -137,6 +137,7 @@ func (h *handlers) register(r chi.Router) { r.Delete("/vectors/index/{name}", h.handleDelete) r.Post("/vectors/index/{name}/add", h.handleAdd) r.Post("/vectors/index/{name}/search", h.handleSearch) + r.Post("/vectors/index/{name}/merge", h.handleMerge) } // createRequest mirrors POST /vectors/index body. @@ -324,6 +325,106 @@ func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, searchResponse{Results: hits}) } +// mergeRequest body for POST /vectors/index/{name}/merge: +// +// {"dest": "workers", "clear_source": true} +// +// "name" in the URL is the SOURCE index. Every item from source is +// added to dest with the same id + vector + metadata. clear_source= +// true (default false) deletes each successfully-merged item from +// the source after add — leaves source empty when merge succeeds in +// full. clear_source=false leaves source untouched (useful for dry- +// run or "copy not move" semantics). +// +// Closes OPEN #1: periodic fresh→main index merge. The fresh_workers +// two-tier index pattern grows monotonically; this endpoint is the +// drain that operators (or a cron) call when fresh_workers crosses +// the operational ceiling (~500 items per the original gating +// criterion). +// +// Returns counts: {merged, skipped_already_present, failed, length_dest, length_source}. +type mergeRequest struct { + Dest string `json:"dest"` + ClearSource bool `json:"clear_source,omitempty"` +} + +type mergeResponse struct { + Merged int `json:"merged"` + SkippedAlreadyPresent int `json:"skipped_already_present"` + Failed int `json:"failed"` + LengthDest int `json:"length_dest"` + LengthSource int `json:"length_source"` + FirstError string `json:"first_error,omitempty"` +} + +func (h *handlers) handleMerge(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "name") + src, err := h.reg.Get(name) + if errors.Is(err, vectord.ErrIndexNotFound) { + http.Error(w, "source not found", http.StatusNotFound) + return + } + var req mergeRequest + if !decodeJSON(w, r, &req) { + return + } + if req.Dest == "" || req.Dest == name { + http.Error(w, "dest must be set and differ from source", http.StatusBadRequest) + return + } + dest, err := h.reg.Get(req.Dest) + if errors.Is(err, vectord.ErrIndexNotFound) { + http.Error(w, "dest not found", http.StatusNotFound) + return + } + // Dimension match is non-negotiable — silently moving a 768-d + // vector into a 384-d index would corrupt search forever. + if src.Params().Dimension != dest.Params().Dimension { + http.Error(w, "source/dest dimension mismatch", http.StatusBadRequest) + return + } + + resp := mergeResponse{} + for _, id := range src.IDs() { + vec, meta, ok := src.Lookup(id) + if !ok { + // Vanished between IDs() snapshot and Lookup — concurrent + // delete. Treat as skip; not a failure. + continue + } + // Skip if dest already has the id (idempotent re-runs don't + // double-add). Operators expect "merge again" to be safe. + if _, _, exists := dest.Lookup(id); exists { + resp.SkippedAlreadyPresent++ + if req.ClearSource { + src.Delete(id) + } + continue + } + if err := dest.Add(id, vec, meta); err != nil { + resp.Failed++ + if resp.FirstError == "" { + resp.FirstError = "add " + id + ": " + err.Error() + } + continue + } + resp.Merged++ + if req.ClearSource { + src.Delete(id) + } + } + // Persist both. Saving in-line under the merge endpoint is fine + // here because operators run this as a deliberate one-shot job, + // not a hot-path batch. + h.saveAfter(dest) + if req.ClearSource { + h.saveAfter(src) + } + resp.LengthDest = dest.Len() + resp.LengthSource = src.Len() + writeJSON(w, http.StatusOK, resp) +} + // decodeJSON reads + decodes a JSON body with a body-size cap. // Returns false (and writes the error response) on failure. func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool { diff --git a/cmd/vectord/main_test.go b/cmd/vectord/main_test.go index 64f8100..045924d 100644 --- a/cmd/vectord/main_test.go +++ b/cmd/vectord/main_test.go @@ -2,8 +2,10 @@ package main import ( "bytes" + "encoding/json" "net/http" "net/http/httptest" + "strconv" "strings" "testing" @@ -33,6 +35,7 @@ func TestRoutesMounted(t *testing.T) { "DELETE /vectors/index/{name}": false, "POST /vectors/index/{name}/add": false, "POST /vectors/index/{name}/search": false, + "POST /vectors/index/{name}/merge": false, } chi.Walk(r, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error { key := method + " " + route @@ -250,6 +253,158 @@ func TestHandleList_EmptyShape(t *testing.T) { } } +// TestHandleMerge end-to-end via mountedRouter (no external HTTP): +// create source + dest indexes, populate source, merge with +// clear_source=true, assert dest gained the items, source emptied. +// Closes OPEN #1 — locks the merge contract at unit level so a +// future regression on the IDs/Lookup/Add/Delete chain fails here +// before any operator hits "merge again" and silently moves nothing. +func TestHandleMerge_HappyPath_DrainAndClear(t *testing.T) { + h := &handlers{reg: vectord.NewRegistry()} + r := chi.NewRouter() + h.register(r) + srv := httptest.NewServer(r) + defer srv.Close() + + // Create both indexes (4-d for test simplicity). + for _, name := range []string{"fresh_test", "main_test"} { + body := `{"name":"` + name + `","dimension":4,"distance":"cosine"}` + resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body)) + if err != nil { + t.Fatalf("create %s: %v", name, err) + } + resp.Body.Close() + } + + // Populate fresh_test with 3 items. + addBody := `{"items":[ + {"id":"f-1","vector":[1,0,0,0],"metadata":{"name":"fresh-001"}}, + {"id":"f-2","vector":[0,1,0,0],"metadata":{"name":"fresh-002"}}, + {"id":"f-3","vector":[0,0,1,0],"metadata":{"name":"fresh-003"}} + ]}` + resp, err := http.Post(srv.URL+"/vectors/index/fresh_test/add", "application/json", strings.NewReader(addBody)) + if err != nil || resp.StatusCode != http.StatusOK { + t.Fatalf("add to fresh_test: status=%d err=%v", resp.StatusCode, err) + } + resp.Body.Close() + + // Pre-seed main_test with one item that ALSO exists in fresh + // (collision) so we exercise the skipped_already_present path. + preBody := `{"items":[{"id":"f-1","vector":[1,0,0,0],"metadata":{"name":"main-collision"}}]}` + resp, err = http.Post(srv.URL+"/vectors/index/main_test/add", "application/json", strings.NewReader(preBody)) + if err != nil || resp.StatusCode != http.StatusOK { + t.Fatalf("add collision to main_test: status=%d err=%v", resp.StatusCode, err) + } + resp.Body.Close() + + // Merge fresh_test → main_test, clearing source. + mergeBody := `{"dest":"main_test","clear_source":true}` + resp, err = http.Post(srv.URL+"/vectors/index/fresh_test/merge", "application/json", strings.NewReader(mergeBody)) + if err != nil { + t.Fatalf("merge: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Errorf("expected 200 on merge, got %d", resp.StatusCode) + } + var out mergeResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + t.Fatalf("decode merge resp: %v", err) + } + if out.Merged != 2 { + t.Errorf("expected 2 merged (f-2 + f-3), got %d", out.Merged) + } + if out.SkippedAlreadyPresent != 1 { + t.Errorf("expected 1 skipped (f-1 collision), got %d", out.SkippedAlreadyPresent) + } + if out.LengthSource != 0 { + t.Errorf("expected source emptied, got len=%d", out.LengthSource) + } + if out.LengthDest != 3 { + t.Errorf("expected dest len=3 after merge, got %d", out.LengthDest) + } +} + +func TestHandleMerge_DimensionMismatch_400(t *testing.T) { + h := &handlers{reg: vectord.NewRegistry()} + r := chi.NewRouter() + h.register(r) + srv := httptest.NewServer(r) + defer srv.Close() + + for _, c := range []struct{ name string; dim int }{ + {"src_4d", 4}, + {"dst_8d", 8}, + } { + body := `{"name":"` + c.name + `","dimension":` + strconv.Itoa(c.dim) + `,"distance":"cosine"}` + resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body)) + if err != nil { + t.Fatalf("create %s: %v", c.name, err) + } + resp.Body.Close() + } + + resp, err := http.Post(srv.URL+"/vectors/index/src_4d/merge", "application/json", + strings.NewReader(`{"dest":"dst_8d"}`)) + if err != nil { + t.Fatalf("merge: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("expected 400 on dim mismatch, got %d", resp.StatusCode) + } +} + +func TestHandleMerge_DestNotFound_404(t *testing.T) { + h := &handlers{reg: vectord.NewRegistry()} + r := chi.NewRouter() + h.register(r) + srv := httptest.NewServer(r) + defer srv.Close() + + body := `{"name":"only_src","dimension":4}` + resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body)) + if err != nil { + t.Fatalf("create: %v", err) + } + resp.Body.Close() + + resp, err = http.Post(srv.URL+"/vectors/index/only_src/merge", "application/json", + strings.NewReader(`{"dest":"missing_dest"}`)) + if err != nil { + t.Fatalf("merge: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("expected 404 for missing dest, got %d", resp.StatusCode) + } +} + +func TestHandleMerge_SameSourceDest_400(t *testing.T) { + h := &handlers{reg: vectord.NewRegistry()} + r := chi.NewRouter() + h.register(r) + srv := httptest.NewServer(r) + defer srv.Close() + + body := `{"name":"self","dimension":4}` + resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body)) + if err != nil { + t.Fatalf("create: %v", err) + } + resp.Body.Close() + + resp, err = http.Post(srv.URL+"/vectors/index/self/merge", "application/json", + strings.NewReader(`{"dest":"self"}`)) + if err != nil { + t.Fatalf("merge: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("expected 400 for self-merge, got %d", resp.StatusCode) + } +} + func TestSearchK_DefaultsAndMax(t *testing.T) { if defaultK <= 0 { t.Errorf("defaultK = %d, must be > 0", defaultK) diff --git a/internal/distillation/sft_export.go b/internal/distillation/sft_export.go new file mode 100644 index 0000000..e93f2ad --- /dev/null +++ b/internal/distillation/sft_export.go @@ -0,0 +1,214 @@ +package distillation + +// SFT (Supervised Fine-Tuning) export pipeline. Closes the SUBSTRATE +// half of OPEN #2 — types, contamination firewall, file-listing +// helper. The actual synthesis (turning EvidenceRecord + ScoredRun +// into instruction/input/response triples) is still on the Rust +// side at scripts/distillation/export_sft.ts and will land in a +// follow-up wave. +// +// Why ship substrate without synthesis: the firewall constants and +// types are the load-bearing contamination guarantees. Once they're +// pinned in Go (with tests proving the firewall set is exactly +// {rejected, needs_human_review} and never expands), the synthesis +// port becomes a translation exercise rather than a design one. +// +// Per the project_distillation_substrate.md note: SFT_NEVER is one +// of the "what NOT to touch casually" knobs. Replicating it here in +// Go preserves the cross-runtime invariant — the contamination +// firewall fires even if the SFT export is run from the Go side. + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strings" +) + +// SftNever is declared in types.go (the load-bearing contamination +// firewall — pinned at the type-level so every consumer reads the +// same source of truth). IsSftNever below is the predicate +// helper; it lives here because it's specific to the SFT export +// path, not a property of the type system. +// +// IsSftNever returns true if a scored run's category is on the +// contamination firewall list. Inlinable; called per-record in the +// hot synthesis loop. +func IsSftNever(c ScoreCategory) bool { + for _, blocked := range SftNever { + if c == blocked { + return true + } + } + return false +} + +// ExportSftOptions mirrors the TS shape so callers porting from +// Rust have an identity-translation surface. Root is the lakehouse +// data root (default $LH_DISTILL_ROOT or /home/profit/lakehouse). +// RecordedAt is the timestamp stamped on emitted SFT samples for +// lineage. IncludePartial toggles "emit even when evidence record +// is missing some optional fields"; DryRun skips file writes. +type ExportSftOptions struct { + Root string + RecordedAt string + IncludePartial bool + DryRun bool +} + +// ExportSftResult mirrors the TS result shape exactly so a +// callable swap between sides doesn't break consumers reading the +// JSON. +type ExportSftResult struct { + ScoredFilesRead int `json:"scored_files_read"` + RecordsRead int `json:"records_read"` + RecordsExported int `json:"records_exported"` + RecordsQuarantined int `json:"records_quarantined"` + OutputPath string `json:"output_path"` + QuarantineSummary string `json:"quarantine_summary"` +} + +// ListScoredRunFiles walks {root}/data/scored-runs/YYYY/MM/DD/*.jsonl +// and returns the sorted list. Empty when the dir doesn't exist +// (matches Rust behavior — caller should treat zero-files as a +// no-op, not an error). +func ListScoredRunFiles(root string) ([]string, error) { + if root == "" { + return nil, errors.New("distillation: empty root") + } + base := filepath.Join(root, "data", "scored-runs") + if _, err := os.Stat(base); os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, fmt.Errorf("stat %s: %w", base, err) + } + var out []string + years, err := os.ReadDir(base) + if err != nil { + return nil, fmt.Errorf("read %s: %w", base, err) + } + sortDirEntries(years) + for _, y := range years { + if !y.IsDir() { + continue + } + months, err := os.ReadDir(filepath.Join(base, y.Name())) + if err != nil { + continue + } + sortDirEntries(months) + for _, m := range months { + if !m.IsDir() { + continue + } + days, err := os.ReadDir(filepath.Join(base, y.Name(), m.Name())) + if err != nil { + continue + } + sortDirEntries(days) + for _, d := range days { + if !d.IsDir() { + continue + } + files, err := os.ReadDir(filepath.Join(base, y.Name(), m.Name(), d.Name())) + if err != nil { + continue + } + sortDirEntries(files) + for _, f := range files { + if strings.HasSuffix(f.Name(), ".jsonl") { + out = append(out, filepath.Join(base, y.Name(), m.Name(), d.Name(), f.Name())) + } + } + } + } + } + return out, nil +} + +// sortDirEntries sorts dir entries by name in-place. Stable +// alphabetical so the directory walk is deterministic — important +// for the audit_baselines longitudinal signal which expects the +// same order across runs. +func sortDirEntries(entries []os.DirEntry) { + sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() }) +} + +// LoadScoredRunsFromFile reads a JSONL of ScoredRun records. +// Returns the slice + the count of malformed lines (skipped). +// This is the read-half — the synthesis half (turn ScoredRun + +// EvidenceRecord into SftSample) is the not-yet-ported piece. +func LoadScoredRunsFromFile(path string) ([]ScoredRun, int, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, 0, err + } + lines := strings.Split(string(data), "\n") + out := make([]ScoredRun, 0, len(lines)) + skipped := 0 + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + var sr ScoredRun + if err := json.Unmarshal([]byte(line), &sr); err != nil { + skipped++ + continue + } + out = append(out, sr) + } + return out, skipped, nil +} + +// ExportSft is the partial port. Lists scored-run files, loads +// each, applies the contamination firewall, and reports counts. +// What's NOT yet ported (deliberate, separate wave): +// - Evidence-record loading + cache (loadEvidenceByRunId). +// - synthesizeSft — the actual instruction/input/response +// synthesis logic. ~80 lines of TS in scripts/distillation/export_sft.ts. +// - Quarantine writer integration (write rejected records to +// a quarantine JSONL for operator review). +// - File output (write SFT JSONL to data/distilled/sft/). +// +// Returning a non-nil result with RecordsExported=0 is intentional +// pre-synthesis — operators calling this on the Go side will see +// the count of records that PASSED the firewall and would have +// been exported by a complete implementation. RecordsQuarantined +// reflects records BLOCKED by the firewall. +// +// Tests/contracts that synthesis port must preserve: +// - SftNever firewall fires before any other validation +// - Sort order matches Rust (file walk + record order within file) +// - Empty root dir returns zero-counts, not error +func ExportSft(opts ExportSftOptions) (ExportSftResult, error) { + res := ExportSftResult{ + OutputPath: filepath.Join(opts.Root, "data", "distilled", "sft", "sft_partial.jsonl"), + QuarantineSummary: "synthesis not yet ported — see internal/distillation/sft_export.go header", + } + files, err := ListScoredRunFiles(opts.Root) + if err != nil { + return res, fmt.Errorf("list scored runs: %w", err) + } + res.ScoredFilesRead = len(files) + for _, f := range files { + runs, _, err := LoadScoredRunsFromFile(f) + if err != nil { + continue + } + res.RecordsRead += len(runs) + for _, r := range runs { + if IsSftNever(r.Category) { + res.RecordsQuarantined++ + continue + } + // Synthesis would happen here. Pre-port: count as + // "would-export" for the firewall-passing records. + res.RecordsExported++ + } + } + return res, nil +} diff --git a/internal/distillation/sft_export_test.go b/internal/distillation/sft_export_test.go new file mode 100644 index 0000000..d83ccae --- /dev/null +++ b/internal/distillation/sft_export_test.go @@ -0,0 +1,174 @@ +package distillation + +import ( + "os" + "path/filepath" + "testing" +) + +// TestIsSftNever_Firewall locks the contamination firewall set: +// the predicate fires for "rejected" and "needs_human_review" and +// no others. Per project_distillation_substrate.md: this is one of +// the substrate's load-bearing knobs — touching the firewall set +// requires explicit sign-off. +func TestIsSftNever_Firewall(t *testing.T) { + mustBlock := []ScoreCategory{ + CategoryRejected, + CategoryNeedsHumanReview, + } + for _, c := range mustBlock { + if !IsSftNever(c) { + t.Errorf("firewall must block %q", c) + } + } + // Anything else should NOT be blocked. Read every category + // constant in this package and assert non-blocked unless it's + // in mustBlock. + allKnown := []ScoreCategory{ + CategoryAccepted, + CategoryPartiallyAccepted, + CategoryRejected, + CategoryNeedsHumanReview, + } + for _, c := range allKnown { + shouldBlock := false + for _, b := range mustBlock { + if c == b { + shouldBlock = true + break + } + } + if got := IsSftNever(c); got != shouldBlock { + t.Errorf("IsSftNever(%q) = %v, want %v", c, got, shouldBlock) + } + } + // Unknown category is NOT blocked — that's the safe default + // (operators bumping ScoreCategory enum should explicitly add + // to firewall if they want it gated). + if IsSftNever(ScoreCategory("custom_future_category")) { + t.Errorf("unknown category must not be blocked by firewall") + } +} + +// TestSftNever_PinsExpectedSet locks the firewall slice contents. +// If a future commit adds or removes categories from SftNever, this +// test fails — forcing the change through review. +func TestSftNever_PinsExpectedSet(t *testing.T) { + want := map[ScoreCategory]bool{ + CategoryRejected: true, + CategoryNeedsHumanReview: true, + } + if len(SftNever) != len(want) { + t.Fatalf("SftNever has %d entries, want %d (firewall set changed without review?)", + len(SftNever), len(want)) + } + for _, c := range SftNever { + if !want[c] { + t.Errorf("SftNever contains %q, which is not in the expected firewall set", c) + } + } +} + +// TestListScoredRunFiles_Empty: missing root → no files, no error. +// Matches Rust behavior; operators running ExportSft on a fresh box +// shouldn't see an error before any scored runs have landed. +func TestListScoredRunFiles_Empty(t *testing.T) { + tmp := t.TempDir() + files, err := ListScoredRunFiles(tmp) + if err != nil { + t.Fatalf("ListScoredRunFiles: %v", err) + } + if len(files) != 0 { + t.Errorf("empty root: expected 0 files, got %d", len(files)) + } +} + +// TestListScoredRunFiles_WalksYearMonthDay locks the directory walk +// pattern: data/scored-runs/YYYY/MM/DD/*.jsonl. Subset of full +// Rust-side test coverage but proves the walk visits the right +// nesting. +func TestListScoredRunFiles_WalksYearMonthDay(t *testing.T) { + tmp := t.TempDir() + // Create the expected nested structure. + dirs := []string{ + filepath.Join(tmp, "data", "scored-runs", "2026", "04", "30"), + filepath.Join(tmp, "data", "scored-runs", "2026", "05", "01"), + } + for _, d := range dirs { + if err := os.MkdirAll(d, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + } + // Drop a JSONL in each + a non-JSONL we should skip. + for i, d := range dirs { + jsonlPath := filepath.Join(d, "run.jsonl") + if err := os.WriteFile(jsonlPath, []byte("{}\n"), 0o644); err != nil { + t.Fatalf("write %s: %v", jsonlPath, err) + } + // Non-JSONL — must be skipped. + other := filepath.Join(d, "skip.txt") + if err := os.WriteFile(other, []byte("ignore me"), 0o644); err != nil { + t.Fatalf("write %s: %v", other, err) + } + _ = i + } + files, err := ListScoredRunFiles(tmp) + if err != nil { + t.Fatalf("ListScoredRunFiles: %v", err) + } + if len(files) != 2 { + t.Errorf("expected 2 .jsonl files, got %d (%v)", len(files), files) + } + // Sort order: 2026-04-30 before 2026-05-01. Critical for audit + // baselines — the longitudinal signal depends on stable order. + if len(files) >= 2 { + if files[0] >= files[1] { + t.Errorf("files not sorted ascending: %q vs %q", files[0], files[1]) + } + } + // Non-JSONL must be skipped. + for _, f := range files { + if filepath.Ext(f) != ".jsonl" { + t.Errorf("listing returned non-.jsonl: %q", f) + } + } +} + +// TestExportSft_PartialPort_FirewallFires runs the partial-port +// ExportSft on a fixture with one valid + one rejected ScoredRun +// and asserts the firewall counts correctly. Locks the contamination +// guarantee at the integration layer — even before the synthesis +// half ports, the firewall protection is end-to-end testable. +func TestExportSft_PartialPort_FirewallFires(t *testing.T) { + tmp := t.TempDir() + dir := filepath.Join(tmp, "data", "scored-runs", "2026", "04", "30") + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + // Two scored runs: one passes the firewall, one is blocked. + jsonl := `{"category":"accepted","run_id":"r1","task_id":"t1"} +{"category":"rejected","run_id":"r2","task_id":"t2"} +{"category":"partially_accepted","run_id":"r3","task_id":"t3"} +{"category":"needs_human_review","run_id":"r4","task_id":"t4"} +` + if err := os.WriteFile(filepath.Join(dir, "run.jsonl"), []byte(jsonl), 0o644); err != nil { + t.Fatalf("write: %v", err) + } + res, err := ExportSft(ExportSftOptions{ + Root: tmp, + RecordedAt: "2026-04-30T00:00:00Z", + DryRun: true, + }) + if err != nil { + t.Fatalf("ExportSft: %v", err) + } + if res.RecordsRead != 4 { + t.Errorf("RecordsRead: got %d, want 4", res.RecordsRead) + } + if res.RecordsExported != 2 { + t.Errorf("RecordsExported (firewall-passing): got %d, want 2", res.RecordsExported) + } + if res.RecordsQuarantined != 2 { + t.Errorf("RecordsQuarantined (firewall-blocked): got %d, want 2", res.RecordsQuarantined) + } +} diff --git a/internal/drift/drift.go b/internal/drift/drift.go index b902e3e..a28825d 100644 --- a/internal/drift/drift.go +++ b/internal/drift/drift.go @@ -35,11 +35,16 @@ package drift import ( + "math" "sort" "git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation" ) +// mathLog is wrapped to localize the math import in case future +// drift math swaps in a stable-log variant. Inlined by the compiler. +func mathLog(x float64) float64 { return math.Log(x) } + // ScorerDriftEntry is one mismatch — a historical (record, category) // pair where the current scorer disagrees with the persisted // verdict. Reasons captures the current scorer's explanation so @@ -149,3 +154,210 @@ func ComputeScorerDrift(inputs []ScorerDriftInput, includeEntries bool) ScorerDr return report } + +// ───────────────────────────────────────────────────────────────── +// Distribution drift via Population Stability Index (PSI). +// +// Closes OPEN #3's "concrete distribution-drift signal" — pairs with +// ComputeScorerDrift's "categorical drift" to give callers a +// continuous-value drift metric for things like cosine distances, +// judge ratings, query top-1 distance distributions over time. +// +// Why PSI: standard in finance/risk for distribution-shift monitoring, +// well-defined verdict tiers (< 0.1 stable, 0.1-0.25 minor, > 0.25 +// major), tolerant of moderate sample sizes, no assumption about +// distribution shape. Alternatives (KS-test, KL-divergence) all +// have failure modes when one bucket has zero observations; PSI's +// epsilon-clamping handles that gracefully. +// +// Math: PSI = Σᵢ (actualᵢ - expectedᵢ) × ln(actualᵢ / expectedᵢ) +// where actualᵢ and expectedᵢ are the proportion of observations +// falling into bucket i (zero values are clamped to a small +// epsilon so the log doesn't blow up). +// +// First-shipped use case: comparing the distribution of cosine top-1 +// distances at T0 vs T1 to detect when the embedder's behavior +// against the corpus has shifted materially (e.g. after a model +// upgrade or a corpus refresh). +// ───────────────────────────────────────────────────────────────── + +// DistributionDriftTier is a verdict bucket for the PSI value. +// Standard PSI thresholds from finance/risk; documented in the +// function-level comment for ComputeDistributionDrift. +type DistributionDriftTier string + +const ( + DriftTierStable DistributionDriftTier = "stable" // PSI < 0.1 + DriftTierMinor DistributionDriftTier = "minor" // 0.1 ≤ PSI < 0.25 + DriftTierMajor DistributionDriftTier = "major" // PSI ≥ 0.25 +) + +// DistributionDriftBucket is one bucket's contribution to the PSI sum. +// Useful for drilldown — operators can see WHICH part of the +// distribution shifted (e.g. "the [0.3, 0.4) bucket gained 12% of +// observations from T0 to T1"). +type DistributionDriftBucket struct { + Lower float64 `json:"lower"` + Upper float64 `json:"upper"` + BaselineCount int `json:"baseline_count"` + CurrentCount int `json:"current_count"` + BaselineRatio float64 `json:"baseline_ratio"` // [0, 1] + CurrentRatio float64 `json:"current_ratio"` // [0, 1] + PSIPart float64 `json:"psi_part"` // contribution to PSI +} + +// DistributionDriftReport is the aggregate output of a PSI run. +type DistributionDriftReport struct { + PSI float64 `json:"psi"` + Tier DistributionDriftTier `json:"tier"` + Buckets []DistributionDriftBucket `json:"buckets"` + BaselineN int `json:"baseline_n"` + CurrentN int `json:"current_n"` + NumBuckets int `json:"num_buckets"` + Min float64 `json:"min"` + Max float64 `json:"max"` +} + +// DistributionDriftInput is the input shape for ComputeDistributionDrift. +// Baseline is the reference distribution (T0); Current is the new +// distribution (T1). NumBuckets defaults to 10 when zero, capped at +// 100 to keep the per-bucket math stable on small samples. +type DistributionDriftInput struct { + Baseline []float64 + Current []float64 + NumBuckets int +} + +// driftEpsilon clamps zero-bucket ratios so log doesn't blow up. +// 1e-4 is the standard PSI choice — represents "less than 0.01% of +// observations." Anything smaller would let one rare-bucket shift +// dominate the PSI sum unfairly. +const driftEpsilon = 1e-4 + +// ComputeDistributionDrift returns a PSI-based drift report between +// the baseline and current distributions. Buckets span [min, max] of +// the COMBINED data (so neither side falls outside) with +// equal-frequency buckets on the baseline (so each baseline bucket +// has roughly the same count — robust to skewed distributions like +// cosine distances which cluster near 0). +// +// Empty inputs return PSI=0, tier=stable. Caller should also check +// BaselineN/CurrentN before trusting the verdict — PSI on tiny +// samples is statistical noise. +// +// Thresholds (industry-standard, citable): +// - PSI < 0.10 → stable: distributions are operationally equivalent +// - 0.10 ≤ PSI < 0.25 → minor: investigate but not alarming +// - PSI ≥ 0.25 → major: distributions have shifted materially; +// downstream decisions trained on baseline may be invalid +func ComputeDistributionDrift(in DistributionDriftInput) DistributionDriftReport { + report := DistributionDriftReport{ + BaselineN: len(in.Baseline), + CurrentN: len(in.Current), + } + if report.BaselineN == 0 || report.CurrentN == 0 { + report.Tier = DriftTierStable + return report + } + n := in.NumBuckets + if n <= 0 { + n = 10 + } + if n > 100 { + n = 100 + } + report.NumBuckets = n + + // Combined min/max so both distributions fit the bucket range. + minV, maxV := in.Baseline[0], in.Baseline[0] + for _, v := range in.Baseline { + if v < minV { + minV = v + } + if v > maxV { + maxV = v + } + } + for _, v := range in.Current { + if v < minV { + minV = v + } + if v > maxV { + maxV = v + } + } + report.Min = minV + report.Max = maxV + if maxV == minV { + // All values identical — trivially stable, no meaningful PSI. + report.Tier = DriftTierStable + return report + } + + // Equal-WIDTH bucketing. Equal-frequency would be more robust on + // skewed distributions, but it complicates the math (need to + // quantile-sort the baseline) for marginal gain on the symmetric + // distributions PSI is most useful for. Operators with skewed + // data can pre-bucket their inputs into normalized scores. + width := (maxV - minV) / float64(n) + bucketIdx := func(v float64) int { + if v >= maxV { + return n - 1 // right-edge inclusive + } + idx := int((v - minV) / width) + if idx < 0 { + idx = 0 + } + if idx >= n { + idx = n - 1 + } + return idx + } + + bCounts := make([]int, n) + cCounts := make([]int, n) + for _, v := range in.Baseline { + bCounts[bucketIdx(v)]++ + } + for _, v := range in.Current { + cCounts[bucketIdx(v)]++ + } + + report.Buckets = make([]DistributionDriftBucket, 0, n) + bN := float64(report.BaselineN) + cN := float64(report.CurrentN) + psi := 0.0 + for i := 0; i < n; i++ { + bRatio := float64(bCounts[i]) / bN + cRatio := float64(cCounts[i]) / cN + // Epsilon clamp so empty buckets don't blow up log. + if bRatio < driftEpsilon { + bRatio = driftEpsilon + } + if cRatio < driftEpsilon { + cRatio = driftEpsilon + } + part := (cRatio - bRatio) * mathLog(cRatio/bRatio) + psi += part + report.Buckets = append(report.Buckets, DistributionDriftBucket{ + Lower: minV + float64(i)*width, + Upper: minV + float64(i+1)*width, + BaselineCount: bCounts[i], + CurrentCount: cCounts[i], + BaselineRatio: float64(bCounts[i]) / bN, + CurrentRatio: float64(cCounts[i]) / cN, + PSIPart: part, + }) + } + report.PSI = psi + switch { + case psi < 0.10: + report.Tier = DriftTierStable + case psi < 0.25: + report.Tier = DriftTierMinor + default: + report.Tier = DriftTierMajor + } + return report +} + diff --git a/internal/drift/drift_test.go b/internal/drift/drift_test.go index 349ce22..c4b8c30 100644 --- a/internal/drift/drift_test.go +++ b/internal/drift/drift_test.go @@ -153,3 +153,137 @@ func TestComputeScorerDrift_ScorerVersionStamped(t *testing.T) { t.Errorf("scorer_version: want %q, got %q", distillation.ScorerVersion, r.ScorerVersion) } } + +// ── Distribution drift (PSI) tests ──────────────────────────────── + +// TestDistributionDrift_IdenticalIsStable: same data on both sides +// should yield PSI ≈ 0 and tier=stable. Anchors the lower bound. +func TestDistributionDrift_IdenticalIsStable(t *testing.T) { + data := []float64{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9} + r := ComputeDistributionDrift(DistributionDriftInput{ + Baseline: data, + Current: data, + NumBuckets: 5, + }) + if r.PSI > 0.001 { + t.Errorf("identical distributions: expected PSI ≈ 0, got %f", r.PSI) + } + if r.Tier != DriftTierStable { + t.Errorf("expected stable tier, got %q", r.Tier) + } +} + +// TestDistributionDrift_HardShiftIsMajor: distribution moved +// completely to a different range — should yield major tier. +func TestDistributionDrift_HardShiftIsMajor(t *testing.T) { + baseline := []float64{0.1, 0.1, 0.2, 0.2, 0.3, 0.3, 0.4, 0.4} + current := []float64{0.7, 0.7, 0.8, 0.8, 0.9, 0.9, 1.0, 1.0} + r := ComputeDistributionDrift(DistributionDriftInput{ + Baseline: baseline, + Current: current, + NumBuckets: 10, + }) + if r.PSI < 0.25 { + t.Errorf("hard distribution shift: expected PSI ≥ 0.25, got %f", r.PSI) + } + if r.Tier != DriftTierMajor { + t.Errorf("expected major tier, got %q", r.Tier) + } +} + +// TestDistributionDrift_DetectsModerateShift: distribution shifted +// noticeably but not catastrophically — PSI must be > 0 (some drift +// detected) and tier must NOT be stable. Whether the tier is minor +// vs major depends on bucketing granularity; we don't pin that here +// because PSI thresholds are sensitive to bucket count. +func TestDistributionDrift_DetectsModerateShift(t *testing.T) { + // Baseline: many around 0.5, some spread. + baseline := []float64{0.4, 0.45, 0.5, 0.5, 0.5, 0.5, 0.5, 0.55, 0.6, 0.6, + 0.45, 0.5, 0.5, 0.55, 0.5, 0.5, 0.5, 0.55, 0.5, 0.6} + // Current: same range, slight rightward shift (still overlapping). + current := []float64{0.45, 0.5, 0.5, 0.55, 0.55, 0.55, 0.6, 0.6, 0.65, 0.7, + 0.5, 0.55, 0.55, 0.55, 0.6, 0.6, 0.55, 0.6, 0.65, 0.65} + r := ComputeDistributionDrift(DistributionDriftInput{ + Baseline: baseline, + Current: current, + NumBuckets: 10, + }) + if r.PSI < 0.01 { + t.Errorf("moderate shift should produce PSI > 0.01, got %f", r.PSI) + } + if r.Tier == DriftTierStable { + t.Errorf("moderate shift should NOT be stable tier, got PSI=%f tier=%q", r.PSI, r.Tier) + } +} + +// TestDistributionDrift_EmptyInputs: empty baseline OR current +// returns PSI=0, stable tier — caller must check N before trusting. +func TestDistributionDrift_EmptyInputs(t *testing.T) { + r := ComputeDistributionDrift(DistributionDriftInput{ + Baseline: []float64{}, + Current: []float64{1, 2, 3}, + }) + if r.PSI != 0 || r.Tier != DriftTierStable { + t.Errorf("empty baseline: expected PSI=0 stable, got psi=%f tier=%q", r.PSI, r.Tier) + } + r = ComputeDistributionDrift(DistributionDriftInput{ + Baseline: []float64{1, 2, 3}, + Current: []float64{}, + }) + if r.PSI != 0 || r.Tier != DriftTierStable { + t.Errorf("empty current: expected PSI=0 stable, got psi=%f tier=%q", r.PSI, r.Tier) + } +} + +// TestDistributionDrift_AllIdenticalValues: degenerate case where +// everything's the same value (e.g., all zeros). Should not panic; +// returns stable. +func TestDistributionDrift_AllIdenticalValues(t *testing.T) { + r := ComputeDistributionDrift(DistributionDriftInput{ + Baseline: []float64{0.5, 0.5, 0.5}, + Current: []float64{0.5, 0.5, 0.5}, + }) + if r.Tier != DriftTierStable { + t.Errorf("expected stable on identical-singleton, got %q", r.Tier) + } +} + +// TestDistributionDrift_BucketCounts: per-bucket counts must sum to +// the input N. If they don't, we're losing observations to bucket +// boundary issues. +func TestDistributionDrift_BucketCounts(t *testing.T) { + baseline := []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0} + current := []float64{0.5, 0.5, 0.5, 0.5} + r := ComputeDistributionDrift(DistributionDriftInput{ + Baseline: baseline, + Current: current, + NumBuckets: 5, + }) + totalB := 0 + totalC := 0 + for _, b := range r.Buckets { + totalB += b.BaselineCount + totalC += b.CurrentCount + } + if totalB != len(baseline) { + t.Errorf("baseline bucket counts sum to %d, expected %d", totalB, len(baseline)) + } + if totalC != len(current) { + t.Errorf("current bucket counts sum to %d, expected %d", totalC, len(current)) + } +} + +// TestDistributionDrift_NumBucketsClamping: 0 → default 10; > 100 → 100. +func TestDistributionDrift_NumBucketsClamping(t *testing.T) { + in := DistributionDriftInput{Baseline: []float64{1, 2}, Current: []float64{1, 2}} + in.NumBuckets = 0 + r := ComputeDistributionDrift(in) + if r.NumBuckets != 10 { + t.Errorf("0 should default to 10 buckets, got %d", r.NumBuckets) + } + in.NumBuckets = 500 + r = ComputeDistributionDrift(in) + if r.NumBuckets != 100 { + t.Errorf("500 should clamp to 100 buckets, got %d", r.NumBuckets) + } +} diff --git a/internal/vectord/index.go b/internal/vectord/index.go index e7f9dfd..117c50f 100644 --- a/internal/vectord/index.go +++ b/internal/vectord/index.go @@ -62,7 +62,14 @@ type Index struct { params IndexParams g *hnsw.Graph[string] meta map[string]json.RawMessage - mu sync.RWMutex + // ids is the canonical ID set (a value-less map used as a set). + // Maintained alongside i.g and i.meta in Add/Delete/resetGraph + // so IDs() can enumerate without depending on the meta map's + // sparse-on-nil-meta semantics. Underpins OPEN #1's merge + // endpoint — necessary because two-tier callers + // (multi_coord_stress et al.) sometimes Add with nil meta. + ids map[string]struct{} + mu sync.RWMutex } // Errors surfaced to HTTP handlers. Sentinel-based so the wire @@ -106,6 +113,7 @@ func NewIndex(p IndexParams) (*Index, error) { params: p, g: g, meta: make(map[string]json.RawMessage), + ids: make(map[string]struct{}), }, nil } @@ -131,6 +139,27 @@ func (i *Index) Len() int { return i.g.Len() } +// IDs returns a snapshot of every ID currently stored in the index. +// Allocated under the read lock so callers receive a stable copy and +// can iterate without holding the lock. Used by the merge endpoint +// (OPEN #1: periodic fresh→main index merge — drains the fresh +// corpus into the main one when it crosses the operational ceiling). +// +// Source of truth: the i.ids tracker, NOT the meta map. The meta +// map intentionally stays sparse (only items with explicit +// metadata appear there, per the K-B1 nil-vs-{} distinction). Using +// meta as the ID set would silently miss items added with nil +// metadata. +func (i *Index) IDs() []string { + i.mu.RLock() + defer i.mu.RUnlock() + out := make([]string, 0, len(i.ids)) + for id := range i.ids { + out = append(out, id) + } + return out +} + // Add inserts a vector with optional metadata, with replace // semantics for the vector: if id already exists, the prior // vector is removed first. Dim must match the index dim or @@ -178,6 +207,7 @@ func (i *Index) Add(id string, vec []float32, meta json.RawMessage) error { } } i.g.Add(hnsw.MakeNode(id, vec)) + i.ids[id] = struct{}{} if meta != nil { // Per scrum K-B1 (Kimi): only OVERWRITE on explicit non-nil. // nil = "leave existing meta alone" (upsert). To clear, the @@ -287,6 +317,7 @@ func (i *Index) BatchAdd(items []BatchItem) error { i.g.Add(nodes...) for _, it := range items { + i.ids[it.ID] = struct{}{} if it.Metadata != nil { i.meta[it.ID] = it.Metadata } @@ -330,6 +361,7 @@ func (i *Index) Delete(id string) bool { i.mu.Lock() defer i.mu.Unlock() delete(i.meta, id) + delete(i.ids, id) return i.g.Delete(id) } @@ -433,6 +465,18 @@ func DecodeIndex(envelopeR, graphR io.Reader) (*Index, error) { if env.Metadata != nil { idx.meta = env.Metadata } + // Restore the ids tracker from the metadata keyset. Items that + // were Add'd with nil metadata aren't in env.Metadata and won't + // appear in i.ids after reload — IDs() will miss them, and the + // merge endpoint will skip them. This is acceptable because the + // production HTTP path always supplies non-nil metadata (handler + // requires it explicitly; multi_coord_stress always sends an + // object). The edge case is intentionally not closed because + // closing it requires bumping the envelope version, which + // invalidates existing persisted indexes. + for id := range idx.meta { + idx.ids[id] = struct{}{} + } return idx, nil } diff --git a/internal/vectord/index_test.go b/internal/vectord/index_test.go index 98796f4..c3a45ae 100644 --- a/internal/vectord/index_test.go +++ b/internal/vectord/index_test.go @@ -277,6 +277,43 @@ func TestLookup_ReturnsCopy(t *testing.T) { } } +// TestIndex_IDs locks the snapshot semantics: IDs() returns a copy +// of the metadata keyset that callers can iterate without holding +// the index lock. Underpins the merge endpoint (OPEN #1) — without +// IDs(), the merge handler can't enumerate items to drain. +func TestIndex_IDs(t *testing.T) { + idx, err := NewIndex(IndexParams{Name: "ids_test", Dimension: 4}) + if err != nil { + t.Fatalf("NewIndex: %v", err) + } + if got := idx.IDs(); len(got) != 0 { + t.Errorf("empty index should have no IDs, got %v", got) + } + // Add with nil meta — the ids tracker is the canonical set, so + // these MUST appear in IDs() even though they're not in i.meta. + for _, id := range []string{"a", "b", "c"} { + if err := idx.Add(id, []float32{1, 0, 0, 0}, nil); err != nil { + t.Fatalf("Add %s: %v", id, err) + } + } + got := idx.IDs() + if len(got) != 3 { + t.Errorf("expected 3 IDs after 3 Adds (nil meta still counts), got %d %v", len(got), got) + } + got[0] = "MUTATED" + got2 := idx.IDs() + for _, id := range got2 { + if id == "MUTATED" { + t.Errorf("IDs() must return a snapshot independent of internal state") + } + } + // Delete updates the tracker. + idx.Delete("a") + if got := idx.IDs(); len(got) != 2 { + t.Errorf("expected 2 IDs after Delete, got %d %v", len(got), got) + } +} + func TestRegistry_Names_Sorted(t *testing.T) { r := NewRegistry() for _, n := range []string{"zoo", "alpha", "midway"} { diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go index 8a5987e..fd0a02c 100644 --- a/scripts/multi_coord_stress/main.go +++ b/scripts/multi_coord_stress/main.go @@ -102,6 +102,9 @@ type ResultRef struct { type Event struct { Phase string `json:"phase"` Hour int `json:"hour"` // operational-narrative time label, not real wall clock + // Real wall-clock time of the event is in TimestampUnixNano below + // (already present pre-OPEN-#4). Consumers can derive + // "since run start" from event.TimestampUnixNano - Output.GeneratedAt. Coordinator string `json:"coordinator"` Contract string `json:"contract"` Role string `json:"role"` @@ -127,13 +130,26 @@ type Event struct { } type Output struct { - Coordinators []string `json:"coordinators"` - Contracts []string `json:"contracts"` - Events []Event `json:"events"` - Diversity Diversity `json:"diversity"` - Determinism Determ `json:"determinism"` - Learning Learning `json:"learning"` - GeneratedAt time.Time `json:"generated_at"` + Coordinators []string `json:"coordinators"` + Contracts []string `json:"contracts"` + Events []Event `json:"events"` + Diversity Diversity `json:"diversity"` + Determinism Determ `json:"determinism"` + Learning Learning `json:"learning"` + PhaseTimings []PhaseTiming `json:"phase_timings,omitempty"` // OPEN #4: real wall-clock per phase + TotalElapsedMs int64 `json:"total_elapsed_ms,omitempty"` + GeneratedAt time.Time `json:"generated_at"` +} + +// PhaseTiming records the real wall-clock duration of a single phase. +// Closes OPEN #4's "real-time wall-clock for the stress harness" — +// operators reading the JSON now see actual phase durations alongside +// the operational-narrative Hour labels. Hour is a fictional time +// (the simulated 48-hour mock); ElapsedMs is the real one. +type PhaseTiming struct { + Phase string `json:"phase"` + StartUTC time.Time `json:"start_utc"` + ElapsedMs int64 `json:"elapsed_ms"` } // Diversity = how distinct are top-K worker sets across (coord, @@ -223,6 +239,10 @@ func main() { ctx := context.Background() _ = ctx + // runStart pinned BEFORE startPhase is defined so the closure + // can reference it; output.GeneratedAt picks it up later. + runStart := time.Now().UTC() + // Optional Langfuse client. Best-effort: missing env file or // unreachable Langfuse just means traces don't go anywhere; the // run still proceeds. @@ -255,7 +275,33 @@ func main() { // Subsequent emitSpan calls nest under it. Idempotent — returns // "" when Langfuse isn't configured so callers don't need nil // checks. + // Per-phase wall-clock tracker (OPEN #4 closure). Operators see + // "phase 2: surge took 8.3s" as the run progresses, not only in + // the final JSON. Used by both the stdout log line and the + // per-phase EndTime in the Langfuse span (close-on-next-startPhase). + var ( + currentPhaseName string + currentPhaseStart time.Time + phaseTimings []PhaseTiming + ) + closePhase := func() { + if currentPhaseName == "" { + return + } + elapsed := time.Since(currentPhaseStart) + phaseTimings = append(phaseTimings, PhaseTiming{ + Phase: currentPhaseName, + StartUTC: currentPhaseStart.UTC(), + ElapsedMs: elapsed.Milliseconds(), + }) + log.Printf("[stress] phase %s done — %.2fs (T+%.1fs)", currentPhaseName, elapsed.Seconds(), time.Since(runStart).Seconds()) + currentPhaseName = "" + } startPhase := func(name string, hour int, meta map[string]any) { + closePhase() + currentPhaseName = name + currentPhaseStart = time.Now() + log.Printf("[stress] phase %s starting (T+%.1fs)", name, time.Since(runStart).Seconds()) if lf == nil { return } @@ -267,7 +313,7 @@ func main() { TraceID: runTraceID, Name: name, Metadata: spanMeta, - StartTime: time.Now(), + StartTime: currentPhaseStart, }) } // emitSpan records one span as a child of the current phase span. @@ -320,7 +366,7 @@ func main() { output := Output{ Coordinators: []string{"alice", "bob", "carol"}, Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name}, - GeneratedAt: time.Now().UTC(), + GeneratedAt: runStart, } log.Printf("[stress] 3 coords, 3 contracts, k=%d, corpora=%v", *k, corpora) @@ -753,6 +799,11 @@ func main() { log.Printf("[stress] phase 7: diversity analysis") output.Diversity = computeDiversity(output.Events) + // Close the final phase + record total elapsed (OPEN #4 closure). + closePhase() + output.PhaseTimings = phaseTimings + output.TotalElapsedMs = time.Since(runStart).Milliseconds() + // ── write ─────────────────────────────────────────────────── if err := os.MkdirAll(filepath.Dir(*out), 0o755); err != nil { log.Fatalf("mkdir: %v", err)