diff --git a/reports/reality-tests/multi_coord_stress_011.md b/reports/reality-tests/multi_coord_stress_011.md new file mode 100644 index 0000000..9101c4b --- /dev/null +++ b/reports/reality-tests/multi_coord_stress_011.md @@ -0,0 +1,82 @@ +# Multi-Coordinator Stress Test — Run 011 + +**Generated:** 2026-04-30T21:41:26.801002955Z +**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`) +**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction +**Corpora:** `workers,ethereal_workers` +**K per query:** 8 +**Total events captured:** 67 +**Evidence:** `reports/reality-tests/multi_coord_stress_011.json` + +--- + +## Diversity — is the system locking into scenarios or cycling? + +| Metric | Mean Jaccard | n pairs | Interpretation | +|---|---:|---:|---| +| Same role across different contracts | 0.025641025641025644 | 9 | Lower = more diverse (different region/cert mix → different workers) | +| Different roles within same contract | 0.06996336996336996 | 18 | Should be near-zero (different roles = different worker pools) | + +**Healthy ranges:** +- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract. +- Different roles same contract: < 0.10 means role-specific retrieval is working. +- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | 1 | +| Number of reissue pairs | 12 | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Verbatim handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (verbatim) | 4 | +| Alice's recorded answer in Bob's top-K (verbatim) | 4 | +| **Verbatim handover hit rate (top-1)** | **1** | +| Paraphrase handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 (paraphrase) | 4 | +| Alice's recorded answer in Bob's top-K (paraphrase) | 4 | +| **Paraphrase handover hit rate (top-1)** | **1** | + +**Interpretation:** +- Verbatim hit rate ≈ 1.0: trivial case — Bob runs identical queries; should always hit. +- Paraphrase hit rate ≥ 0.5: institutional memory survives wording change — the harder learning property. +- Paraphrase hit rate ≈ 0.0: Bob's paraphrases drift past the inject threshold, so Alice's recordings don't activate. Same caveat as the playbook_lift paraphrase pass. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +```bash +jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_011.json +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_011.json +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_011.json +``` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go index 1fa6fab..73f2f49 100644 --- a/scripts/multi_coord_stress/main.go +++ b/scripts/multi_coord_stress/main.go @@ -224,10 +224,20 @@ func main() { // unreachable Langfuse just means traces don't go anywhere; the // run still proceeds. var lf *langfuse.Client + var runTraceID string + var currentPhaseSpanID string if *langfuseEnv != "" { if creds, err := loadLangfuseEnv(*langfuseEnv); err == nil { lf = langfuse.New(creds.URL, creds.PublicKey, creds.SecretKey, nil) log.Printf("[stress] Langfuse client live → %s", creds.URL) + runTraceID = lf.Trace(ctx, langfuse.TraceInput{ + Name: "multi_coord_stress run", + Tags: []string{"stress", "multi-coord"}, + Metadata: map[string]any{ + "corpora": corpora, + "k": *k, + }, + }) defer func() { if err := lf.Flush(context.Background()); err != nil { log.Printf("[stress] Langfuse final flush: %v", err) @@ -238,6 +248,68 @@ func main() { } } + // startPhase begins a new phase span (child of the run trace). + // Subsequent emitSpan calls nest under it. Idempotent — returns + // "" when Langfuse isn't configured so callers don't need nil + // checks. + startPhase := func(name string, hour int, meta map[string]any) { + if lf == nil { + return + } + spanMeta := map[string]any{"hour": hour} + for k, v := range meta { + spanMeta[k] = v + } + currentPhaseSpanID = lf.Span(ctx, langfuse.SpanInput{ + TraceID: runTraceID, + Name: name, + Metadata: spanMeta, + StartTime: time.Now(), + }) + } + // emitSpan records one span as a child of the current phase span. + // Always pair with a matching `defer` style call so durations are + // real (not 0). + emitSpan := func(name string, start time.Time, input, output any, level string) { + if lf == nil { + return + } + lf.Span(ctx, langfuse.SpanInput{ + TraceID: runTraceID, + ParentID: currentPhaseSpanID, + Name: name, + Input: input, + Output: output, + StartTime: start, + EndTime: time.Now(), + Level: level, + }) + } + // tracedSearch wraps matrixSearch with span emission. Every + // search-call-site in the phases below uses this so Langfuse + // captures every retrieval with its inputs (query, playbook, + // excludes) and outputs (top-K ids, top-1 distance, boost/inject + // counts). Caller still must() if they want the fail-fast behavior; + // errors here are emitted as ERROR spans + propagated. + tracedSearch := func(spanName, query string, searchCorpora []string, usePlaybook bool, pbCorpus string, excludeIDs ...string) *matrixResp { + start := time.Now() + resp, err := matrixSearch(hc, *gateway, query, searchCorpora, *k, usePlaybook, pbCorpus, excludeIDs...) + if err != nil { + emitSpan(spanName, start, + map[string]any{"query": query, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)}, + map[string]any{"error": err.Error()}, "ERROR") + log.Fatalf("[stress] %v", err) + } + topIDs := make([]string, 0, len(resp.Results)) + for _, r := range resp.Results { + topIDs = append(topIDs, r.ID) + } + emitSpan(spanName, start, + map[string]any{"query": query, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)}, + map[string]any{"top_k_ids": topIDs, "top1_distance": firstDistance(resp.Results), "playbook_boosted": resp.PlaybookBoosted, "playbook_injected": resp.PlaybookInjected}, "") + return resp + } + output := Output{ Coordinators: []string{"alice", "bob", "carol"}, Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name}, @@ -251,11 +323,12 @@ func main() { // playbook entries (top-1 of each as a synthetic "successful // match" outcome) into their personal namespace. log.Printf("[stress] phase 1: baseline") + startPhase("phase.baseline", 0, nil) for _, coord := range coords { c := assignments[coord.Name] for _, d := range c.Demand { q := buildQuery(c, d, 1) - resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) + resp := tracedSearch("matrix.search.baseline", q, corpora, true, coord.PlaybookCorpus) ev := captureEvent("baseline", 0, coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp) output.Events = append(output.Events, ev) // Record top-1 as a successful playbook entry for this coord. @@ -274,6 +347,7 @@ func main() { // Tests the substrate's ability to absorb fresh candidates // without restart. log.Printf("[stress] phase 1b: new-resume injection (3 fresh workers, verify findable)") + startPhase("phase.new_resume_injection", 6, nil) // Each fresh worker has a SEMANTIC query that should surface them // based on the actual content of their resume — role + skills + // location. nomic-embed-text is dense/semantic, NOT lexical, so a @@ -321,7 +395,7 @@ func main() { verifyCorpora := append([]string{}, corpora...) verifyCorpora = append(verifyCorpora, freshIdx) for _, fw := range freshWorkers { - resp := must(matrixSearch(hc, *gateway, fw.Verify, verifyCorpora, *k, false, "")) + resp := tracedSearch("matrix.search.fresh_verify", fw.Verify, verifyCorpora, false, "") ev := captureEvent("new-resume-verify", 6, "system", "fresh-resume-pool", "fresh", fw.Verify, 1, false, "", resp) // Find the fresh worker's rank in top-K (rank 0 = top-1). freshRank := -1 @@ -356,19 +430,7 @@ func main() { // fire in their preferred order); this phase verifies the // recording surface and the search-from-inbox flow work. log.Printf("[stress] phase 1c: inbox burst (6 events, priority-ordered)") - var inboxTraceID string - if lf != nil { - inboxTraceID = lf.Trace(ctx, langfuse.TraceInput{ - Name: "multi_coord_stress phase 1c inbox burst", - Tags: []string{"stress", "inbox", "phase-1c"}, - Metadata: map[string]any{ - "hour": 9, - "corpora": corpora, - "k": *k, - "event_count": 6, - }, - }) - } + startPhase("phase.inbox_burst", 9, map[string]any{"event_count": 6}) type inboxEvent struct { Priority string // "urgent" | "high" | "medium" | "low" Type string // "email" | "sms" @@ -425,82 +487,43 @@ func main() { log.Printf(" inbox record failed (%s): %v", ie.Priority, err) continue } - if lf != nil && inboxTraceID != "" { - lf.Span(ctx, langfuse.SpanInput{ - TraceID: inboxTraceID, - Name: "observerd.inbox.record", - Input: map[string]any{"type": ie.Type, "sender": ie.Sender, "priority": ie.Priority, "subject": ie.Subject, "body_chars": len(ie.Body)}, - Output: map[string]any{"accepted": true}, - StartTime: stepStart, - EndTime: time.Now(), - Metadata: map[string]any{"coordinator": ie.Coord}, - }) - } + emitSpan("observerd.inbox.record", stepStart, + map[string]any{"type": ie.Type, "sender": ie.Sender, "priority": ie.Priority, "subject": ie.Subject, "body_chars": len(ie.Body), "coordinator": ie.Coord}, + map[string]any{"accepted": true}, "") // 2. LLM parses the body into a structured demand. parseStart := time.Now() parsed, perr := parseInboxDemand(hc, *ollama, *judgeModel, ie.Body) - parseEnd := time.Now() if perr != nil { - if lf != nil && inboxTraceID != "" { - lf.Span(ctx, langfuse.SpanInput{ - TraceID: inboxTraceID, - Name: "llm.parse_demand", - Input: map[string]any{"body": ie.Body, "model": *judgeModel}, - Output: map[string]any{"error": perr.Error()}, - StartTime: parseStart, - EndTime: parseEnd, - Level: "ERROR", - }) - } + emitSpan("llm.parse_demand", parseStart, + map[string]any{"body": ie.Body, "model": *judgeModel}, + map[string]any{"error": perr.Error()}, "ERROR") log.Printf(" inbox demand parse failed (%s): %v", ie.Priority, perr) continue } - if lf != nil && inboxTraceID != "" { - lf.Span(ctx, langfuse.SpanInput{ - TraceID: inboxTraceID, - Name: "llm.parse_demand", - Input: map[string]any{"body": ie.Body, "model": *judgeModel}, - Output: parsed, - StartTime: parseStart, - EndTime: parseEnd, - }) - } + emitSpan("llm.parse_demand", parseStart, + map[string]any{"body": ie.Body, "model": *judgeModel}, + parsed, "") // 3. Build a query string from the parsed demand and search. query := parsed.AsQuery() coord := coordByName(coords, ie.Coord) searchStart := time.Now() resp, err := matrixSearch(hc, *gateway, query, corpora, *k, true, coord.PlaybookCorpus) - searchEnd := time.Now() if err != nil { + emitSpan("matrix.search.inbox", searchStart, + map[string]any{"query": query, "corpora": corpora, "k": *k}, + map[string]any{"error": err.Error()}, "ERROR") log.Printf(" inbox-triggered search failed (%s): %v", ie.Priority, err) continue } - if lf != nil && inboxTraceID != "" { - topIDs := make([]string, 0, len(resp.Results)) - for _, r := range resp.Results { - topIDs = append(topIDs, r.ID) - } - lf.Span(ctx, langfuse.SpanInput{ - TraceID: inboxTraceID, - Name: "matrix.search", - Input: map[string]any{ - "query": query, - "corpora": corpora, - "k": *k, - "playbook_corpus": coord.PlaybookCorpus, - }, - Output: map[string]any{ - "top_k_ids": topIDs, - "top1_distance": firstDistance(resp.Results), - "playbook_boosted": resp.PlaybookBoosted, - "playbook_injected": resp.PlaybookInjected, - }, - StartTime: searchStart, - EndTime: searchEnd, - }) + topIDs := make([]string, 0, len(resp.Results)) + for _, r := range resp.Results { + topIDs = append(topIDs, r.ID) } + emitSpan("matrix.search.inbox", searchStart, + map[string]any{"query": query, "corpora": corpora, "k": *k, "playbook_corpus": coord.PlaybookCorpus}, + map[string]any{"top_k_ids": topIDs, "top1_distance": firstDistance(resp.Results), "playbook_boosted": resp.PlaybookBoosted, "playbook_injected": resp.PlaybookInjected}, "") ev := captureEvent("inbox-triggered-search", 9, ie.Coord, "inbox-burst", ie.Subject, query, 1, true, coord.PlaybookCorpus, resp) parsedJSON, _ := json.Marshal(parsed) ev.Note = fmt.Sprintf("inbox %s/%s from %s · LLM-parsed demand: %s", ie.Type, ie.Priority, ie.Sender, string(parsedJSON)) @@ -510,20 +533,9 @@ func main() { judgeStart := time.Now() rating := judgeInboxResult(hc, *ollama, *judgeModel, ie.Body, resp.Results[0]) ev.JudgeRating = rating - if lf != nil && inboxTraceID != "" { - lf.Span(ctx, langfuse.SpanInput{ - TraceID: inboxTraceID, - Name: "llm.judge_top1", - Input: map[string]any{ - "original_body": ie.Body, - "top1_id": resp.Results[0].ID, - "top1_corpus": resp.Results[0].Corpus, - }, - Output: map[string]any{"rating": rating}, - StartTime: judgeStart, - EndTime: time.Now(), - }) - } + emitSpan("llm.judge_top1", judgeStart, + map[string]any{"original_body": ie.Body, "top1_id": resp.Results[0].ID, "top1_corpus": resp.Results[0].Corpus}, + map[string]any{"rating": rating}, "") } output.Events = append(output.Events, ev) } @@ -531,11 +543,12 @@ func main() { // ── Phase 2: surge ────────────────────────────────────────── // Each coord's contract demand doubles. URGENT phrasing. log.Printf("[stress] phase 2: surge (2x demand, urgent phrasing)") + startPhase("phase.surge", 12, nil) for _, coord := range coords { c := assignments[coord.Name] for _, d := range c.Demand { q := buildQuery(c, d, 2) - resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) + resp := tracedSearch("matrix.search.surge", q, corpora, true, coord.PlaybookCorpus) ev := captureEvent("surge", 12, coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) output.Events = append(output.Events, ev) } @@ -548,9 +561,10 @@ func main() { // Real product test: does the system find genuinely different // candidates, or does it sit on the same population? log.Printf("[stress] phase 2b: 200-worker swap (alpha warehouse — exclude originally placed)") + startPhase("phase.swap_200_workers", 18, nil) warehouseDemand := contracts[0].Demand[0] // slot 0 is warehouse worker by contract design swapQuery := buildQuery(&contracts[0], warehouseDemand, 1) - origResp := must(matrixSearch(hc, *gateway, swapQuery, corpora, *k, false, "")) + origResp := tracedSearch("matrix.search.swap_orig", swapQuery, corpora, false, "") placedIDs := make([]string, 0, len(origResp.Results)) for _, r := range origResp.Results { placedIDs = append(placedIDs, r.ID) @@ -559,7 +573,7 @@ func main() { origEv.Note = fmt.Sprintf("captured %d originally-placed worker IDs", len(placedIDs)) output.Events = append(output.Events, origEv) - swapResp := must(matrixSearch(hc, *gateway, swapQuery, corpora, *k, false, "", placedIDs...)) + swapResp := tracedSearch("matrix.search.swap_replace", swapQuery, corpora, false, "", placedIDs...) swapEv := captureEvent("swap-replace", 18, "alice", contracts[0].Name, warehouseDemand.Role, swapQuery, 1, false, "", swapResp) swapEv.ExcludeIDs = placedIDs swapIDs := make([]string, 0, len(swapResp.Results)) @@ -572,11 +586,12 @@ func main() { // ── Phase 3: merge — alpha + beta combined under alice ────── log.Printf("[stress] phase 3: merge (alpha + beta combined, alice handles)") + startPhase("phase.merge", 24, nil) mergedDemand := append(append([]Demand{}, contracts[0].Demand...), contracts[1].Demand...) for _, d := range mergedDemand { mergedC := &Contract{Name: contracts[0].Name + "+" + contracts[1].Name, Location: contracts[0].Location + " + " + contracts[1].Location, Shift: "shared"} q := buildQuery(mergedC, d, 1) - resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus)) + resp := tracedSearch("matrix.search.merge", q, corpora, true, coords[0].PlaybookCorpus) ev := captureEvent("merge", 24, "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) output.Events = append(output.Events, ev) } @@ -585,6 +600,7 @@ func main() { // alice's playbook namespace. Tests whether Alice's recordings // surface in Bob's results when Bob runs Alice's contract. log.Printf("[stress] phase 4: handover (bob takes alpha, using alice's playbook)") + startPhase("phase.handover_verbatim", 30, nil) aliceRecordedAnswers := map[string]string{} // role → recorded answer id for _, ev := range output.Events { if ev.Phase == "baseline" && ev.Coordinator == "alice" && len(ev.TopK) > 0 { @@ -596,7 +612,7 @@ func main() { handoverRun := 0 for _, d := range contracts[0].Demand { q := buildQuery(&contracts[0], d, 1) - resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus)) + resp := tracedSearch("matrix.search.handover_verbatim", q, corpora, true, coords[0].PlaybookCorpus) ev := captureEvent("handover", 30, "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) output.Events = append(output.Events, ev) handoverRun++ @@ -632,21 +648,25 @@ func main() { // naturally introduce? if *withParaphraseHandover { log.Printf("[stress] phase 4b: paraphrase handover (bob runs paraphrased versions of alice's queries)") + startPhase("phase.handover_paraphrase", 36, nil) pHandoverRun := 0 pTop1 := 0 pTopK := 0 for _, d := range contracts[0].Demand { origQuery := buildQuery(&contracts[0], d, 1) + paraStart := time.Now() paraphrase, err := generateParaphrase(hc, *ollama, *judgeModel, origQuery) if err != nil { + emitSpan("llm.paraphrase", paraStart, + map[string]any{"original": origQuery, "model": *judgeModel}, + map[string]any{"error": err.Error()}, "ERROR") log.Printf(" paraphrase gen failed for %s: %v", d.Role, err) continue } - resp, err := matrixSearch(hc, *gateway, paraphrase, corpora, *k, true, coords[0].PlaybookCorpus) - if err != nil { - log.Printf(" paraphrase search failed for %s: %v", d.Role, err) - continue - } + emitSpan("llm.paraphrase", paraStart, + map[string]any{"original": origQuery, "model": *judgeModel}, + map[string]any{"paraphrase": paraphrase}, "") + resp := tracedSearch("matrix.search.handover_paraphrase", paraphrase, corpora, true, coords[0].PlaybookCorpus) ev := captureEvent("handover-paraphrase", 36, "bob", contracts[0].Name, d.Role, paraphrase, 1, true, coords[0].PlaybookCorpus, resp) ev.Note = "paraphrase of: " + origQuery output.Events = append(output.Events, ev) @@ -677,11 +697,12 @@ func main() { // ── Phase 5: split — surge re-distributed across 3 coords ── log.Printf("[stress] phase 5: split (alpha surge spread across all 3 coords)") + startPhase("phase.split", 42, nil) for i, d := range contracts[0].Demand { coord := coords[i%len(coords)] c := &contracts[0] q := buildQuery(c, d, 2) - resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) + resp := tracedSearch("matrix.search.split", q, corpora, true, coord.PlaybookCorpus) ev := captureEvent("split", 42, coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) output.Events = append(output.Events, ev) } @@ -689,19 +710,20 @@ func main() { // ── Phase 6: non-determinism check ───────────────────────── // Reissue each baseline query once and compare top-K Jaccard. log.Printf("[stress] phase 6: non-determinism (reissue baselines, measure Jaccard)") + startPhase("phase.reissue", 48, nil) jaccards := []float64{} for _, ev := range output.Events { if ev.Phase != "baseline" { continue } - resp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) // playbook OFF for reissue to isolate retrieval stability + resp := tracedSearch("matrix.search.reissue", ev.Query, corpora, false, "") reissue := captureEvent("reissue", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp) output.Events = append(output.Events, reissue) // Compare against ev.TopK (also playbook-on baseline). Note: // this conflates retrieval stability with playbook stability. // We capture both ev (playbook on) and a fresh retrieval (off); // real determinism = retrieval-only top-K comparison. - freshRetrievalResp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) + freshRetrievalResp := tracedSearch("matrix.search.reissue_retrieval_only", ev.Query, corpora, false, "") freshRetrievalEv := captureEvent("reissue-retrieval-only", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp) j := jaccardTopK(reissue.TopK, freshRetrievalEv.TopK) jaccards = append(jaccards, j)