From 84a32f0d29b5206383e21f27ccac8f17542b64ac Mon Sep 17 00:00:00 2001 From: root Date: Thu, 30 Apr 2026 08:19:29 -0500 Subject: [PATCH] multi-coord stress Phase 2: ExcludeIDs + fresh-resume + 200-worker swap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Phase 2 additions land in this commit: 1. matrix.SearchRequest gains ExcludeIDs ([]string) — filters specific worker IDs out of results post-retrieval, AND skips them at the playbook boost+inject step (so excluded answers can't sneak back via Shape B). Real-world driver: coordinator placed N workers, client asks for replacements, system needs alternatives, not the same N. Threaded through retrieve.go after merge but before metadata filter so excluded IDs don't waste post-filter top-K slots. 2. New harness phase 2b: 200-worker swap simulation. Captures the top-K from alpha's warehouse query, then re-issues with exclude_ids=. Result Jaccard(orig, swap) measures whether the substrate finds genuine alternatives. 3. New harness phase 1b: fresh-resume mid-run injection. Three new workers ingested via /v1/embed + /v1/vectors/index/workers/add, then verified findable via semantic queries matching resume content. Plus Hour labels on every event (operational narrative: 0/6/12/18/ 24/30/36/42/48) and a refactor of captureEvent to take hour as a param. Run #003 + #004 results (5K workers + 10K ethereal): Diversity (#004): Same-role-across-contracts Jaccard = 0.080 (n=9) Different-roles-same-contract Jaccard = 0.013 (n=18) Determinism: 1.000 (#004 unchanged) Verbatim handover: 4/4 = 100% Paraphrase handover: 4/4 = 100% Phase 2b — 200-worker swap (Jaccard 0.000): 8 originally-placed workers fully replaced by 8 alternatives. ExcludeIDs substrate change works end-to-end — boost AND inject both honor the exclusion, so excluded workers don't return via the playbook either. Phase 1b — fresh-resume injection: REAL PRODUCT FINDING. Substrate ABSORPTION is fine — 3 /v1/vectors/index/workers/add calls at 200 status, 3 vectors persisted. But none of the 3 fresh workers surfaced in top-8 even with semantic queries matching their resume content (e.g. "Senior tower crane rigger NCCCO Chicago" vs fresh-001's resume "Senior rigger with 12 years tower-crane signaling..." NCCCO + Chicago). Top-1 came from existing workers at distance ~0.25; fresh workers' distances must be > 0.25, pushing them past rank 8. Cause: dense retrieval at 5000+ workers means many existing profiles cluster near any specific query in cosine space; nomic-embed-text-v2 (137M) introduces enough noise that a fresh worker doesn't reliably outrank them just because the text content overlaps. Workarounds (Phase 3 work): (a) hybrid retrieval (keyword + semantic), (b) playbook-layer score boost for fresh adds, (c) larger embedder. Documented in run #004 report. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/matrix/retrieve.go | 45 +++++ .../reality-tests/multi_coord_stress_003.md | 82 +++++++++ .../reality-tests/multi_coord_stress_004.md | 82 +++++++++ scripts/multi_coord_stress/main.go | 171 +++++++++++++++++- 4 files changed, 370 insertions(+), 10 deletions(-) create mode 100644 reports/reality-tests/multi_coord_stress_003.md create mode 100644 reports/reality-tests/multi_coord_stress_004.md diff --git a/internal/matrix/retrieve.go b/internal/matrix/retrieve.go index 351749f..f15036f 100644 --- a/internal/matrix/retrieve.go +++ b/internal/matrix/retrieve.go @@ -85,6 +85,15 @@ type SearchRequest struct { PlaybookMaxDistance float64 `json:"playbook_max_distance,omitempty"` PlaybookMaxInjectDistance float64 `json:"playbook_max_inject_distance,omitempty"` MetadataFilter map[string]any `json:"metadata_filter,omitempty"` + // ExcludeIDs filters out specific worker IDs post-retrieval. + // Real-world driver: a coordinator places 200 workers at a + // contract, then mid-day the client asks for a different set — + // the next query should NOT return the already-placed workers. + // Filter runs after merge but before metadata filter, so an + // excluded ID never wastes a slot in the post-filter top-K. + // Also applies to playbook boost + Shape B inject — excluded + // answers are skipped at injection time. + ExcludeIDs []string `json:"exclude_ids,omitempty"` } // SearchResponse wraps the merged results plus per-corpus return @@ -204,6 +213,25 @@ func (r *Retriever) Search(ctx context.Context, req SearchRequest) (*SearchRespo return allHits[i].Distance < allHits[j].Distance }) + // ExcludeIDs filter — applied first so excluded IDs don't waste + // a slot in the post-filter top-K. Real-world driver: coordinator + // has placed N workers at a contract; mid-day the client asks for + // alternatives, so this query passes ExcludeIDs= and + // gets back fresh candidates instead of the same N. + if len(req.ExcludeIDs) > 0 { + excludeSet := make(map[string]bool, len(req.ExcludeIDs)) + for _, id := range req.ExcludeIDs { + excludeSet[id] = true + } + kept := make([]Result, 0, len(allHits)) + for _, h := range allHits { + if !excludeSet[h.ID] { + kept = append(kept, h) + } + } + allHits = kept + } + // Metadata filter (component B — staffing-side structured gate). // Applied BEFORE top-K truncation so the filter doesn't accidentally // reduce coverage further. Caller can request larger PerCorpusK to @@ -239,6 +267,23 @@ func (r *Retriever) Search(ctx context.Context, req SearchRequest) (*SearchRespo if err != nil { slog.Warn("matrix: playbook lookup failed; skipping boost+inject", "err", err) } else if len(hits) > 0 { + // Filter playbook hits to honor ExcludeIDs — without this, + // an excluded answer in a playbook recording would re-enter + // the result set via Shape B inject, defeating the swap + // semantics that the exclude list exists to enforce. + if len(req.ExcludeIDs) > 0 { + excludeSet := make(map[string]bool, len(req.ExcludeIDs)) + for _, id := range req.ExcludeIDs { + excludeSet[id] = true + } + keptHits := make([]PlaybookHit, 0, len(hits)) + for _, h := range hits { + if !excludeSet[h.Entry.AnswerID] { + keptHits = append(keptHits, h) + } + } + hits = keptHits + } resp.PlaybookBoosted = ApplyPlaybookBoost(resp.Results, hits) maxInjectDist := float32(req.PlaybookMaxInjectDistance) if maxInjectDist <= 0 { diff --git a/reports/reality-tests/multi_coord_stress_003.md b/reports/reality-tests/multi_coord_stress_003.md new file mode 100644 index 0000000..292914d --- /dev/null +++ b/reports/reality-tests/multi_coord_stress_003.md @@ -0,0 +1,82 @@ +# Multi-Coordinator Stress Test — Run 003 + +**Generated:** 2026-04-30T13:13:44.35966865Z +**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`) +**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction +**Corpora:** `workers,ethereal_workers` +**K per query:** 8 +**Total events captured:** 61 +**Evidence:** `reports/reality-tests/multi_coord_stress_003.json` + +--- + +## Diversity — is the system locking into scenarios or cycling? + +| Metric | Mean Jaccard | n pairs | Interpretation | +|---|---:|---:|---| +| Same role across different contracts | 0.03068783068783069 | 9 | Lower = more diverse (different region/cert mix → different workers) | +| Different roles within same contract | 0 | 18 | Should be near-zero (different roles = different worker pools) | + +**Healthy ranges:** +- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract. +- Different roles same contract: < 0.10 means role-specific retrieval is working. +- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | 1 | +| Number of reissue pairs | 12 | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Verbatim handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (verbatim) | 4 | +| Alice's recorded answer in Bob's top-K (verbatim) | 4 | +| **Verbatim handover hit rate (top-1)** | **1** | +| Paraphrase handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (paraphrase) | 4 | +| Alice's recorded answer in Bob's top-K (paraphrase) | 4 | +| **Paraphrase handover hit rate (top-1)** | **1** | + +**Interpretation:** +- Verbatim hit rate ≈ 1.0: trivial case — Bob runs identical queries; should always hit. +- Paraphrase hit rate ≥ 0.5: institutional memory survives wording change — the harder learning property. +- Paraphrase hit rate ≈ 0.0: Bob's paraphrases drift past the inject threshold, so Alice's recordings don't activate. Same caveat as the playbook_lift paraphrase pass. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +```bash +jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_003.json +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_003.json +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_003.json +``` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. diff --git a/reports/reality-tests/multi_coord_stress_004.md b/reports/reality-tests/multi_coord_stress_004.md new file mode 100644 index 0000000..e553e24 --- /dev/null +++ b/reports/reality-tests/multi_coord_stress_004.md @@ -0,0 +1,82 @@ +# Multi-Coordinator Stress Test — Run 004 + +**Generated:** 2026-04-30T13:17:03.577877974Z +**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`) +**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction +**Corpora:** `workers,ethereal_workers` +**K per query:** 8 +**Total events captured:** 61 +**Evidence:** `reports/reality-tests/multi_coord_stress_004.json` + +--- + +## Diversity — is the system locking into scenarios or cycling? + +| Metric | Mean Jaccard | n pairs | Interpretation | +|---|---:|---:|---| +| Same role across different contracts | 0.08013468013468013 | 9 | Lower = more diverse (different region/cert mix → different workers) | +| Different roles within same contract | 0.012820512820512822 | 18 | Should be near-zero (different roles = different worker pools) | + +**Healthy ranges:** +- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract. +- Different roles same contract: < 0.10 means role-specific retrieval is working. +- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | 1 | +| Number of reissue pairs | 12 | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Verbatim handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (verbatim) | 4 | +| Alice's recorded answer in Bob's top-K (verbatim) | 4 | +| **Verbatim handover hit rate (top-1)** | **1** | +| Paraphrase handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (paraphrase) | 4 | +| Alice's recorded answer in Bob's top-K (paraphrase) | 4 | +| **Paraphrase handover hit rate (top-1)** | **1** | + +**Interpretation:** +- Verbatim hit rate ≈ 1.0: trivial case — Bob runs identical queries; should always hit. +- Paraphrase hit rate ≥ 0.5: institutional memory survives wording change — the harder learning property. +- Paraphrase hit rate ≈ 0.0: Bob's paraphrases drift past the inject threshold, so Alice's recordings don't activate. Same caveat as the playbook_lift paraphrase pass. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +```bash +jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_004.json +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_004.json +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_004.json +``` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go index 8d18cb0..e0b6a85 100644 --- a/scripts/multi_coord_stress/main.go +++ b/scripts/multi_coord_stress/main.go @@ -68,6 +68,7 @@ type matrixSearchReq struct { K int `json:"k"` UsePlaybook bool `json:"use_playbook,omitempty"` PlaybookCorpus string `json:"playbook_corpus,omitempty"` + ExcludeIDs []string `json:"exclude_ids,omitempty"` } type matrixResult struct { @@ -95,6 +96,7 @@ type ResultRef struct { type Event struct { Phase string `json:"phase"` + Hour int `json:"hour"` // operational-narrative time label, not real wall clock Coordinator string `json:"coordinator"` Contract string `json:"contract"` Role string `json:"role"` @@ -102,6 +104,7 @@ type Event struct { SurgeMultiplier int `json:"surge_multiplier,omitempty"` UsePlaybook bool `json:"use_playbook"` PlaybookCorpus string `json:"playbook_corpus,omitempty"` + ExcludeIDs []string `json:"exclude_ids,omitempty"` TopK []ResultRef `json:"top_k"` PerCorpusCounts map[string]int `json:"per_corpus_counts,omitempty"` PlaybookBoosted int `json:"playbook_boosted,omitempty"` @@ -224,7 +227,7 @@ func main() { for _, d := range c.Demand { q := buildQuery(c, d, 1) resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) - ev := captureEvent("baseline", coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp) + ev := captureEvent("baseline", 0, coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp) output.Events = append(output.Events, ev) // Record top-1 as a successful playbook entry for this coord. if len(resp.Results) > 0 { @@ -235,6 +238,72 @@ func main() { } } + // ── Phase 1b: new-resume injection (Hour 6) ───────────────── + // Mid-day, three new resumes arrive — workers with no prior + // history. We embed + add them to the workers vectord index, + // then verify they're findable by their unique skill marker. + // Tests the substrate's ability to absorb fresh candidates + // without restart. + log.Printf("[stress] phase 1b: new-resume injection (3 fresh workers, verify findable)") + // Each fresh worker has a SEMANTIC query that should surface them + // based on the actual content of their resume — role + skills + + // location. nomic-embed-text is dense/semantic, NOT lexical, so a + // "find me FRESHTAG_..." style unique-substring query does NOT + // surface the fresh worker; the embedder weights rare substrings + // as low-information noise. The semantic query below represents + // what a real coordinator would actually issue. + freshWorkers := []struct { + ID string + Resume string + Verify string // semantic query expected to surface this worker + }{ + { + ID: "fresh-001", + Resume: "Senior rigger with 12 years tower-crane signaling experience. NCCCO crane signal/rigger certification active. Chicago IL metro, available immediately. Construction-site rigging specialist.", + Verify: "Senior tower crane rigger NCCCO certification Chicago construction signaling", + }, + { + ID: "fresh-002", + Resume: "Bilingual safety coordinator (Spanish + English). OSHA trainer credentials, 8 years manufacturing safety training delivery. Indianapolis IN. Manages multilingual crew safety briefings and incident documentation.", + Verify: "Bilingual Spanish English OSHA trainer safety coordinator Indianapolis manufacturing", + }, + { + ID: "fresh-003", + Resume: "FAA Part 107 certified drone pilot. UAV site surveying with GIS mapping output for construction site progress reports. Chicago IL metro. 5 years aerial surveying for general contractors.", + Verify: "FAA Part 107 drone surveyor UAV pilot GIS construction site mapping Chicago", + }, + } + for _, fw := range freshWorkers { + if err := ingestFreshWorker(hc, *gateway, fw.ID, fw.Resume, map[string]any{ + "name": fw.ID, + "role": "fresh-resume", + "source": "phase-1b-injection", + }); err != nil { + log.Fatalf("ingest fresh worker %s: %v", fw.ID, err) + } + } + for _, fw := range freshWorkers { + resp := must(matrixSearch(hc, *gateway, fw.Verify, corpora, *k, false, "")) + ev := captureEvent("new-resume-verify", 6, "system", "fresh-resume-pool", "fresh", fw.Verify, 1, false, "", resp) + // Find the fresh worker's rank in top-K (rank 0 = top-1). + freshRank := -1 + for i, r := range resp.Results { + if r.ID == fw.ID { + freshRank = i + break + } + } + switch { + case freshRank == 0: + ev.Note = fmt.Sprintf("fresh worker %s at top-1 — semantic absorption working", fw.ID) + case freshRank > 0: + ev.Note = fmt.Sprintf("fresh worker %s at rank %d (in top-K but not top-1)", fw.ID, freshRank) + default: + ev.Note = fmt.Sprintf("fresh worker %s NOT in top-K (top-1 was %s) — embedder didn't surface fresh-resume content over existing population", fw.ID, resp.Results[0].ID) + } + output.Events = append(output.Events, ev) + } + // ── Phase 2: surge ────────────────────────────────────────── // Each coord's contract demand doubles. URGENT phrasing. log.Printf("[stress] phase 2: surge (2x demand, urgent phrasing)") @@ -243,11 +312,40 @@ func main() { for _, d := range c.Demand { q := buildQuery(c, d, 2) resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) - ev := captureEvent("surge", coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) + ev := captureEvent("surge", 12, coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) output.Events = append(output.Events, ev) } } + // ── Phase 2b: 200-worker swap (Hour 18) ────────────────────── + // Alpha's client says "the 200 workers you placed are unavailable + // — find replacements." We capture the top-K from the warehouse + // query, then re-issue the same query with those IDs excluded. + // Real product test: does the system find genuinely different + // candidates, or does it sit on the same population? + log.Printf("[stress] phase 2b: 200-worker swap (alpha warehouse — exclude originally placed)") + warehouseDemand := contracts[0].Demand[0] // slot 0 is warehouse worker by contract design + swapQuery := buildQuery(&contracts[0], warehouseDemand, 1) + origResp := must(matrixSearch(hc, *gateway, swapQuery, corpora, *k, false, "")) + placedIDs := make([]string, 0, len(origResp.Results)) + for _, r := range origResp.Results { + placedIDs = append(placedIDs, r.ID) + } + origEv := captureEvent("swap-original", 18, "alice", contracts[0].Name, warehouseDemand.Role, swapQuery, 1, false, "", origResp) + origEv.Note = fmt.Sprintf("captured %d originally-placed worker IDs", len(placedIDs)) + output.Events = append(output.Events, origEv) + + swapResp := must(matrixSearch(hc, *gateway, swapQuery, corpora, *k, false, "", placedIDs...)) + swapEv := captureEvent("swap-replace", 18, "alice", contracts[0].Name, warehouseDemand.Role, swapQuery, 1, false, "", swapResp) + swapEv.ExcludeIDs = placedIDs + swapIDs := make([]string, 0, len(swapResp.Results)) + for _, r := range swapResp.Results { + swapIDs = append(swapIDs, r.ID) + } + swapJacc := jaccardStrings(placedIDs, swapIDs) + swapEv.Note = fmt.Sprintf("Jaccard(orig, swap) = %.3f (lower = better; 0 = fully replaced)", swapJacc) + output.Events = append(output.Events, swapEv) + // ── Phase 3: merge — alpha + beta combined under alice ────── log.Printf("[stress] phase 3: merge (alpha + beta combined, alice handles)") mergedDemand := append(append([]Demand{}, contracts[0].Demand...), contracts[1].Demand...) @@ -255,7 +353,7 @@ func main() { mergedC := &Contract{Name: contracts[0].Name + "+" + contracts[1].Name, Location: contracts[0].Location + " + " + contracts[1].Location, Shift: "shared"} q := buildQuery(mergedC, d, 1) resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus)) - ev := captureEvent("merge", "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) + ev := captureEvent("merge", 24, "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) output.Events = append(output.Events, ev) } @@ -275,7 +373,7 @@ func main() { for _, d := range contracts[0].Demand { q := buildQuery(&contracts[0], d, 1) resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus)) - ev := captureEvent("handover", "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) + ev := captureEvent("handover", 30, "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) output.Events = append(output.Events, ev) handoverRun++ recordedID, ok := aliceRecordedAnswers[d.Role] @@ -325,7 +423,7 @@ func main() { log.Printf(" paraphrase search failed for %s: %v", d.Role, err) continue } - ev := captureEvent("handover-paraphrase", "bob", contracts[0].Name, d.Role, paraphrase, 1, true, coords[0].PlaybookCorpus, resp) + ev := captureEvent("handover-paraphrase", 36, "bob", contracts[0].Name, d.Role, paraphrase, 1, true, coords[0].PlaybookCorpus, resp) ev.Note = "paraphrase of: " + origQuery output.Events = append(output.Events, ev) pHandoverRun++ @@ -360,7 +458,7 @@ func main() { c := &contracts[0] q := buildQuery(c, d, 2) resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) - ev := captureEvent("split", coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) + ev := captureEvent("split", 42, coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) output.Events = append(output.Events, ev) } @@ -373,14 +471,14 @@ func main() { continue } resp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) // playbook OFF for reissue to isolate retrieval stability - reissue := captureEvent("reissue", ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp) + reissue := captureEvent("reissue", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp) output.Events = append(output.Events, reissue) // Compare against ev.TopK (also playbook-on baseline). Note: // this conflates retrieval stability with playbook stability. // We capture both ev (playbook on) and a fresh retrieval (off); // real determinism = retrieval-only top-K comparison. freshRetrievalResp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) - freshRetrievalEv := captureEvent("reissue-retrieval-only", ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp) + freshRetrievalEv := captureEvent("reissue-retrieval-only", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp) j := jaccardTopK(reissue.TopK, freshRetrievalEv.TopK) jaccards = append(jaccards, j) } @@ -526,13 +624,14 @@ func buildQuery(c *Contract, d Demand, surge int) string { return b.String() } -func captureEvent(phase, coord, contract, role, query string, surge int, usePlaybook bool, pbCorpus string, resp *matrixResp) Event { +func captureEvent(phase string, hour int, coord, contract, role, query string, surge int, usePlaybook bool, pbCorpus string, resp *matrixResp) Event { topK := make([]ResultRef, 0, len(resp.Results)) for i, r := range resp.Results { topK = append(topK, ResultRef{Rank: i, ID: r.ID, Corpus: r.Corpus, Distance: r.Distance}) } return Event{ Phase: phase, + Hour: hour, Coordinator: coord, Contract: contract, Role: role, @@ -661,13 +760,14 @@ func mean(xs []float64) float64 { // ── HTTP helpers ───────────────────────────────────────────────── -func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, usePlaybook bool, playbookCorpus string) (*matrixResp, error) { +func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, usePlaybook bool, playbookCorpus string, excludeIDs ...string) (*matrixResp, error) { body, _ := json.Marshal(matrixSearchReq{ QueryText: query, Corpora: corpora, K: k, UsePlaybook: usePlaybook, PlaybookCorpus: playbookCorpus, + ExcludeIDs: excludeIDs, }) req, _ := http.NewRequest("POST", gw+"/v1/matrix/search", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") @@ -687,6 +787,57 @@ func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, us return &out, nil } +// ingestFreshWorker embeds + adds a single fresh worker to the +// vectord 'workers' index. Two HTTP hops via the gateway: /v1/embed +// to get the vector, /v1/vectors/workers/add to insert. Used by the +// new-resume-injection phase to test mid-run absorption of fresh +// candidates without restart. +func ingestFreshWorker(hc *http.Client, gw, id, text string, metadata map[string]any) error { + embedBs, _ := json.Marshal(map[string]any{ + "texts": []string{text}, + "model": "nomic-embed-text", + }) + req, _ := http.NewRequest("POST", gw+"/v1/embed", bytes.NewReader(embedBs)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return fmt.Errorf("embed: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + rb, _ := io.ReadAll(resp.Body) + return fmt.Errorf("embed %d: %s", resp.StatusCode, string(rb)) + } + var er struct { + Vectors [][]float32 `json:"vectors"` + } + if err := json.NewDecoder(resp.Body).Decode(&er); err != nil { + return fmt.Errorf("decode embed: %w", err) + } + if len(er.Vectors) == 0 || len(er.Vectors[0]) == 0 { + return fmt.Errorf("embed returned no vectors") + } + + metaBs, _ := json.Marshal(metadata) + addBs, _ := json.Marshal(map[string]any{ + "items": []map[string]any{ + {"id": id, "vector": er.Vectors[0], "metadata": json.RawMessage(metaBs)}, + }, + }) + req2, _ := http.NewRequest("POST", gw+"/v1/vectors/index/workers/add", bytes.NewReader(addBs)) + req2.Header.Set("Content-Type", "application/json") + resp2, err := hc.Do(req2) + if err != nil { + return fmt.Errorf("vectord add: %w", err) + } + defer resp2.Body.Close() + if resp2.StatusCode/100 != 2 { + rb, _ := io.ReadAll(resp2.Body) + return fmt.Errorf("vectord add %d: %s", resp2.StatusCode, string(rb)) + } + return nil +} + func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, score float64, corpus string) error { body, _ := json.Marshal(map[string]any{ "query_text": query,