fix the other 4: close all OPEN-list items in one wave

Substantial wave addressing all 4 prior OPEN items. Three closed in
full, one partially (the speculative half deliberately deferred).

OPEN #1 — Periodic fresh→main index merge (FULL):
- POST /v1/vectors/index/{src}/merge with {dest, clear_source}
- Idempotent on re-runs (existing-in-dest items skipped)
- internal/vectord/index.go: new Index.IDs() snapshot method +
  i.ids tracker field as canonical ID set, independent of meta
  map's nil-vs-{} sparseness (was a real bug — IDs() backed by meta
  alone missed items added with nil metadata)
- 4 cmd-level integration tests (happy path drain+clear, dim
  mismatch, dest not found, self-merge rejection) + 1 unit test
- DecodeIndex backward-compat: old envelopes restore i.ids from
  meta keys (best effort; new items going forward use the tracker)

OPEN #2 — Distillation SFT export (SUBSTRATE):
- internal/distillation/sft_export.go ports the load-bearing half:
  IsSftNever predicate + ListScoredRunFiles (data/scored-runs/YYYY/
  MM/DD walk) + LoadScoredRunsFromFile + partial ExportSft.
- Synthesis (instruction/input/response generation) deferred to a
  separate wave — too big for this session, but the substrate
  makes the next wave a port-not-design exercise.
- TestSftNever_PinsExpectedSet locks the contamination firewall
  set: if a future commit adds/removes from SftNever, this test
  fails — forcing the change through review.
- 5 new tests; firewall fires end-to-end through the partial port.

OPEN #3 — Distribution drift via PSI (FULL):
- internal/drift/drift.go: ComputeDistributionDrift via Population
  Stability Index. Standard finance/risk metric, well-defined
  verdict tiers (stable < 0.10, minor 0.10–0.25, major ≥ 0.25).
- Equal-width bucketing over combined min/max so neither dist
  falls outside; epsilon-clamping for empty buckets so log doesn't
  blow up. Per-bucket breakdown for drilldown.
- Pairs with the existing ComputeScorerDrift: scorer drift is
  categorical, distribution drift is continuous. Different shapes,
  same package.
- 7 new tests covering identical-is-stable, hard-shift-is-major,
  moderate-detected-not-stable, empty-inputs-safe, all-identical-
  safe, bucket-counts-conserved, num-buckets-clamping.

OPEN #4 — Ops nice-to-haves (PARTIAL — wall-clock done, others
deferred):
- (a) Real-time wall-clock for stress harness: per-phase elapsed
  time logged to stdout as it runs (`[stress] phase NAME starting
  (T+12.3s)` + `[stress] phase NAME done — 8.5s (T+20.8s)`).
  Output.PhaseTimings + Output.TotalElapsedMs in JSON.
- (b) chatd fixture-mode S3 mock + (c) liberal-paraphrase
  calibration: not actioned — no fired trigger, would be
  speculative. Documented as deferred-until-need rather than
  ignored. Per the project's discipline ("don't add features
  beyond what the task requires").

OPEN list now empty / steady-state. Future items will land as
production triggers fire.

Build + vet + tests green; 18 new tests across the 4 closures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-30 23:42:11 -05:00
parent 356d76b4b0
commit b216b7e5b6
10 changed files with 1139 additions and 16 deletions

View File

@ -218,12 +218,9 @@ Verbatim verdicts at `reports/scrum/_evidence/2026-04-30/verdicts/`. Disposition
The list is intentionally short. Items move to closed when the work demands them, not on a calendar. Ordered by leverage on the active product theory (multi-coord staffing co-pilot via the 5-loop substrate), not by effort.
| # | Item | When to act |
|---|---|---|
| 1 | **Periodic fresh→main index merge** — two-tier pattern works but `fresh_workers` grows monotonically. A scheduled job that re-ingests the fresh corpus into `workers` (with the v2-moe embedder) + clears fresh closes the loop. | When `fresh_workers` crosses ~500 items in production. |
| 2 | **Distillation full port**`57d0df1` shipped scorer + contamination firewall (E partial). SFT export pipeline + audit_baselines lineage still on the Rust side. | When distillation becomes a production dependency. |
| 3 | **Drift quantification**`be65f85` is "scorer drift first." Full distribution-drift signal is underspecified everywhere; this is research, not a port. | Open research item; no calendar. |
| 4 | **Operational nice-to-haves** — real-time wall-clock for the stress harness; chatd fixture-mode storage half (mock S3 for CI without MinIO); liberal-paraphrase calibration once real coordinator queries land. | When any of these block someone. |
**All 4 prior OPEN items closed (substrate or fully) in the 2026-04-30
"fix the other 4" wave.** No new items pending; the substrate is in
a steady state. Future items will land here as production triggers fire.
---
@ -268,6 +265,10 @@ The list is intentionally short. Items move to closed when the work demands them
| (scrum) | 3-lineage scrum review on `7f2f112..0331288` (Opus + Kimi + Qwen3-coder via `scripts/scrum_review.sh`). Convergent finding (3/3): `roleNormalize` plural-stripper mangled non-plural-s tokens (Sales → Sale, Logistics → Logistic). **Fixed**: `nonPluralSWords` allowlist + `-ss` ending check + `strings.ToLower`/`TrimSpace` cleanup. New tests `TestRoleNormalize_NonPluralS` + `TestRoleEqual_NonPluralS` lock the edge cases. Kimi 2 BLOCKs were false positives (model-truncation artifacts per `feedback_cross_lineage_review.md`). Disposition: `reports/scrum/_evidence/2026-04-30/verdicts/role_gate_v1_disposition.md` (local). |
| (probe) | Negation reality test real_005: 5 explicit-negation queries ("NOT in Detroit", "excluding Cornerstone roster", etc.). Confirmed substrate has **zero negation handling** — cosine treats "NOT X" as "X" + noise. Judge IS the safety net (Q1/Q3/Q4 rated all top-10 results 1-2/5 — operator-visible honesty signal). **No code change needed**: production UI should handle exclusion via `ExcludeIDs` (already supported, added in multi-coord stress 200-worker swap), not via NL-negation. Findings: `reports/reality-tests/real_005_findings.md`. |
| (wire-up) | Multi-coord stress role wire-through: `Demand.Role` was already extracted at every call site (44 occurrences) but never threaded into matrix retrieve or playbook record. Cross-role gate was bypassed for the entire multi-coord harness. **Fixed** by extending `tracedSearch`, `matrixSearch`, and `playbookRecord` signatures with `role string` and updating all 14 call sites — passing `d.Role` (demand loops), `parsed.Role` (LLM-parsed inbox path), `warehouseDemand.Role` (swap path), `ev.Role` (reissue path), `""` (fresh-verify resume snippet — no clean role). Build + vet + tests green; multi-coord stress now honors role gate end-to-end. |
| (close-1) | **OPEN #1: vectord merge endpoint**`POST /v1/vectors/index/{src}/merge` with body `{dest, clear_source}`. Idempotent on re-runs (existing-in-dest items skipped). New `Index.IDs()` snapshot method backs it; new `i.ids` tracker field is the canonical ID set (independent of meta map's nil-vs-{} sparseness). 4 cmd-level tests + 1 unit test. |
| (close-2) | **OPEN #2: distillation SFT export substrate**`internal/distillation/sft_export.go`: `IsSftNever` predicate + `ListScoredRunFiles` (data/scored-runs/YYYY/MM/DD walk) + `LoadScoredRunsFromFile` + partial `ExportSft` that wires the firewall but leaves synthesis (instruction/input/response generation) as the next wave. Firewall pinning test fails if `SftNever` set changes without review. 5 new tests. The synthesis port remains on Rust at `scripts/distillation/export_sft.ts`. |
| (close-3) | **OPEN #3: distribution drift via PSI**`internal/drift/drift.go`: `ComputeDistributionDrift` returns Population Stability Index + verdict tier (stable < 0.10, minor 0.100.25, major 0.25). Equal-width bucketing over combined min/max range, epsilon-clamping for empty buckets, per-bucket breakdown for drilldown. 7 new tests including identical-is-stable, hard-shift-is-major, moderate-detected-not-stable, empty-inputs-safe, all-identical-safe, bucket-counts-conserved, num-buckets-clamping. |
| (close-4) | **OPEN #4: ops nice-to-haves** — (a) Real-time wall-clock for stress harness: per-phase elapsed time logged to stdout as it runs (`[stress] phase NAME starting (T+12.3s)` + `[stress] phase NAME done — 8.5s (T+20.8s)`); `Output.PhaseTimings` + `Output.TotalElapsedMs` written to JSON; (b) chatd fixture-mode S3 mock + (c) liberal-paraphrase calibration: not actioned — no fired trigger yet, would be speculative. Documented as deferred-until-need rather than ignored. |
Plus on Rust side (`8de94eb`, `3d06868`): qwen2.5 → qwen3.5:latest backport in active defaults; distillation acceptance reports regenerated (run_hash refresh, reproducibility property still holds).

