//! Pathway memory — full backtrack-able context for scrum/auditor reviews. //! //! Consensus-designed (10-probe N=3 ensemble, see //! `data/_kb/consensus_reducer_design_*.json`). The reducer emits a //! `PathwayTrace` sidecar alongside its legacy summary. Traces are //! fingerprinted narrowly (`task_class + file_prefix + signal_class`) for //! generalizing hot-swap, and embedded via normalized-metadata-token //! concatenation so the HNSW similarity search can discriminate between //! pathways that share a fingerprint but diverged in ladder/KB choices. //! //! The hot-swap decision requires four conditions in AND: //! 1. narrow fingerprint match //! 2. audit_consensus.pass == true //! 3. replay_count >= 3 //! 4. replays_succeeded / replay_count >= 0.80 //! 5. NOT retired //! 6. similarity(new, stored) >= 0.90 //! //! Any replay reports its outcome via `record_replay_outcome`; pathways //! whose success rate drops below 0.80 after >=3 replays are marked //! retired and excluded from further hot-swap consideration. This is the //! self-correcting learning loop — a pathway that worked once but breaks //! under distribution shift removes itself automatically. use std::collections::HashMap; use std::sync::Arc; use chrono::{DateTime, Utc}; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use storaged::ops; use tokio::sync::RwLock; const STATE_KEY: &str = "_pathway_memory/state.json"; /// Outcome of one ladder rung attempt. Captured for every attempt, /// regardless of whether it was accepted — rejections are signal too. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LadderAttempt { pub rung: u8, pub model: String, pub latency_ms: u64, pub accepted: bool, pub reject_reason: Option, } /// Provenance of a RAG chunk retrieved for this review. The /// `cosine_score` is the similarity as returned by the index; `rank` is /// 0-indexed order in the top-K result list. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct KbChunkRef { pub source_doc: String, pub chunk_id: String, pub cosine_score: f32, pub rank: u8, } /// Signal emitted by mcp-server/observer classifier. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct ObserverSignal { pub class: String, pub priors: Vec, pub prior_iter_outcomes: Vec, } /// Context7-bridge lookup snapshot. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct BridgeHit { pub library: String, pub version: String, } /// Call to LLM Team (/api/run?mode=extract) or auditor N=3 consensus. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct SubPipelineCall { pub pipeline: String, // "llm_team_extract" / "audit_consensus" / etc. pub result_summary: String, } /// N=3 independent consensus re-check result. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct AuditConsensus { pub pass: bool, pub models: Vec, pub disagreements: u32, } // ─── ADR-021: Semantic correctness layer ──────────────────────────── // // SemanticFlag names the CATEGORY of bug found. Scrum reviewer attaches // these to findings (via prompt instruction to tag); the matrix index // uses them for "same crate has seen N unit mismatches" preemption. // // Discipline: extend this enum only when a real bug is found that // doesn't fit an existing variant. Avoid the "add a vague variant just // in case" anti-pattern — it dilutes the grammar the index learns from. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(tag = "kind")] pub enum SemanticFlag { /// Operation combines values with different units (e.g. /// `row_count - file_count`, `bytes - rows`). Instance that motivated /// ADR-021: queryd/delta.rs base_rows = pre_filter_rows - delta_count. UnitMismatch, /// Same type, wrong role (e.g. treating a PK as a row index). TypeConfusion, /// Unwrap-without-check or nullable-treated-as-non-null paths. NullableConfusion, /// Off-by-one in loops / ranges / slice bounds. OffByOne, /// Reference to a deprecated / removed / moved symbol that the /// compiler hasn't flagged (trait method shadowing, feature flags). StaleReference, /// Pseudo-implementation: stub body, `todo!()`, or function named /// for work it doesn't actually do. Distinct from DeadCode — pseudo /// is CALLED but doesn't do its job. PseudoImpl, /// Unreachable or uncalled code that compiles but serves no purpose. DeadCode, /// Code compiles green but emits a warning the workspace baseline /// didn't have. The applier's new-warning gate already catches these /// at commit time; flagging at review time lets the matrix index /// surface "this file area tends to produce warning noise." WarningNoise, /// Operation crosses a layer/crate boundary it shouldn't (e.g. a /// hot-path function calling a cloud API, or a catalog op mutating /// storage directly). BoundaryViolation, } /// What schema/type context was surfaced to the reviewer when this /// pathway was produced. Empty = bootstrap path (reviewer got no /// type context); populated = we fed the model typed info to work with. /// Drift in this field over time is the feedback signal for "are we /// getting smarter at enriching prompts?" #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct TypeHint { /// Where the hint came from: "catalogd" | "arrow_schema" | /// "rust_struct" | "truth_rule" | "manual". pub source: String, /// The identifier being typed (field name, variable, column). pub symbol: String, /// The type as extracted (stringly-typed is fine — this is a /// retrieval key, not a compiler representation). pub type_repr: String, } /// Stable hash of a bug pattern. Used by the matrix index to retrieve /// "similar-shaped bugs" across files. The `pattern_key` is the field /// that's semantically load-bearing; `occurrences` is how many times /// this exact signature has appeared in this pathway's file history. /// `example` is one representative code snippet so the prompt can /// quote it back to future reviewers. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct BugFingerprint { pub flag: SemanticFlag, /// SHA256 of the structural pattern (e.g. for UnitMismatch: /// `"row_count-file_count"` → its hash). Stable across minor /// token-level variation so the same bug shape clusters. pub pattern_key: String, pub example: String, pub occurrences: u32, } /// Full backtrack-able context for one reviewed file. Lives alongside /// the reducer's summary — summary is what the reviewer LLM sees, this /// is what the auditor / future iterations / hot-swap use. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct PathwayTrace { pub pathway_id: String, // SHA256(task_class|file_prefix|signal_class) pub task_class: String, pub file_path: String, pub signal_class: Option, pub created_at: DateTime, pub ladder_attempts: Vec, pub kb_chunks: Vec, pub observer_signals: Vec, pub bridge_hits: Vec, pub sub_pipeline_calls: Vec, pub audit_consensus: Option, pub reducer_summary: String, pub final_verdict: String, /// Normalized-metadata-token embedding. Dimension fixed per index /// version (current: 32, sufficient to distinguish task/file/signal /// combinations without requiring an external embedding model — /// round-3 consensus said "small metadata tokens", not "full JSON"). pub pathway_vec: Vec, /// Number of times this pathway has been replayed via hot-swap. /// Replay only begins after first insert; initial insert itself is /// NOT a replay. Probation of ≥3 replays is required before the /// success-rate gate can fire. pub replay_count: u32, pub replays_succeeded: u32, /// ADR-021 semantic-correctness layer. Populated by scrum reviewer /// via explicit prompt-level tagging of findings. Empty on existing /// traces (pre-ADR-021 inserts); additive field so back-compat /// deserialization works via serde default. #[serde(default)] pub semantic_flags: Vec, /// Schema/type context fed to the reviewer during this pathway's /// review. Starts empty (bootstrap); fills as we wire catalogd + /// arrow_schema + truth_rule enrichment into the prompt pipeline. #[serde(default)] pub type_hints_used: Vec, /// Bug patterns caught on this file/pathway — the matrix index's /// retrieval key for "have we seen this shape here before?" #[serde(default)] pub bug_fingerprints: Vec, /// Marked true when replay_count >= 3 AND success_rate < 0.80. /// Retired pathways are excluded from hot-swap forever. (If the /// underlying file / task / signal characteristics genuinely change /// such that a retired pathway would work again, a new PathwayTrace /// with a fresh id will be inserted — retirement is per-id.) pub retired: bool, // ─── Mem0 versioning + deletion (J 2026-04-25 directive, mirrors // playbook_memory's Phase 25/27 patterns) ─── /// UUID for THIS specific trace. pathway_id is the bucket key /// (shared by traces of the same task/file_prefix/signal); trace_uid /// addresses an individual trace within that bucket so retire/revise /// can target it precisely. Empty on legacy traces; populated by /// upsert/insert callers (or filled with a generated UUID on insert). #[serde(default)] pub trace_uid: String, /// Mem0-style version chain. v1 for original traces; bumped on /// `revise()`. Legacy traces deserialize as version=1 via default. #[serde(default = "default_version")] pub version: u32, /// trace_uid of the trace this one supersedes (None = root version). #[serde(default)] pub parent_trace_uid: Option, /// Set when a newer version supersedes this trace. Excluded from /// retrieval (hot-swap, bug_fingerprints_for) once set. #[serde(default)] pub superseded_at: Option, /// trace_uid of the new version. Pairs with superseded_at. #[serde(default)] pub superseded_by_trace_uid: Option, /// Human-readable reason recorded with retire(). Pairs with /// `retired: true`. Empty on probation-driven retirements (those /// just set retired=true without a textual reason). #[serde(default)] pub retirement_reason: Option, } fn default_version() -> u32 { 1 } impl PathwayTrace { /// Compute the narrow fingerprint id from task_class + file_prefix /// + signal_class. `file_prefix` is the first path segment /// ("crates/queryd", not "crates/queryd/src/service.rs") so that /// related files in the same crate share pathways. pub fn compute_id(task_class: &str, file_path: &str, signal_class: Option<&str>) -> String { let prefix = file_prefix(file_path); let sig = signal_class.unwrap_or(""); let mut hasher = Sha256::new(); hasher.update(task_class.as_bytes()); hasher.update(b"|"); hasher.update(prefix.as_bytes()); hasher.update(b"|"); hasher.update(sig.as_bytes()); format!("{:x}", hasher.finalize()) } pub fn success_rate(&self) -> f32 { if self.replay_count == 0 { return 0.0; } self.replays_succeeded as f32 / self.replay_count as f32 } } /// First two path segments, so `crates/queryd/src/service.rs` → /// `crates/queryd`. This is intentional — similar files in the same /// crate often share task characteristics (e.g., all files in /// `crates/queryd/` are SQL-path Rust code), so fingerprinting on the /// crate-level prefix lets the hot-swap generalize across files within /// the crate. Exactly-matching file paths still match (same prefix). pub fn file_prefix(path: &str) -> String { let parts: Vec<&str> = path.split('/').take(2).collect(); parts.join("/") } /// Build the pathway vector from trace metadata. Intentionally simple — /// deterministic bag-of-tokens hash into 32 buckets, normalized. Round-3 /// consensus said "small metadata tokens, not full JSON." An external /// embedding model would work too but adds a dependency, failure mode, /// and drift risk the consensus flagged. pub fn build_pathway_vec(trace: &PathwayTrace) -> Vec { let mut buckets = vec![0f32; 32]; let mut tokens: Vec = Vec::new(); tokens.push(trace.task_class.clone()); tokens.push(trace.file_path.clone()); if let Some(s) = &trace.signal_class { tokens.push(format!("signal:{s}")); } for a in &trace.ladder_attempts { tokens.push(format!("rung:{}", a.rung)); tokens.push(format!("model:{}", a.model)); tokens.push(format!("accepted:{}", a.accepted)); } for k in &trace.kb_chunks { tokens.push(format!("kb:{}", k.source_doc)); } for o in &trace.observer_signals { tokens.push(format!("class:{}", o.class)); } for b in &trace.bridge_hits { tokens.push(format!("lib:{}", b.library)); } for s in &trace.sub_pipeline_calls { tokens.push(format!("pipeline:{}", s.pipeline)); } // ADR-021: include semantic flags + bug fingerprints in the // embedding so pathways with the same narrow fingerprint but // different bug histories cluster separately. "This file has // had 3 unit mismatches" is a different pathway from "this file // is clean" — similarity gate should see them as distinct. for f in &trace.semantic_flags { tokens.push(format!("flag:{:?}", f)); } for bp in &trace.bug_fingerprints { tokens.push(format!("bug:{:?}:{}", bp.flag, bp.pattern_key)); } for t in &tokens { let mut h = Sha256::new(); h.update(t.as_bytes()); let d = h.finalize(); // Two bucket writes per token: use different byte windows to // spread probability across buckets even when tokens share a // common prefix. let b1 = (d[0] as usize) % 32; let b2 = (d[8] as usize) % 32; buckets[b1] += 1.0; buckets[b2] += 1.0; } // L2 normalize so cosine similarity becomes a dot product. let norm: f32 = buckets.iter().map(|v| v * v).sum::().sqrt(); if norm > 0.0 { for v in &mut buckets { *v /= norm; } } buckets } pub fn cosine(a: &[f32], b: &[f32]) -> f32 { if a.len() != b.len() { return 0.0; } a.iter().zip(b.iter()).map(|(x, y)| x * y).sum::() } #[derive(Default, Clone, Serialize, Deserialize)] struct PathwayMemoryState { pathways: HashMap>, // key = pathway_id (narrow fingerprint) last_updated_at: i64, } #[derive(Clone)] pub struct PathwayMemory { state: Arc>, store: Arc, } #[derive(Debug, Serialize)] pub struct HotSwapCandidate { pub pathway_id: String, /// trace_uid of the SPECIFIC trace this hot-swap recommendation /// came from. Lets a caller call /pathway/retire with single-trace /// precision when observer rejects the result — the audit-consensus /// → retire wire (HANDOVER §queued, ADR-021). pub trace_uid: String, pub similarity: f32, pub replay_count: u32, pub success_rate: f32, pub recommended_rung: u8, pub recommended_model: String, } /// Mem0-style outcome of an upsert. Mirrors playbook_memory::UpsertOutcome /// but adapts the UPDATE semantic to PathwayTrace's bucket model: there /// is no notion of merging endorsed_names — each trace is an immutable /// run record. UPDATE here means "we found a non-retired non-superseded /// trace with the same workflow shape; bumped its replay_count instead /// of appending a duplicate." NOOP is reserved for the case where the /// caller asked for an upsert that would change nothing observable. #[derive(Debug, Serialize)] pub enum PathwayUpsertOutcome { Added { pathway_id: String, trace_uid: String }, Updated { pathway_id: String, trace_uid: String, replay_count: u32 }, Noop { pathway_id: String, trace_uid: String }, } /// Mem0-style outcome of revise — chains versions across traces. #[derive(Debug, Serialize)] pub struct PathwayReviseOutcome { pub parent_trace_uid: String, pub parent_version: u32, pub new_trace_uid: String, pub new_version: u32, pub superseded_at: String, } /// Compute a stable fingerprint for upsert dedup. Captures the /// workflow shape: the sequence of (rung, model) pairs from /// ladder_attempts, plus the final_verdict. Two traces with the same /// fingerprint represent the same proven approach on the same task — /// don't store duplicates. fn workflow_fingerprint(trace: &PathwayTrace) -> String { let mut h = Sha256::new(); h.update(trace.final_verdict.as_bytes()); h.update(b"|"); for a in &trace.ladder_attempts { h.update(a.model.as_bytes()); h.update(b":"); h.update(a.rung.to_string().as_bytes()); h.update(b";"); } format!("{:x}", h.finalize()) } impl PathwayMemory { pub fn new(store: Arc) -> Self { Self { state: Arc::new(RwLock::new(PathwayMemoryState::default())), store, } } pub async fn load_from_storage(&self) -> Result { let data = match ops::get(&self.store, STATE_KEY).await { Ok(d) => d, Err(_) => return Ok(0), }; let persisted: PathwayMemoryState = serde_json::from_slice(&data) .map_err(|e| format!("parse pathway_memory state: {e}"))?; let n: usize = persisted.pathways.values().map(|v| v.len()).sum(); *self.state.write().await = persisted; tracing::info!("pathway_memory: loaded {n} traces from {STATE_KEY}"); Ok(n) } async fn persist(&self) -> Result<(), String> { let snapshot = self.state.read().await.clone(); let bytes = serde_json::to_vec_pretty(&snapshot).map_err(|e| e.to_string())?; ops::put(&self.store, STATE_KEY, bytes.into()).await } /// Insert a new pathway trace. Called by scrum_master_pipeline at /// the end of each file's review. Computes the pathway_vec from /// metadata if the caller didn't supply one. Appends to the bucket /// for this pathway_id — multiple traces can share a fingerprint /// (each represents one review of the same file/task/signal combo). pub async fn insert(&self, mut trace: PathwayTrace) -> Result<(), String> { if trace.pathway_vec.is_empty() { trace.pathway_vec = build_pathway_vec(&trace); } if trace.trace_uid.is_empty() { trace.trace_uid = uuid::Uuid::new_v4().to_string(); } if trace.version == 0 { trace.version = 1; } let mut s = self.state.write().await; s.pathways .entry(trace.pathway_id.clone()) .or_default() .push(trace); s.last_updated_at = Utc::now().timestamp_millis(); drop(s); self.persist().await } /// Mem0-style upsert. ADD if no existing live trace in the bucket /// matches this trace's workflow fingerprint. UPDATE (bump /// replay_count) if a match exists. NOOP semantically equivalent /// to UPDATE here — kept for symmetry with playbook_memory and /// future-proofing if we add merge logic. /// /// "Live" means: not retired, not superseded. /// /// Replaces raw `insert` for callers that want dedup. Existing /// `insert` callers (scrum_master) keep raw-append semantics so /// behavior is back-compat. pub async fn upsert(&self, mut trace: PathwayTrace) -> Result { if trace.pathway_vec.is_empty() { trace.pathway_vec = build_pathway_vec(&trace); } if trace.trace_uid.is_empty() { trace.trace_uid = uuid::Uuid::new_v4().to_string(); } if trace.version == 0 { trace.version = 1; } let new_fp = workflow_fingerprint(&trace); let mut s = self.state.write().await; let bucket = s.pathways.entry(trace.pathway_id.clone()).or_default(); // Find a live trace (not retired, not superseded) with same workflow. let mut existing_idx: Option = None; for (i, t) in bucket.iter().enumerate() { if t.retired { continue; } if t.superseded_at.is_some() { continue; } if workflow_fingerprint(t) == new_fp { existing_idx = Some(i); break; } } let pathway_id = trace.pathway_id.clone(); let outcome = match existing_idx { None => { let trace_uid = trace.trace_uid.clone(); bucket.push(trace); PathwayUpsertOutcome::Added { pathway_id, trace_uid } } Some(i) => { // UPDATE: bump replay counters on the existing trace // instead of duplicating. Replays_succeeded only bumps // on accepted final_verdict (mirror record_replay logic). let existing = &mut bucket[i]; existing.replay_count = existing.replay_count.saturating_add(1); if trace.final_verdict == "accepted" { existing.replays_succeeded = existing.replays_succeeded.saturating_add(1); } PathwayUpsertOutcome::Updated { pathway_id, trace_uid: existing.trace_uid.clone(), replay_count: existing.replay_count, } } }; s.last_updated_at = Utc::now().timestamp_millis(); drop(s); self.persist().await?; Ok(outcome) } /// Mem0-style retire. Marks a specific trace (by trace_uid) retired /// with a human-readable reason. Retired traces are excluded from /// hot-swap and bug_fingerprints retrieval. Idempotent: retiring an /// already-retired trace returns Ok(false) without modification. pub async fn retire(&self, trace_uid: &str, reason: &str) -> Result { let mut touched = false; { let mut s = self.state.write().await; 'outer: for traces in s.pathways.values_mut() { for t in traces.iter_mut() { if t.trace_uid == trace_uid && !t.retired { t.retired = true; t.retirement_reason = Some(reason.to_string()); touched = true; break 'outer; } } } if touched { s.last_updated_at = Utc::now().timestamp_millis(); } } if touched { self.persist().await?; } Ok(touched) } /// Mem0-style revise. Supersedes parent trace, chains the new /// version. New version inherits parent_trace_uid; parent gets /// superseded_at + superseded_by_trace_uid stamped. Rejects if /// parent is retired or already superseded (revise the tip, not /// the middle of the chain). pub async fn revise( &self, parent_trace_uid: &str, mut new_trace: PathwayTrace, ) -> Result { let now = Utc::now().to_rfc3339(); if new_trace.pathway_vec.is_empty() { new_trace.pathway_vec = build_pathway_vec(&new_trace); } if new_trace.trace_uid.is_empty() { new_trace.trace_uid = uuid::Uuid::new_v4().to_string(); } let mut s = self.state.write().await; // Locate parent across all buckets let mut parent_loc: Option<(String, usize)> = None; for (bucket_key, traces) in s.pathways.iter() { for (i, t) in traces.iter().enumerate() { if t.trace_uid == parent_trace_uid { parent_loc = Some((bucket_key.clone(), i)); break; } } if parent_loc.is_some() { break; } } let (parent_bucket, parent_idx) = parent_loc .ok_or_else(|| format!("parent trace_uid '{parent_trace_uid}' not found"))?; // Validate parent state { let parent = &s.pathways[&parent_bucket][parent_idx]; if parent.retired { return Err(format!( "cannot revise retired trace '{parent_trace_uid}' — retirement is terminal" )); } if parent.superseded_at.is_some() { return Err(format!( "trace '{parent_trace_uid}' already superseded; revise the tip of the chain" )); } } let parent_version = s.pathways[&parent_bucket][parent_idx].version; let new_version = parent_version.saturating_add(1); let new_uid = new_trace.trace_uid.clone(); new_trace.version = new_version; new_trace.parent_trace_uid = Some(parent_trace_uid.to_string()); new_trace.superseded_at = None; new_trace.superseded_by_trace_uid = None; // Stamp parent { let parent_mut = &mut s.pathways.get_mut(&parent_bucket).unwrap()[parent_idx]; parent_mut.superseded_at = Some(now.clone()); parent_mut.superseded_by_trace_uid = Some(new_uid.clone()); } // Append new version (same bucket if same pathway_id) s.pathways .entry(new_trace.pathway_id.clone()) .or_default() .push(new_trace); s.last_updated_at = Utc::now().timestamp_millis(); drop(s); self.persist().await?; Ok(PathwayReviseOutcome { parent_trace_uid: parent_trace_uid.to_string(), parent_version, new_trace_uid: new_uid, new_version, superseded_at: now, }) } /// Walk the version chain containing trace_uid. Returns root→tip. /// Empty if trace_uid not found. Cycle-safe. pub async fn history(&self, trace_uid: &str) -> Vec { let s = self.state.read().await; // Build trace_uid → trace map across all buckets let mut by_uid: HashMap = HashMap::new(); for traces in s.pathways.values() { for t in traces { if !t.trace_uid.is_empty() { by_uid.insert(t.trace_uid.clone(), t.clone()); } } } let Some(seed) = by_uid.get(trace_uid).cloned() else { return Vec::new(); }; // Walk back to root let mut visited: std::collections::HashSet = std::collections::HashSet::new(); let mut root = seed.clone(); while let Some(parent_id) = root.parent_trace_uid.clone() { if !visited.insert(parent_id.clone()) { break; } match by_uid.get(&parent_id) { Some(p) => root = p.clone(), None => break, } } // Walk forward to tip let mut chain = vec![root.clone()]; let mut visited_fwd: std::collections::HashSet = std::collections::HashSet::new(); visited_fwd.insert(root.trace_uid.clone()); let mut current = root; while let Some(succ) = current.superseded_by_trace_uid.clone() { if !visited_fwd.insert(succ.clone()) { break; } match by_uid.get(&succ) { Some(n) => { chain.push(n.clone()); current = n.clone(); } None => break, } } chain } /// Query for a hot-swap candidate. Returns `None` if no eligible /// pathway exists — caller should run the full ladder. Returns /// `Some(cand)` if all gates pass — caller can short-circuit to /// `cand.recommended_rung` / `cand.recommended_model`. /// /// Gates (all must hold): /// - narrow fingerprint match (same task/file_prefix/signal) /// - audit_consensus.pass == true on the stored trace /// - replay_count >= 3 (probation) /// - success_rate >= 0.80 /// - NOT retired /// - similarity(query_vec, stored.pathway_vec) >= 0.90 pub async fn query_hot_swap( &self, task_class: &str, file_path: &str, signal_class: Option<&str>, query_vec: &[f32], ) -> Option { let id = PathwayTrace::compute_id(task_class, file_path, signal_class); let s = self.state.read().await; let candidates = s.pathways.get(&id)?; let mut best: Option<(f32, &PathwayTrace)> = None; for p in candidates { if p.retired { continue; } // Mem0 versioning: superseded traces are excluded from // retrieval — only the tip of each version chain counts. if p.superseded_at.is_some() { continue; } // audit_consensus gate: explicit FAIL blocks hot-swap. A null // audit_consensus (auditor hasn't seen this pathway yet) is // NOT a block — the success_rate gate below still requires // ≥3 real-world replays at ≥80% success before a pathway // becomes hot-swap eligible, so the learning loop itself // provides the safety net during bootstrap. Once the auditor // pipeline wires pathway audit updates, this gate tightens // automatically: any explicit audit_consensus.pass == false // here will skip the candidate. if let Some(ac) = &p.audit_consensus { if !ac.pass { continue; } } if p.replay_count < 3 { continue; } if p.success_rate() < 0.80 { continue; } let sim = cosine(query_vec, &p.pathway_vec); if sim < 0.90 { continue; } if best.as_ref().map(|(b, _)| sim > *b).unwrap_or(true) { best = Some((sim, p)); } } let (similarity, p) = best?; // The "recommended" rung is the first accepted attempt in the // stored pathway — that's the one the ladder converged on. let accepted = p.ladder_attempts.iter().find(|a| a.accepted)?; Some(HotSwapCandidate { pathway_id: p.pathway_id.clone(), trace_uid: p.trace_uid.clone(), similarity, replay_count: p.replay_count, success_rate: p.success_rate(), recommended_rung: accepted.rung, recommended_model: accepted.model.clone(), }) } /// Record the outcome of a hot-swap replay. Increments replay_count /// unconditionally; increments replays_succeeded iff succeeded; /// retires the pathway if replay_count >= 3 and success_rate falls /// below 0.80. Mistral's learning loop in code. pub async fn record_replay_outcome( &self, pathway_id: &str, succeeded: bool, ) -> Result<(), String> { let mut s = self.state.write().await; // Find the specific pathway across the bucket that matches by // full id (the bucket key is already the narrow id, but in case // of future multi-trace-per-id we take the most recent). let bucket = s .pathways .iter_mut() .find(|(k, _)| k.as_str() == pathway_id) .map(|(_, v)| v) .ok_or_else(|| format!("pathway {pathway_id} not found"))?; let p = bucket .last_mut() .ok_or_else(|| format!("pathway {pathway_id} has empty bucket"))?; p.replay_count = p.replay_count.saturating_add(1); if succeeded { p.replays_succeeded = p.replays_succeeded.saturating_add(1); } if p.replay_count >= 3 && p.success_rate() < 0.80 { p.retired = true; } s.last_updated_at = Utc::now().timestamp_millis(); drop(s); self.persist().await } /// ADR-021 Phase C: retrieve aggregated bug fingerprints for a /// narrow fingerprint (task_class + file_prefix + signal_class). /// Scrum pipeline calls this BEFORE running the ladder and prepends /// the result to the reviewer prompt as historical context. /// /// Returns at most `limit` most-frequent patterns across all traces /// sharing the narrow id. Frequency is summed `occurrences` — a /// fingerprint seen in 3 traces with occurrences 2/1/1 comes back /// as occurrences=4 so the preempt-prompt can say "this pattern /// appeared 4 times on this crate." pub async fn bug_fingerprints_for( &self, task_class: &str, file_path: &str, signal_class: Option<&str>, limit: usize, ) -> Vec { let id = PathwayTrace::compute_id(task_class, file_path, signal_class); let s = self.state.read().await; let Some(traces) = s.pathways.get(&id) else { return Vec::new(); }; // Aggregate by (flag, pattern_key) and sum occurrences. Keep a // representative example (first one seen is fine — bug examples // are semantically equivalent within a pattern_key by design). let mut agg: HashMap<(String, String), (SemanticFlag, String, u32)> = HashMap::new(); for t in traces { // Mem0 versioning: skip retired + superseded traces so // their bug patterns don't leak into future retrievals. if t.retired || t.superseded_at.is_some() { continue; } for bp in &t.bug_fingerprints { let key = (format!("{:?}", bp.flag), bp.pattern_key.clone()); let entry = agg.entry(key).or_insert_with(|| { (bp.flag.clone(), bp.example.clone(), 0) }); entry.2 = entry.2.saturating_add(bp.occurrences); } } let mut out: Vec = agg .into_iter() .map(|((_, pk), (flag, ex, occ))| BugFingerprint { flag, pattern_key: pk, example: ex, occurrences: occ, }) .collect(); out.sort_by(|a, b| b.occurrences.cmp(&a.occurrences)); out.truncate(limit); out } pub async fn stats(&self) -> PathwayMemoryStats { let s = self.state.read().await; let mut total = 0usize; let mut retired = 0usize; let mut with_audit_pass = 0usize; let mut total_replays = 0u64; let mut successful_replays = 0u64; for bucket in s.pathways.values() { for p in bucket { total += 1; if p.retired { retired += 1; } if p.audit_consensus.as_ref().map(|a| a.pass).unwrap_or(false) { with_audit_pass += 1; } total_replays += p.replay_count as u64; successful_replays += p.replays_succeeded as u64; } } PathwayMemoryStats { total_pathways: total, retired, with_audit_pass, total_replays, successful_replays, reuse_rate: if total == 0 { 0.0 } else { total_replays as f32 / total as f32 }, replay_success_rate: if total_replays == 0 { 0.0 } else { successful_replays as f32 / total_replays as f32 }, } } } #[derive(Debug, Serialize, Deserialize)] pub struct PathwayMemoryStats { pub total_pathways: usize, pub retired: usize, pub with_audit_pass: usize, pub total_replays: u64, pub successful_replays: u64, pub reuse_rate: f32, // total_replays / total_pathways pub replay_success_rate: f32, // successful_replays / total_replays } #[cfg(test)] mod tests { use super::*; use object_store::memory::InMemory; fn mk_store() -> Arc { Arc::new(InMemory::new()) } fn mk_trace(id_tag: &str, audit_pass: bool, replays: u32, succ: u32) -> PathwayTrace { let pathway_id = PathwayTrace::compute_id("scrum_review", &format!("crates/{id_tag}/src/x.rs"), Some("CONVERGING")); let attempts = vec![LadderAttempt { rung: 2, model: "qwen3-coder:480b".into(), latency_ms: 1000, accepted: true, reject_reason: None, }]; let mut trace = PathwayTrace { pathway_id, task_class: "scrum_review".into(), file_path: format!("crates/{id_tag}/src/x.rs"), signal_class: Some("CONVERGING".into()), created_at: Utc::now(), ladder_attempts: attempts, kb_chunks: vec![KbChunkRef { source_doc: "PRD.md".into(), chunk_id: "c1".into(), cosine_score: 0.88, rank: 0, }], observer_signals: vec![], bridge_hits: vec![], sub_pipeline_calls: vec![], audit_consensus: Some(AuditConsensus { pass: audit_pass, models: vec!["qwen3-coder:480b".into(), "gpt-oss:120b".into(), "kimi-k2:1t".into()], disagreements: 0, }), reducer_summary: "ok".into(), final_verdict: "accepted".into(), pathway_vec: vec![], semantic_flags: vec![], type_hints_used: vec![], bug_fingerprints: vec![], replay_count: replays, replays_succeeded: succ, retired: false, }; trace.pathway_vec = build_pathway_vec(&trace); trace } #[test] fn file_prefix_takes_first_two_segments() { assert_eq!(file_prefix("crates/queryd/src/service.rs"), "crates/queryd"); assert_eq!(file_prefix("crates/gateway"), "crates/gateway"); assert_eq!(file_prefix("README.md"), "README.md"); assert_eq!(file_prefix(""), ""); } #[test] fn compute_id_is_deterministic() { let a = PathwayTrace::compute_id("scrum", "crates/queryd/src/x.rs", Some("LOOPING")); let b = PathwayTrace::compute_id("scrum", "crates/queryd/src/x.rs", Some("LOOPING")); assert_eq!(a, b); } #[test] fn compute_id_generalizes_across_same_prefix() { // Same prefix + task + signal → same id. That IS the narrow // generalization — it's what lets hot-swap fire for different // files in the same crate that share the task/signal profile. let a = PathwayTrace::compute_id("scrum", "crates/queryd/src/a.rs", Some("L")); let b = PathwayTrace::compute_id("scrum", "crates/queryd/src/b.rs", Some("L")); assert_eq!(a, b); } #[test] fn compute_id_differs_on_signal_class() { let a = PathwayTrace::compute_id("scrum", "crates/q/s", Some("CONVERGING")); let b = PathwayTrace::compute_id("scrum", "crates/q/s", Some("LOOPING")); assert_ne!(a, b); } #[test] fn cosine_handles_mismatched_lengths() { assert_eq!(cosine(&[1.0, 0.0], &[1.0]), 0.0); } #[test] fn cosine_of_identical_normalized_is_one() { let v = vec![0.6, 0.8]; let c = cosine(&v, &v); assert!((c - 1.0).abs() < 1e-5); } #[test] fn success_rate_is_zero_before_any_replay() { let t = mk_trace("a", true, 0, 0); assert_eq!(t.success_rate(), 0.0); } #[test] fn success_rate_ratio() { let t = mk_trace("a", true, 4, 3); assert!((t.success_rate() - 0.75).abs() < 1e-5); } #[tokio::test] async fn insert_and_stats_roundtrip() { let mem = PathwayMemory::new(mk_store()); mem.insert(mk_trace("a", true, 0, 0)).await.unwrap(); let stats = mem.stats().await; assert_eq!(stats.total_pathways, 1); assert_eq!(stats.retired, 0); assert_eq!(stats.with_audit_pass, 1); } #[tokio::test] async fn hot_swap_rejects_when_probation_not_met() { // Probation: replay_count must be >= 3 before success-rate gate // can fire. A fresh pathway with 0 replays must NEVER hot-swap // even if its similarity is 1.0 and audit passes. let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", true, 0, 0); let qvec = trace.pathway_vec.clone(); mem.insert(trace).await.unwrap(); let got = mem .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) .await; assert!(got.is_none(), "fresh pathway must not hot-swap"); } #[tokio::test] async fn hot_swap_rejects_when_audit_explicitly_fails() { let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", false, 5, 5); // audit FAILED explicitly let qvec = trace.pathway_vec.clone(); mem.insert(trace).await.unwrap(); let got = mem .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) .await; assert!(got.is_none(), "pathway with explicit audit FAIL must not hot-swap"); } #[tokio::test] async fn hot_swap_accepts_unaudited_pathway_for_bootstrap() { // v1 bootstrap: auditor doesn't update pathway audit_consensus // until Phase N+1 wires it. Until then, null audit_consensus // must NOT block hot-swap — the success_rate + probation gates // alone prove safety. Once auditor wires up, explicit audit // failures will re-introduce the block (see previous test). let mem = PathwayMemory::new(mk_store()); let mut trace = mk_trace("a", true, 5, 5); trace.audit_consensus = None; // bootstrap path trace.pathway_vec = build_pathway_vec(&trace); let qvec = trace.pathway_vec.clone(); mem.insert(trace).await.unwrap(); let got = mem .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) .await; assert!(got.is_some(), "unaudited pathway with good replay history must hot-swap"); } #[tokio::test] async fn hot_swap_rejects_when_success_rate_below_80pct() { // 10 replays, 7 succeeded = 70% — below the 0.80 threshold. let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", true, 10, 7); let qvec = trace.pathway_vec.clone(); mem.insert(trace).await.unwrap(); let got = mem .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) .await; assert!(got.is_none()); } #[tokio::test] async fn hot_swap_accepts_when_all_gates_pass() { let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", true, 5, 5); // 100% success after 5 replays let qvec = trace.pathway_vec.clone(); mem.insert(trace).await.unwrap(); let got = mem .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) .await; let cand = got.expect("should hot-swap"); assert!(cand.similarity >= 0.90); assert_eq!(cand.recommended_rung, 2); assert_eq!(cand.recommended_model, "qwen3-coder:480b"); } #[tokio::test] async fn record_replay_retires_pathway_on_failure_pattern() { let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", true, 0, 0); let pid = trace.pathway_id.clone(); mem.insert(trace).await.unwrap(); // Three replays, all fail → success_rate = 0.0 → retired. mem.record_replay_outcome(&pid, false).await.unwrap(); mem.record_replay_outcome(&pid, false).await.unwrap(); mem.record_replay_outcome(&pid, false).await.unwrap(); let stats = mem.stats().await; assert_eq!(stats.retired, 1, "3 failures after insert must retire"); } #[tokio::test] async fn record_replay_does_not_retire_before_probation() { let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", true, 0, 0); let pid = trace.pathway_id.clone(); mem.insert(trace).await.unwrap(); // Two replays (below probation of 3), both fail. Should NOT // retire yet — probation requires minimum 3 data points. mem.record_replay_outcome(&pid, false).await.unwrap(); mem.record_replay_outcome(&pid, false).await.unwrap(); let stats = mem.stats().await; assert_eq!(stats.retired, 0, "only 2 replays → below probation floor"); } #[tokio::test] async fn retired_pathway_never_hot_swaps_again() { let mem = PathwayMemory::new(mk_store()); let trace = mk_trace("a", true, 0, 0); let pid = trace.pathway_id.clone(); let qvec = trace.pathway_vec.clone(); mem.insert(trace).await.unwrap(); for _ in 0..3 { mem.record_replay_outcome(&pid, false).await.unwrap(); } // Now record 10 successes to push success_rate well above 0.80. // Pathway is still retired — retirement is sticky by design, to // prevent oscillation on noise. for _ in 0..10 { mem.record_replay_outcome(&pid, true).await.unwrap(); } let got = mem .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) .await; assert!(got.is_none(), "retirement must be sticky"); } #[tokio::test] async fn pathway_vec_differs_for_different_models() { // Two pathways with same fingerprint but different ladder // models should have different embeddings so the similarity // gate can discriminate. This is what enables narrow fingerprint // + similarity-vec to cluster correctly. let a = mk_trace("a", true, 5, 5); let mut b = a.clone(); b.ladder_attempts[0].model = "kimi-k2:1t".into(); b.pathway_vec = build_pathway_vec(&b); let sim = cosine(&a.pathway_vec, &b.pathway_vec); assert!(sim < 1.0, "different models → different embeddings"); assert!(sim > 0.5, "shared fingerprint → embeddings still related"); } // ─── ADR-021 semantic-correctness layer tests ─────────────────── #[test] fn pathway_trace_deserializes_without_new_fields_backcompat() { // Critical: existing traces on disk (persisted before ADR-021) // must still deserialize. serde(default) on the three new fields // is the back-compat mechanism — verify it holds. let json = r#"{ "pathway_id": "abc", "task_class": "scrum_review", "file_path": "crates/x/y.rs", "signal_class": null, "created_at": "2026-04-24T00:00:00Z", "ladder_attempts": [], "kb_chunks": [], "observer_signals": [], "bridge_hits": [], "sub_pipeline_calls": [], "audit_consensus": null, "reducer_summary": "old trace", "final_verdict": "accepted", "pathway_vec": [], "replay_count": 0, "replays_succeeded": 0, "retired": false }"#; let t: PathwayTrace = serde_json::from_str(json).expect("must deserialize pre-ADR-021 trace"); assert!(t.semantic_flags.is_empty()); assert!(t.type_hints_used.is_empty()); assert!(t.bug_fingerprints.is_empty()); assert_eq!(t.reducer_summary, "old trace"); } #[test] fn semantic_flag_serializes_as_tagged_enum() { // Verifying the wire format — the tag field "kind" lets TS/JSON // clients pattern-match without needing to know variant ordering. let s = serde_json::to_string(&SemanticFlag::UnitMismatch).unwrap(); assert!(s.contains("UnitMismatch"), "got: {s}"); assert!(s.contains("kind"), "must be tagged enum for TS interop, got: {s}"); } #[test] fn bug_fingerprint_roundtrips_through_serde() { let bp = BugFingerprint { flag: SemanticFlag::UnitMismatch, pattern_key: "row_count-file_count".into(), example: "base_rows = pre_filter_rows - delta_count".into(), occurrences: 1, }; let s = serde_json::to_string(&bp).unwrap(); let parsed: BugFingerprint = serde_json::from_str(&s).unwrap(); assert_eq!(parsed, bp); } #[test] fn pathway_vec_differs_when_bug_fingerprint_added() { // A trace with a known bug history should embed differently // from a clean trace with the same ladder/KB. This is the // compounding signal: "same file, different bug history." let clean = mk_trace("a", true, 5, 5); let mut flagged = clean.clone(); flagged.semantic_flags.push(SemanticFlag::UnitMismatch); flagged.bug_fingerprints.push(BugFingerprint { flag: SemanticFlag::UnitMismatch, pattern_key: "row_count-file_count".into(), example: "x = y - z".into(), occurrences: 1, }); flagged.pathway_vec = build_pathway_vec(&flagged); let sim = cosine(&clean.pathway_vec, &flagged.pathway_vec); assert!(sim < 1.0, "bug history must shift the embedding"); assert!(sim > 0.3, "shared fingerprint should keep them loosely related"); } #[test] fn semantic_flag_discriminates_by_variant() { // Two traces with different flag classes should embed to // different points. Validates that the index can retrieve // "files with UnitMismatch history" separately from // "files with NullableConfusion history." let mut a = mk_trace("x", true, 5, 5); a.semantic_flags.push(SemanticFlag::UnitMismatch); a.pathway_vec = build_pathway_vec(&a); let mut b = a.clone(); b.semantic_flags = vec![SemanticFlag::NullableConfusion]; b.pathway_vec = build_pathway_vec(&b); let sim = cosine(&a.pathway_vec, &b.pathway_vec); assert!(sim < 1.0, "different flag variants → different embeddings"); } #[tokio::test] async fn bug_fingerprints_aggregate_by_pattern_key() { // Three traces on the same narrow fingerprint — two with the // same bug pattern, one with a different pattern. The aggregator // must sum occurrences for the shared key and sort by count. let mem = PathwayMemory::new(mk_store()); let mut t1 = mk_trace("q", true, 0, 0); t1.bug_fingerprints.push(BugFingerprint { flag: SemanticFlag::UnitMismatch, pattern_key: "row-file".into(), example: "a - b".into(), occurrences: 2, }); let mut t2 = mk_trace("q", true, 0, 0); t2.bug_fingerprints.push(BugFingerprint { flag: SemanticFlag::UnitMismatch, pattern_key: "row-file".into(), example: "x - y".into(), occurrences: 1, }); let mut t3 = mk_trace("q", true, 0, 0); t3.bug_fingerprints.push(BugFingerprint { flag: SemanticFlag::OffByOne, pattern_key: "len-1".into(), example: "items[len]".into(), occurrences: 1, }); mem.insert(t1).await.unwrap(); mem.insert(t2).await.unwrap(); mem.insert(t3).await.unwrap(); let fps = mem .bug_fingerprints_for("scrum_review", "crates/q/src/x.rs", Some("CONVERGING"), 10) .await; assert_eq!(fps.len(), 2, "two distinct patterns after aggregation"); // First should be the aggregated UnitMismatch (3 total occurrences) assert_eq!(fps[0].pattern_key, "row-file"); assert_eq!(fps[0].occurrences, 3); assert_eq!(fps[1].pattern_key, "len-1"); assert_eq!(fps[1].occurrences, 1); } #[tokio::test] async fn bug_fingerprints_empty_for_unseen_fingerprint() { let mem = PathwayMemory::new(mk_store()); let fps = mem .bug_fingerprints_for("scrum_review", "crates/never_seen/x.rs", None, 5) .await; assert!(fps.is_empty()); } #[tokio::test] async fn bug_fingerprints_respects_limit() { let mem = PathwayMemory::new(mk_store()); for i in 0..10 { let mut t = mk_trace("q", true, 0, 0); t.bug_fingerprints.push(BugFingerprint { flag: SemanticFlag::OffByOne, pattern_key: format!("p{i}"), example: "".into(), occurrences: (10 - i) as u32, // decreasing so sort matters }); mem.insert(t).await.unwrap(); } let fps = mem .bug_fingerprints_for("scrum_review", "crates/q/src/x.rs", Some("CONVERGING"), 3) .await; assert_eq!(fps.len(), 3); // Highest occurrences first. assert_eq!(fps[0].pattern_key, "p0"); assert_eq!(fps[0].occurrences, 10); } #[tokio::test] async fn insert_preserves_semantic_fields() { let mem = PathwayMemory::new(mk_store()); let mut t = mk_trace("a", true, 0, 0); t.semantic_flags.push(SemanticFlag::UnitMismatch); t.type_hints_used.push(TypeHint { source: "arrow_schema".into(), symbol: "pre_filter_rows".into(), type_repr: "usize (sum of batch.num_rows)".into(), }); t.bug_fingerprints.push(BugFingerprint { flag: SemanticFlag::UnitMismatch, pattern_key: "row-minus-file".into(), example: "pre_filter_rows - delta_count".into(), occurrences: 1, }); mem.insert(t).await.unwrap(); // Reload from store via a fresh handle — proves persistence // roundtrips the new fields as well as the old ones. let mem2 = PathwayMemory::new(mem.store.clone()); mem2.load_from_storage().await.unwrap(); let stats = mem2.stats().await; assert_eq!(stats.total_pathways, 1); } }