root 61c7b55e48 multi-coord stress harness — Phase 1 of 48-hour mock
Three coordinators (alice / bob / carol) with three contracts
(Milwaukee distribution / Indianapolis manufacturing / Chicago
construction). 7-phase scenario runner: baseline → surge → merge →
handover → split → reissue → analysis. Each coord has a separate
playbook namespace (playbook_{name}) so institutional memory stays
isolated by default but transferable on demand.

Phase 1 deliberately skips the 48-hour clock, email/SMS endpoints,
and Langfuse tracing — those are Phase 2/3.

Run #001 (52 events, 4 queries × 3 coords × 2 demand flavors):

  Diversity:
    Different-roles-same-contract Jaccard = 0.004 (n=18)
      → role-specific retrieval is working perfectly. Different
        roles within one contract pull totally different worker
        pools. System is NOT cycling; locks into per-role retrieval.
    Same-role-across-contracts Jaccard = N/A (n=0)
      → TEST-DESIGN ISSUE: the 3 contracts use distinct role names
        per industry (warehouse worker / production worker / general
        laborer), so no exact-name overlaps exist. Phase 2 should
        either share at least one role across contracts OR add a
        skill-based diversity metric.

  Determinism: Jaccard = 1.000 (n=12)
    → HNSW + Ollama retrieval is fully deterministic on identical
      query text. coder/hnsw + nomic-embed-text are stable.

  Learning: handover hit rate = 4/4 = 100%
    → Bob inherits Alice's recordings perfectly when bob runs
      identical queries with alice's playbook namespace. CAVEAT:
      this tests the trivial verbatim case, not paraphrase handover.
      The harder test (bob runs paraphrased queries with alice's
      playbook) is Phase 2 work.

Per-event capture in JSON: every matrix.search response is logged
with phase / coordinator / contract / role / query / top-K IDs +
distances + per-corpus counts + boosted/injected counts. Reviewable
via:
  jq '.events[] | select(.phase == "merge")'
  jq '.events[] | select(.coordinator == "alice")'
  jq '.events[] | select(.role == "warehouse worker")'

Notable finding from per-event: carol's "general laborer" and "crane
operator" queries both surface w-1009 as top-1, with crane operator
at distance 0.098 (very tight) and general laborer at 0.297. The
system found a worker who legitimately covers both roles — realistic
for small construction crews.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 07:55:29 -05:00

591 lines
21 KiB
Go

