diff --git a/Cargo.lock b/Cargo.lock index bd6b280..a2c981b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6898,6 +6898,7 @@ dependencies = [ "storaged", "tokio", "tracing", + "truth", "url", ] diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 0c709f3..bedebbc 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -93,6 +93,12 @@ async fn main() { // operators call POST /vectors/playbook_memory/rebuild to populate. let pbm = vectord::playbook_memory::PlaybookMemory::new(store.clone()); let _ = pbm.load_from_storage().await; + // Pathway memory — consensus-designed sidecar for full-context + // backtracking + hot-swap of successful review pathways. Same + // load-on-boot pattern as playbook_memory: empty state is fine, + // operators start populating via scrum_master_pipeline.ts. + let pwm = vectord::pathway_memory::PathwayMemory::new(store.clone()); + let _ = pwm.load_from_storage().await; // Phase 16.2: spawn the autotune agent. When config.agent.enabled=false // this returns a handle that drops triggers silently — no surprise load. @@ -178,6 +184,7 @@ async fn main() { bucket_registry.clone(), index_reg.clone(), ), playbook_memory: pbm, + pathway_memory: pwm, embed_semaphore: std::sync::Arc::new(tokio::sync::Semaphore::new(1)), })) .nest("/workspaces", queryd::workspace_service::router(workspace_mgr)) diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index c937fc5..41f0d4f 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -8,6 +8,7 @@ pub mod hnsw; pub mod index_registry; pub mod jobs; pub mod playbook_memory; +pub mod pathway_memory; pub mod doc_drift; pub mod promotion; pub mod refresh; diff --git a/crates/vectord/src/pathway_memory.rs b/crates/vectord/src/pathway_memory.rs new file mode 100644 index 0000000..b2a1d91 --- /dev/null +++ b/crates/vectord/src/pathway_memory.rs @@ -0,0 +1,704 @@ +//! Pathway memory — full backtrack-able context for scrum/auditor reviews. +//! +//! Consensus-designed (10-probe N=3 ensemble, see +//! `data/_kb/consensus_reducer_design_*.json`). The reducer emits a +//! `PathwayTrace` sidecar alongside its legacy summary. Traces are +//! fingerprinted narrowly (`task_class + file_prefix + signal_class`) for +//! generalizing hot-swap, and embedded via normalized-metadata-token +//! concatenation so the HNSW similarity search can discriminate between +//! pathways that share a fingerprint but diverged in ladder/KB choices. +//! +//! The hot-swap decision requires four conditions in AND: +//! 1. narrow fingerprint match +//! 2. audit_consensus.pass == true +//! 3. replay_count >= 3 +//! 4. replays_succeeded / replay_count >= 0.80 +//! 5. NOT retired +//! 6. similarity(new, stored) >= 0.90 +//! +//! Any replay reports its outcome via `record_replay_outcome`; pathways +//! whose success rate drops below 0.80 after >=3 replays are marked +//! retired and excluded from further hot-swap consideration. This is the +//! self-correcting learning loop — a pathway that worked once but breaks +//! under distribution shift removes itself automatically. + +use std::collections::HashMap; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use storaged::ops; +use tokio::sync::RwLock; + +const STATE_KEY: &str = "_pathway_memory/state.json"; + +/// Outcome of one ladder rung attempt. Captured for every attempt, +/// regardless of whether it was accepted — rejections are signal too. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LadderAttempt { + pub rung: u8, + pub model: String, + pub latency_ms: u64, + pub accepted: bool, + pub reject_reason: Option, +} + +/// Provenance of a RAG chunk retrieved for this review. The +/// `cosine_score` is the similarity as returned by the index; `rank` is +/// 0-indexed order in the top-K result list. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct KbChunkRef { + pub source_doc: String, + pub chunk_id: String, + pub cosine_score: f32, + pub rank: u8, +} + +/// Signal emitted by mcp-server/observer classifier. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct ObserverSignal { + pub class: String, + pub priors: Vec, + pub prior_iter_outcomes: Vec, +} + +/// Context7-bridge lookup snapshot. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct BridgeHit { + pub library: String, + pub version: String, +} + +/// Call to LLM Team (/api/run?mode=extract) or auditor N=3 consensus. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct SubPipelineCall { + pub pipeline: String, // "llm_team_extract" / "audit_consensus" / etc. + pub result_summary: String, +} + +/// N=3 independent consensus re-check result. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct AuditConsensus { + pub pass: bool, + pub models: Vec, + pub disagreements: u32, +} + +/// Full backtrack-able context for one reviewed file. Lives alongside +/// the reducer's summary — summary is what the reviewer LLM sees, this +/// is what the auditor / future iterations / hot-swap use. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct PathwayTrace { + pub pathway_id: String, // SHA256(task_class|file_prefix|signal_class) + pub task_class: String, + pub file_path: String, + pub signal_class: Option, + pub created_at: DateTime, + + pub ladder_attempts: Vec, + pub kb_chunks: Vec, + pub observer_signals: Vec, + pub bridge_hits: Vec, + pub sub_pipeline_calls: Vec, + pub audit_consensus: Option, + + pub reducer_summary: String, + pub final_verdict: String, + + /// Normalized-metadata-token embedding. Dimension fixed per index + /// version (current: 32, sufficient to distinguish task/file/signal + /// combinations without requiring an external embedding model — + /// round-3 consensus said "small metadata tokens", not "full JSON"). + pub pathway_vec: Vec, + + /// Number of times this pathway has been replayed via hot-swap. + /// Replay only begins after first insert; initial insert itself is + /// NOT a replay. Probation of ≥3 replays is required before the + /// success-rate gate can fire. + pub replay_count: u32, + pub replays_succeeded: u32, + /// Marked true when replay_count >= 3 AND success_rate < 0.80. + /// Retired pathways are excluded from hot-swap forever. (If the + /// underlying file / task / signal characteristics genuinely change + /// such that a retired pathway would work again, a new PathwayTrace + /// with a fresh id will be inserted — retirement is per-id.) + pub retired: bool, +} + +impl PathwayTrace { + /// Compute the narrow fingerprint id from task_class + file_prefix + /// + signal_class. `file_prefix` is the first path segment + /// ("crates/queryd", not "crates/queryd/src/service.rs") so that + /// related files in the same crate share pathways. + pub fn compute_id(task_class: &str, file_path: &str, signal_class: Option<&str>) -> String { + let prefix = file_prefix(file_path); + let sig = signal_class.unwrap_or(""); + let mut hasher = Sha256::new(); + hasher.update(task_class.as_bytes()); + hasher.update(b"|"); + hasher.update(prefix.as_bytes()); + hasher.update(b"|"); + hasher.update(sig.as_bytes()); + format!("{:x}", hasher.finalize()) + } + + pub fn success_rate(&self) -> f32 { + if self.replay_count == 0 { + return 0.0; + } + self.replays_succeeded as f32 / self.replay_count as f32 + } +} + +/// First two path segments, so `crates/queryd/src/service.rs` → +/// `crates/queryd`. This is intentional — similar files in the same +/// crate often share task characteristics (e.g., all files in +/// `crates/queryd/` are SQL-path Rust code), so fingerprinting on the +/// crate-level prefix lets the hot-swap generalize across files within +/// the crate. Exactly-matching file paths still match (same prefix). +pub fn file_prefix(path: &str) -> String { + let parts: Vec<&str> = path.split('/').take(2).collect(); + parts.join("/") +} + +/// Build the pathway vector from trace metadata. Intentionally simple — +/// deterministic bag-of-tokens hash into 32 buckets, normalized. Round-3 +/// consensus said "small metadata tokens, not full JSON." An external +/// embedding model would work too but adds a dependency, failure mode, +/// and drift risk the consensus flagged. +pub fn build_pathway_vec(trace: &PathwayTrace) -> Vec { + let mut buckets = vec![0f32; 32]; + let mut tokens: Vec = Vec::new(); + tokens.push(trace.task_class.clone()); + tokens.push(trace.file_path.clone()); + if let Some(s) = &trace.signal_class { + tokens.push(format!("signal:{s}")); + } + for a in &trace.ladder_attempts { + tokens.push(format!("rung:{}", a.rung)); + tokens.push(format!("model:{}", a.model)); + tokens.push(format!("accepted:{}", a.accepted)); + } + for k in &trace.kb_chunks { + tokens.push(format!("kb:{}", k.source_doc)); + } + for o in &trace.observer_signals { + tokens.push(format!("class:{}", o.class)); + } + for b in &trace.bridge_hits { + tokens.push(format!("lib:{}", b.library)); + } + for s in &trace.sub_pipeline_calls { + tokens.push(format!("pipeline:{}", s.pipeline)); + } + + for t in &tokens { + let mut h = Sha256::new(); + h.update(t.as_bytes()); + let d = h.finalize(); + // Two bucket writes per token: use different byte windows to + // spread probability across buckets even when tokens share a + // common prefix. + let b1 = (d[0] as usize) % 32; + let b2 = (d[8] as usize) % 32; + buckets[b1] += 1.0; + buckets[b2] += 1.0; + } + + // L2 normalize so cosine similarity becomes a dot product. + let norm: f32 = buckets.iter().map(|v| v * v).sum::().sqrt(); + if norm > 0.0 { + for v in &mut buckets { + *v /= norm; + } + } + buckets +} + +pub fn cosine(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() { + return 0.0; + } + a.iter().zip(b.iter()).map(|(x, y)| x * y).sum::() +} + +#[derive(Default, Clone, Serialize, Deserialize)] +struct PathwayMemoryState { + pathways: HashMap>, // key = pathway_id (narrow fingerprint) + last_updated_at: i64, +} + +#[derive(Clone)] +pub struct PathwayMemory { + state: Arc>, + store: Arc, +} + +#[derive(Debug, Serialize)] +pub struct HotSwapCandidate { + pub pathway_id: String, + pub similarity: f32, + pub replay_count: u32, + pub success_rate: f32, + pub recommended_rung: u8, + pub recommended_model: String, +} + +impl PathwayMemory { + pub fn new(store: Arc) -> Self { + Self { + state: Arc::new(RwLock::new(PathwayMemoryState::default())), + store, + } + } + + pub async fn load_from_storage(&self) -> Result { + let data = match ops::get(&self.store, STATE_KEY).await { + Ok(d) => d, + Err(_) => return Ok(0), + }; + let persisted: PathwayMemoryState = serde_json::from_slice(&data) + .map_err(|e| format!("parse pathway_memory state: {e}"))?; + let n: usize = persisted.pathways.values().map(|v| v.len()).sum(); + *self.state.write().await = persisted; + tracing::info!("pathway_memory: loaded {n} traces from {STATE_KEY}"); + Ok(n) + } + + async fn persist(&self) -> Result<(), String> { + let snapshot = self.state.read().await.clone(); + let bytes = serde_json::to_vec_pretty(&snapshot).map_err(|e| e.to_string())?; + ops::put(&self.store, STATE_KEY, bytes.into()).await + } + + /// Insert a new pathway trace. Called by scrum_master_pipeline at + /// the end of each file's review. Computes the pathway_vec from + /// metadata if the caller didn't supply one. Appends to the bucket + /// for this pathway_id — multiple traces can share a fingerprint + /// (each represents one review of the same file/task/signal combo). + pub async fn insert(&self, mut trace: PathwayTrace) -> Result<(), String> { + if trace.pathway_vec.is_empty() { + trace.pathway_vec = build_pathway_vec(&trace); + } + let mut s = self.state.write().await; + s.pathways + .entry(trace.pathway_id.clone()) + .or_default() + .push(trace); + s.last_updated_at = Utc::now().timestamp_millis(); + drop(s); + self.persist().await + } + + /// Query for a hot-swap candidate. Returns `None` if no eligible + /// pathway exists — caller should run the full ladder. Returns + /// `Some(cand)` if all gates pass — caller can short-circuit to + /// `cand.recommended_rung` / `cand.recommended_model`. + /// + /// Gates (all must hold): + /// - narrow fingerprint match (same task/file_prefix/signal) + /// - audit_consensus.pass == true on the stored trace + /// - replay_count >= 3 (probation) + /// - success_rate >= 0.80 + /// - NOT retired + /// - similarity(query_vec, stored.pathway_vec) >= 0.90 + pub async fn query_hot_swap( + &self, + task_class: &str, + file_path: &str, + signal_class: Option<&str>, + query_vec: &[f32], + ) -> Option { + let id = PathwayTrace::compute_id(task_class, file_path, signal_class); + let s = self.state.read().await; + let candidates = s.pathways.get(&id)?; + let mut best: Option<(f32, &PathwayTrace)> = None; + for p in candidates { + if p.retired { + continue; + } + // audit_consensus gate: explicit FAIL blocks hot-swap. A null + // audit_consensus (auditor hasn't seen this pathway yet) is + // NOT a block — the success_rate gate below still requires + // ≥3 real-world replays at ≥80% success before a pathway + // becomes hot-swap eligible, so the learning loop itself + // provides the safety net during bootstrap. Once the auditor + // pipeline wires pathway audit updates, this gate tightens + // automatically: any explicit audit_consensus.pass == false + // here will skip the candidate. + if let Some(ac) = &p.audit_consensus { + if !ac.pass { + continue; + } + } + if p.replay_count < 3 { + continue; + } + if p.success_rate() < 0.80 { + continue; + } + let sim = cosine(query_vec, &p.pathway_vec); + if sim < 0.90 { + continue; + } + if best.as_ref().map(|(b, _)| sim > *b).unwrap_or(true) { + best = Some((sim, p)); + } + } + let (similarity, p) = best?; + // The "recommended" rung is the first accepted attempt in the + // stored pathway — that's the one the ladder converged on. + let accepted = p.ladder_attempts.iter().find(|a| a.accepted)?; + Some(HotSwapCandidate { + pathway_id: p.pathway_id.clone(), + similarity, + replay_count: p.replay_count, + success_rate: p.success_rate(), + recommended_rung: accepted.rung, + recommended_model: accepted.model.clone(), + }) + } + + /// Record the outcome of a hot-swap replay. Increments replay_count + /// unconditionally; increments replays_succeeded iff succeeded; + /// retires the pathway if replay_count >= 3 and success_rate falls + /// below 0.80. Mistral's learning loop in code. + pub async fn record_replay_outcome( + &self, + pathway_id: &str, + succeeded: bool, + ) -> Result<(), String> { + let mut s = self.state.write().await; + // Find the specific pathway across the bucket that matches by + // full id (the bucket key is already the narrow id, but in case + // of future multi-trace-per-id we take the most recent). + let bucket = s + .pathways + .iter_mut() + .find(|(k, _)| k.as_str() == pathway_id) + .map(|(_, v)| v) + .ok_or_else(|| format!("pathway {pathway_id} not found"))?; + let p = bucket + .last_mut() + .ok_or_else(|| format!("pathway {pathway_id} has empty bucket"))?; + p.replay_count = p.replay_count.saturating_add(1); + if succeeded { + p.replays_succeeded = p.replays_succeeded.saturating_add(1); + } + if p.replay_count >= 3 && p.success_rate() < 0.80 { + p.retired = true; + } + s.last_updated_at = Utc::now().timestamp_millis(); + drop(s); + self.persist().await + } + + pub async fn stats(&self) -> PathwayMemoryStats { + let s = self.state.read().await; + let mut total = 0usize; + let mut retired = 0usize; + let mut with_audit_pass = 0usize; + let mut total_replays = 0u64; + let mut successful_replays = 0u64; + for bucket in s.pathways.values() { + for p in bucket { + total += 1; + if p.retired { + retired += 1; + } + if p.audit_consensus.as_ref().map(|a| a.pass).unwrap_or(false) { + with_audit_pass += 1; + } + total_replays += p.replay_count as u64; + successful_replays += p.replays_succeeded as u64; + } + } + PathwayMemoryStats { + total_pathways: total, + retired, + with_audit_pass, + total_replays, + successful_replays, + reuse_rate: if total == 0 { + 0.0 + } else { + total_replays as f32 / total as f32 + }, + replay_success_rate: if total_replays == 0 { + 0.0 + } else { + successful_replays as f32 / total_replays as f32 + }, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PathwayMemoryStats { + pub total_pathways: usize, + pub retired: usize, + pub with_audit_pass: usize, + pub total_replays: u64, + pub successful_replays: u64, + pub reuse_rate: f32, // total_replays / total_pathways + pub replay_success_rate: f32, // successful_replays / total_replays +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::memory::InMemory; + + fn mk_store() -> Arc { + Arc::new(InMemory::new()) + } + + fn mk_trace(id_tag: &str, audit_pass: bool, replays: u32, succ: u32) -> PathwayTrace { + let pathway_id = + PathwayTrace::compute_id("scrum_review", &format!("crates/{id_tag}/src/x.rs"), Some("CONVERGING")); + let attempts = vec![LadderAttempt { + rung: 2, + model: "qwen3-coder:480b".into(), + latency_ms: 1000, + accepted: true, + reject_reason: None, + }]; + let mut trace = PathwayTrace { + pathway_id, + task_class: "scrum_review".into(), + file_path: format!("crates/{id_tag}/src/x.rs"), + signal_class: Some("CONVERGING".into()), + created_at: Utc::now(), + ladder_attempts: attempts, + kb_chunks: vec![KbChunkRef { + source_doc: "PRD.md".into(), + chunk_id: "c1".into(), + cosine_score: 0.88, + rank: 0, + }], + observer_signals: vec![], + bridge_hits: vec![], + sub_pipeline_calls: vec![], + audit_consensus: Some(AuditConsensus { + pass: audit_pass, + models: vec!["qwen3-coder:480b".into(), "gpt-oss:120b".into(), "kimi-k2:1t".into()], + disagreements: 0, + }), + reducer_summary: "ok".into(), + final_verdict: "accepted".into(), + pathway_vec: vec![], + replay_count: replays, + replays_succeeded: succ, + retired: false, + }; + trace.pathway_vec = build_pathway_vec(&trace); + trace + } + + #[test] + fn file_prefix_takes_first_two_segments() { + assert_eq!(file_prefix("crates/queryd/src/service.rs"), "crates/queryd"); + assert_eq!(file_prefix("crates/gateway"), "crates/gateway"); + assert_eq!(file_prefix("README.md"), "README.md"); + assert_eq!(file_prefix(""), ""); + } + + #[test] + fn compute_id_is_deterministic() { + let a = PathwayTrace::compute_id("scrum", "crates/queryd/src/x.rs", Some("LOOPING")); + let b = PathwayTrace::compute_id("scrum", "crates/queryd/src/x.rs", Some("LOOPING")); + assert_eq!(a, b); + } + + #[test] + fn compute_id_generalizes_across_same_prefix() { + // Same prefix + task + signal → same id. That IS the narrow + // generalization — it's what lets hot-swap fire for different + // files in the same crate that share the task/signal profile. + let a = PathwayTrace::compute_id("scrum", "crates/queryd/src/a.rs", Some("L")); + let b = PathwayTrace::compute_id("scrum", "crates/queryd/src/b.rs", Some("L")); + assert_eq!(a, b); + } + + #[test] + fn compute_id_differs_on_signal_class() { + let a = PathwayTrace::compute_id("scrum", "crates/q/s", Some("CONVERGING")); + let b = PathwayTrace::compute_id("scrum", "crates/q/s", Some("LOOPING")); + assert_ne!(a, b); + } + + #[test] + fn cosine_handles_mismatched_lengths() { + assert_eq!(cosine(&[1.0, 0.0], &[1.0]), 0.0); + } + + #[test] + fn cosine_of_identical_normalized_is_one() { + let v = vec![0.6, 0.8]; + let c = cosine(&v, &v); + assert!((c - 1.0).abs() < 1e-5); + } + + #[test] + fn success_rate_is_zero_before_any_replay() { + let t = mk_trace("a", true, 0, 0); + assert_eq!(t.success_rate(), 0.0); + } + + #[test] + fn success_rate_ratio() { + let t = mk_trace("a", true, 4, 3); + assert!((t.success_rate() - 0.75).abs() < 1e-5); + } + + #[tokio::test] + async fn insert_and_stats_roundtrip() { + let mem = PathwayMemory::new(mk_store()); + mem.insert(mk_trace("a", true, 0, 0)).await.unwrap(); + let stats = mem.stats().await; + assert_eq!(stats.total_pathways, 1); + assert_eq!(stats.retired, 0); + assert_eq!(stats.with_audit_pass, 1); + } + + #[tokio::test] + async fn hot_swap_rejects_when_probation_not_met() { + // Probation: replay_count must be >= 3 before success-rate gate + // can fire. A fresh pathway with 0 replays must NEVER hot-swap + // even if its similarity is 1.0 and audit passes. + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", true, 0, 0); + let qvec = trace.pathway_vec.clone(); + mem.insert(trace).await.unwrap(); + let got = mem + .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) + .await; + assert!(got.is_none(), "fresh pathway must not hot-swap"); + } + + #[tokio::test] + async fn hot_swap_rejects_when_audit_explicitly_fails() { + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", false, 5, 5); // audit FAILED explicitly + let qvec = trace.pathway_vec.clone(); + mem.insert(trace).await.unwrap(); + let got = mem + .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) + .await; + assert!(got.is_none(), "pathway with explicit audit FAIL must not hot-swap"); + } + + #[tokio::test] + async fn hot_swap_accepts_unaudited_pathway_for_bootstrap() { + // v1 bootstrap: auditor doesn't update pathway audit_consensus + // until Phase N+1 wires it. Until then, null audit_consensus + // must NOT block hot-swap — the success_rate + probation gates + // alone prove safety. Once auditor wires up, explicit audit + // failures will re-introduce the block (see previous test). + let mem = PathwayMemory::new(mk_store()); + let mut trace = mk_trace("a", true, 5, 5); + trace.audit_consensus = None; // bootstrap path + trace.pathway_vec = build_pathway_vec(&trace); + let qvec = trace.pathway_vec.clone(); + mem.insert(trace).await.unwrap(); + let got = mem + .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) + .await; + assert!(got.is_some(), "unaudited pathway with good replay history must hot-swap"); + } + + #[tokio::test] + async fn hot_swap_rejects_when_success_rate_below_80pct() { + // 10 replays, 7 succeeded = 70% — below the 0.80 threshold. + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", true, 10, 7); + let qvec = trace.pathway_vec.clone(); + mem.insert(trace).await.unwrap(); + let got = mem + .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) + .await; + assert!(got.is_none()); + } + + #[tokio::test] + async fn hot_swap_accepts_when_all_gates_pass() { + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", true, 5, 5); // 100% success after 5 replays + let qvec = trace.pathway_vec.clone(); + mem.insert(trace).await.unwrap(); + let got = mem + .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) + .await; + let cand = got.expect("should hot-swap"); + assert!(cand.similarity >= 0.90); + assert_eq!(cand.recommended_rung, 2); + assert_eq!(cand.recommended_model, "qwen3-coder:480b"); + } + + #[tokio::test] + async fn record_replay_retires_pathway_on_failure_pattern() { + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", true, 0, 0); + let pid = trace.pathway_id.clone(); + mem.insert(trace).await.unwrap(); + // Three replays, all fail → success_rate = 0.0 → retired. + mem.record_replay_outcome(&pid, false).await.unwrap(); + mem.record_replay_outcome(&pid, false).await.unwrap(); + mem.record_replay_outcome(&pid, false).await.unwrap(); + let stats = mem.stats().await; + assert_eq!(stats.retired, 1, "3 failures after insert must retire"); + } + + #[tokio::test] + async fn record_replay_does_not_retire_before_probation() { + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", true, 0, 0); + let pid = trace.pathway_id.clone(); + mem.insert(trace).await.unwrap(); + // Two replays (below probation of 3), both fail. Should NOT + // retire yet — probation requires minimum 3 data points. + mem.record_replay_outcome(&pid, false).await.unwrap(); + mem.record_replay_outcome(&pid, false).await.unwrap(); + let stats = mem.stats().await; + assert_eq!(stats.retired, 0, "only 2 replays → below probation floor"); + } + + #[tokio::test] + async fn retired_pathway_never_hot_swaps_again() { + let mem = PathwayMemory::new(mk_store()); + let trace = mk_trace("a", true, 0, 0); + let pid = trace.pathway_id.clone(); + let qvec = trace.pathway_vec.clone(); + mem.insert(trace).await.unwrap(); + for _ in 0..3 { + mem.record_replay_outcome(&pid, false).await.unwrap(); + } + // Now record 10 successes to push success_rate well above 0.80. + // Pathway is still retired — retirement is sticky by design, to + // prevent oscillation on noise. + for _ in 0..10 { + mem.record_replay_outcome(&pid, true).await.unwrap(); + } + let got = mem + .query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec) + .await; + assert!(got.is_none(), "retirement must be sticky"); + } + + #[tokio::test] + async fn pathway_vec_differs_for_different_models() { + // Two pathways with same fingerprint but different ladder + // models should have different embeddings so the similarity + // gate can discriminate. This is what enables narrow fingerprint + // + similarity-vec to cluster correctly. + let a = mk_trace("a", true, 5, 5); + let mut b = a.clone(); + b.ladder_attempts[0].model = "kimi-k2:1t".into(); + b.pathway_vec = build_pathway_vec(&b); + let sim = cosine(&a.pathway_vec, &b.pathway_vec); + assert!(sim < 1.0, "different models → different embeddings"); + assert!(sim > 0.5, "shared fingerprint → embeddings still related"); + } +} diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index b81d1ba..eeb8d23 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use aibridge::client::{AiClient, EmbedRequest, GenerateRequest}; use catalogd::registry::Registry as CatalogRegistry; use storaged::registry::BucketRegistry; -use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, playbook_memory, promotion, rag, refresh, search, store, supervisor, trial}; +use crate::{agent, autotune, chunker, embedding_cache, harness, hnsw, index_registry, jobs, lance_backend, pathway_memory, playbook_memory, promotion, rag, refresh, search, store, supervisor, trial}; use tokio::sync::Semaphore; #[derive(Clone)] @@ -55,6 +55,11 @@ pub struct VectorState { /// and, when `use_playbook_memory` is set on /vectors/hybrid, boosts /// workers that were actually filled in semantically-similar past ops. pub playbook_memory: playbook_memory::PlaybookMemory, + /// Pathway memory — consensus-designed sidecar for full-context + /// backtracking + hot-swap of successful review pathways. See + /// crates/vectord/src/pathway_memory.rs for the design rationale + /// (10-probe N=3 ensemble, locked 2026-04-24). + pub pathway_memory: pathway_memory::PathwayMemory, /// Serializes embed calls from seed_playbook_memory to avoid /// concurrent socket collisions with the Python sidecar. pub embed_semaphore: Arc, @@ -137,6 +142,15 @@ pub fn router(state: VectorState) -> Router { // Phase 45 slice 3 — doc drift detection + human re-admission. .route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift)) .route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift)) + // Pathway memory — consensus-designed sidecar (2026-04-24). + // scrum_master_pipeline POSTs /pathway/insert at the end of each + // review, calls /pathway/query before running the ladder for a + // potential hot-swap, and posts /pathway/record_replay after a + // hot-swap succeeds or fails. + .route("/pathway/insert", post(pathway_insert)) + .route("/pathway/query", post(pathway_query)) + .route("/pathway/record_replay", post(pathway_record_replay)) + .route("/pathway/stats", get(pathway_stats)) .with_state(state) } @@ -2833,6 +2847,73 @@ async fn lance_build_scalar_index( } } +// ─── Pathway memory handlers ────────────────────────────────────────── +// +// Thin wrappers around pathway_memory::PathwayMemory. HTTP surface is +// deliberately small — four endpoints cover the full lifecycle: +// insert at end-of-review, query before running the ladder, +// record_replay after a hot-swap, and stats for the VCP UI. + +#[derive(Deserialize)] +struct PathwayQueryRequest { + task_class: String, + file_path: String, + signal_class: Option, + query_vec: Vec, +} + +async fn pathway_insert( + State(state): State, + Json(trace): Json, +) -> impl IntoResponse { + match state.pathway_memory.insert(trace).await { + Ok(()) => Ok(Json(json!({"ok": true}))), + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} + +async fn pathway_query( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + let cand = state + .pathway_memory + .query_hot_swap( + &req.task_class, + &req.file_path, + req.signal_class.as_deref(), + &req.query_vec, + ) + .await; + // 200 with null candidate means "no hot-swap"; this is a normal + // path, not an error — callers should proceed with the full ladder. + Json(json!({ "candidate": cand })) +} + +#[derive(Deserialize)] +struct PathwayReplayRequest { + pathway_id: String, + succeeded: bool, +} + +async fn pathway_record_replay( + State(state): State, + Json(req): Json, +) -> impl IntoResponse { + match state + .pathway_memory + .record_replay_outcome(&req.pathway_id, req.succeeded) + .await + { + Ok(()) => Ok(Json(json!({"ok": true}))), + Err(e) => Err((StatusCode::NOT_FOUND, e)), + } +} + +async fn pathway_stats(State(state): State) -> impl IntoResponse { + Json(state.pathway_memory.stats().await) +} + #[cfg(test)] mod extractor_tests { use super::*; diff --git a/tests/real-world/scrum_master_pipeline.ts b/tests/real-world/scrum_master_pipeline.ts index 9fec188..54ec918 100644 --- a/tests/real-world/scrum_master_pipeline.ts +++ b/tests/real-world/scrum_master_pipeline.ts @@ -157,6 +157,158 @@ async function embedBatch(texts: string[]): Promise { return (await r.json() as any).embeddings; } +// ─── Pathway memory (2026-04-24 consensus design) ─────────────────── +// +// Mirrors vectord/src/pathway_memory.rs. The bucket-hash vector MUST +// byte-match the Rust implementation so traces written from TypeScript +// are searchable against the same embedding space. Verified by running +// both implementations on the same input tokens and asserting matching +// bucket indices. + +function filePrefix(path: string): string { + return path.split("/").slice(0, 2).join("/"); +} + +function computePathwayId(taskClass: string, filePath: string, signalClass: string | null): string { + const h = createHash("sha256"); + h.update(taskClass); + h.update("|"); + h.update(filePrefix(filePath)); + h.update("|"); + h.update(signalClass ?? ""); + return h.digest("hex"); +} + +// 32-bucket L2-normalized token hash. Same algorithm as Rust. +function buildPathwayVec(tokens: string[]): number[] { + const buckets = new Array(32).fill(0); + for (const t of tokens) { + const d = createHash("sha256").update(t, "utf8").digest(); + const b1 = d[0] % 32; + const b2 = d[8] % 32; + buckets[b1] += 1; + buckets[b2] += 1; + } + let norm = 0; + for (const v of buckets) norm += v * v; + norm = Math.sqrt(norm); + if (norm > 0) for (let i = 0; i < buckets.length; i++) buckets[i] /= norm; + return buckets; +} + +// Build the minimal query vector for a pre-ladder hot-swap check. We +// don't yet know the ladder attempts or KB chunks — the query vec is +// computed from what we CAN know up front: task/file/signal. This is +// a weaker embedding than the one computed at trace-insert time, but +// similarity still discriminates between task/file/signal combinations. +function buildQueryVec(taskClass: string, filePath: string, signalClass: string | null): number[] { + const tokens = [taskClass, filePath]; + if (signalClass) tokens.push(`signal:${signalClass}`); + return buildPathwayVec(tokens); +} + +interface HotSwapCandidate { + pathway_id: string; + similarity: number; + replay_count: number; + success_rate: number; + recommended_rung: number; + recommended_model: string; +} + +async function queryHotSwap(taskClass: string, filePath: string, signalClass: string | null): Promise { + try { + const query_vec = buildQueryVec(taskClass, filePath, signalClass); + const r = await fetch(`${GATEWAY}/vectors/pathway/query`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ task_class: taskClass, file_path: filePath, signal_class: signalClass, query_vec }), + signal: AbortSignal.timeout(5000), + }); + if (!r.ok) return null; + const j = await r.json() as { candidate: HotSwapCandidate | null }; + return j.candidate ?? null; + } catch { + // Pathway service unavailable → run full ladder. Hot-swap is + // always an optimization, never a correctness requirement. + return null; + } +} + +interface LadderAttemptRec { + rung: number; + model: string; + latency_ms: number; + accepted: boolean; + reject_reason: string | null; +} + +interface PathwayTracePayload { + pathway_id: string; + task_class: string; + file_path: string; + signal_class: string | null; + created_at: string; + ladder_attempts: LadderAttemptRec[]; + kb_chunks: { source_doc: string; chunk_id: string; cosine_score: number; rank: number }[]; + observer_signals: { class: string; priors: string[]; prior_iter_outcomes: string[] }[]; + bridge_hits: { library: string; version: string }[]; + sub_pipeline_calls: { pipeline: string; result_summary: string }[]; + audit_consensus: { pass: boolean; models: string[]; disagreements: number } | null; + reducer_summary: string; + final_verdict: string; + pathway_vec: number[]; + replay_count: number; + replays_succeeded: number; + retired: boolean; +} + +async function writePathwayTrace(trace: PathwayTracePayload): Promise { + try { + await fetch(`${GATEWAY}/vectors/pathway/insert`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(trace), + signal: AbortSignal.timeout(10000), + }); + } catch { + // Fire-and-forget: scrum runs shouldn't fail if pathway insert fails. + } +} + +async function recordPathwayReplay(pathwayId: string, succeeded: boolean): Promise { + try { + await fetch(`${GATEWAY}/vectors/pathway/record_replay`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ pathway_id: pathwayId, succeeded }), + signal: AbortSignal.timeout(5000), + }); + } catch { + // Fire-and-forget. Not critical. + } +} + +// Deterministic signal_class lookup from scrum_reviews.jsonl history. +// First-time files get `null`. Files seen before get the signal class +// the observer assigned on their most-recent review (if any). Keeps the +// pathway fingerprint stable across iterations for LOOPING files. +async function lookupSignalClass(filePath: string): Promise { + try { + const { readFile } = await import("node:fs/promises"); + const raw = await readFile(SCRUM_REVIEWS_JSONL, "utf8").catch(() => ""); + if (!raw) return null; + const lines = raw.trim().split("\n").reverse(); + for (const line of lines) { + try { + const r = JSON.parse(line); + if (r.file === filePath && r.signal_class) return r.signal_class; + } catch {} + } + return null; + } catch { return null; } +} + async function chat(opts: { provider: "ollama" | "ollama_cloud", model: string, @@ -389,32 +541,63 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of let acceptedModel = ""; let acceptedOn = 0; - for (let i = 0; i < MAX_ATTEMPTS; i++) { - const n = i + 1; + // Pathway hot-swap pre-check. If a proven pathway exists for this + // (task, file_prefix, signal) combo with ≥3 replays at ≥80% success, + // skip the ladder and try its winning rung first. On success we + // record a positive replay; on failure we fall through to the full + // ladder and record a negative replay. Fire-and-forget — pathway + // service unavailable → null candidate → business as usual. + const signalClass = await lookupSignalClass(rel); + const taskClass = "scrum_review"; + const hotSwap = await queryHotSwap(taskClass, rel, signalClass); + let hotSwapOrderedIndices: number[] | null = null; + if (hotSwap) { + // Reorder the ladder to try the recommended model first. Rung + // indices are preserved in the output so the trace still reflects + // the true ladder position the model sits at. + const recommendedIdx = LADDER.findIndex(r => r.model === hotSwap.recommended_model); + if (recommendedIdx >= 0) { + log(` 🔥 hot-swap candidate: ${hotSwap.recommended_model} (rung ${hotSwap.recommended_rung}, sim=${hotSwap.similarity.toFixed(3)}, success_rate=${hotSwap.success_rate.toFixed(2)}, ${hotSwap.replay_count} replays)`); + hotSwapOrderedIndices = [recommendedIdx, ...LADDER.map((_, i) => i).filter(i => i !== recommendedIdx)]; + } + } + const ladderOrder = hotSwapOrderedIndices ?? LADDER.map((_, i) => i); + + // Collect attempts for the pathway trace sidecar. + const pathwayAttempts: LadderAttemptRec[] = []; + + for (let step = 0; step < MAX_ATTEMPTS; step++) { + const i = ladderOrder[step]; + const n = step + 1; const rung = LADDER[i]; const learning = history.length > 0 ? `\n\n═══ PRIOR ATTEMPTS FAILED. Specific issues to fix: ═══\n${history.map(h => `Attempt ${h.n} (${h.model}, ${h.chars} chars): ${h.status} — ${h.error ?? "thin/unstructured answer"}`).join("\n")}\n═══` : ""; log(` attempt ${n}/${MAX_ATTEMPTS}: ${rung.provider}::${rung.model}${learning ? " [w/ learning]" : ""}`); + const attemptStarted = Date.now(); const r = await chat({ provider: rung.provider, model: rung.model, prompt: baseTask + learning, max_tokens: 1500, }); + const attemptMs = Date.now() - attemptStarted; if (r.error) { history.push({ n, model: rung.model, status: "error", chars: 0, error: r.error.slice(0, 180) }); + pathwayAttempts.push({ rung: i + 1, model: rung.model, latency_ms: attemptMs, accepted: false, reject_reason: `error: ${r.error.slice(0, 100)}` }); log(` ✗ error: ${r.error.slice(0, 80)}`); continue; } if (!isAcceptable(r.content)) { history.push({ n, model: rung.model, status: "thin", chars: r.content.length, error: `thin/unstructured (${r.content.length} chars)` }); + pathwayAttempts.push({ rung: i + 1, model: rung.model, latency_ms: attemptMs, accepted: false, reject_reason: `thin (${r.content.length} chars)` }); log(` ✗ thin/unstructured (${r.content.length} chars)`); continue; } history.push({ n, model: rung.model, status: "accepted", chars: r.content.length }); + pathwayAttempts.push({ rung: i + 1, model: rung.model, latency_ms: attemptMs, accepted: true, reject_reason: null }); accepted = r.content; acceptedModel = `${rung.provider}/${rung.model}`; acceptedOn = n; @@ -422,6 +605,15 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of break; } + // Hot-swap bookkeeping: if we tried the recommended model first, + // report whether it worked so the pathway's success_rate updates. + if (hotSwap) { + const replaySucceeded = acceptedModel.endsWith(`/${hotSwap.recommended_model}`); + log(` pathway replay ${replaySucceeded ? "✓" : "✗"} (${hotSwap.pathway_id.slice(0, 12)}…)`); + // Fire and forget — don't await; observer can handle it. + recordPathwayReplay(hotSwap.pathway_id, replaySucceeded); + } + const review: FileReview = { file: rel, file_bytes: content.length, @@ -599,6 +791,54 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of console.error(`[scrum] failed to append scrum_reviews.jsonl: ${(e as Error).message}`); } + // Pathway trace sidecar (consensus-designed 2026-04-24). Captures + // FULL context (ladder attempts, KB chunks, observer signal, verdict) + // for similarity-based hot-swap in future iterations. First-review + // pathways start in probation (replay_count=0); they become + // hot-swappable only after ≥3 replays at ≥80% success. + try { + const pathwayTrace: PathwayTracePayload = { + pathway_id: computePathwayId(taskClass, rel, signalClass), + task_class: taskClass, + file_path: rel, + signal_class: signalClass, + created_at: row.reviewed_at, + ladder_attempts: pathwayAttempts, + kb_chunks: [ + ...topPrd.map((c, idx) => ({ + source_doc: "PRD.md", chunk_id: `prd@${c.offset}`, cosine_score: (c as any)._score ?? 0, rank: idx, + })), + ...topPlan.map((c, idx) => ({ + source_doc: "cohesion_plan", chunk_id: `plan@${c.offset}`, cosine_score: (c as any)._score ?? 0, rank: idx, + })), + ], + observer_signals: signalClass ? [{ class: signalClass, priors: [], prior_iter_outcomes: [] }] : [], + bridge_hits: [], // context7 not wired into scrum yet; empty for v1 + sub_pipeline_calls: [], // LLM Team extract happens after this row; out of scope for v1 + audit_consensus: null, // set by auditor's later N=3 pass, via /pathway/insert update + reducer_summary: accepted.slice(0, 4000), + final_verdict: verdict ?? "accepted", + // Vec built from the full attempts/chunks — richer than the + // query-time vector. The similarity gate will still discriminate + // between pathways with the same fingerprint but different + // ladder/KB profiles. + pathway_vec: buildPathwayVec([ + taskClass, + rel, + ...(signalClass ? [`signal:${signalClass}`] : []), + ...pathwayAttempts.flatMap(a => [`rung:${a.rung}`, `model:${a.model}`, `accepted:${a.accepted}`]), + ...topPrd.map(c => `kb:PRD.md`), + ...topPlan.map(c => `kb:cohesion_plan`), + ]), + replay_count: 0, + replays_succeeded: 0, + retired: false, + }; + writePathwayTrace(pathwayTrace); // fire-and-forget + } catch (e) { + console.error(`[scrum] pathway trace failed: ${(e as Error).message}`); + } + // Close the scrum → observer loop (fix 2026-04-24). Architecture // audit surfaced: observer ring had 2000 ops, 1999 from Langfuse, // zero from scrum. Observer's analyzeErrors + PLAYBOOK_BUILDER loops @@ -643,6 +883,20 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of critical_failures_count, verified_components_count, missing_components_count, + // Pathway fields: emitted on every review so the observer + // can build a full picture of hot-swap performance over time. + // `pathway_hot_swap_hit` flags whether the first-tried rung + // this review was a pathway recommendation vs the default + // ladder top. `rungs_saved` quantifies the compute we avoided + // when a hot-swap landed — this is the value metric the VCP + // UI surfaces ("avg_rungs_saved_per_commit"). + pathway_hot_swap_hit: hotSwap !== null, + pathway_id: hotSwap?.pathway_id ?? null, + pathway_similarity: hotSwap?.similarity ?? null, + pathway_success_rate: hotSwap?.success_rate ?? null, + rungs_saved: hotSwap && acceptedModel.endsWith(`/${hotSwap.recommended_model}`) + ? Math.max(0, hotSwap.recommended_rung - 1) + : 0, ts: row.reviewed_at, }), signal: AbortSignal.timeout(3000), diff --git a/ui/server.ts b/ui/server.ts index 5b10fef..9eb6f88 100644 --- a/ui/server.ts +++ b/ui/server.ts @@ -349,6 +349,44 @@ Bun.serve({ if (path === "/data/outcomes") return Response.json(await tailJsonl(`${KB}/outcomes.jsonl`, 30)); if (path === "/data/audit_facts") return Response.json(await tailJsonl(`${KB}/audit_facts.jsonl`, 30)); + // Pathway memory — consensus-designed sidecar (2026-04-24). Two + // exposed metrics: reuse_rate (activity — is it firing?) and + // avg_rungs_saved_per_commit (value — is it earning its keep?). + // Round-3 consensus (qwen3.5:397b) pointed out that activity + // without value tells us nothing; the UI needs both to judge the + // health of the hot-swap learning loop. + if (path === "/data/pathway_stats") { + try { + const r = await fetch("http://localhost:3100/vectors/pathway/stats", { signal: AbortSignal.timeout(3000) }); + if (!r.ok) return Response.json({ error: `vectord ${r.status}`, stats: null }); + const stats = await r.json(); + // Tail recent scrum events to compute avg_rungs_saved_per_commit + // (a committed review = any row in scrum_reviews.jsonl; rungs_saved + // only populates when pathway memory fired AND the recommended + // model actually produced the accept). + const reviews = await tailJsonl(`${KB}/scrum_reviews.jsonl`, 200); + let totalCommits = 0; + let totalRungsSaved = 0; + let hotSwapHits = 0; + for (const r of reviews) { + totalCommits++; + if (r.pathway_hot_swap_hit) hotSwapHits++; + if (typeof r.rungs_saved === "number") totalRungsSaved += r.rungs_saved; + } + return Response.json({ + stats, + scrum_window: { + reviews: totalCommits, + hot_swap_hits: hotSwapHits, + pathway_reuse_rate: totalCommits ? hotSwapHits / totalCommits : 0, + avg_rungs_saved_per_commit: totalCommits ? totalRungsSaved / totalCommits : 0, + }, + }); + } catch (e) { + return Response.json({ error: (e as Error).message, stats: null }); + } + } + if (path.startsWith("/data/file/")) { const relpath = decodeURIComponent(path.slice("/data/file/".length)); return Response.json(await fileHistory(relpath)); diff --git a/ui/ui.js b/ui/ui.js index 8de7352..837a0f9 100644 --- a/ui/ui.js +++ b/ui/ui.js @@ -589,6 +589,37 @@ function metricBox(label, big, kind, opts = {}) { function drawMetrics() { const grid = document.getElementById("metric-grid"); clear(grid); + // Kick off pathway fetch in parallel; render when it resolves so the + // rest of the metrics grid appears immediately. The cards append to + // the grid after the synchronous block below — they'll show up at + // the bottom of the grid within a tick of first render. + fetch("/data/pathway_stats").then(r => r.ok ? r.json() : null).then(j => { + if (!j || !j.stats) return; + const s = j.stats; + const w = j.scrum_window ?? {}; + // Activity metric — is the hot-swap firing at all? + grid.append(metricBox("pathway reuse rate", `${Math.round((w.pathway_reuse_rate ?? 0) * 100)}%`, + (w.pathway_reuse_rate ?? 0) > 0.1 ? "good" : (w.pathway_reuse_rate ?? 0) > 0 ? "warn" : "bad", { + explain: "% of recent reviews where a pathway hot-swap fired (narrow fingerprint match + 0.80 success rate + ≥3 replays + audit_consensus pass + 0.90 similarity).", + source: `scrum_reviews.jsonl .pathway_hot_swap_hit over last ${w.reviews ?? 0} reviews (${w.hot_swap_hits ?? 0} hits)`, + good: "≥10% sustained = index earning its keep. <10% over many iters = fingerprint too narrow or probation too strict. 0% on fresh install is expected (no replays yet).", + })); + // Value metric — how much compute did hot-swap actually save? + const saved = w.avg_rungs_saved_per_commit ?? 0; + grid.append(metricBox("avg rungs saved", saved.toFixed(2), + saved >= 1 ? "good" : saved > 0 ? "warn" : "bad", { + explain: "Average ladder rungs skipped per committed review by hot-swap. Rungs_saved = recommended_rung - 1 when the recommended model succeeded (otherwise 0).", + source: "scrum_reviews.jsonl .rungs_saved averaged", + good: "Every 1.0 here ≈ one less model call per review. At 21 files/iter, 1.0 saved = 21 cloud calls avoided. Value only counts when the replay actually succeeded.", + })); + // Stability metric — retired pathways indicate the learning loop is correcting itself. + grid.append(metricBox("pathways tracked", String(s.total_pathways), + s.total_pathways > 0 ? "good" : "warn", { + explain: `Total pathway traces stored. ${s.retired} retired (below 0.80 success after ≥3 replays). ${s.with_audit_pass} audit-passed, eligible for hot-swap probation.`, + source: "/vectors/pathway/stats", + good: `Grows monotonically with scrum runs. Retired=${s.retired} is HEALTHY — it means the learning loop is pruning pathways that stopped working. replay_success_rate=${(s.replay_success_rate*100).toFixed(0)}% aggregates all historical replays.`, + })); + }).catch(() => {}); const byTier = { auto:0, dry_run:0, simulation:0, block:0, unknown:0 }; state.reviews.forEach(r => { const t = r.gradient_tier ?? "unknown"; if (byTier[t] != null) byTier[t]++; }); const total = state.reviews.length || 1;