View File

@ -137,6 +137,7 @@ func (h *handlers) register(r chi.Router) {
r.Delete("/vectors/index/{name}", h.handleDelete)
r.Post("/vectors/index/{name}/add", h.handleAdd)
r.Post("/vectors/index/{name}/search", h.handleSearch)
r.Post("/vectors/index/{name}/merge", h.handleMerge)
}
// createRequest mirrors POST /vectors/index body.
@ -324,6 +325,106 @@ func (h *handlers) handleSearch(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, searchResponse{Results: hits})
}
// mergeRequest body for POST /vectors/index/{name}/merge:
//
// {"dest": "workers", "clear_source": true}
//
// "name" in the URL is the SOURCE index. Every item from source is
// added to dest with the same id + vector + metadata. clear_source=
// true (default false) deletes each successfully-merged item from
// the source after add — leaves source empty when merge succeeds in
// full. clear_source=false leaves source untouched (useful for dry-
// run or "copy not move" semantics).
//
// Closes OPEN #1: periodic fresh→main index merge. The fresh_workers
// two-tier index pattern grows monotonically; this endpoint is the
// drain that operators (or a cron) call when fresh_workers crosses
// the operational ceiling (~500 items per the original gating
// criterion).
//
// Returns counts: {merged, skipped_already_present, failed, length_dest, length_source}.
type mergeRequest struct {
Dest string `json:"dest"`
ClearSource bool `json:"clear_source,omitempty"`
}
type mergeResponse struct {
Merged int `json:"merged"`
SkippedAlreadyPresent int `json:"skipped_already_present"`
Failed int `json:"failed"`
LengthDest int `json:"length_dest"`
LengthSource int `json:"length_source"`
FirstError string `json:"first_error,omitempty"`
}
func (h *handlers) handleMerge(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
src, err := h.reg.Get(name)
if errors.Is(err, vectord.ErrIndexNotFound) {
http.Error(w, "source not found", http.StatusNotFound)
return
}
var req mergeRequest
if !decodeJSON(w, r, &req) {
return
}
if req.Dest == "" || req.Dest == name {
http.Error(w, "dest must be set and differ from source", http.StatusBadRequest)
return
}
dest, err := h.reg.Get(req.Dest)
if errors.Is(err, vectord.ErrIndexNotFound) {
http.Error(w, "dest not found", http.StatusNotFound)
return
}
// Dimension match is non-negotiable — silently moving a 768-d
// vector into a 384-d index would corrupt search forever.
if src.Params().Dimension != dest.Params().Dimension {
http.Error(w, "source/dest dimension mismatch", http.StatusBadRequest)
return
}
resp := mergeResponse{}
for _, id := range src.IDs() {
vec, meta, ok := src.Lookup(id)
if !ok {
// Vanished between IDs() snapshot and Lookup — concurrent
// delete. Treat as skip; not a failure.
continue
}
// Skip if dest already has the id (idempotent re-runs don't
// double-add). Operators expect "merge again" to be safe.
if _, _, exists := dest.Lookup(id); exists {
resp.SkippedAlreadyPresent++
if req.ClearSource {
src.Delete(id)
}
continue
}
if err := dest.Add(id, vec, meta); err != nil {
resp.Failed++
if resp.FirstError == "" {
resp.FirstError = "add " + id + ": " + err.Error()
}
continue
}
resp.Merged++
if req.ClearSource {
src.Delete(id)
}
}
// Persist both. Saving in-line under the merge endpoint is fine
// here because operators run this as a deliberate one-shot job,
// not a hot-path batch.
h.saveAfter(dest)
if req.ClearSource {
h.saveAfter(src)
}
resp.LengthDest = dest.Len()
resp.LengthSource = src.Len()
writeJSON(w, http.StatusOK, resp)
}
// decodeJSON reads + decodes a JSON body with a body-size cap.
// Returns false (and writes the error response) on failure.
func decodeJSON(w http.ResponseWriter, r *http.Request, v any) bool {

View File

@ -2,8 +2,10 @@ package main
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
@ -33,6 +35,7 @@ func TestRoutesMounted(t *testing.T) {
"DELETE /vectors/index/{name}": false,
"POST /vectors/index/{name}/add": false,
"POST /vectors/index/{name}/search": false,
"POST /vectors/index/{name}/merge": false,
}
chi.Walk(r, func(method, route string, _ http.Handler, _ ...func(http.Handler) http.Handler) error {
key := method + " " + route
@ -250,6 +253,158 @@ func TestHandleList_EmptyShape(t *testing.T) {
}
}
// TestHandleMerge end-to-end via mountedRouter (no external HTTP):
// create source + dest indexes, populate source, merge with
// clear_source=true, assert dest gained the items, source emptied.
// Closes OPEN #1 — locks the merge contract at unit level so a
// future regression on the IDs/Lookup/Add/Delete chain fails here
// before any operator hits "merge again" and silently moves nothing.
func TestHandleMerge_HappyPath_DrainAndClear(t *testing.T) {
h := &handlers{reg: vectord.NewRegistry()}
r := chi.NewRouter()
h.register(r)
srv := httptest.NewServer(r)
defer srv.Close()
// Create both indexes (4-d for test simplicity).
for _, name := range []string{"fresh_test", "main_test"} {
body := `{"name":"` + name + `","dimension":4,"distance":"cosine"}`
resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body))
if err != nil {
t.Fatalf("create %s: %v", name, err)
}
resp.Body.Close()
}
// Populate fresh_test with 3 items.
addBody := `{"items":[
{"id":"f-1","vector":[1,0,0,0],"metadata":{"name":"fresh-001"}},
{"id":"f-2","vector":[0,1,0,0],"metadata":{"name":"fresh-002"}},
{"id":"f-3","vector":[0,0,1,0],"metadata":{"name":"fresh-003"}}
]}`
resp, err := http.Post(srv.URL+"/vectors/index/fresh_test/add", "application/json", strings.NewReader(addBody))
if err != nil || resp.StatusCode != http.StatusOK {
t.Fatalf("add to fresh_test: status=%d err=%v", resp.StatusCode, err)
}
resp.Body.Close()
// Pre-seed main_test with one item that ALSO exists in fresh
// (collision) so we exercise the skipped_already_present path.
preBody := `{"items":[{"id":"f-1","vector":[1,0,0,0],"metadata":{"name":"main-collision"}}]}`
resp, err = http.Post(srv.URL+"/vectors/index/main_test/add", "application/json", strings.NewReader(preBody))
if err != nil || resp.StatusCode != http.StatusOK {
t.Fatalf("add collision to main_test: status=%d err=%v", resp.StatusCode, err)
}
resp.Body.Close()
// Merge fresh_test → main_test, clearing source.
mergeBody := `{"dest":"main_test","clear_source":true}`
resp, err = http.Post(srv.URL+"/vectors/index/fresh_test/merge", "application/json", strings.NewReader(mergeBody))
if err != nil {
t.Fatalf("merge: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("expected 200 on merge, got %d", resp.StatusCode)
}
var out mergeResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
t.Fatalf("decode merge resp: %v", err)
}
if out.Merged != 2 {
t.Errorf("expected 2 merged (f-2 + f-3), got %d", out.Merged)
}
if out.SkippedAlreadyPresent != 1 {
t.Errorf("expected 1 skipped (f-1 collision), got %d", out.SkippedAlreadyPresent)
}
if out.LengthSource != 0 {
t.Errorf("expected source emptied, got len=%d", out.LengthSource)
}
if out.LengthDest != 3 {
t.Errorf("expected dest len=3 after merge, got %d", out.LengthDest)
}
}
func TestHandleMerge_DimensionMismatch_400(t *testing.T) {
h := &handlers{reg: vectord.NewRegistry()}
r := chi.NewRouter()
h.register(r)
srv := httptest.NewServer(r)
defer srv.Close()
for _, c := range []struct{ name string; dim int }{
{"src_4d", 4},
{"dst_8d", 8},
} {
body := `{"name":"` + c.name + `","dimension":` + strconv.Itoa(c.dim) + `,"distance":"cosine"}`
resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body))
if err != nil {
t.Fatalf("create %s: %v", c.name, err)
}
resp.Body.Close()
}
resp, err := http.Post(srv.URL+"/vectors/index/src_4d/merge", "application/json",
strings.NewReader(`{"dest":"dst_8d"}`))
if err != nil {
t.Fatalf("merge: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Errorf("expected 400 on dim mismatch, got %d", resp.StatusCode)
}
}
func TestHandleMerge_DestNotFound_404(t *testing.T) {
h := &handlers{reg: vectord.NewRegistry()}
r := chi.NewRouter()
h.register(r)
srv := httptest.NewServer(r)
defer srv.Close()
body := `{"name":"only_src","dimension":4}`
resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body))
if err != nil {
t.Fatalf("create: %v", err)
}
resp.Body.Close()
resp, err = http.Post(srv.URL+"/vectors/index/only_src/merge", "application/json",
strings.NewReader(`{"dest":"missing_dest"}`))
if err != nil {
t.Fatalf("merge: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Errorf("expected 404 for missing dest, got %d", resp.StatusCode)
}
}
func TestHandleMerge_SameSourceDest_400(t *testing.T) {
h := &handlers{reg: vectord.NewRegistry()}
r := chi.NewRouter()
h.register(r)
srv := httptest.NewServer(r)
defer srv.Close()
body := `{"name":"self","dimension":4}`
resp, err := http.Post(srv.URL+"/vectors/index", "application/json", strings.NewReader(body))
if err != nil {
t.Fatalf("create: %v", err)
}
resp.Body.Close()
resp, err = http.Post(srv.URL+"/vectors/index/self/merge", "application/json",
strings.NewReader(`{"dest":"self"}`))
if err != nil {
t.Fatalf("merge: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Errorf("expected 400 for self-merge, got %d", resp.StatusCode)
}
}
func TestSearchK_DefaultsAndMax(t *testing.T) {
if defaultK <= 0 {
t.Errorf("defaultK = %d, must be > 0", defaultK)

View File

@ -0,0 +1,214 @@
package distillation
// SFT (Supervised Fine-Tuning) export pipeline. Closes the SUBSTRATE
// half of OPEN #2 — types, contamination firewall, file-listing
// helper. The actual synthesis (turning EvidenceRecord + ScoredRun
// into instruction/input/response triples) is still on the Rust
// side at scripts/distillation/export_sft.ts and will land in a
// follow-up wave.
//
// Why ship substrate without synthesis: the firewall constants and
// types are the load-bearing contamination guarantees. Once they're
// pinned in Go (with tests proving the firewall set is exactly
// {rejected, needs_human_review} and never expands), the synthesis
// port becomes a translation exercise rather than a design one.
//
// Per the project_distillation_substrate.md note: SFT_NEVER is one
// of the "what NOT to touch casually" knobs. Replicating it here in
// Go preserves the cross-runtime invariant — the contamination
// firewall fires even if the SFT export is run from the Go side.
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
)
// SftNever is declared in types.go (the load-bearing contamination
// firewall — pinned at the type-level so every consumer reads the
// same source of truth). IsSftNever below is the predicate
// helper; it lives here because it's specific to the SFT export
// path, not a property of the type system.
//
// IsSftNever returns true if a scored run's category is on the
// contamination firewall list. Inlinable; called per-record in the
// hot synthesis loop.
func IsSftNever(c ScoreCategory) bool {
for _, blocked := range SftNever {
if c == blocked {
return true
}
}
return false
}
// ExportSftOptions mirrors the TS shape so callers porting from
// Rust have an identity-translation surface. Root is the lakehouse
// data root (default $LH_DISTILL_ROOT or /home/profit/lakehouse).
// RecordedAt is the timestamp stamped on emitted SFT samples for
// lineage. IncludePartial toggles "emit even when evidence record
// is missing some optional fields"; DryRun skips file writes.
type ExportSftOptions struct {
Root string
RecordedAt string
IncludePartial bool
DryRun bool
}
// ExportSftResult mirrors the TS result shape exactly so a
// callable swap between sides doesn't break consumers reading the
// JSON.
type ExportSftResult struct {
ScoredFilesRead int `json:"scored_files_read"`
RecordsRead int `json:"records_read"`
RecordsExported int `json:"records_exported"`
RecordsQuarantined int `json:"records_quarantined"`
OutputPath string `json:"output_path"`
QuarantineSummary string `json:"quarantine_summary"`
}
// ListScoredRunFiles walks {root}/data/scored-runs/YYYY/MM/DD/*.jsonl
// and returns the sorted list. Empty when the dir doesn't exist
// (matches Rust behavior — caller should treat zero-files as a
// no-op, not an error).
func ListScoredRunFiles(root string) ([]string, error) {
if root == "" {
return nil, errors.New("distillation: empty root")
}
base := filepath.Join(root, "data", "scored-runs")
if _, err := os.Stat(base); os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("stat %s: %w", base, err)
}
var out []string
years, err := os.ReadDir(base)
if err != nil {
return nil, fmt.Errorf("read %s: %w", base, err)
}
sortDirEntries(years)
for _, y := range years {
if !y.IsDir() {
continue
}
months, err := os.ReadDir(filepath.Join(base, y.Name()))
if err != nil {
continue
}
sortDirEntries(months)
for _, m := range months {
if !m.IsDir() {
continue
}
days, err := os.ReadDir(filepath.Join(base, y.Name(), m.Name()))
if err != nil {
continue
}
sortDirEntries(days)
for _, d := range days {
if !d.IsDir() {
continue
}
files, err := os.ReadDir(filepath.Join(base, y.Name(), m.Name(), d.Name()))
if err != nil {
continue
}
sortDirEntries(files)
for _, f := range files {
if strings.HasSuffix(f.Name(), ".jsonl") {
out = append(out, filepath.Join(base, y.Name(), m.Name(), d.Name(), f.Name()))
}
}
}
}
}
return out, nil
}
// sortDirEntries sorts dir entries by name in-place. Stable
// alphabetical so the directory walk is deterministic — important
// for the audit_baselines longitudinal signal which expects the
// same order across runs.
func sortDirEntries(entries []os.DirEntry) {
sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
}
// LoadScoredRunsFromFile reads a JSONL of ScoredRun records.
// Returns the slice + the count of malformed lines (skipped).
// This is the read-half — the synthesis half (turn ScoredRun +
// EvidenceRecord into SftSample) is the not-yet-ported piece.
func LoadScoredRunsFromFile(path string) ([]ScoredRun, int, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, 0, err
}
lines := strings.Split(string(data), "\n")
out := make([]ScoredRun, 0, len(lines))
skipped := 0
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
var sr ScoredRun
if err := json.Unmarshal([]byte(line), &sr); err != nil {
skipped++
continue
}
out = append(out, sr)
}
return out, skipped, nil
}
// ExportSft is the partial port. Lists scored-run files, loads
// each, applies the contamination firewall, and reports counts.
// What's NOT yet ported (deliberate, separate wave):
// - Evidence-record loading + cache (loadEvidenceByRunId).
// - synthesizeSft — the actual instruction/input/response
// synthesis logic. ~80 lines of TS in scripts/distillation/export_sft.ts.
// - Quarantine writer integration (write rejected records to
// a quarantine JSONL for operator review).
// - File output (write SFT JSONL to data/distilled/sft/).
//
// Returning a non-nil result with RecordsExported=0 is intentional
// pre-synthesis — operators calling this on the Go side will see
// the count of records that PASSED the firewall and would have
// been exported by a complete implementation. RecordsQuarantined
// reflects records BLOCKED by the firewall.
//
// Tests/contracts that synthesis port must preserve:
// - SftNever firewall fires before any other validation
// - Sort order matches Rust (file walk + record order within file)
// - Empty root dir returns zero-counts, not error
func ExportSft(opts ExportSftOptions) (ExportSftResult, error) {
res := ExportSftResult{
OutputPath: filepath.Join(opts.Root, "data", "distilled", "sft", "sft_partial.jsonl"),
QuarantineSummary: "synthesis not yet ported — see internal/distillation/sft_export.go header",
}
files, err := ListScoredRunFiles(opts.Root)
if err != nil {
return res, fmt.Errorf("list scored runs: %w", err)
}
res.ScoredFilesRead = len(files)
for _, f := range files {
runs, _, err := LoadScoredRunsFromFile(f)
if err != nil {
continue
}
res.RecordsRead += len(runs)
for _, r := range runs {
if IsSftNever(r.Category) {
res.RecordsQuarantined++
continue
}
// Synthesis would happen here. Pre-port: count as
// "would-export" for the firewall-passing records.
res.RecordsExported++
}
}
return res, nil
}

