multi_coord_stress: thread role through matrix retrieve + playbook record

Real wire-up gap discovered post-scrum: Demand.Role was already
extracted at every call site in multi_coord_stress (44 occurrences,
both contract-driven and LLM-parsed inbox-triggered paths), but
neither matrixSearch nor playbookRecord accepted role in their
signatures. Cross-role gate (real_001..real_004 work) was bypassed
for the entire multi-coord harness — recordings and queries went
through with empty role, gate fell back to lenient behavior.

Fix:
- matrixSearchReq gains query_role field
- matrixSearch signature: (..., query, role string, ...)
- tracedSearch wrapper gains role param + emits it in span input
  metadata for Langfuse visibility
- playbookRecord signature: (..., query, role, ...) — body emits
  role only when non-empty (preserves backward compat at API)
- 14 call sites updated:
    contract-driven Demand loops → d.Role
    LLM-parsed inbox path → parsed.Role (qwen2.5 already extracts it)
    swap path (warehouseDemand) → warehouseDemand.Role
    reissue path → ev.Role (captured at original event time)
    fresh-verify (resume snippet, no role concept) → ""

Build clean, vet clean, all tests pass. Cross-role gate now fires
end-to-end across the multi-coord harness — matches the playbook_lift
harness's coverage from the original real_001 fix.

This closes the symmetric gap to scripts/playbook_lift's existing
wire-through. Both production-shape harnesses now exercise the role
gate; future reality tests automatically inherit the protection.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-30 23:10:49 -05:00
parent cca32344f3
commit 356d76b4b0
2 changed files with 42 additions and 22 deletions

View File

@ -267,6 +267,7 @@ The list is intentionally short. Items move to closed when the work demands them
| (fix) | LLM-based role extractor (real_004): `roleExtractor` struct with regex-first → qwen2.5 format=json fallback → per-process cache. Opt-in via `-llm-role-extract` flag + `LLM_ROLE_EXTRACT=1` env. Off-by-default preserves real_003b shipping config. 8 new tests including `TestRoleExtractor_ClosesCrossRoleShorthandBleed` — the load-bearing witness pairing with the matrix-side `TestInjectPlaybookMisses_RoleGateRejectsCrossRole` to prove the extraction-layer + gate-layer compose correctly on the exact real_003 failure mode. Findings: `reports/reality-tests/real_004_findings.md`. |
| (scrum) | 3-lineage scrum review on `7f2f112..0331288` (Opus + Kimi + Qwen3-coder via `scripts/scrum_review.sh`). Convergent finding (3/3): `roleNormalize` plural-stripper mangled non-plural-s tokens (Sales → Sale, Logistics → Logistic). **Fixed**: `nonPluralSWords` allowlist + `-ss` ending check + `strings.ToLower`/`TrimSpace` cleanup. New tests `TestRoleNormalize_NonPluralS` + `TestRoleEqual_NonPluralS` lock the edge cases. Kimi 2 BLOCKs were false positives (model-truncation artifacts per `feedback_cross_lineage_review.md`). Disposition: `reports/scrum/_evidence/2026-04-30/verdicts/role_gate_v1_disposition.md` (local). |
| (probe) | Negation reality test real_005: 5 explicit-negation queries ("NOT in Detroit", "excluding Cornerstone roster", etc.). Confirmed substrate has **zero negation handling** — cosine treats "NOT X" as "X" + noise. Judge IS the safety net (Q1/Q3/Q4 rated all top-10 results 1-2/5 — operator-visible honesty signal). **No code change needed**: production UI should handle exclusion via `ExcludeIDs` (already supported, added in multi-coord stress 200-worker swap), not via NL-negation. Findings: `reports/reality-tests/real_005_findings.md`. |
| (wire-up) | Multi-coord stress role wire-through: `Demand.Role` was already extracted at every call site (44 occurrences) but never threaded into matrix retrieve or playbook record. Cross-role gate was bypassed for the entire multi-coord harness. **Fixed** by extending `tracedSearch`, `matrixSearch`, and `playbookRecord` signatures with `role string` and updating all 14 call sites — passing `d.Role` (demand loops), `parsed.Role` (LLM-parsed inbox path), `warehouseDemand.Role` (swap path), `ev.Role` (reissue path), `""` (fresh-verify resume snippet — no clean role). Build + vet + tests green; multi-coord stress now honors role gate end-to-end. |
Plus on Rust side (`8de94eb`, `3d06868`): qwen2.5 → qwen3.5:latest backport in active defaults; distillation acceptance reports regenerated (run_hash refresh, reproducibility property still holds).