// Multi-coordinator stress harness — Phase 1 of the 48-hour mock.
//
// Three coordinators (Alice, Bob, Carol) each own a contract with a
// different demand profile. They run queries against the matrix
// indexer with separate playbook namespaces. The harness fires
// scenario phases (baseline → surge → merge → handover → split) and
// captures every response so we can verify:
//
// 1. Diversity — different (coord, contract, role) triples should
// surface DIFFERENT top-K worker IDs. If everything returns the
// same handful of workers, the system is "cycling" not "locking
// into scenarios."
// 2. Non-determinism — same query reissued should return near-
// identical top-K (controlled variance from HNSW + judge, if any).
// 3. Learning — after Alice records playbook entries for her
// contract's queries, Bob takes over the same contract using
// Alice's playbook namespace; Alice's recordings should surface
// in Bob's results.
//
// Phase 1 deliberately skips: time-based event clock (events fire
// sequentially), email/SMS ingest (no integration yet), Langfuse
// tracing (would need Go-side wiring). Those are Phase 2/3.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
)
// ── data shapes ──────────────────────────────────────────────────
type Demand struct {
Role string `json:"role"`
Count int `json:"count"`
Skills []string `json:"skills"`
Certs []string `json:"certs"`
InRoster *bool `json:"in_roster,omitempty"` // nil = assume true
}
type Contract struct {
Name string `json:"name"`
Client string `json:"client"`
Location string `json:"location"`
Shift string `json:"shift"`
Demand []Demand `json:"demand"`
}
type Coordinator struct {
Name string
PlaybookCorpus string
}
// ── matrix.search wire shapes ────────────────────────────────────
type matrixSearchReq struct {
QueryText string `json:"query_text"`
Corpora []string `json:"corpora"`
K int `json:"k"`
UsePlaybook bool `json:"use_playbook,omitempty"`
PlaybookCorpus string `json:"playbook_corpus,omitempty"`
}
type matrixResult struct {
ID string `json:"id"`
Distance float32 `json:"distance"`
Corpus string `json:"corpus"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
type matrixResp struct {
Results []matrixResult `json:"results"`
PerCorpusCounts map[string]int `json:"per_corpus_counts"`
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
PlaybookInjected int `json:"playbook_injected,omitempty"`
}
// ── event capture ────────────────────────────────────────────────
type ResultRef struct {
Rank int `json:"rank"`
ID string `json:"id"`
Corpus string `json:"corpus"`
Distance float32 `json:"distance"`
}
type Event struct {
Phase string `json:"phase"`
Coordinator string `json:"coordinator"`
Contract string `json:"contract"`
Role string `json:"role"`
Query string `json:"query"`
SurgeMultiplier int `json:"surge_multiplier,omitempty"`
UsePlaybook bool `json:"use_playbook"`
PlaybookCorpus string `json:"playbook_corpus,omitempty"`
TopK []ResultRef `json:"top_k"`
PerCorpusCounts map[string]int `json:"per_corpus_counts,omitempty"`
PlaybookBoosted int `json:"playbook_boosted,omitempty"`
PlaybookInjected int `json:"playbook_injected,omitempty"`
Note string `json:"note,omitempty"`
TimestampUnixNano int64 `json:"ts_ns"`
}
type Output struct {
Coordinators []string `json:"coordinators"`
Contracts []string `json:"contracts"`
Events []Event `json:"events"`
Diversity Diversity `json:"diversity"`
Determinism Determ `json:"determinism"`
Learning Learning `json:"learning"`
GeneratedAt time.Time `json:"generated_at"`
}
// Diversity = how distinct are top-K worker sets across (coord,
// contract, role) triples that SHOULD differ. We compute mean Jaccard
// similarity for matched-role-across-contracts pairs (lower is more
// diverse) and matched-coord-different-roles pairs.
type Diversity struct {
SameRoleAcrossContractsMeanJaccard float64 `json:"same_role_across_contracts_mean_jaccard"`
DifferentRolesSameContractMeanJaccard float64 `json:"different_roles_same_contract_mean_jaccard"`
NumPairsSameRoleAcrossContracts int `json:"num_pairs_same_role_across_contracts"`
NumPairsDifferentRolesSameContract int `json:"num_pairs_different_roles_same_contract"`
}
// Determ = same query reissued — top-K should be near-identical.
// Jaccard close to 1.0 = stable / deterministic, < 0.95 = some HNSW
// or judge variance.
type Determ struct {
MeanJaccard float64 `json:"mean_jaccard"`
NumReissuedPairs int `json:"num_reissued_pairs"`
}
// Learning = handover signal. After Alice records playbooks for her
// contract, Bob runs the same queries with Alice's playbook namespace.
// We measure: do Alice's recorded answer IDs surface in Bob's top-K?
type Learning struct {
HandoverQueriesRun int `json:"handover_queries_run"`
RecordedAnswersTop1Count int `json:"recorded_answers_top1_count"`
RecordedAnswersTopKCount int `json:"recorded_answers_topk_count"`
HandoverHitRate float64 `json:"handover_hit_rate"`
}
// ── main ─────────────────────────────────────────────────────────
func main() {
var (
gateway = flag.String("gateway", "http://127.0.0.1:3110", "gateway base URL")
contractsDir = flag.String("contracts", "tests/reality/contracts", "directory of contract JSON files")
corporaCSV = flag.String("corpora", "workers,ethereal_workers", "comma-separated matrix corpora")
k = flag.Int("k", 8, "top-k from matrix.search per query")
out = flag.String("out", "reports/reality-tests/multi_coord_stress_001.json", "output JSON path")
)
flag.Parse()
contracts, err := loadContracts(*contractsDir)
if err != nil {
log.Fatalf("load contracts: %v", err)
}
if len(contracts) < 3 {
log.Fatalf("need ≥3 contracts in %s, got %d", *contractsDir, len(contracts))
}
// First three contracts → coord assignments. Names are fixed so
// playbook corpora are stable across runs (rerun lands on same
// namespaces, exercising the persistence path indirectly).
coords := []Coordinator{
{Name: "alice", PlaybookCorpus: "playbook_alice"},
{Name: "bob", PlaybookCorpus: "playbook_bob"},
{Name: "carol", PlaybookCorpus: "playbook_carol"},
}
// Initial assignment: alice→alpha, bob→beta, carol→gamma.
assignments := map[string]*Contract{
"alice": &contracts[0],
"bob": &contracts[1],
"carol": &contracts[2],
}
corpora := strings.Split(*corporaCSV, ",")
hc := &http.Client{Timeout: 30 * time.Second}
ctx := context.Background()
_ = ctx
output := Output{
Coordinators: []string{"alice", "bob", "carol"},
Contracts: []string{contracts[0].Name, contracts[1].Name, contracts[2].Name},
GeneratedAt: time.Now().UTC(),
}
log.Printf("[stress] 3 coords, 3 contracts, k=%d, corpora=%v", *k, corpora)
// ── Phase 1: baseline ───────────────────────────────────────
// Each coord runs their own contract's role queries. Records
// playbook entries (top-1 of each as a synthetic "successful
// match" outcome) into their personal namespace.
log.Printf("[stress] phase 1: baseline")
for _, coord := range coords {
c := assignments[coord.Name]
for _, d := range c.Demand {
q := buildQuery(c, d, 1)
resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus))
ev := captureEvent("baseline", coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
// Record top-1 as a successful playbook entry for this coord.
if len(resp.Results) > 0 {
if err := playbookRecord(hc, *gateway, q, resp.Results[0].ID, resp.Results[0].Corpus, 1.0, coord.PlaybookCorpus); err != nil {
log.Printf(" record (%s/%s): %v", coord.Name, d.Role, err)
}
}
}
}
// ── Phase 2: surge ──────────────────────────────────────────
// Each coord's contract demand doubles. URGENT phrasing.
log.Printf("[stress] phase 2: surge (2x demand, urgent phrasing)")
for _, coord := range coords {
c := assignments[coord.Name]
for _, d := range c.Demand {
q := buildQuery(c, d, 2)
resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus))
ev := captureEvent("surge", coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
}
// ── Phase 3: merge — alpha + beta combined under alice ──────
log.Printf("[stress] phase 3: merge (alpha + beta combined, alice handles)")
mergedDemand := append(append([]Demand{}, contracts[0].Demand...), contracts[1].Demand...)
for _, d := range mergedDemand {
mergedC := &Contract{Name: contracts[0].Name + "+" + contracts[1].Name, Location: contracts[0].Location + " + " + contracts[1].Location, Shift: "shared"}
q := buildQuery(mergedC, d, 1)
resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus))
ev := captureEvent("merge", "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
// ── Phase 4: handover — bob takes alpha contract, USING ─────
// alice's playbook namespace. Tests whether Alice's recordings
// surface in Bob's results when Bob runs Alice's contract.
log.Printf("[stress] phase 4: handover (bob takes alpha, using alice's playbook)")
aliceRecordedAnswers := map[string]string{} // role → recorded answer id
for _, ev := range output.Events {
if ev.Phase == "baseline" && ev.Coordinator == "alice" && len(ev.TopK) > 0 {
aliceRecordedAnswers[ev.Role] = ev.TopK[0].ID
}
}
handoverHitsTop1 := 0
handoverHitsTopK := 0
handoverRun := 0
for _, d := range contracts[0].Demand {
q := buildQuery(&contracts[0], d, 1)
resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coords[0].PlaybookCorpus))
ev := captureEvent("handover", "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
handoverRun++
recordedID, ok := aliceRecordedAnswers[d.Role]
if !ok {
continue
}
if len(ev.TopK) > 0 && ev.TopK[0].ID == recordedID {
handoverHitsTop1++
handoverHitsTopK++
} else {
for _, r := range ev.TopK {
if r.ID == recordedID {
handoverHitsTopK++
break
}
}
}
}
output.Learning.HandoverQueriesRun = handoverRun
output.Learning.RecordedAnswersTop1Count = handoverHitsTop1
output.Learning.RecordedAnswersTopKCount = handoverHitsTopK
if handoverRun > 0 {
output.Learning.HandoverHitRate = float64(handoverHitsTop1) / float64(handoverRun)
}
// ── Phase 5: split — surge re-distributed across 3 coords ──
log.Printf("[stress] phase 5: split (alpha surge spread across all 3 coords)")
for i, d := range contracts[0].Demand {
coord := coords[i%len(coords)]
c := &contracts[0]
q := buildQuery(c, d, 2)
resp := must(matrixSearch(hc, *gateway, q, corpora, *k, true, coord.PlaybookCorpus))
ev := captureEvent("split", coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
// ── Phase 6: non-determinism check ─────────────────────────
// Reissue each baseline query once and compare top-K Jaccard.
log.Printf("[stress] phase 6: non-determinism (reissue baselines, measure Jaccard)")
jaccards := []float64{}
for _, ev := range output.Events {
if ev.Phase != "baseline" {
continue
}
resp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, "")) // playbook OFF for reissue to isolate retrieval stability
reissue := captureEvent("reissue", ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp)
output.Events = append(output.Events, reissue)
// Compare against ev.TopK (also playbook-on baseline). Note:
// this conflates retrieval stability with playbook stability.
// We capture both ev (playbook on) and a fresh retrieval (off);
// real determinism = retrieval-only top-K comparison.
freshRetrievalResp := must(matrixSearch(hc, *gateway, ev.Query, corpora, *k, false, ""))
freshRetrievalEv := captureEvent("reissue-retrieval-only", ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp)
j := jaccardTopK(reissue.TopK, freshRetrievalEv.TopK)
jaccards = append(jaccards, j)
}
output.Determinism.NumReissuedPairs = len(jaccards)
output.Determinism.MeanJaccard = mean(jaccards)
// ── Phase 7: diversity analysis ─────────────────────────────
log.Printf("[stress] phase 7: diversity analysis")
output.Diversity = computeDiversity(output.Events)
// ── write ───────────────────────────────────────────────────
if err := os.MkdirAll(filepath.Dir(*out), 0o755); err != nil {
log.Fatalf("mkdir: %v", err)
}
bs, _ := json.MarshalIndent(output, "", " ")
if err := os.WriteFile(*out, bs, 0o644); err != nil {
log.Fatalf("write %s: %v", *out, err)
}
log.Printf("[stress] DONE — events=%d", len(output.Events))
log.Printf("[stress] diversity: same-role-across-contracts mean Jaccard = %.3f (n=%d)",
output.Diversity.SameRoleAcrossContractsMeanJaccard, output.Diversity.NumPairsSameRoleAcrossContracts)
log.Printf("[stress] different-roles-same-contract mean Jaccard = %.3f (n=%d)",
output.Diversity.DifferentRolesSameContractMeanJaccard, output.Diversity.NumPairsDifferentRolesSameContract)
log.Printf("[stress] determinism: mean Jaccard on reissue = %.3f (n=%d)",
output.Determinism.MeanJaccard, output.Determinism.NumReissuedPairs)
log.Printf("[stress] learning: handover hit rate (top-1) = %d/%d = %.0f%%",
output.Learning.RecordedAnswersTop1Count, output.Learning.HandoverQueriesRun,
output.Learning.HandoverHitRate*100)
log.Printf("[stress] results → %s", *out)
}
// ── helpers ──────────────────────────────────────────────────────
func loadContracts(dir string) ([]Contract, error) {
files, err := filepath.Glob(filepath.Join(dir, "contract_*.json"))
if err != nil {
return nil, err
}
if len(files) == 0 {
return nil, fmt.Errorf("no contract_*.json files in %s", dir)
}
var out []Contract
for _, f := range files {
bs, err := os.ReadFile(f)
if err != nil {
return nil, err
}
var c Contract
if err := json.Unmarshal(bs, &c); err != nil {
return nil, fmt.Errorf("%s: %w", f, err)
}
out = append(out, c)
}
return out, nil
}
func buildQuery(c *Contract, d Demand, surge int) string {
var b strings.Builder
if surge > 1 {
b.WriteString(fmt.Sprintf("URGENT: need %d ", d.Count*surge))
} else {
b.WriteString(fmt.Sprintf("Need %d ", d.Count))
}
b.WriteString(d.Role)
if c.Location != "" {
b.WriteString(" for ")
b.WriteString(c.Location)
}
if c.Shift != "" {
b.WriteString(", ")
b.WriteString(c.Shift)
b.WriteString(" shift")
}
if len(d.Certs) > 0 {
b.WriteString(", certifications: ")
b.WriteString(strings.Join(d.Certs, ", "))
}
if len(d.Skills) > 0 {
b.WriteString(", skills: ")
b.WriteString(strings.Join(d.Skills, ", "))
}
return b.String()
}
func captureEvent(phase, coord, contract, role, query string, surge int, usePlaybook bool, pbCorpus string, resp *matrixResp) Event {
topK := make([]ResultRef, 0, len(resp.Results))
for i, r := range resp.Results {
topK = append(topK, ResultRef{Rank: i, ID: r.ID, Corpus: r.Corpus, Distance: r.Distance})
}
return Event{
Phase: phase,
Coordinator: coord,
Contract: contract,
Role: role,
Query: query,
SurgeMultiplier: surge,
UsePlaybook: usePlaybook,
PlaybookCorpus: pbCorpus,
TopK: topK,
PerCorpusCounts: resp.PerCorpusCounts,
PlaybookBoosted: resp.PlaybookBoosted,
PlaybookInjected: resp.PlaybookInjected,
TimestampUnixNano: time.Now().UnixNano(),
}
}
func computeDiversity(events []Event) Diversity {
// Filter to baseline events for clean apples-to-apples.
type key struct{ contract, role string }
byKey := map[key][]string{}
for _, ev := range events {
if ev.Phase != "baseline" {
continue
}
k := key{ev.Contract, ev.Role}
ids := make([]string, len(ev.TopK))
for i, r := range ev.TopK {
ids[i] = r.ID
}
byKey[k] = ids
}
// Same role across contracts: same `role`, different `contract`.
rolesSeen := map[string][][]string{}
contractsSeen := map[string][]struct {
role string
ids []string
}{}
for k, ids := range byKey {
rolesSeen[k.role] = append(rolesSeen[k.role], ids)
contractsSeen[k.contract] = append(contractsSeen[k.contract], struct {
role string
ids []string
}{k.role, ids})
}
var (
sameRoleJacc []float64
diffRolesJacc []float64
)
// Same-role-across-contracts: each role's idsSet pair-wise.
for _, idsList := range rolesSeen {
for i := 0; i < len(idsList); i++ {
for j := i + 1; j < len(idsList); j++ {
sameRoleJacc = append(sameRoleJacc, jaccardStrings(idsList[i], idsList[j]))
}
}
}
// Different-roles-same-contract.
for _, items := range contractsSeen {
for i := 0; i < len(items); i++ {
for j := i + 1; j < len(items); j++ {
if items[i].role == items[j].role {
continue
}
diffRolesJacc = append(diffRolesJacc, jaccardStrings(items[i].ids, items[j].ids))
}
}
}
return Diversity{
SameRoleAcrossContractsMeanJaccard: mean(sameRoleJacc),
DifferentRolesSameContractMeanJaccard: mean(diffRolesJacc),
NumPairsSameRoleAcrossContracts: len(sameRoleJacc),
NumPairsDifferentRolesSameContract: len(diffRolesJacc),
}
}
func jaccardTopK(a, b []ResultRef) float64 {
aIDs := make([]string, len(a))
bIDs := make([]string, len(b))
for i, r := range a {
aIDs[i] = r.ID
}
for i, r := range b {
bIDs[i] = r.ID
}
return jaccardStrings(aIDs, bIDs)
}
func jaccardStrings(a, b []string) float64 {
if len(a) == 0 && len(b) == 0 {
return 1.0
}
setA := map[string]bool{}
for _, x := range a {
setA[x] = true
}
intersect := 0
for _, x := range b {
if setA[x] {
intersect++
}
}
union := len(setA)
for _, x := range b {
if !setA[x] {
union++
}
}
if union == 0 {
return 0
}
return float64(intersect) / float64(union)
}
func mean(xs []float64) float64 {
if len(xs) == 0 {
return 0
}
s := 0.0
for _, x := range xs {
s += x
}
return s / float64(len(xs))
}
// ── HTTP helpers ─────────────────────────────────────────────────
func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, usePlaybook bool, playbookCorpus string) (*matrixResp, error) {
body, _ := json.Marshal(matrixSearchReq{
QueryText: query,
Corpora: corpora,
K: k,
UsePlaybook: usePlaybook,
PlaybookCorpus: playbookCorpus,
})
req, _ := http.NewRequest("POST", gw+"/v1/matrix/search", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("matrix.search %d: %s", resp.StatusCode, string(rb))
}
var out matrixResp
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
return &out, nil
}
func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, score float64, corpus string) error {
body, _ := json.Marshal(map[string]any{
"query_text": query,
"answer_id": answerID,
"answer_corpus": answerCorpus,
"score": score,
"tags": []string{"multi-coord-stress"},
"corpus": corpus,
})
req, _ := http.NewRequest("POST", gw+"/v1/matrix/playbooks/record", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
rb, _ := io.ReadAll(resp.Body)
return fmt.Errorf("playbook record %d: %s", resp.StatusCode, string(rb))
}
return nil
}
func must[T any](v T, err error) T {
if err != nil {
log.Fatalf("[stress] %v", err)
}
return v
}