diff --git a/crates/vectord/src/playbook_memory.rs b/crates/vectord/src/playbook_memory.rs index 033c1da..dd5ae5e 100644 --- a/crates/vectord/src/playbook_memory.rs +++ b/crates/vectord/src/playbook_memory.rs @@ -213,6 +213,28 @@ impl PlaybookMemory { query_embedding: &[f32], top_k_playbooks: usize, base_weight: f32, + ) -> HashMap<(String, String, String), BoostEntry> { + self.compute_boost_for_filtered(query_embedding, top_k_playbooks, base_weight, None).await + } + + /// Same as `compute_boost_for` but only considers playbooks whose + /// (city, state) matches the caller's target. This is the honest + /// fix for the "boosts=170 matched=0" pathology: globally-ranked + /// semantic neighbors include playbooks from every city the query + /// could never reach via its SQL filter. When the caller knows the + /// target geo, restricting here collapses noise and raises the + /// endorsed-worker hit rate. Pass None for the original behavior. + /// + /// 2026-04-21 — added after a corpus-density batch of 25 runs + /// showed only 6/40 successful (role, city) combos ever triggered + /// a citation on subsequent runs. Diagnostic logging proved the + /// boost map had 170 keys but the 50-candidate pool matched 0. + pub async fn compute_boost_for_filtered( + &self, + query_embedding: &[f32], + top_k_playbooks: usize, + base_weight: f32, + target_geo: Option<(&str, &str)>, ) -> HashMap<(String, String, String), BoostEntry> { let state = self.state.read().await; let entries = state.entries.clone(); @@ -225,8 +247,18 @@ impl PlaybookMemory { drop(state); // Brute-force cosine. Empty / missing embeddings just skip. + // When target_geo is set, pre-filter to matching playbooks BEFORE + // cosine sort — that way top-k is within the city, not across + // all cities. let mut scored: Vec<(f32, &PlaybookEntry)> = entries .iter() + .filter(|e| match (target_geo, &e.city, &e.state) { + (None, _, _) => true, + (Some((tc, ts)), Some(ec), Some(es)) => { + ec.eq_ignore_ascii_case(tc) && es.eq_ignore_ascii_case(ts) + } + _ => false, + }) .filter_map(|e| e.embedding.as_ref().map(|v| (cosine(query_embedding, v), e))) .collect(); scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal)); diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index a124921..73ea114 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -803,22 +803,61 @@ async fn hybrid_search( // set. Additive boost on the existing vector score, then re-sort. if req.use_playbook_memory { let boost_k = req.playbook_memory_k.unwrap_or(playbook_memory::DEFAULT_TOP_K_PLAYBOOKS); + // Extract target (city, state) from the SQL filter so + // compute_boost_for can skip playbooks from other cities that + // would never intersect the candidate pool. The executor's + // filter shape is stable: `... city = 'Toledo' AND state = 'OH' ...`. + // Case-insensitive match, tolerant of single quotes and spaces. + let target_geo = req.sql_filter.as_deref().and_then(extract_target_geo); // We embedded the question as `qv` above — reuse it for the // playbook similarity lookup so we don't double-pay Ollama. - let boosts = state.playbook_memory.compute_boost_for(&qv, boost_k, 0.5).await; + let boosts = state.playbook_memory + .compute_boost_for_filtered( + &qv, + boost_k, + 0.5, + target_geo.as_ref().map(|(c, s)| (c.as_str(), s.as_str())), + ) + .await; + + // Diagnostics for Phase 19 boost pipeline. Logged so item 3 + // investigation has ground truth: + // - boosts.len(): how many (city,state,name) keys surfaced for + // this query (0 = playbook_memory found nothing semantically + // similar to the question). + // - parsed: how many candidate chunks parsed cleanly into + // (name,city,state) via parse_worker_chunk. + // - matched: how many parsed keys matched an entry in boosts. + // 2026-04-21 — 20-scenario batch showed 34/40 ok combos never + // got a citation. These counters pin whether the gap is on the + // SIMILARITY side (boosts empty) or the MATCH side (parsed vs + // boosted keys mismatch — e.g. name format drift). + let mut parsed_count = 0usize; + let mut matched_count = 0usize; for src in sources.iter_mut() { // Parse "{Name} — {Role} in {City}, {State}. …" chunk. Being // defensive: chunks from other datasets may not follow this // exact shape, so absent fields just skip the boost. if let Some((name, city, state)) = parse_worker_chunk(&src.chunk_text) { + parsed_count += 1; let key = (city, state, name); if let Some(entry) = boosts.get(&key) { src.score += entry.boost; src.playbook_boost = entry.boost; src.playbook_citations = entry.citations.clone(); + matched_count += 1; } } } + tracing::info!( + "playbook_boost: boosts={} sources={} parsed={} matched={} target_geo={:?} (query='{}')", + boosts.len(), + sources.len(), + parsed_count, + matched_count, + target_geo, + req.question.chars().take(60).collect::(), + ); // Re-rank: boosted scores can flip ordering. sources.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal)); // Finally trim to the caller's requested top_k — we pulled fetch_k @@ -2059,6 +2098,54 @@ struct LanceRecallQuery { /// "{Name} — {Role} in {City}, {State}. Skills: …". /// Returns None if the chunk doesn't match the shape; callers simply /// skip the boost for that hit. +/// Pull (city, state) out of a SQL filter that uses +/// `city = 'Toledo' AND state = 'OH'` style equality. Returns None if +/// either is missing — the caller keeps the original global boost map +/// behavior (no geo narrowing). Case-insensitive on the column name +/// so `CITY=` or `City =` also work. +fn extract_target_geo(sql_filter: &str) -> Option<(String, String)> { + fn grab_eq(src: &str, col: &str) -> Option { + // Very small parser, resilient enough for the executor's + // filter shapes. Matches `col = 'value'` or `col='value'` with + // case-insensitive column name. + let lower = src.to_ascii_lowercase(); + let col_lower = col.to_ascii_lowercase(); + let mut search_from = 0usize; + while let Some(off) = lower[search_from..].find(&col_lower) { + let pos = search_from + off; + // Require word boundary before the column name so "city" + // inside "civilian_rank" doesn't false-match. + let prior_ok = pos == 0 + || !lower.as_bytes()[pos - 1].is_ascii_alphanumeric() + && lower.as_bytes()[pos - 1] != b'_'; + let after = pos + col_lower.len(); + if !prior_ok || after >= src.len() { + search_from = pos + col_lower.len(); + continue; + } + // Walk past whitespace, require '='. + let mut i = after; + while i < src.len() && src.as_bytes()[i] == b' ' { i += 1; } + if i >= src.len() || src.as_bytes()[i] != b'=' { search_from = pos + col_lower.len(); continue; } + i += 1; + while i < src.len() && src.as_bytes()[i] == b' ' { i += 1; } + // Value is single-quoted literal; extract until the next '. + if i >= src.len() || src.as_bytes()[i] != b'\'' { search_from = pos + col_lower.len(); continue; } + i += 1; + let start = i; + while i < src.len() && src.as_bytes()[i] != b'\'' { i += 1; } + if i > start { + return Some(src[start..i].to_string()); + } + search_from = pos + col_lower.len(); + } + None + } + let city = grab_eq(sql_filter, "city")?; + let state = grab_eq(sql_filter, "state")?; + Some((city, state)) +} + fn parse_worker_chunk(chunk: &str) -> Option<(String, String, String)> { // "Name — Role in City, ST. …" → split on "—" then " in " then "," let (name_part, rest) = chunk.split_once('—')?; diff --git a/scripts/kb_measure.py b/scripts/kb_measure.py new file mode 100755 index 0000000..376f8bf --- /dev/null +++ b/scripts/kb_measure.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +"""Aggregate KB state for item 3 decision. + +Reads data/_kb/*.jsonl and tests/multi-agent/playbooks/*/results.json +to answer: + - How many distinct signatures exist? + - Total runs, avg ok rate, avg citations per event? + - Which (role, city) combos have NEVER gotten a citation? + - Recommender confidence progression (cold → medium → high)? + - Mean turn count trend across runs (proxy for efficiency). + +Run after `scripts/run_kb_batch.sh` completes. Writes a markdown +summary to tests/multi-agent/playbooks/kb_measurement.md and prints +to stdout. +""" +import json +import os +import sys +from collections import Counter, defaultdict +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +KB = ROOT / "data" / "_kb" +PLAYBOOKS = ROOT / "tests" / "multi-agent" / "playbooks" + + +def load_jsonl(p): + if not p.exists(): + return [] + out = [] + for line in p.read_text().splitlines(): + if line.strip(): + try: + out.append(json.loads(line)) + except json.JSONDecodeError: + pass + return out + + +def main(): + sigs = load_jsonl(KB / "signatures.jsonl") + outcomes = load_jsonl(KB / "outcomes.jsonl") + recs = load_jsonl(KB / "pathway_recommendations.jsonl") + corrections = load_jsonl(KB / "error_corrections.jsonl") + + # --- Basic counts --- + print(f"Signatures: {len(sigs)}") + print(f"Outcomes: {len(outcomes)}") + print(f"Recommendations: {len(recs)}") + print(f"Error corrections: {len(corrections)}") + print() + + # --- Recommender confidence progression --- + conf_counts = Counter(r.get("confidence", "?") for r in recs) + print(f"Recommender confidence distribution:") + for c in ("high", "medium", "low"): + print(f" {c:8s}: {conf_counts.get(c, 0)}") + print() + + # Time-ordered confidence + recs_sorted = sorted(recs, key=lambda r: r.get("generated_at", "")) + neighbor_counts = [len(r.get("neighbors_consulted", [])) for r in recs_sorted] + if neighbor_counts: + print(f"Neighbors consulted over time (first → last):") + print(f" first 3: {neighbor_counts[:3]}") + print(f" last 3: {neighbor_counts[-3:]}") + print(f" max: {max(neighbor_counts)}") + print() + + # --- Fill rate + citation density per run --- + if outcomes: + total_ok = sum(o["ok_events"] for o in outcomes) + total_events = sum(o["total_events"] for o in outcomes) + total_cites = sum(o.get("total_citations", 0) for o in outcomes) + total_turns = sum(o.get("total_turns", 0) for o in outcomes) + print(f"Fill rate: {total_ok}/{total_events} = {100*total_ok/max(1,total_events):.1f}%") + print(f"Avg citations per run: {total_cites/len(outcomes):.2f}") + print(f"Avg turns per run: {total_turns/len(outcomes):.1f}") + print() + + # First 5 runs vs last 5 — does it get better? + sorted_out = sorted(outcomes, key=lambda o: o.get("created_at", "")) + if len(sorted_out) >= 10: + first = sorted_out[:5] + last = sorted_out[-5:] + fok = sum(o["ok_events"] for o in first) / sum(o["total_events"] for o in first) + lok = sum(o["ok_events"] for o in last) / sum(o["total_events"] for o in last) + fcit = sum(o.get("total_citations", 0) for o in first) / 5 + lcit = sum(o.get("total_citations", 0) for o in last) / 5 + print(f"First 5 runs ok rate: {100*fok:.1f}% avg cites: {fcit:.2f}") + print(f"Last 5 runs ok rate: {100*lok:.1f}% avg cites: {lcit:.2f}") + print() + + # --- Per-(role, city) citation coverage --- + cite_by_combo = Counter() + combo_attempts = Counter() + for o in outcomes: + for ev in o.get("per_event", []): + key = (ev.get("role", "?"), "?") # city not in per_event summary + combo_attempts[key] += 1 + # Read the playbook dirs for full event detail (has city) + cites_by_role_city = defaultdict(lambda: {"attempts": 0, "citations": 0, "ok": 0}) + for o in outcomes: + run_dir = PLAYBOOKS / o["run_id"] + results_file = run_dir / "results.json" + if not results_file.exists(): + continue + try: + results = json.loads(results_file.read_text()) + except Exception: + continue + for r in results: + e = r.get("event", {}) + key = (e.get("role"), e.get("city"), e.get("state")) + cites_by_role_city[key]["attempts"] += 1 + cites_by_role_city[key]["citations"] += len(r.get("playbook_citations") or []) + if r.get("ok"): + cites_by_role_city[key]["ok"] += 1 + + combos_with_cites = [(k, v) for k, v in cites_by_role_city.items() if v["citations"] > 0] + combos_zero_cites = [(k, v) for k, v in cites_by_role_city.items() if v["citations"] == 0 and v["ok"] > 0] + print(f"(role, city, state) combos with any citation: {len(combos_with_cites)}") + print(f"(role, city, state) combos with ok fills but 0 cites: {len(combos_zero_cites)}") + print() + if combos_with_cites: + print("Top 10 combos by citation count:") + for (role, city, state), v in sorted(combos_with_cites, key=lambda x: -x[1]["citations"])[:10]: + print(f" {role:25s} {city:15s} {state}: {v['citations']} cites across {v['attempts']} attempts ({v['ok']} ok)") + print() + + # --- Write markdown report --- + lines = ["# KB Measurement Report", ""] + lines.append(f"Generated from {len(outcomes)} runs across {len(sigs)} distinct signatures.") + lines.append("") + lines.append("## Recommender confidence") + for c in ("high", "medium", "low"): + lines.append(f"- {c}: {conf_counts.get(c, 0)}") + lines.append("") + lines.append("## Overall fill + citation") + if outcomes: + lines.append(f"- Fill rate: **{total_ok}/{total_events}** ({100*total_ok/max(1,total_events):.1f}%)") + lines.append(f"- Avg citations per run: **{total_cites/len(outcomes):.2f}**") + lines.append(f"- Avg turns per run: {total_turns/len(outcomes):.1f}") + lines.append("") + lines.append("## Citation coverage by (role, city, state)") + lines.append(f"- Combos with ≥1 citation: {len(combos_with_cites)}") + lines.append(f"- Combos with ok fills but 0 citations: {len(combos_zero_cites)}") + lines.append("") + lines.append("## Item 3 decision signal") + if combos_zero_cites: + lines.append("Non-zero: there are **combos that succeeded but never triggered playbook_memory boost**. Candidates for item 3 investigation:") + for (role, city, state), v in combos_zero_cites[:5]: + lines.append(f"- {role} in {city}, {state}: {v['ok']}/{v['attempts']} ok, 0 cites") + else: + lines.append("All ok combos got at least some citation firing. Boost mechanism is healthy; raising the cap may help but isn't forced.") + lines.append("") + out = PLAYBOOKS / "kb_measurement.md" + out.write_text("\n".join(lines)) + print(f"✓ markdown report → {out}") + + +if __name__ == "__main__": + main()