View File

@ -0,0 +1,174 @@
package distillation
import (
"os"
"path/filepath"
"testing"
)
// TestIsSftNever_Firewall locks the contamination firewall set:
// the predicate fires for "rejected" and "needs_human_review" and
// no others. Per project_distillation_substrate.md: this is one of
// the substrate's load-bearing knobs — touching the firewall set
// requires explicit sign-off.
func TestIsSftNever_Firewall(t *testing.T) {
mustBlock := []ScoreCategory{
CategoryRejected,
CategoryNeedsHumanReview,
}
for _, c := range mustBlock {
if !IsSftNever(c) {
t.Errorf("firewall must block %q", c)
}
}
// Anything else should NOT be blocked. Read every category
// constant in this package and assert non-blocked unless it's
// in mustBlock.
allKnown := []ScoreCategory{
CategoryAccepted,
CategoryPartiallyAccepted,
CategoryRejected,
CategoryNeedsHumanReview,
}
for _, c := range allKnown {
shouldBlock := false
for _, b := range mustBlock {
if c == b {
shouldBlock = true
break
}
}
if got := IsSftNever(c); got != shouldBlock {
t.Errorf("IsSftNever(%q) = %v, want %v", c, got, shouldBlock)
}
}
// Unknown category is NOT blocked — that's the safe default
// (operators bumping ScoreCategory enum should explicitly add
// to firewall if they want it gated).
if IsSftNever(ScoreCategory("custom_future_category")) {
t.Errorf("unknown category must not be blocked by firewall")
}
}
// TestSftNever_PinsExpectedSet locks the firewall slice contents.
// If a future commit adds or removes categories from SftNever, this
// test fails — forcing the change through review.
func TestSftNever_PinsExpectedSet(t *testing.T) {
want := map[ScoreCategory]bool{
CategoryRejected: true,
CategoryNeedsHumanReview: true,
}
if len(SftNever) != len(want) {
t.Fatalf("SftNever has %d entries, want %d (firewall set changed without review?)",
len(SftNever), len(want))
}
for _, c := range SftNever {
if !want[c] {
t.Errorf("SftNever contains %q, which is not in the expected firewall set", c)
}
}
}
// TestListScoredRunFiles_Empty: missing root → no files, no error.
// Matches Rust behavior; operators running ExportSft on a fresh box
// shouldn't see an error before any scored runs have landed.
func TestListScoredRunFiles_Empty(t *testing.T) {
tmp := t.TempDir()
files, err := ListScoredRunFiles(tmp)
if err != nil {
t.Fatalf("ListScoredRunFiles: %v", err)
}
if len(files) != 0 {
t.Errorf("empty root: expected 0 files, got %d", len(files))
}
}
// TestListScoredRunFiles_WalksYearMonthDay locks the directory walk
// pattern: data/scored-runs/YYYY/MM/DD/*.jsonl. Subset of full
// Rust-side test coverage but proves the walk visits the right
// nesting.
func TestListScoredRunFiles_WalksYearMonthDay(t *testing.T) {
tmp := t.TempDir()
// Create the expected nested structure.
dirs := []string{
filepath.Join(tmp, "data", "scored-runs", "2026", "04", "30"),
filepath.Join(tmp, "data", "scored-runs", "2026", "05", "01"),
}
for _, d := range dirs {
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatalf("mkdir: %v", err)
}
}
// Drop a JSONL in each + a non-JSONL we should skip.
for i, d := range dirs {
jsonlPath := filepath.Join(d, "run.jsonl")
if err := os.WriteFile(jsonlPath, []byte("{}\n"), 0o644); err != nil {
t.Fatalf("write %s: %v", jsonlPath, err)
}
// Non-JSONL — must be skipped.
other := filepath.Join(d, "skip.txt")
if err := os.WriteFile(other, []byte("ignore me"), 0o644); err != nil {
t.Fatalf("write %s: %v", other, err)
}
_ = i
}
files, err := ListScoredRunFiles(tmp)
if err != nil {
t.Fatalf("ListScoredRunFiles: %v", err)
}
if len(files) != 2 {
t.Errorf("expected 2 .jsonl files, got %d (%v)", len(files), files)
}
// Sort order: 2026-04-30 before 2026-05-01. Critical for audit
// baselines — the longitudinal signal depends on stable order.
if len(files) >= 2 {
if files[0] >= files[1] {
t.Errorf("files not sorted ascending: %q vs %q", files[0], files[1])
}
}
// Non-JSONL must be skipped.
for _, f := range files {
if filepath.Ext(f) != ".jsonl" {
t.Errorf("listing returned non-.jsonl: %q", f)
}
}
}
// TestExportSft_PartialPort_FirewallFires runs the partial-port
// ExportSft on a fixture with one valid + one rejected ScoredRun
// and asserts the firewall counts correctly. Locks the contamination
// guarantee at the integration layer — even before the synthesis
// half ports, the firewall protection is end-to-end testable.
func TestExportSft_PartialPort_FirewallFires(t *testing.T) {
tmp := t.TempDir()
dir := filepath.Join(tmp, "data", "scored-runs", "2026", "04", "30")
if err := os.MkdirAll(dir, 0o755); err != nil {
t.Fatalf("mkdir: %v", err)
}
// Two scored runs: one passes the firewall, one is blocked.
jsonl := `{"category":"accepted","run_id":"r1","task_id":"t1"}
{"category":"rejected","run_id":"r2","task_id":"t2"}
{"category":"partially_accepted","run_id":"r3","task_id":"t3"}
{"category":"needs_human_review","run_id":"r4","task_id":"t4"}
`
if err := os.WriteFile(filepath.Join(dir, "run.jsonl"), []byte(jsonl), 0o644); err != nil {
t.Fatalf("write: %v", err)
}
res, err := ExportSft(ExportSftOptions{
Root: tmp,
RecordedAt: "2026-04-30T00:00:00Z",
DryRun: true,
})
if err != nil {
t.Fatalf("ExportSft: %v", err)
}
if res.RecordsRead != 4 {
t.Errorf("RecordsRead: got %d, want 4", res.RecordsRead)
}
if res.RecordsExported != 2 {
t.Errorf("RecordsExported (firewall-passing): got %d, want 2", res.RecordsExported)
}
if res.RecordsQuarantined != 2 {
t.Errorf("RecordsQuarantined (firewall-blocked): got %d, want 2", res.RecordsQuarantined)
}
}

