diff --git a/reports/reality-tests/multi_coord_stress_001.md b/reports/reality-tests/multi_coord_stress_001.md new file mode 100644 index 0000000..81d1ce1 --- /dev/null +++ b/reports/reality-tests/multi_coord_stress_001.md @@ -0,0 +1,77 @@ +# Multi-Coordinator Stress Test — Run 001 + +**Generated:** 2026-04-30T12:54:09.621556469Z +**Coordinators:** alice / bob / carol (each with own playbook namespace: `playbook_alice` / `playbook_bob` / `playbook_carol`) +**Contracts:** alpha_milwaukee_distribution / beta_indianapolis_manufacturing / gamma_chicago_construction +**Corpora:** `workers,ethereal_workers` +**K per query:** 8 +**Total events captured:** 52 +**Evidence:** `reports/reality-tests/multi_coord_stress_001.json` + +--- + +## Diversity — is the system locking into scenarios or cycling? + +| Metric | Mean Jaccard | n pairs | Interpretation | +|---|---:|---:|---| +| Same role across different contracts | 0 | 0 | Lower = more diverse (different region/cert mix → different workers) | +| Different roles within same contract | 0.003703703703703704 | 18 | Should be near-zero (different roles = different worker pools) | + +**Healthy ranges:** +- Same role across contracts: < 0.30 means the system is genuinely picking different workers per region/contract. +- Different roles same contract: < 0.10 means role-specific retrieval is working. +- If either is > 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | 1 | +| Number of reissue pairs | 12 | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Handover queries run | 4 | +| Alice's recorded answer at Bob's top-1 | 4 | +| Alice's recorded answer in Bob's top-K | 4 | +| **Handover hit rate (top-1)** | **1** | + +**Interpretation:** +- Hit rate ≥ 0.5: handover is meaningful — the second coordinator inherits the first's institutional memory. +- Hit rate ≈ 0.0: playbook namespace isolation is working but the playbook itself isn't transferable, OR Bob's queries don't match Alice's recordings closely enough. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +```bash +jq '.events[] | select(.phase == "merge")' reports/reality-tests/multi_coord_stress_001.json +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' reports/reality-tests/multi_coord_stress_001.json +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' reports/reality-tests/multi_coord_stress_001.json +``` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. diff --git a/scripts/multi_coord_stress.sh b/scripts/multi_coord_stress.sh new file mode 100755 index 0000000..a7c48b0 --- /dev/null +++ b/scripts/multi_coord_stress.sh @@ -0,0 +1,266 @@ +#!/usr/bin/env bash +# Multi-coordinator stress harness — Phase 1 of the 48-hour mock. +# +# Three coordinators (Alice / Bob / Carol) own three distinct contracts +# (Milwaukee distribution, Indianapolis manufacturing, Chicago +# construction). The driver fires phases: +# 1. baseline — each coord runs their contract's role queries +# 2. surge — each contract's demand doubles (URGENT phrasing) +# 3. merge — alpha + beta combined under alice +# 4. handover — bob takes alpha, USING alice's playbook namespace +# 5. split — alpha surge re-distributed across all 3 coords +# 6. reissue — non-determinism check: same baselines reissued +# 7. analysis — diversity + determinism + learning metrics +# +# Phase 1 deliberately skips the 48-hour clock, email/SMS endpoints, +# and Langfuse wiring — those are Phase 2/3. +# +# Usage: +# ./scripts/multi_coord_stress.sh # run #001 +# RUN_ID=002 ./scripts/multi_coord_stress.sh +# K=12 ./scripts/multi_coord_stress.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +RUN_ID="${RUN_ID:-001}" +WORKERS_LIMIT="${WORKERS_LIMIT:-5000}" +ETHEREAL_LIMIT="${ETHEREAL_LIMIT:-0}" +CORPORA="${CORPORA:-workers,ethereal_workers}" +K="${K:-8}" + +OUT_JSON="reports/reality-tests/multi_coord_stress_${RUN_ID}.json" +OUT_MD="reports/reality-tests/multi_coord_stress_${RUN_ID}.md" + +if ! curl -sS --max-time 3 http://localhost:11434/api/tags >/dev/null 2>&1; then + echo "[stress] Ollama not reachable on :11434 — skipping (need it for embeddings)" + exit 0 +fi + +echo "[stress] building binaries..." +go build -o bin/ ./cmd/storaged ./cmd/catalogd ./cmd/ingestd ./cmd/queryd \ + ./cmd/embedd ./cmd/vectord ./cmd/pathwayd ./cmd/observerd \ + ./cmd/matrixd ./cmd/gateway \ + ./scripts/staffing_workers ./scripts/multi_coord_stress + +pkill -f "bin/(storaged|catalogd|ingestd|queryd|embedd|vectord|pathwayd|observerd|matrixd|gateway)" 2>/dev/null || true +sleep 0.3 + +PIDS=() +TMP="$(mktemp -d)" +CFG="$TMP/stress.toml" + +cleanup() { + echo "[stress] cleanup" + for p in "${PIDS[@]:-}"; do [ -n "${p:-}" ] && kill "$p" 2>/dev/null || true; done + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +cat > "$CFG" </dev/null 2>&1; then return 0; fi + sleep 0.05 + done + return 1 +} + +echo "[stress] launching stack..." +./bin/storaged -config "$CFG" > /tmp/stress_storaged.log 2>&1 & PIDS+=($!); poll_health 3211 || { echo "storaged failed"; exit 1; } +./bin/catalogd -config "$CFG" > /tmp/stress_catalogd.log 2>&1 & PIDS+=($!); poll_health 3212 || { echo "catalogd failed"; exit 1; } +./bin/ingestd -config "$CFG" > /tmp/stress_ingestd.log 2>&1 & PIDS+=($!); poll_health 3213 || { echo "ingestd failed"; exit 1; } +./bin/queryd -config "$CFG" > /tmp/stress_queryd.log 2>&1 & PIDS+=($!); poll_health 3214 || { echo "queryd failed"; exit 1; } +./bin/embedd -config "$CFG" > /tmp/stress_embedd.log 2>&1 & PIDS+=($!); poll_health 3216 || { echo "embedd failed"; exit 1; } +./bin/vectord -config "$CFG" > /tmp/stress_vectord.log 2>&1 & PIDS+=($!); poll_health 3215 || { echo "vectord failed"; exit 1; } +./bin/pathwayd -config "$CFG" > /tmp/stress_pathwayd.log 2>&1 & PIDS+=($!); poll_health 3217 || { echo "pathwayd failed"; exit 1; } +./bin/observerd -config "$CFG" > /tmp/stress_observerd.log 2>&1 & PIDS+=($!); poll_health 3219 || { echo "observerd failed"; exit 1; } +./bin/matrixd -config "$CFG" > /tmp/stress_matrixd.log 2>&1 & PIDS+=($!); poll_health 3218 || { echo "matrixd failed"; exit 1; } +./bin/gateway -config "$CFG" > /tmp/stress_gateway.log 2>&1 & PIDS+=($!); poll_health 3110 || { echo "gateway failed"; exit 1; } + +echo +echo "[stress] ingest workers (limit=$WORKERS_LIMIT) into 'workers' corpus..." +./bin/staffing_workers -limit "$WORKERS_LIMIT" + +echo +echo "[stress] ingest ethereal_workers (limit=$ETHEREAL_LIMIT, 0=all) into 'ethereal_workers' corpus..." +./bin/staffing_workers \ + -parquet "/home/profit/lakehouse/data/datasets/ethereal_workers.parquet" \ + -index-name ethereal_workers \ + -id-prefix "e-" \ + -limit "$ETHEREAL_LIMIT" + +echo +echo "[stress] running multi-coord stress driver..." +./bin/multi_coord_stress \ + -gateway "http://127.0.0.1:3110" \ + -contracts tests/reality/contracts \ + -corpora "$CORPORA" \ + -k "$K" \ + -out "$OUT_JSON" + +echo +echo "[stress] generating markdown report → $OUT_MD" + +# Render compact markdown from the JSON. Same shape as the lift harness +# reports so reviewers can compare format. +total=$(jq -r '.events | length' "$OUT_JSON") +gen_at=$(jq -r '.generated_at' "$OUT_JSON") +div_role=$(jq -r '.diversity.same_role_across_contracts_mean_jaccard' "$OUT_JSON") +div_role_n=$(jq -r '.diversity.num_pairs_same_role_across_contracts' "$OUT_JSON") +div_xrole=$(jq -r '.diversity.different_roles_same_contract_mean_jaccard' "$OUT_JSON") +div_xrole_n=$(jq -r '.diversity.num_pairs_different_roles_same_contract' "$OUT_JSON") +det_jacc=$(jq -r '.determinism.mean_jaccard' "$OUT_JSON") +det_n=$(jq -r '.determinism.num_reissued_pairs' "$OUT_JSON") +hand_run=$(jq -r '.learning.handover_queries_run' "$OUT_JSON") +hand_top1=$(jq -r '.learning.recorded_answers_top1_count' "$OUT_JSON") +hand_topk=$(jq -r '.learning.recorded_answers_topk_count' "$OUT_JSON") +hand_rate=$(jq -r '.learning.handover_hit_rate' "$OUT_JSON") + +cat > "$OUT_MD" < 0.50, the system is "cycling" the same handful of workers regardless of query intent. + +--- + +## Determinism — same query reissued, top-K stability + +| Metric | Value | +|---|---:| +| Mean Jaccard on retrieval-only reissue | ${det_jacc} | +| Number of reissue pairs | ${det_n} | + +**Interpretation:** +- ≥ 0.95: HNSW retrieval is highly deterministic; reissues land on near-identical top-K. Good — system locks into a stable view of "best workers for this query." +- 0.80 – 0.95: Some HNSW or embed variance, acceptable. +- < 0.80: Retrieval is unstable — reissues see substantially different results, suggesting either embed nondeterminism (Ollama returning slightly different vectors) or vectord nondeterminism (HNSW insertion order affecting recall). + +--- + +## Learning — handover hit rate + +Bob takes Alice's contract using Alice's playbook namespace. Did Alice's recorded answers surface in Bob's results? + +| Metric | Value | +|---|---:| +| Handover queries run | ${hand_run} | +| Alice's recorded answer at Bob's top-1 | ${hand_top1} | +| Alice's recorded answer in Bob's top-K | ${hand_topk} | +| **Handover hit rate (top-1)** | **${hand_rate}** | + +**Interpretation:** +- Hit rate ≥ 0.5: handover is meaningful — the second coordinator inherits the first's institutional memory. +- Hit rate ≈ 0.0: playbook namespace isolation is working but the playbook itself isn't transferable, OR Bob's queries don't match Alice's recordings closely enough. + +--- + +## Per-event capture + +All matrix.search responses live in the JSON — top-K with worker IDs, distances, and per-corpus counts. Search by phase: + +\`\`\`bash +jq '.events[] | select(.phase == "merge")' ${OUT_JSON} +jq '.events[] | select(.coordinator == "alice" and .phase == "baseline")' ${OUT_JSON} +jq '.events[] | select(.role == "warehouse worker") | {phase, contract, top_k_ids: [.top_k[].id]}' ${OUT_JSON} +\`\`\` + +--- + +## What's NOT in this run (Phase 1 deliberately defers) + +- **48-hour clock.** Events fire as discrete steps, not on a timeline. +- **Email / SMS ingest.** No endpoints exist on the Go side yet. +- **New-resume injection mid-run.** The corpus is fixed at the start. +- **Langfuse traces.** Need Go-side wiring. + +These are Phase 2/3. The Phase 1 substrate is what the time-based runner will mount on top of. +MDEOF + +echo +echo "[stress] DONE" +echo "[stress] evidence: $OUT_JSON" +echo "[stress] report: $OUT_MD" diff --git a/scripts/multi_coord_stress/main.go b/scripts/multi_coord_stress/main.go new file mode 100644 index 0000000..682a773 --- /dev/null +++ b/scripts/multi_coord_stress/main.go @@ -0,0 +1,590 @@ +// Multi-coordinator stress harness — Phase 1 of the 48-hour mock. +// +// Three coordinators (Alice, Bob, Carol) each own a contract with a +// different demand profile. They run queries against the matrix +// indexer with separate playbook namespaces. The harness fires +// scenario phases (baseline → surge → merge → handover → split) and +// captures every response so we can verify: +// +// 1. Diversity — different (coord, contract, role) triples should +// surface DIFFERENT top-K worker IDs. If everything returns the +// same handful of workers, the system is "cycling" not "locking +// into scenarios." +// 2. Non-determinism — same query reissued should return near- +// identical top-K (controlled variance from HNSW + judge, if any). +// 3. Learning — after Alice records playbook entries for her +// contract's queries, Bob takes over the same contract using +// Alice's playbook namespace; Alice's recordings should surface +// in Bob's results. +// +// Phase 1 deliberately skips: time-based event clock (events fire +// sequentially), email/SMS ingest (no integration yet), Langfuse +// tracing (would need Go-side wiring). Those are Phase 2/3. +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strings" + "time" +) + +// ── data shapes ────────────────────────────────────────────────── + +type Demand struct { + Role string `json:"role"` + Count int `json:"count"` + Skills []string `json:"skills"` + Certs []string `json:"certs"` + InRoster *bool `json:"in_roster,omitempty"` // nil = assume true +} + +type Contract struct { + Name string `json:"name"` + Client string `json:"client"` + Location string `json:"location"` + Shift string `json:"shift"` + Demand []Demand `json:"demand"` +} + +type Coordinator struct { + Name string + PlaybookCorpus string +} + +// ── matrix.search wire shapes ──────────────────────────────────── + +type matrixSearchReq struct { + QueryText string `json:"query_text"` + Corpora []string `json:"corpora"` + K int `json:"k"` + UsePlaybook bool `json:"use_playbook,omitempty"` + PlaybookCorpus string `json:"playbook_corpus,omitempty"` +} + +type matrixResult struct { + ID string `json:"id"` + Distance float32 `json:"distance"` + Corpus string `json:"corpus"` + Metadata json.RawMessage `json:"metadata,omitempty"` +} + +type matrixResp struct { + Results []matrixResult `json:"results"` + PerCorpusCounts map[string]int `json:"per_corpus_counts"` + PlaybookBoosted int `json:"playbook_boosted,omitempty"` + PlaybookInjected int `json:"playbook_injected,omitempty"` +} + +// ── event capture ──────────────────────────────────────────────── + +type ResultRef struct { + Rank int `json:"rank"` + ID string `json:"id"` + Corpus string `json:"corpus"` + Distance float32 `json:"distance"` +} + +type Event struct { + Phase string `json:"phase"` + Coordinator string `json:"coordinator"` + Contract string `json:"contract"` + Role string `json:"role"` + Query string `json:"query"` + SurgeMultiplier int `json:"surge_multiplier,omitempty"` + UsePlaybook bool `json:"use_playbook"` + PlaybookCorpus string `json:"playbook_corpus,omitempty"` + TopK []ResultRef `json:"top_k"` + PerCorpusCounts map[string]int `json:"per_corpus_counts,omitempty"` + PlaybookBoosted int `json:"playbook_boosted,omitempty"` + PlaybookInjected int `json:"playbook_injected,omitempty"` + Note string `json:"note,omitempty"` + TimestampUnixNano int64 `json:"ts_ns"` +} + +type Output struct { + Coordinators []string `json:"coordinators"` + Contracts []string `json:"contracts"` + Events []Event `json:"events"` + Diversity Diversity `json:"diversity"` + Determinism Determ `json:"determinism"` + Learning Learning `json:"learning"` + GeneratedAt time.Time `json:"generated_at"` +} + +// Diversity = how distinct are top-K worker sets across (coord, +// contract, role) triples that SHOULD differ. We compute mean Jaccard +// similarity for matched-role-across-contracts pairs (lower is more +// diverse) and matched-coord-different-roles pairs. +type Diversity struct { + SameRoleAcrossContractsMeanJaccard float64 `json:"same_role_across_contracts_mean_jaccard"` + DifferentRolesSameContractMeanJaccard float64 `json:"different_roles_same_contract_mean_jaccard"` + NumPairsSameRoleAcrossContracts int `json:"num_pairs_same_role_across_contracts"` + NumPairsDifferentRolesSameContract int `json:"num_pairs_different_roles_same_contract"` +} + +// Determ = same query reissued — top-K should be near-identical. +// Jaccard close to 1.0 = stable / deterministic, < 0.95 = some HNSW +// or judge variance. +type Determ struct { + MeanJaccard float64 `json:"mean_jaccard"` + NumReissuedPairs int `json:"num_reissued_pairs"` +} + +// Learning = handover signal. After Alice records playbooks for her +// contract, Bob runs the same queries with Alice's playbook namespace. +// We measure: do Alice's recorded answer IDs surface in Bob's top-K? +type Learning struct { + HandoverQueriesRun int `json:"handover_queries_run"` + RecordedAnswersTop1Count int `json:"recorded_answers_top1_count"` + RecordedAnswersTopKCount int `json:"recorded_answers_topk_count"` + HandoverHitRate float64 `json:"handover_hit_rate"` +} + +// ── main ───────────────────────────────────────────────────────── + +func main() { + var ( + gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL") + contractsDir = flag.String("contracts", "tests/reality/contracts", "directory of contract JSON files") + corporaCSV = flag.String("corpora", "workers,ethereal_workers", "comma-separated matrix corpora") + k = flag.Int("k", 8, "top-k from matrix.search per query") + out = flag.String("out", "reports/reality-tests/multi_coord_stress_001.json", "output JSON path") + ) + flag.Parse() + + contracts, err := loadContracts(*contractsDir) + if err != nil { + log.Fatalf("load contracts: %v", err) + } + if len(contracts) < 3 { + log.Fatalf("need ≥3 contracts in %s, got %d", *contractsDir, len(contracts)) + } + + // First three contracts → coord assignments. Names are fixed so + // playbook corpora are stable across runs (rerun lands on same + // namespaces, exercising the persistence path indirectly). + coords := []Coordinator{ + {Name: "alice", PlaybookCorpus: "playbook_alice"}, + {Name: "bob", PlaybookCorpus: "playbook_bob"}, + {Name: "carol", PlaybookCorpus: "playbook_carol"}, + } + + // Initial assignment: alice→alpha, bob→beta, carol→gamma. + assignments := map[string]*Contract{ + "alice": &contracts[0], + "bob": &contracts[1], + "carol": &contracts[2], + } + + corpora := strings.Split(*corporaCSV, ",") + hc := &http.Client{Timeout: 30 * time.Second} + ctx := context.Background() + _ = ctx + + output := Output{ + Coordinators: []string{"alice", "bob", "carol"}, + Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name}, + GeneratedAt: time.Now().UTC(), + } + + log.Printf("[stress] 3 coords, 3 contracts, k=%d, corpora=%v", *k, corpora) + + // ── Phase 1: baseline ─────────────────────────────────────── + // Each coord runs their own contract's role queries. Records + // playbook entries (top-1 of each as a synthetic "successful + // match" outcome) into their personal namespace. + log.Printf("[stress] phase 1: baseline") + for _, coord := range coords { + c := assignments[coord.Name] + for _, d := range c.Demand { + q := buildQuery(c, d, 1) + resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) + ev := captureEvent("baseline", coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp) + output.Events = append(output.Events, ev) + // Record top-1 as a successful playbook entry for this coord. + if len(resp.Results) > 0 { + if err := playbookRecord(hc, *gateway, q, resp.Results[0].ID, resp.Results[0].Corpus, 1.0, coord.PlaybookCorpus); err != nil { + log.Printf(" record (%s/%s): %v", coord.Name, d.Role, err) + } + } + } + } + + // ── Phase 2: surge ────────────────────────────────────────── + // Each coord's contract demand doubles. URGENT phrasing. + log.Printf("[stress] phase 2: surge (2x demand, urgent phrasing)") + for _, coord := range coords { + c := assignments[coord.Name] + for _, d := range c.Demand { + q := buildQuery(c, d, 2) + resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) + ev := captureEvent("surge", coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) + output.Events = append(output.Events, ev) + } + } + + // ── Phase 3: merge — alpha + beta combined under alice ────── + log.Printf("[stress] phase 3: merge (alpha + beta combined, alice handles)") + mergedDemand := append(append([]Demand{}, contracts[0].Demand...), contracts[1].Demand...) + for _, d := range mergedDemand { + mergedC := &Contract{Name: contracts[0].Name + "+" + contracts[1].Name, Location: contracts[0].Location + " + " + contracts[1].Location, Shift: "shared"} + q := buildQuery(mergedC, d, 1) + resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus)) + ev := captureEvent("merge", "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) + output.Events = append(output.Events, ev) + } + + // ── Phase 4: handover — bob takes alpha contract, USING ───── + // alice's playbook namespace. Tests whether Alice's recordings + // surface in Bob's results when Bob runs Alice's contract. + log.Printf("[stress] phase 4: handover (bob takes alpha, using alice's playbook)") + aliceRecordedAnswers := map[string]string{} // role → recorded answer id + for _, ev := range output.Events { + if ev.Phase == "baseline" && ev.Coordinator == "alice" && len(ev.TopK) > 0 { + aliceRecordedAnswers[ev.Role] = ev.TopK[0].ID + } + } + handoverHitsTop1 := 0 + handoverHitsTopK := 0 + handoverRun := 0 + for _, d := range contracts[0].Demand { + q := buildQuery(&contracts[0], d, 1) + resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus)) + ev := captureEvent("handover", "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp) + output.Events = append(output.Events, ev) + handoverRun++ + recordedID, ok := aliceRecordedAnswers[d.Role] + if !ok { + continue + } + if len(ev.TopK) > 0 && ev.TopK[0].ID == recordedID { + handoverHitsTop1++ + handoverHitsTopK++ + } else { + for _, r := range ev.TopK { + if r.ID == recordedID { + handoverHitsTopK++ + break + } + } + } + } + output.Learning.HandoverQueriesRun = handoverRun + output.Learning.RecordedAnswersTop1Count = handoverHitsTop1 + output.Learning.RecordedAnswersTopKCount = handoverHitsTopK + if handoverRun > 0 { + output.Learning.HandoverHitRate = float64(handoverHitsTop1) / float64(handoverRun) + } + + // ── Phase 5: split — surge re-distributed across 3 coords ── + log.Printf("[stress] phase 5: split (alpha surge spread across all 3 coords)") + for i, d := range contracts[0].Demand { + coord := coords[i%len(coords)] + c := &contracts[0] + q := buildQuery(c, d, 2) + resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus)) + ev := captureEvent("split", coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp) + output.Events = append(output.Events, ev) + } + + // ── Phase 6: non-determinism check ───────────────────────── + // Reissue each baseline query once and compare top-K Jaccard. + log.Printf("[stress] phase 6: non-determinism (reissue baselines, measure Jaccard)") + jaccards := []float64{} + for _, ev := range output.Events { + if ev.Phase != "baseline" { + continue + } + resp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) // playbook OFF for reissue to isolate retrieval stability + reissue := captureEvent("reissue", ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp) + output.Events = append(output.Events, reissue) + // Compare against ev.TopK (also playbook-on baseline). Note: + // this conflates retrieval stability with playbook stability. + // We capture both ev (playbook on) and a fresh retrieval (off); + // real determinism = retrieval-only top-K comparison. + freshRetrievalResp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) + freshRetrievalEv := captureEvent("reissue-retrieval-only", ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp) + j := jaccardTopK(reissue.TopK, freshRetrievalEv.TopK) + jaccards = append(jaccards, j) + } + output.Determinism.NumReissuedPairs = len(jaccards) + output.Determinism.MeanJaccard = mean(jaccards) + + // ── Phase 7: diversity analysis ───────────────────────────── + log.Printf("[stress] phase 7: diversity analysis") + output.Diversity = computeDiversity(output.Events) + + // ── write ─────────────────────────────────────────────────── + if err := os.MkdirAll(filepath.Dir(*out), 0o755); err != nil { + log.Fatalf("mkdir: %v", err) + } + bs, _ := json.MarshalIndent(output, "", " ") + if err := os.WriteFile(*out, bs, 0o644); err != nil { + log.Fatalf("write %s: %v", *out, err) + } + + log.Printf("[stress] DONE — events=%d", len(output.Events)) + log.Printf("[stress] diversity: same-role-across-contracts mean Jaccard = %.3f (n=%d)", + output.Diversity.SameRoleAcrossContractsMeanJaccard, output.Diversity.NumPairsSameRoleAcrossContracts) + log.Printf("[stress] different-roles-same-contract mean Jaccard = %.3f (n=%d)", + output.Diversity.DifferentRolesSameContractMeanJaccard, output.Diversity.NumPairsDifferentRolesSameContract) + log.Printf("[stress] determinism: mean Jaccard on reissue = %.3f (n=%d)", + output.Determinism.MeanJaccard, output.Determinism.NumReissuedPairs) + log.Printf("[stress] learning: handover hit rate (top-1) = %d/%d = %.0f%%", + output.Learning.RecordedAnswersTop1Count, output.Learning.HandoverQueriesRun, + output.Learning.HandoverHitRate*100) + log.Printf("[stress] results → %s", *out) +} + +// ── helpers ────────────────────────────────────────────────────── + +func loadContracts(dir string) ([]Contract, error) { + files, err := filepath.Glob(filepath.Join(dir, "contract_*.json")) + if err != nil { + return nil, err + } + if len(files) == 0 { + return nil, fmt.Errorf("no contract_*.json files in %s", dir) + } + var out []Contract + for _, f := range files { + bs, err := os.ReadFile(f) + if err != nil { + return nil, err + } + var c Contract + if err := json.Unmarshal(bs, &c); err != nil { + return nil, fmt.Errorf("%s: %w", f, err) + } + out = append(out, c) + } + return out, nil +} + +func buildQuery(c *Contract, d Demand, surge int) string { + var b strings.Builder + if surge > 1 { + b.WriteString(fmt.Sprintf("URGENT: need %d ", d.Count*surge)) + } else { + b.WriteString(fmt.Sprintf("Need %d ", d.Count)) + } + b.WriteString(d.Role) + if c.Location != "" { + b.WriteString(" for ") + b.WriteString(c.Location) + } + if c.Shift != "" { + b.WriteString(", ") + b.WriteString(c.Shift) + b.WriteString(" shift") + } + if len(d.Certs) > 0 { + b.WriteString(", certifications: ") + b.WriteString(strings.Join(d.Certs, ", ")) + } + if len(d.Skills) > 0 { + b.WriteString(", skills: ") + b.WriteString(strings.Join(d.Skills, ", ")) + } + return b.String() +} + +func captureEvent(phase, coord, contract, role, query string, surge int, usePlaybook bool, pbCorpus string, resp *matrixResp) Event { + topK := make([]ResultRef, 0, len(resp.Results)) + for i, r := range resp.Results { + topK = append(topK, ResultRef{Rank: i, ID: r.ID, Corpus: r.Corpus, Distance: r.Distance}) + } + return Event{ + Phase: phase, + Coordinator: coord, + Contract: contract, + Role: role, + Query: query, + SurgeMultiplier: surge, + UsePlaybook: usePlaybook, + PlaybookCorpus: pbCorpus, + TopK: topK, + PerCorpusCounts: resp.PerCorpusCounts, + PlaybookBoosted: resp.PlaybookBoosted, + PlaybookInjected: resp.PlaybookInjected, + TimestampUnixNano: time.Now().UnixNano(), + } +} + +func computeDiversity(events []Event) Diversity { + // Filter to baseline events for clean apples-to-apples. + type key struct{ contract, role string } + byKey := map[key][]string{} + for _, ev := range events { + if ev.Phase != "baseline" { + continue + } + k := key{ev.Contract, ev.Role} + ids := make([]string, len(ev.TopK)) + for i, r := range ev.TopK { + ids[i] = r.ID + } + byKey[k] = ids + } + + // Same role across contracts: same `role`, different `contract`. + rolesSeen := map[string][][]string{} + contractsSeen := map[string][]struct { + role string + ids []string + }{} + for k, ids := range byKey { + rolesSeen[k.role] = append(rolesSeen[k.role], ids) + contractsSeen[k.contract] = append(contractsSeen[k.contract], struct { + role string + ids []string + }{k.role, ids}) + } + + var ( + sameRoleJacc []float64 + diffRolesJacc []float64 + ) + // Same-role-across-contracts: each role's idsSet pair-wise. + for _, idsList := range rolesSeen { + for i := 0; i < len(idsList); i++ { + for j := i + 1; j < len(idsList); j++ { + sameRoleJacc = append(sameRoleJacc, jaccardStrings(idsList[i], idsList[j])) + } + } + } + // Different-roles-same-contract. + for _, items := range contractsSeen { + for i := 0; i < len(items); i++ { + for j := i + 1; j < len(items); j++ { + if items[i].role == items[j].role { + continue + } + diffRolesJacc = append(diffRolesJacc, jaccardStrings(items[i].ids, items[j].ids)) + } + } + } + + return Diversity{ + SameRoleAcrossContractsMeanJaccard: mean(sameRoleJacc), + DifferentRolesSameContractMeanJaccard: mean(diffRolesJacc), + NumPairsSameRoleAcrossContracts: len(sameRoleJacc), + NumPairsDifferentRolesSameContract: len(diffRolesJacc), + } +} + +func jaccardTopK(a, b []ResultRef) float64 { + aIDs := make([]string, len(a)) + bIDs := make([]string, len(b)) + for i, r := range a { + aIDs[i] = r.ID + } + for i, r := range b { + bIDs[i] = r.ID + } + return jaccardStrings(aIDs, bIDs) +} + +func jaccardStrings(a, b []string) float64 { + if len(a) == 0 && len(b) == 0 { + return 1.0 + } + setA := map[string]bool{} + for _, x := range a { + setA[x] = true + } + intersect := 0 + for _, x := range b { + if setA[x] { + intersect++ + } + } + union := len(setA) + for _, x := range b { + if !setA[x] { + union++ + } + } + if union == 0 { + return 0 + } + return float64(intersect) / float64(union) +} + +func mean(xs []float64) float64 { + if len(xs) == 0 { + return 0 + } + s := 0.0 + for _, x := range xs { + s += x + } + return s / float64(len(xs)) +} + +// ── HTTP helpers ───────────────────────────────────────────────── + +func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, usePlaybook bool, playbookCorpus string) (*matrixResp, error) { + body, _ := json.Marshal(matrixSearchReq{ + QueryText: query, + Corpora: corpora, + K: k, + UsePlaybook: usePlaybook, + PlaybookCorpus: playbookCorpus, + }) + req, _ := http.NewRequest("POST", gw+"/v1/matrix/search", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + rb, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("matrix.search %d: %s", resp.StatusCode, string(rb)) + } + var out matrixResp + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, err + } + return &out, nil +} + +func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, score float64, corpus string) error { + body, _ := json.Marshal(map[string]any{ + "query_text": query, + "answer_id": answerID, + "answer_corpus": answerCorpus, + "score": score, + "tags": []string{"multi-coord-stress"}, + "corpus": corpus, + }) + req, _ := http.NewRequest("POST", gw+"/v1/matrix/playbooks/record", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + rb, _ := io.ReadAll(resp.Body) + return fmt.Errorf("playbook record %d: %s", resp.StatusCode, string(rb)) + } + return nil +} + +func must[T any](v T, err error) T { + if err != nil { + log.Fatalf("[stress] %v", err) + } + return v +} diff --git a/tests/reality/contracts/contract_alpha.json b/tests/reality/contracts/contract_alpha.json new file mode 100644 index 0000000..8705e5d --- /dev/null +++ b/tests/reality/contracts/contract_alpha.json @@ -0,0 +1,12 @@ +{ + "name": "alpha_milwaukee_distribution", + "client": "Northstar Logistics", + "location": "Milwaukee, WI metro", + "shift": "day", + "demand": [ + {"role": "warehouse worker", "count": 200, "skills": ["pallet jack", "inventory"], "certs": ["OSHA-30"]}, + {"role": "admin assistant", "count": 3, "skills": ["scheduling", "data entry"], "certs": []}, + {"role": "heavy equipment operator", "count": 2, "skills": ["forklift", "bobcat"], "certs": ["OSHA-30", "forklift cert"]}, + {"role": "industrial electrician", "count": 1, "skills": ["high voltage", "PLC"], "certs": ["journeyman"], "in_roster": false} + ] +} diff --git a/tests/reality/contracts/contract_beta.json b/tests/reality/contracts/contract_beta.json new file mode 100644 index 0000000..9f32ef2 --- /dev/null +++ b/tests/reality/contracts/contract_beta.json @@ -0,0 +1,12 @@ +{ + "name": "beta_indianapolis_manufacturing", + "client": "Crossroads Manufacturing", + "location": "Indianapolis, IN metro", + "shift": "swing", + "demand": [ + {"role": "production worker", "count": 150, "skills": ["assembly", "machine operation"], "certs": ["OSHA-10"]}, + {"role": "quality inspector", "count": 4, "skills": ["measurement", "documentation"], "certs": ["six-sigma yellow belt"]}, + {"role": "forklift operator", "count": 3, "skills": ["pallet jack", "inventory", "cold storage"], "certs": ["OSHA-30", "forklift cert"]}, + {"role": "bilingual safety coordinator", "count": 1, "skills": ["spanish", "english", "training"], "certs": ["OSHA trainer"], "in_roster": false} + ] +} diff --git a/tests/reality/contracts/contract_gamma.json b/tests/reality/contracts/contract_gamma.json new file mode 100644 index 0000000..3663bbc --- /dev/null +++ b/tests/reality/contracts/contract_gamma.json @@ -0,0 +1,12 @@ +{ + "name": "gamma_chicago_construction", + "client": "Loop Construction Group", + "location": "Chicago, IL metro", + "shift": "early-day", + "demand": [ + {"role": "general laborer", "count": 80, "skills": ["framing", "concrete", "rigging"], "certs": ["OSHA-10"]}, + {"role": "site superintendent", "count": 1, "skills": ["scheduling", "leadership", "blueprint reading"], "certs": ["OSHA-30", "first-aid"]}, + {"role": "crane operator", "count": 2, "skills": ["mobile crane", "rigging signals"], "certs": ["NCCCO crane cert"]}, + {"role": "drone surveyor", "count": 1, "skills": ["UAV piloting", "GIS", "site mapping"], "certs": ["FAA Part 107"], "in_roster": false} + ] +}