View File

@ -68,6 +68,7 @@ type Coordinator struct {
type matrixSearchReq struct {
QueryText string `json:"query_text"`
QueryRole string `json:"query_role,omitempty"`
Corpora []string `json:"corpora"`
K int `json:"k"`
UsePlaybook bool `json:"use_playbook,omitempty"`
@ -293,12 +294,16 @@ func main() {
// excludes) and outputs (top-K ids, top-1 distance, boost/inject
// counts). Caller still must() if they want the fail-fast behavior;
// errors here are emitted as ERROR spans + propagated.
tracedSearch := func(spanName, query string, searchCorpora []string, usePlaybook bool, pbCorpus string, excludeIDs ...string) *matrixResp {
// role parameter threads through the cross-role gate (real_001+
// fix). Empty role disables the gate — preserves current
// behavior for the swap/reissue/fresh-verify sites where there's
// no clean role concept.
tracedSearch := func(spanName, query, role string, searchCorpora []string, usePlaybook bool, pbCorpus string, excludeIDs ...string) *matrixResp {
start := time.Now()
resp, err := matrixSearch(hc, *gateway, query, searchCorpora, *k, usePlaybook, pbCorpus, excludeIDs...)
resp, err := matrixSearch(hc, *gateway, query, role, searchCorpora, *k, usePlaybook, pbCorpus, excludeIDs...)
if err != nil {
emitSpan(spanName, start,
map[string]any{"query": query, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)},
map[string]any{"query": query, "role": role, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)},
map[string]any{"error": err.Error()}, "ERROR")
log.Fatalf("[stress] %v", err)
}
@ -307,7 +312,7 @@ func main() {
topIDs = append(topIDs, r.ID)
}
emitSpan(spanName, start,
map[string]any{"query": query, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)},
map[string]any{"query": query, "role": role, "corpora": searchCorpora, "k": *k, "use_playbook": usePlaybook, "playbook_corpus": pbCorpus, "exclude_n": len(excludeIDs)},
map[string]any{"top_k_ids": topIDs, "top1_distance": firstDistance(resp.Results), "playbook_boosted": resp.PlaybookBoosted, "playbook_injected": resp.PlaybookInjected}, "")
return resp
}
@ -330,12 +335,12 @@ func main() {
c := assignments[coord.Name]
for _, d := range c.Demand {
q := buildQuery(c, d, 1)
resp := tracedSearch("matrix.search.baseline", q, corpora, true, coord.PlaybookCorpus)
resp := tracedSearch("matrix.search.baseline", q, d.Role, corpora, true, coord.PlaybookCorpus)
ev := captureEvent("baseline", 0, coord.Name, c.Name, d.Role, q, 1, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
// Record top-1 as a successful playbook entry for this coord.
if len(resp.Results) > 0 {
if err := playbookRecord(hc, *gateway, q, resp.Results[0].ID, resp.Results[0].Corpus, 1.0, coord.PlaybookCorpus); err != nil {
if err := playbookRecord(hc, *gateway, q, d.Role, resp.Results[0].ID, resp.Results[0].Corpus, 1.0, coord.PlaybookCorpus); err != nil {
log.Printf(" record (%s/%s): %v", coord.Name, d.Role, err)
}
}
@ -397,7 +402,9 @@ func main() {
verifyCorpora := append([]string{}, corpora...)
verifyCorpora = append(verifyCorpora, freshIdx)
for _, fw := range freshWorkers {
resp := tracedSearch("matrix.search.fresh_verify", fw.Verify, verifyCorpora, false, "")
// fresh_verify uses a fresh-worker resume excerpt as the query
// — no role concept (it's an arbitrary substring), so pass "".
resp := tracedSearch("matrix.search.fresh_verify", fw.Verify, "", verifyCorpora, false, "")
ev := captureEvent("new-resume-verify", 6, "system", "fresh-resume-pool", "fresh", fw.Verify, 1, false, "", resp)
// Find the fresh worker's rank in top-K (rank 0 = top-1).
freshRank := -1
@ -511,7 +518,11 @@ func main() {
query := parsed.AsQuery()
coord := coordByName(coords, ie.Coord)
searchStart := time.Now()
resp, err := matrixSearch(hc, *gateway, query, corpora, *k, true, coord.PlaybookCorpus)
// parsed.Role comes from the LLM-parsed inbox demand (qwen2.5
// format=json) — same role-extraction shape as gen_real_queries
// emits, but on free-form inbox bodies. Threading it through
// closes the cross-role gate for the inbox-triggered path.
resp, err := matrixSearch(hc, *gateway, query, parsed.Role, corpora, *k, true, coord.PlaybookCorpus)
if err != nil {
emitSpan("matrix.search.inbox", searchStart,
map[string]any{"query": query, "corpora": corpora, "k": *k},
@ -555,7 +566,7 @@ func main() {
c := assignments[coord.Name]
for _, d := range c.Demand {
q := buildQuery(c, d, 2)
resp := tracedSearch("matrix.search.surge", q, corpora, true, coord.PlaybookCorpus)
resp := tracedSearch("matrix.search.surge", q, d.Role, corpora, true, coord.PlaybookCorpus)
ev := captureEvent("surge", 12, coord.Name, c.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
@ -571,7 +582,7 @@ func main() {
startPhase("phase.swap_200_workers", 18, nil)
warehouseDemand := contracts[0].Demand[0] // slot 0 is warehouse worker by contract design
swapQuery := buildQuery(&contracts[0], warehouseDemand, 1)
origResp := tracedSearch("matrix.search.swap_orig", swapQuery, corpora, false, "")
origResp := tracedSearch("matrix.search.swap_orig", swapQuery, warehouseDemand.Role, corpora, false, "")
placedIDs := make([]string, 0, len(origResp.Results))
for _, r := range origResp.Results {
placedIDs = append(placedIDs, r.ID)
@ -580,7 +591,7 @@ func main() {
origEv.Note = fmt.Sprintf("captured %d originally-placed worker IDs", len(placedIDs))
output.Events = append(output.Events, origEv)
swapResp := tracedSearch("matrix.search.swap_replace", swapQuery, corpora, false, "", placedIDs...)
swapResp := tracedSearch("matrix.search.swap_replace", swapQuery, warehouseDemand.Role, corpora, false, "", placedIDs...)
swapEv := captureEvent("swap-replace", 18, "alice", contracts[0].Name, warehouseDemand.Role, swapQuery, 1, false, "", swapResp)
swapEv.ExcludeIDs = placedIDs
swapIDs := make([]string, 0, len(swapResp.Results))
@ -598,7 +609,7 @@ func main() {
for _, d := range mergedDemand {
mergedC := &Contract{Name: contracts[0].Name + "+" + contracts[1].Name, Location: contracts[0].Location + " + " + contracts[1].Location, Shift: "shared"}
q := buildQuery(mergedC, d, 1)
resp := tracedSearch("matrix.search.merge", q, corpora, true, coords[0].PlaybookCorpus)
resp := tracedSearch("matrix.search.merge", q, d.Role, corpora, true, coords[0].PlaybookCorpus)
ev := captureEvent("merge", 24, "alice", mergedC.Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
@ -619,7 +630,7 @@ func main() {
handoverRun := 0
for _, d := range contracts[0].Demand {
q := buildQuery(&contracts[0], d, 1)
resp := tracedSearch("matrix.search.handover_verbatim", q, corpora, true, coords[0].PlaybookCorpus)
resp := tracedSearch("matrix.search.handover_verbatim", q, d.Role, corpora, true, coords[0].PlaybookCorpus)
ev := captureEvent("handover", 30, "bob", contracts[0].Name, d.Role, q, 1, true, coords[0].PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
handoverRun++
@ -673,7 +684,7 @@ func main() {
emitSpan("llm.paraphrase", paraStart,
map[string]any{"original": origQuery, "model": *judgeModel},
map[string]any{"paraphrase": paraphrase}, "")
resp := tracedSearch("matrix.search.handover_paraphrase", paraphrase, corpora, true, coords[0].PlaybookCorpus)
resp := tracedSearch("matrix.search.handover_paraphrase", paraphrase, d.Role, corpora, true, coords[0].PlaybookCorpus)
ev := captureEvent("handover-paraphrase", 36, "bob", contracts[0].Name, d.Role, paraphrase, 1, true, coords[0].PlaybookCorpus, resp)
ev.Note = "paraphrase of: " + origQuery
output.Events = append(output.Events, ev)
@ -709,7 +720,7 @@ func main() {
coord := coords[i%len(coords)]
c := &contracts[0]
q := buildQuery(c, d, 2)
resp := tracedSearch("matrix.search.split", q, corpora, true, coord.PlaybookCorpus)
resp := tracedSearch("matrix.search.split", q, d.Role, corpora, true, coord.PlaybookCorpus)
ev := captureEvent("split", 42, coord.Name, c.Name+"-share-"+coord.Name, d.Role, q, 2, true, coord.PlaybookCorpus, resp)
output.Events = append(output.Events, ev)
}
@ -723,14 +734,14 @@ func main() {
if ev.Phase != "baseline" {
continue
}
resp := tracedSearch("matrix.search.reissue", ev.Query, corpora, false, "")
resp := tracedSearch("matrix.search.reissue", ev.Query, ev.Role, corpora, false, "")
reissue := captureEvent("reissue", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", resp)
output.Events = append(output.Events, reissue)
// Compare against ev.TopK (also playbook-on baseline). Note:
// this conflates retrieval stability with playbook stability.
// We capture both ev (playbook on) and a fresh retrieval (off);
// real determinism = retrieval-only top-K comparison.
freshRetrievalResp := tracedSearch("matrix.search.reissue_retrieval_only", ev.Query, corpora, false, "")
freshRetrievalResp := tracedSearch("matrix.search.reissue_retrieval_only", ev.Query, ev.Role, corpora, false, "")
freshRetrievalEv := captureEvent("reissue-retrieval-only", 48, ev.Coordinator, ev.Contract, ev.Role, ev.Query, 1, false, "", freshRetrievalResp)
j := jaccardTopK(reissue.TopK, freshRetrievalEv.TopK)
jaccards = append(jaccards, j)
@ -1013,9 +1024,10 @@ func mean(xs []float64) float64 {
// ── HTTP helpers ─────────────────────────────────────────────────
func matrixSearch(hc *http.Client, gw, query string, corpora []string, k int, usePlaybook bool, playbookCorpus string, excludeIDs ...string) (*matrixResp, error) {
func matrixSearch(hc *http.Client, gw, query, role string, corpora []string, k int, usePlaybook bool, playbookCorpus string, excludeIDs ...string) (*matrixResp, error) {
body, _ := json.Marshal(matrixSearchReq{
QueryText: query,
QueryRole: role,
Corpora: corpora,
K: k,
UsePlaybook: usePlaybook,
@ -1373,16 +1385,23 @@ func coordByName(coords []Coordinator, name string) Coordinator {
return coords[0]
}
func playbookRecord(hc *http.Client, gw, query, answerID, answerCorpus string, score float64, corpus string) error {
body, _ := json.Marshal(map[string]any{
func playbookRecord(hc *http.Client, gw, query, role, answerID, answerCorpus string, score float64, corpus string) error {
body := map[string]any{
"query_text": query,
"answer_id": answerID,
"answer_corpus": answerCorpus,
"score": score,
"tags": []string{"multi-coord-stress"},
"corpus": corpus,
})
req, _ := http.NewRequest("POST", gw+"/v1/matrix/playbooks/record", bytes.NewReader(body))
}
// Role-aware recording (real_001+ cross-role bleed fix). Empty
// role omits the field; matrix gate falls back to lenient
// behavior, preserving callers that don't supply role.
if role != "" {
body["role"] = role
}
bodyBytes, _ := json.Marshal(body)
req, _ := http.NewRequest("POST", gw+"/v1/matrix/playbooks/record", bytes.NewReader(bodyBytes))
req.Header.Set("Content-Type", "application/json")
resp, err := hc.Do(req)
if err != nil {