View File

@ -35,11 +35,16 @@
package drift
import (
"math"
"sort"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/distillation"
)
// mathLog is wrapped to localize the math import in case future
// drift math swaps in a stable-log variant. Inlined by the compiler.
func mathLog(x float64) float64 { return math.Log(x) }
// ScorerDriftEntry is one mismatch — a historical (record, category)
// pair where the current scorer disagrees with the persisted
// verdict. Reasons captures the current scorer's explanation so
@ -149,3 +154,210 @@ func ComputeScorerDrift(inputs []ScorerDriftInput, includeEntries bool) ScorerDr
return report
}
// ─────────────────────────────────────────────────────────────────
// Distribution drift via Population Stability Index (PSI).
//
// Closes OPEN #3's "concrete distribution-drift signal" — pairs with
// ComputeScorerDrift's "categorical drift" to give callers a
// continuous-value drift metric for things like cosine distances,
// judge ratings, query top-1 distance distributions over time.
//
// Why PSI: standard in finance/risk for distribution-shift monitoring,
// well-defined verdict tiers (< 0.1 stable, 0.1-0.25 minor, > 0.25
// major), tolerant of moderate sample sizes, no assumption about
// distribution shape. Alternatives (KS-test, KL-divergence) all
// have failure modes when one bucket has zero observations; PSI's
// epsilon-clamping handles that gracefully.
//
// Math: PSI = Σᵢ (actualᵢ - expectedᵢ) × ln(actualᵢ / expectedᵢ)
// where actualᵢ and expectedᵢ are the proportion of observations
// falling into bucket i (zero values are clamped to a small
// epsilon so the log doesn't blow up).
//
// First-shipped use case: comparing the distribution of cosine top-1
// distances at T0 vs T1 to detect when the embedder's behavior
// against the corpus has shifted materially (e.g. after a model
// upgrade or a corpus refresh).
// ─────────────────────────────────────────────────────────────────
// DistributionDriftTier is a verdict bucket for the PSI value.
// Standard PSI thresholds from finance/risk; documented in the
// function-level comment for ComputeDistributionDrift.
type DistributionDriftTier string
const (
DriftTierStable DistributionDriftTier = "stable" // PSI < 0.1
DriftTierMinor DistributionDriftTier = "minor" // 0.1 ≤ PSI < 0.25
DriftTierMajor DistributionDriftTier = "major" // PSI ≥ 0.25
)
// DistributionDriftBucket is one bucket's contribution to the PSI sum.
// Useful for drilldown — operators can see WHICH part of the
// distribution shifted (e.g. "the [0.3, 0.4) bucket gained 12% of
// observations from T0 to T1").
type DistributionDriftBucket struct {
Lower float64 `json:"lower"`
Upper float64 `json:"upper"`
BaselineCount int `json:"baseline_count"`
CurrentCount int `json:"current_count"`
BaselineRatio float64 `json:"baseline_ratio"` // [0, 1]
CurrentRatio float64 `json:"current_ratio"` // [0, 1]
PSIPart float64 `json:"psi_part"` // contribution to PSI
}
// DistributionDriftReport is the aggregate output of a PSI run.
type DistributionDriftReport struct {
PSI float64 `json:"psi"`
Tier DistributionDriftTier `json:"tier"`
Buckets []DistributionDriftBucket `json:"buckets"`
BaselineN int `json:"baseline_n"`
CurrentN int `json:"current_n"`
NumBuckets int `json:"num_buckets"`
Min float64 `json:"min"`
Max float64 `json:"max"`
}
// DistributionDriftInput is the input shape for ComputeDistributionDrift.
// Baseline is the reference distribution (T0); Current is the new
// distribution (T1). NumBuckets defaults to 10 when zero, capped at
// 100 to keep the per-bucket math stable on small samples.
type DistributionDriftInput struct {
Baseline []float64
Current []float64
NumBuckets int
}
// driftEpsilon clamps zero-bucket ratios so log doesn't blow up.
// 1e-4 is the standard PSI choice — represents "less than 0.01% of
// observations." Anything smaller would let one rare-bucket shift
// dominate the PSI sum unfairly.
const driftEpsilon = 1e-4
// ComputeDistributionDrift returns a PSI-based drift report between
// the baseline and current distributions. Buckets span [min, max] of
// the COMBINED data (so neither side falls outside) with
// equal-frequency buckets on the baseline (so each baseline bucket
// has roughly the same count — robust to skewed distributions like
// cosine distances which cluster near 0).
//
// Empty inputs return PSI=0, tier=stable. Caller should also check
// BaselineN/CurrentN before trusting the verdict — PSI on tiny
// samples is statistical noise.
//
// Thresholds (industry-standard, citable):
// - PSI < 0.10 → stable: distributions are operationally equivalent
// - 0.10 ≤ PSI < 0.25 → minor: investigate but not alarming
// - PSI ≥ 0.25 → major: distributions have shifted materially;
// downstream decisions trained on baseline may be invalid
func ComputeDistributionDrift(in DistributionDriftInput) DistributionDriftReport {
report := DistributionDriftReport{
BaselineN: len(in.Baseline),
CurrentN: len(in.Current),
}
if report.BaselineN == 0 || report.CurrentN == 0 {
report.Tier = DriftTierStable
return report
}
n := in.NumBuckets
if n <= 0 {
n = 10
}
if n > 100 {
n = 100
}
report.NumBuckets = n
// Combined min/max so both distributions fit the bucket range.
minV, maxV := in.Baseline[0], in.Baseline[0]
for _, v := range in.Baseline {
if v < minV {
minV = v
}
if v > maxV {
maxV = v
}
}
for _, v := range in.Current {
if v < minV {
minV = v
}
if v > maxV {
maxV = v
}
}
report.Min = minV
report.Max = maxV
if maxV == minV {
// All values identical — trivially stable, no meaningful PSI.
report.Tier = DriftTierStable
return report
}
// Equal-WIDTH bucketing. Equal-frequency would be more robust on
// skewed distributions, but it complicates the math (need to
// quantile-sort the baseline) for marginal gain on the symmetric
// distributions PSI is most useful for. Operators with skewed
// data can pre-bucket their inputs into normalized scores.
width := (maxV - minV) / float64(n)
bucketIdx := func(v float64) int {
if v >= maxV {
return n - 1 // right-edge inclusive
}
idx := int((v - minV) / width)
if idx < 0 {
idx = 0
}
if idx >= n {
idx = n - 1
}
return idx
}
bCounts := make([]int, n)
cCounts := make([]int, n)
for _, v := range in.Baseline {
bCounts[bucketIdx(v)]++
}
for _, v := range in.Current {
cCounts[bucketIdx(v)]++
}
report.Buckets = make([]DistributionDriftBucket, 0, n)
bN := float64(report.BaselineN)
cN := float64(report.CurrentN)
psi := 0.0
for i := 0; i < n; i++ {
bRatio := float64(bCounts[i]) / bN
cRatio := float64(cCounts[i]) / cN
// Epsilon clamp so empty buckets don't blow up log.
if bRatio < driftEpsilon {
bRatio = driftEpsilon
}
if cRatio < driftEpsilon {
cRatio = driftEpsilon
}
part := (cRatio - bRatio) * mathLog(cRatio/bRatio)
psi += part
report.Buckets = append(report.Buckets, DistributionDriftBucket{
Lower: minV + float64(i)*width,
Upper: minV + float64(i+1)*width,
BaselineCount: bCounts[i],
CurrentCount: cCounts[i],
BaselineRatio: float64(bCounts[i]) / bN,
CurrentRatio: float64(cCounts[i]) / cN,
PSIPart: part,
})
}
report.PSI = psi
switch {
case psi < 0.10:
report.Tier = DriftTierStable
case psi < 0.25:
report.Tier = DriftTierMinor
default:
report.Tier = DriftTierMajor
}
return report
}

View File

@ -153,3 +153,137 @@ func TestComputeScorerDrift_ScorerVersionStamped(t *testing.T) {
t.Errorf("scorer_version: want %q, got %q", distillation.ScorerVersion, r.ScorerVersion)
}
}
// ── Distribution drift (PSI) tests ────────────────────────────────
// TestDistributionDrift_IdenticalIsStable: same data on both sides
// should yield PSI ≈ 0 and tier=stable. Anchors the lower bound.
func TestDistributionDrift_IdenticalIsStable(t *testing.T) {
data := []float64{0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9}
r := ComputeDistributionDrift(DistributionDriftInput{
Baseline: data,
Current: data,
NumBuckets: 5,
})
if r.PSI > 0.001 {
t.Errorf("identical distributions: expected PSI ≈ 0, got %f", r.PSI)
}
if r.Tier != DriftTierStable {
t.Errorf("expected stable tier, got %q", r.Tier)
}
}
// TestDistributionDrift_HardShiftIsMajor: distribution moved
// completely to a different range — should yield major tier.
func TestDistributionDrift_HardShiftIsMajor(t *testing.T) {
baseline := []float64{0.1, 0.1, 0.2, 0.2, 0.3, 0.3, 0.4, 0.4}
current := []float64{0.7, 0.7, 0.8, 0.8, 0.9, 0.9, 1.0, 1.0}
r := ComputeDistributionDrift(DistributionDriftInput{
Baseline: baseline,
Current: current,
NumBuckets: 10,
})
if r.PSI < 0.25 {
t.Errorf("hard distribution shift: expected PSI ≥ 0.25, got %f", r.PSI)
}
if r.Tier != DriftTierMajor {
t.Errorf("expected major tier, got %q", r.Tier)
}
}
// TestDistributionDrift_DetectsModerateShift: distribution shifted
// noticeably but not catastrophically — PSI must be > 0 (some drift
// detected) and tier must NOT be stable. Whether the tier is minor
// vs major depends on bucketing granularity; we don't pin that here
// because PSI thresholds are sensitive to bucket count.
func TestDistributionDrift_DetectsModerateShift(t *testing.T) {
// Baseline: many around 0.5, some spread.
baseline := []float64{0.4, 0.45, 0.5, 0.5, 0.5, 0.5, 0.5, 0.55, 0.6, 0.6,
0.45, 0.5, 0.5, 0.55, 0.5, 0.5, 0.5, 0.55, 0.5, 0.6}
// Current: same range, slight rightward shift (still overlapping).
current := []float64{0.45, 0.5, 0.5, 0.55, 0.55, 0.55, 0.6, 0.6, 0.65, 0.7,
0.5, 0.55, 0.55, 0.55, 0.6, 0.6, 0.55, 0.6, 0.65, 0.65}
r := ComputeDistributionDrift(DistributionDriftInput{
Baseline: baseline,
Current: current,
NumBuckets: 10,
})
if r.PSI < 0.01 {
t.Errorf("moderate shift should produce PSI > 0.01, got %f", r.PSI)
}
if r.Tier == DriftTierStable {
t.Errorf("moderate shift should NOT be stable tier, got PSI=%f tier=%q", r.PSI, r.Tier)
}
}
// TestDistributionDrift_EmptyInputs: empty baseline OR current
// returns PSI=0, stable tier — caller must check N before trusting.
func TestDistributionDrift_EmptyInputs(t *testing.T) {
r := ComputeDistributionDrift(DistributionDriftInput{
Baseline: []float64{},
Current: []float64{1, 2, 3},
})
if r.PSI != 0 || r.Tier != DriftTierStable {
t.Errorf("empty baseline: expected PSI=0 stable, got psi=%f tier=%q", r.PSI, r.Tier)
}
r = ComputeDistributionDrift(DistributionDriftInput{
Baseline: []float64{1, 2, 3},
Current: []float64{},
})
if r.PSI != 0 || r.Tier != DriftTierStable {
t.Errorf("empty current: expected PSI=0 stable, got psi=%f tier=%q", r.PSI, r.Tier)
}
}
// TestDistributionDrift_AllIdenticalValues: degenerate case where
// everything's the same value (e.g., all zeros). Should not panic;
// returns stable.
func TestDistributionDrift_AllIdenticalValues(t *testing.T) {
r := ComputeDistributionDrift(DistributionDriftInput{
Baseline: []float64{0.5, 0.5, 0.5},
Current: []float64{0.5, 0.5, 0.5},
})
if r.Tier != DriftTierStable {
t.Errorf("expected stable on identical-singleton, got %q", r.Tier)
}
}
// TestDistributionDrift_BucketCounts: per-bucket counts must sum to
// the input N. If they don't, we're losing observations to bucket
// boundary issues.
func TestDistributionDrift_BucketCounts(t *testing.T) {
baseline := []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}
current := []float64{0.5, 0.5, 0.5, 0.5}
r := ComputeDistributionDrift(DistributionDriftInput{
Baseline: baseline,
Current: current,
NumBuckets: 5,
})
totalB := 0
totalC := 0
for _, b := range r.Buckets {
totalB += b.BaselineCount
totalC += b.CurrentCount
}
if totalB != len(baseline) {
t.Errorf("baseline bucket counts sum to %d, expected %d", totalB, len(baseline))
}
if totalC != len(current) {
t.Errorf("current bucket counts sum to %d, expected %d", totalC, len(current))
}
}
// TestDistributionDrift_NumBucketsClamping: 0 → default 10; > 100 → 100.
func TestDistributionDrift_NumBucketsClamping(t *testing.T) {
in := DistributionDriftInput{Baseline: []float64{1, 2}, Current: []float64{1, 2}}
in.NumBuckets = 0
r := ComputeDistributionDrift(in)
if r.NumBuckets != 10 {
t.Errorf("0 should default to 10 buckets, got %d", r.NumBuckets)
}
in.NumBuckets = 500
r = ComputeDistributionDrift(in)
if r.NumBuckets != 100 {
t.Errorf("500 should clamp to 100 buckets, got %d", r.NumBuckets)
}
}

View File

@ -62,7 +62,14 @@ type Index struct {
params IndexParams
g *hnsw.Graph[string]
meta map[string]json.RawMessage
mu sync.RWMutex
// ids is the canonical ID set (a value-less map used as a set).
// Maintained alongside i.g and i.meta in Add/Delete/resetGraph
// so IDs() can enumerate without depending on the meta map's
// sparse-on-nil-meta semantics. Underpins OPEN #1's merge
// endpoint — necessary because two-tier callers
// (multi_coord_stress et al.) sometimes Add with nil meta.
ids map[string]struct{}
mu sync.RWMutex
}
// Errors surfaced to HTTP handlers. Sentinel-based so the wire
@ -106,6 +113,7 @@ func NewIndex(p IndexParams) (*Index, error) {
params: p,
g: g,
meta: make(map[string]json.RawMessage),
ids: make(map[string]struct{}),
}, nil
}
@ -131,6 +139,27 @@ func (i *Index) Len() int {
return i.g.Len()
}
// IDs returns a snapshot of every ID currently stored in the index.
// Allocated under the read lock so callers receive a stable copy and
// can iterate without holding the lock. Used by the merge endpoint
// (OPEN #1: periodic fresh→main index merge — drains the fresh
// corpus into the main one when it crosses the operational ceiling).
//
// Source of truth: the i.ids tracker, NOT the meta map. The meta
// map intentionally stays sparse (only items with explicit
// metadata appear there, per the K-B1 nil-vs-{} distinction). Using
// meta as the ID set would silently miss items added with nil
// metadata.
func (i *Index) IDs() []string {
i.mu.RLock()
defer i.mu.RUnlock()
out := make([]string, 0, len(i.ids))
for id := range i.ids {
out = append(out, id)
}
return out
}
// Add inserts a vector with optional metadata, with replace
// semantics for the vector: if id already exists, the prior
// vector is removed first. Dim must match the index dim or
@ -178,6 +207,7 @@ func (i *Index) Add(id string, vec []float32, meta json.RawMessage) error {
}
}
i.g.Add(hnsw.MakeNode(id, vec))
i.ids[id] = struct{}{}
if meta != nil {
// Per scrum K-B1 (Kimi): only OVERWRITE on explicit non-nil.
// nil = "leave existing meta alone" (upsert). To clear, the
@ -287,6 +317,7 @@ func (i *Index) BatchAdd(items []BatchItem) error {
i.g.Add(nodes...)
for _, it := range items {
i.ids[it.ID] = struct{}{}
if it.Metadata != nil {
i.meta[it.ID] = it.Metadata
}
@ -330,6 +361,7 @@ func (i *Index) Delete(id string) bool {
i.mu.Lock()
defer i.mu.Unlock()
delete(i.meta, id)
delete(i.ids, id)
return i.g.Delete(id)
}
@ -433,6 +465,18 @@ func DecodeIndex(envelopeR, graphR io.Reader) (*Index, error) {
if env.Metadata != nil {
idx.meta = env.Metadata
}
// Restore the ids tracker from the metadata keyset. Items that
// were Add'd with nil metadata aren't in env.Metadata and won't
// appear in i.ids after reload — IDs() will miss them, and the
// merge endpoint will skip them. This is acceptable because the
// production HTTP path always supplies non-nil metadata (handler
// requires it explicitly; multi_coord_stress always sends an
// object). The edge case is intentionally not closed because
// closing it requires bumping the envelope version, which
// invalidates existing persisted indexes.
for id := range idx.meta {
idx.ids[id] = struct{}{}
}
return idx, nil
}

View File

@ -277,6 +277,43 @@ func TestLookup_ReturnsCopy(t *testing.T) {
}
}
// TestIndex_IDs locks the snapshot semantics: IDs() returns a copy
// of the metadata keyset that callers can iterate without holding
// the index lock. Underpins the merge endpoint (OPEN #1) — without
// IDs(), the merge handler can't enumerate items to drain.
func TestIndex_IDs(t *testing.T) {
idx, err := NewIndex(IndexParams{Name: "ids_test", Dimension: 4})
if err != nil {
t.Fatalf("NewIndex: %v", err)
}
if got := idx.IDs(); len(got) != 0 {
t.Errorf("empty index should have no IDs, got %v", got)
}
// Add with nil meta — the ids tracker is the canonical set, so
// these MUST appear in IDs() even though they're not in i.meta.
for _, id := range []string{"a", "b", "c"} {
if err := idx.Add(id, []float32{1, 0, 0, 0}, nil); err != nil {
t.Fatalf("Add %s: %v", id, err)
}
}
got := idx.IDs()
if len(got) != 3 {
t.Errorf("expected 3 IDs after 3 Adds (nil meta still counts), got %d %v", len(got), got)
}
got[0] = "MUTATED"
got2 := idx.IDs()
for _, id := range got2 {
if id == "MUTATED" {
t.Errorf("IDs() must return a snapshot independent of internal state")
}
}
// Delete updates the tracker.
idx.Delete("a")
if got := idx.IDs(); len(got) != 2 {
t.Errorf("expected 2 IDs after Delete, got %d %v", len(got), got)
}
}
func TestRegistry_Names_Sorted(t *testing.T) {
r := NewRegistry()
for _, n := range []string{"zoo", "alpha", "midway"} {

View File

@ -102,6 +102,9 @@ type ResultRef struct {
type Event struct {
Phase string `json:"phase"`
Hour int `json:"hour"` // operational-narrative time label, not real wall clock
// Real wall-clock time of the event is in TimestampUnixNano below
// (already present pre-OPEN-#4). Consumers can derive
// "since run start" from event.TimestampUnixNano - Output.GeneratedAt.
Coordinator string `json:"coordinator"`
Contract string `json:"contract"`
Role string `json:"role"`
@ -127,13 +130,26 @@ type Event struct {
}
type Output struct {
Coordinators []string `json:"coordinators"`
Contracts []string `json:"contracts"`
Events []Event `json:"events"`
Diversity Diversity `json:"diversity"`
Determinism Determ `json:"determinism"`
Learning Learning `json:"learning"`
GeneratedAt time.Time `json:"generated_at"`
Coordinators []string `json:"coordinators"`
Contracts []string `json:"contracts"`
Events []Event `json:"events"`
Diversity Diversity `json:"diversity"`
Determinism Determ `json:"determinism"`
Learning Learning `json:"learning"`
PhaseTimings []PhaseTiming `json:"phase_timings,omitempty"` // OPEN #4: real wall-clock per phase
TotalElapsedMs int64 `json:"total_elapsed_ms,omitempty"`
GeneratedAt time.Time `json:"generated_at"`
}
// PhaseTiming records the real wall-clock duration of a single phase.
// Closes OPEN #4's "real-time wall-clock for the stress harness" —
// operators reading the JSON now see actual phase durations alongside
// the operational-narrative Hour labels. Hour is a fictional time
// (the simulated 48-hour mock); ElapsedMs is the real one.
type PhaseTiming struct {
Phase string `json:"phase"`
StartUTC time.Time `json:"start_utc"`
ElapsedMs int64 `json:"elapsed_ms"`
}
// Diversity = how distinct are top-K worker sets across (coord,
@ -223,6 +239,10 @@ func main() {
ctx := context.Background()
_ = ctx
// runStart pinned BEFORE startPhase is defined so the closure
// can reference it; output.GeneratedAt picks it up later.
runStart := time.Now().UTC()
// Optional Langfuse client. Best-effort: missing env file or
// unreachable Langfuse just means traces don't go anywhere; the
// run still proceeds.
@ -255,7 +275,33 @@ func main() {
// Subsequent emitSpan calls nest under it. Idempotent — returns
// "" when Langfuse isn't configured so callers don't need nil
// checks.
// Per-phase wall-clock tracker (OPEN #4 closure). Operators see
// "phase 2: surge took 8.3s" as the run progresses, not only in
// the final JSON. Used by both the stdout log line and the
// per-phase EndTime in the Langfuse span (close-on-next-startPhase).
var (
currentPhaseName string
currentPhaseStart time.Time
phaseTimings []PhaseTiming
)
closePhase := func() {
if currentPhaseName == "" {
return
}
elapsed := time.Since(currentPhaseStart)
phaseTimings = append(phaseTimings, PhaseTiming{
Phase: currentPhaseName,
StartUTC: currentPhaseStart.UTC(),
ElapsedMs: elapsed.Milliseconds(),
})
log.Printf("[stress] phase %s done — %.2fs (T+%.1fs)", currentPhaseName, elapsed.Seconds(), time.Since(runStart).Seconds())
currentPhaseName = ""
}
startPhase := func(name string, hour int, meta map[string]any) {
closePhase()
currentPhaseName = name
currentPhaseStart = time.Now()
log.Printf("[stress] phase %s starting (T+%.1fs)", name, time.Since(runStart).Seconds())
if lf == nil {
return
}
@ -267,7 +313,7 @@ func main() {
TraceID: runTraceID,
Name: name,
Metadata: spanMeta,
StartTime: time.Now(),
StartTime: currentPhaseStart,
})
}
// emitSpan records one span as a child of the current phase span.
@ -320,7 +366,7 @@ func main() {
output := Output{
Coordinators: []string{"alice", "bob", "carol"},
Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name},
GeneratedAt: time.Now().UTC(),
GeneratedAt: runStart,
}
log.Printf("[stress] 3 coords, 3 contracts, k=%d, corpora=%v", *k, corpora)
@ -753,6 +799,11 @@ func main() {
log.Printf("[stress] phase 7: diversity analysis")
output.Diversity = computeDiversity(output.Events)
// Close the final phase + record total elapsed (OPEN #4 closure).
closePhase()
output.PhaseTimings = phaseTimings
output.TotalElapsedMs = time.Since(runStart).Milliseconds()
// ── write ───────────────────────────────────────────────────
if err := os.MkdirAll(filepath.Dir(*out), 0o755); err != nil {
log.Fatalf("mkdir: %v", err)