lakehouse/crates/vectord/src/pathway_memory.rs
root 0a0843b605
Some checks failed
lakehouse/auditor 4 blocking issues: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
ADR-021: semantic-correctness layer lands in pathway_memory (A+B+C)
Phase A — data model (vectord/src/pathway_memory.rs):
  + SemanticFlag enum (9 variants: UnitMismatch, TypeConfusion,
    NullableConfusion, OffByOne, StaleReference, PseudoImpl, DeadCode,
    WarningNoise, BoundaryViolation) as #[serde(tag = "kind")]
  + TypeHint { source, symbol, type_repr }
  + BugFingerprint { flag, pattern_key, example, occurrences }
  + PathwayTrace gains semantic_flags, type_hints_used, bug_fingerprints
    all #[serde(default)] for back-compat deserialization of pre-ADR-021
    traces on disk
  + build_pathway_vec now tokenizes flag:{variant} + bug:{flag}:{key}
    so traces with different bug histories cluster separately in the
    similarity gate (proven by pathway_vec_differs_when_bug_fingerprint_added
    test)

Phase B — producer (scrum_master_pipeline.ts):
  + Prompt addendum: each finding must carry `**Flag: <CATEGORY>**` tag
    alongside the existing Confidence: NN% tag. 9 category choices plus
    `None` for improvements that aren't bug-shaped.
  + Parser extracts tagged flags from reviewer markdown; falls back to
    bare-word match if reviewer omits the label. Deduplicated per trace.
  + PathwayTracePayload gains semantic_flags / type_hints_used /
    bug_fingerprints fields. Wire format matches Rust serde tagged enum
    so TS and Rust interop directly.

Phase C — pre-review enrichment:
  + new `/vectors/pathway/bug_fingerprints` endpoint aggregates
    occurrences by (flag, pattern_key) across traces sharing a narrow
    fingerprint, sorts by frequency, returns top-K.
  + scrum calls it before the ladder and prepends a PATHWAY MEMORY
    preamble to the reviewer prompt ("these patterns appeared N times
    on this file area before — check for recurrences"). Empty on
    fresh install; grows as the matrix index learns.

Tests: 27 pathway_memory tests green (was 18). New tests:
  - pathway_trace_deserializes_without_new_fields_backcompat
  - semantic_flag_serializes_as_tagged_enum
  - bug_fingerprint_roundtrips_through_serde
  - pathway_vec_differs_when_bug_fingerprint_added
  - semantic_flag_discriminates_by_variant
  - bug_fingerprints_aggregate_by_pattern_key (sums occurrences, sorts desc)
  - bug_fingerprints_empty_for_unseen_fingerprint
  - bug_fingerprints_respects_limit
  - insert_preserves_semantic_fields (roundtrip via persist + reload)

Workspace warnings unchanged at 11.

What's still queued (not this commit):
  - type_hints_used population from catalogd column types + Arrow schema
  - bug_fingerprint extraction from reviewer output (Phase D — for now
    semantic_flags populate but the fingerprint key requires parsing
    code-shape from the finding; next iteration's work)
  - auditor → pathway audit_consensus update wire (explicit-fail gate)

Why this commit matters: the mechanical applier's gates are syntactic
(warning count, patch size, rationale-token alignment). The
queryd/delta.rs base_rows bug (86901f8) was found by human reading —
unit mismatch between row counts and file counts. At 100 bugs this
deep, humans can't catch them all; the matrix index has to learn the
shapes. This commit gives it the fields to learn into and the surface
to read from.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 05:49:10 -05:00

1046 lines
41 KiB
Rust

//! Pathway memory — full backtrack-able context for scrum/auditor reviews.
//!
//! Consensus-designed (10-probe N=3 ensemble, see
//! `data/_kb/consensus_reducer_design_*.json`). The reducer emits a
//! `PathwayTrace` sidecar alongside its legacy summary. Traces are
//! fingerprinted narrowly (`task_class + file_prefix + signal_class`) for
//! generalizing hot-swap, and embedded via normalized-metadata-token
//! concatenation so the HNSW similarity search can discriminate between
//! pathways that share a fingerprint but diverged in ladder/KB choices.
//!
//! The hot-swap decision requires four conditions in AND:
//! 1. narrow fingerprint match
//! 2. audit_consensus.pass == true
//! 3. replay_count >= 3
//! 4. replays_succeeded / replay_count >= 0.80
//! 5. NOT retired
//! 6. similarity(new, stored) >= 0.90
//!
//! Any replay reports its outcome via `record_replay_outcome`; pathways
//! whose success rate drops below 0.80 after >=3 replays are marked
//! retired and excluded from further hot-swap consideration. This is the
//! self-correcting learning loop — a pathway that worked once but breaks
//! under distribution shift removes itself automatically.
use std::collections::HashMap;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use storaged::ops;
use tokio::sync::RwLock;
const STATE_KEY: &str = "_pathway_memory/state.json";
/// Outcome of one ladder rung attempt. Captured for every attempt,
/// regardless of whether it was accepted — rejections are signal too.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LadderAttempt {
pub rung: u8,
pub model: String,
pub latency_ms: u64,
pub accepted: bool,
pub reject_reason: Option<String>,
}
/// Provenance of a RAG chunk retrieved for this review. The
/// `cosine_score` is the similarity as returned by the index; `rank` is
/// 0-indexed order in the top-K result list.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct KbChunkRef {
pub source_doc: String,
pub chunk_id: String,
pub cosine_score: f32,
pub rank: u8,
}
/// Signal emitted by mcp-server/observer classifier.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ObserverSignal {
pub class: String,
pub priors: Vec<String>,
pub prior_iter_outcomes: Vec<String>,
}
/// Context7-bridge lookup snapshot.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct BridgeHit {
pub library: String,
pub version: String,
}
/// Call to LLM Team (/api/run?mode=extract) or auditor N=3 consensus.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SubPipelineCall {
pub pipeline: String, // "llm_team_extract" / "audit_consensus" / etc.
pub result_summary: String,
}
/// N=3 independent consensus re-check result.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct AuditConsensus {
pub pass: bool,
pub models: Vec<String>,
pub disagreements: u32,
}
// ─── ADR-021: Semantic correctness layer ────────────────────────────
//
// SemanticFlag names the CATEGORY of bug found. Scrum reviewer attaches
// these to findings (via prompt instruction to tag); the matrix index
// uses them for "same crate has seen N unit mismatches" preemption.
//
// Discipline: extend this enum only when a real bug is found that
// doesn't fit an existing variant. Avoid the "add a vague variant just
// in case" anti-pattern — it dilutes the grammar the index learns from.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(tag = "kind")]
pub enum SemanticFlag {
/// Operation combines values with different units (e.g.
/// `row_count - file_count`, `bytes - rows`). Instance that motivated
/// ADR-021: queryd/delta.rs base_rows = pre_filter_rows - delta_count.
UnitMismatch,
/// Same type, wrong role (e.g. treating a PK as a row index).
TypeConfusion,
/// Unwrap-without-check or nullable-treated-as-non-null paths.
NullableConfusion,
/// Off-by-one in loops / ranges / slice bounds.
OffByOne,
/// Reference to a deprecated / removed / moved symbol that the
/// compiler hasn't flagged (trait method shadowing, feature flags).
StaleReference,
/// Pseudo-implementation: stub body, `todo!()`, or function named
/// for work it doesn't actually do. Distinct from DeadCode — pseudo
/// is CALLED but doesn't do its job.
PseudoImpl,
/// Unreachable or uncalled code that compiles but serves no purpose.
DeadCode,
/// Code compiles green but emits a warning the workspace baseline
/// didn't have. The applier's new-warning gate already catches these
/// at commit time; flagging at review time lets the matrix index
/// surface "this file area tends to produce warning noise."
WarningNoise,
/// Operation crosses a layer/crate boundary it shouldn't (e.g. a
/// hot-path function calling a cloud API, or a catalog op mutating
/// storage directly).
BoundaryViolation,
}
/// What schema/type context was surfaced to the reviewer when this
/// pathway was produced. Empty = bootstrap path (reviewer got no
/// type context); populated = we fed the model typed info to work with.
/// Drift in this field over time is the feedback signal for "are we
/// getting smarter at enriching prompts?"
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct TypeHint {
/// Where the hint came from: "catalogd" | "arrow_schema" |
/// "rust_struct" | "truth_rule" | "manual".
pub source: String,
/// The identifier being typed (field name, variable, column).
pub symbol: String,
/// The type as extracted (stringly-typed is fine — this is a
/// retrieval key, not a compiler representation).
pub type_repr: String,
}
/// Stable hash of a bug pattern. Used by the matrix index to retrieve
/// "similar-shaped bugs" across files. The `pattern_key` is the field
/// that's semantically load-bearing; `occurrences` is how many times
/// this exact signature has appeared in this pathway's file history.
/// `example` is one representative code snippet so the prompt can
/// quote it back to future reviewers.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct BugFingerprint {
pub flag: SemanticFlag,
/// SHA256 of the structural pattern (e.g. for UnitMismatch:
/// `"row_count-file_count"` → its hash). Stable across minor
/// token-level variation so the same bug shape clusters.
pub pattern_key: String,
pub example: String,
pub occurrences: u32,
}
/// Full backtrack-able context for one reviewed file. Lives alongside
/// the reducer's summary — summary is what the reviewer LLM sees, this
/// is what the auditor / future iterations / hot-swap use.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PathwayTrace {
pub pathway_id: String, // SHA256(task_class|file_prefix|signal_class)
pub task_class: String,
pub file_path: String,
pub signal_class: Option<String>,
pub created_at: DateTime<Utc>,
pub ladder_attempts: Vec<LadderAttempt>,
pub kb_chunks: Vec<KbChunkRef>,
pub observer_signals: Vec<ObserverSignal>,
pub bridge_hits: Vec<BridgeHit>,
pub sub_pipeline_calls: Vec<SubPipelineCall>,
pub audit_consensus: Option<AuditConsensus>,
pub reducer_summary: String,
pub final_verdict: String,
/// Normalized-metadata-token embedding. Dimension fixed per index
/// version (current: 32, sufficient to distinguish task/file/signal
/// combinations without requiring an external embedding model —
/// round-3 consensus said "small metadata tokens", not "full JSON").
pub pathway_vec: Vec<f32>,
/// Number of times this pathway has been replayed via hot-swap.
/// Replay only begins after first insert; initial insert itself is
/// NOT a replay. Probation of ≥3 replays is required before the
/// success-rate gate can fire.
pub replay_count: u32,
pub replays_succeeded: u32,
/// ADR-021 semantic-correctness layer. Populated by scrum reviewer
/// via explicit prompt-level tagging of findings. Empty on existing
/// traces (pre-ADR-021 inserts); additive field so back-compat
/// deserialization works via serde default.
#[serde(default)]
pub semantic_flags: Vec<SemanticFlag>,
/// Schema/type context fed to the reviewer during this pathway's
/// review. Starts empty (bootstrap); fills as we wire catalogd +
/// arrow_schema + truth_rule enrichment into the prompt pipeline.
#[serde(default)]
pub type_hints_used: Vec<TypeHint>,
/// Bug patterns caught on this file/pathway — the matrix index's
/// retrieval key for "have we seen this shape here before?"
#[serde(default)]
pub bug_fingerprints: Vec<BugFingerprint>,
/// Marked true when replay_count >= 3 AND success_rate < 0.80.
/// Retired pathways are excluded from hot-swap forever. (If the
/// underlying file / task / signal characteristics genuinely change
/// such that a retired pathway would work again, a new PathwayTrace
/// with a fresh id will be inserted — retirement is per-id.)
pub retired: bool,
}
impl PathwayTrace {
/// Compute the narrow fingerprint id from task_class + file_prefix
/// + signal_class. `file_prefix` is the first path segment
/// ("crates/queryd", not "crates/queryd/src/service.rs") so that
/// related files in the same crate share pathways.
pub fn compute_id(task_class: &str, file_path: &str, signal_class: Option<&str>) -> String {
let prefix = file_prefix(file_path);
let sig = signal_class.unwrap_or("");
let mut hasher = Sha256::new();
hasher.update(task_class.as_bytes());
hasher.update(b"|");
hasher.update(prefix.as_bytes());
hasher.update(b"|");
hasher.update(sig.as_bytes());
format!("{:x}", hasher.finalize())
}
pub fn success_rate(&self) -> f32 {
if self.replay_count == 0 {
return 0.0;
}
self.replays_succeeded as f32 / self.replay_count as f32
}
}
/// First two path segments, so `crates/queryd/src/service.rs` →
/// `crates/queryd`. This is intentional — similar files in the same
/// crate often share task characteristics (e.g., all files in
/// `crates/queryd/` are SQL-path Rust code), so fingerprinting on the
/// crate-level prefix lets the hot-swap generalize across files within
/// the crate. Exactly-matching file paths still match (same prefix).
pub fn file_prefix(path: &str) -> String {
let parts: Vec<&str> = path.split('/').take(2).collect();
parts.join("/")
}
/// Build the pathway vector from trace metadata. Intentionally simple —
/// deterministic bag-of-tokens hash into 32 buckets, normalized. Round-3
/// consensus said "small metadata tokens, not full JSON." An external
/// embedding model would work too but adds a dependency, failure mode,
/// and drift risk the consensus flagged.
pub fn build_pathway_vec(trace: &PathwayTrace) -> Vec<f32> {
let mut buckets = vec![0f32; 32];
let mut tokens: Vec<String> = Vec::new();
tokens.push(trace.task_class.clone());
tokens.push(trace.file_path.clone());
if let Some(s) = &trace.signal_class {
tokens.push(format!("signal:{s}"));
}
for a in &trace.ladder_attempts {
tokens.push(format!("rung:{}", a.rung));
tokens.push(format!("model:{}", a.model));
tokens.push(format!("accepted:{}", a.accepted));
}
for k in &trace.kb_chunks {
tokens.push(format!("kb:{}", k.source_doc));
}
for o in &trace.observer_signals {
tokens.push(format!("class:{}", o.class));
}
for b in &trace.bridge_hits {
tokens.push(format!("lib:{}", b.library));
}
for s in &trace.sub_pipeline_calls {
tokens.push(format!("pipeline:{}", s.pipeline));
}
// ADR-021: include semantic flags + bug fingerprints in the
// embedding so pathways with the same narrow fingerprint but
// different bug histories cluster separately. "This file has
// had 3 unit mismatches" is a different pathway from "this file
// is clean" — similarity gate should see them as distinct.
for f in &trace.semantic_flags {
tokens.push(format!("flag:{:?}", f));
}
for bp in &trace.bug_fingerprints {
tokens.push(format!("bug:{:?}:{}", bp.flag, bp.pattern_key));
}
for t in &tokens {
let mut h = Sha256::new();
h.update(t.as_bytes());
let d = h.finalize();
// Two bucket writes per token: use different byte windows to
// spread probability across buckets even when tokens share a
// common prefix.
let b1 = (d[0] as usize) % 32;
let b2 = (d[8] as usize) % 32;
buckets[b1] += 1.0;
buckets[b2] += 1.0;
}
// L2 normalize so cosine similarity becomes a dot product.
let norm: f32 = buckets.iter().map(|v| v * v).sum::<f32>().sqrt();
if norm > 0.0 {
for v in &mut buckets {
*v /= norm;
}
}
buckets
}
pub fn cosine(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() {
return 0.0;
}
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum::<f32>()
}
#[derive(Default, Clone, Serialize, Deserialize)]
struct PathwayMemoryState {
pathways: HashMap<String, Vec<PathwayTrace>>, // key = pathway_id (narrow fingerprint)
last_updated_at: i64,
}
#[derive(Clone)]
pub struct PathwayMemory {
state: Arc<RwLock<PathwayMemoryState>>,
store: Arc<dyn ObjectStore>,
}
#[derive(Debug, Serialize)]
pub struct HotSwapCandidate {
pub pathway_id: String,
pub similarity: f32,
pub replay_count: u32,
pub success_rate: f32,
pub recommended_rung: u8,
pub recommended_model: String,
}
impl PathwayMemory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
state: Arc::new(RwLock::new(PathwayMemoryState::default())),
store,
}
}
pub async fn load_from_storage(&self) -> Result<usize, String> {
let data = match ops::get(&self.store, STATE_KEY).await {
Ok(d) => d,
Err(_) => return Ok(0),
};
let persisted: PathwayMemoryState = serde_json::from_slice(&data)
.map_err(|e| format!("parse pathway_memory state: {e}"))?;
let n: usize = persisted.pathways.values().map(|v| v.len()).sum();
*self.state.write().await = persisted;
tracing::info!("pathway_memory: loaded {n} traces from {STATE_KEY}");
Ok(n)
}
async fn persist(&self) -> Result<(), String> {
let snapshot = self.state.read().await.clone();
let bytes = serde_json::to_vec_pretty(&snapshot).map_err(|e| e.to_string())?;
ops::put(&self.store, STATE_KEY, bytes.into()).await
}
/// Insert a new pathway trace. Called by scrum_master_pipeline at
/// the end of each file's review. Computes the pathway_vec from
/// metadata if the caller didn't supply one. Appends to the bucket
/// for this pathway_id — multiple traces can share a fingerprint
/// (each represents one review of the same file/task/signal combo).
pub async fn insert(&self, mut trace: PathwayTrace) -> Result<(), String> {
if trace.pathway_vec.is_empty() {
trace.pathway_vec = build_pathway_vec(&trace);
}
let mut s = self.state.write().await;
s.pathways
.entry(trace.pathway_id.clone())
.or_default()
.push(trace);
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await
}
/// Query for a hot-swap candidate. Returns `None` if no eligible
/// pathway exists — caller should run the full ladder. Returns
/// `Some(cand)` if all gates pass — caller can short-circuit to
/// `cand.recommended_rung` / `cand.recommended_model`.
///
/// Gates (all must hold):
/// - narrow fingerprint match (same task/file_prefix/signal)
/// - audit_consensus.pass == true on the stored trace
/// - replay_count >= 3 (probation)
/// - success_rate >= 0.80
/// - NOT retired
/// - similarity(query_vec, stored.pathway_vec) >= 0.90
pub async fn query_hot_swap(
&self,
task_class: &str,
file_path: &str,
signal_class: Option<&str>,
query_vec: &[f32],
) -> Option<HotSwapCandidate> {
let id = PathwayTrace::compute_id(task_class, file_path, signal_class);
let s = self.state.read().await;
let candidates = s.pathways.get(&id)?;
let mut best: Option<(f32, &PathwayTrace)> = None;
for p in candidates {
if p.retired {
continue;
}
// audit_consensus gate: explicit FAIL blocks hot-swap. A null
// audit_consensus (auditor hasn't seen this pathway yet) is
// NOT a block — the success_rate gate below still requires
// ≥3 real-world replays at ≥80% success before a pathway
// becomes hot-swap eligible, so the learning loop itself
// provides the safety net during bootstrap. Once the auditor
// pipeline wires pathway audit updates, this gate tightens
// automatically: any explicit audit_consensus.pass == false
// here will skip the candidate.
if let Some(ac) = &p.audit_consensus {
if !ac.pass {
continue;
}
}
if p.replay_count < 3 {
continue;
}
if p.success_rate() < 0.80 {
continue;
}
let sim = cosine(query_vec, &p.pathway_vec);
if sim < 0.90 {
continue;
}
if best.as_ref().map(|(b, _)| sim > *b).unwrap_or(true) {
best = Some((sim, p));
}
}
let (similarity, p) = best?;
// The "recommended" rung is the first accepted attempt in the
// stored pathway — that's the one the ladder converged on.
let accepted = p.ladder_attempts.iter().find(|a| a.accepted)?;
Some(HotSwapCandidate {
pathway_id: p.pathway_id.clone(),
similarity,
replay_count: p.replay_count,
success_rate: p.success_rate(),
recommended_rung: accepted.rung,
recommended_model: accepted.model.clone(),
})
}
/// Record the outcome of a hot-swap replay. Increments replay_count
/// unconditionally; increments replays_succeeded iff succeeded;
/// retires the pathway if replay_count >= 3 and success_rate falls
/// below 0.80. Mistral's learning loop in code.
pub async fn record_replay_outcome(
&self,
pathway_id: &str,
succeeded: bool,
) -> Result<(), String> {
let mut s = self.state.write().await;
// Find the specific pathway across the bucket that matches by
// full id (the bucket key is already the narrow id, but in case
// of future multi-trace-per-id we take the most recent).
let bucket = s
.pathways
.iter_mut()
.find(|(k, _)| k.as_str() == pathway_id)
.map(|(_, v)| v)
.ok_or_else(|| format!("pathway {pathway_id} not found"))?;
let p = bucket
.last_mut()
.ok_or_else(|| format!("pathway {pathway_id} has empty bucket"))?;
p.replay_count = p.replay_count.saturating_add(1);
if succeeded {
p.replays_succeeded = p.replays_succeeded.saturating_add(1);
}
if p.replay_count >= 3 && p.success_rate() < 0.80 {
p.retired = true;
}
s.last_updated_at = Utc::now().timestamp_millis();
drop(s);
self.persist().await
}
/// ADR-021 Phase C: retrieve aggregated bug fingerprints for a
/// narrow fingerprint (task_class + file_prefix + signal_class).
/// Scrum pipeline calls this BEFORE running the ladder and prepends
/// the result to the reviewer prompt as historical context.
///
/// Returns at most `limit` most-frequent patterns across all traces
/// sharing the narrow id. Frequency is summed `occurrences` — a
/// fingerprint seen in 3 traces with occurrences 2/1/1 comes back
/// as occurrences=4 so the preempt-prompt can say "this pattern
/// appeared 4 times on this crate."
pub async fn bug_fingerprints_for(
&self,
task_class: &str,
file_path: &str,
signal_class: Option<&str>,
limit: usize,
) -> Vec<BugFingerprint> {
let id = PathwayTrace::compute_id(task_class, file_path, signal_class);
let s = self.state.read().await;
let Some(traces) = s.pathways.get(&id) else { return Vec::new(); };
// Aggregate by (flag, pattern_key) and sum occurrences. Keep a
// representative example (first one seen is fine — bug examples
// are semantically equivalent within a pattern_key by design).
let mut agg: HashMap<(String, String), (SemanticFlag, String, u32)> = HashMap::new();
for t in traces {
for bp in &t.bug_fingerprints {
let key = (format!("{:?}", bp.flag), bp.pattern_key.clone());
let entry = agg.entry(key).or_insert_with(|| {
(bp.flag.clone(), bp.example.clone(), 0)
});
entry.2 = entry.2.saturating_add(bp.occurrences);
}
}
let mut out: Vec<BugFingerprint> = agg
.into_iter()
.map(|((_, pk), (flag, ex, occ))| BugFingerprint {
flag,
pattern_key: pk,
example: ex,
occurrences: occ,
})
.collect();
out.sort_by(|a, b| b.occurrences.cmp(&a.occurrences));
out.truncate(limit);
out
}
pub async fn stats(&self) -> PathwayMemoryStats {
let s = self.state.read().await;
let mut total = 0usize;
let mut retired = 0usize;
let mut with_audit_pass = 0usize;
let mut total_replays = 0u64;
let mut successful_replays = 0u64;
for bucket in s.pathways.values() {
for p in bucket {
total += 1;
if p.retired {
retired += 1;
}
if p.audit_consensus.as_ref().map(|a| a.pass).unwrap_or(false) {
with_audit_pass += 1;
}
total_replays += p.replay_count as u64;
successful_replays += p.replays_succeeded as u64;
}
}
PathwayMemoryStats {
total_pathways: total,
retired,
with_audit_pass,
total_replays,
successful_replays,
reuse_rate: if total == 0 {
0.0
} else {
total_replays as f32 / total as f32
},
replay_success_rate: if total_replays == 0 {
0.0
} else {
successful_replays as f32 / total_replays as f32
},
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PathwayMemoryStats {
pub total_pathways: usize,
pub retired: usize,
pub with_audit_pass: usize,
pub total_replays: u64,
pub successful_replays: u64,
pub reuse_rate: f32, // total_replays / total_pathways
pub replay_success_rate: f32, // successful_replays / total_replays
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::memory::InMemory;
fn mk_store() -> Arc<dyn ObjectStore> {
Arc::new(InMemory::new())
}
fn mk_trace(id_tag: &str, audit_pass: bool, replays: u32, succ: u32) -> PathwayTrace {
let pathway_id =
PathwayTrace::compute_id("scrum_review", &format!("crates/{id_tag}/src/x.rs"), Some("CONVERGING"));
let attempts = vec![LadderAttempt {
rung: 2,
model: "qwen3-coder:480b".into(),
latency_ms: 1000,
accepted: true,
reject_reason: None,
}];
let mut trace = PathwayTrace {
pathway_id,
task_class: "scrum_review".into(),
file_path: format!("crates/{id_tag}/src/x.rs"),
signal_class: Some("CONVERGING".into()),
created_at: Utc::now(),
ladder_attempts: attempts,
kb_chunks: vec![KbChunkRef {
source_doc: "PRD.md".into(),
chunk_id: "c1".into(),
cosine_score: 0.88,
rank: 0,
}],
observer_signals: vec![],
bridge_hits: vec![],
sub_pipeline_calls: vec![],
audit_consensus: Some(AuditConsensus {
pass: audit_pass,
models: vec!["qwen3-coder:480b".into(), "gpt-oss:120b".into(), "kimi-k2:1t".into()],
disagreements: 0,
}),
reducer_summary: "ok".into(),
final_verdict: "accepted".into(),
pathway_vec: vec![],
semantic_flags: vec![],
type_hints_used: vec![],
bug_fingerprints: vec![],
replay_count: replays,
replays_succeeded: succ,
retired: false,
};
trace.pathway_vec = build_pathway_vec(&trace);
trace
}
#[test]
fn file_prefix_takes_first_two_segments() {
assert_eq!(file_prefix("crates/queryd/src/service.rs"), "crates/queryd");
assert_eq!(file_prefix("crates/gateway"), "crates/gateway");
assert_eq!(file_prefix("README.md"), "README.md");
assert_eq!(file_prefix(""), "");
}
#[test]
fn compute_id_is_deterministic() {
let a = PathwayTrace::compute_id("scrum", "crates/queryd/src/x.rs", Some("LOOPING"));
let b = PathwayTrace::compute_id("scrum", "crates/queryd/src/x.rs", Some("LOOPING"));
assert_eq!(a, b);
}
#[test]
fn compute_id_generalizes_across_same_prefix() {
// Same prefix + task + signal → same id. That IS the narrow
// generalization — it's what lets hot-swap fire for different
// files in the same crate that share the task/signal profile.
let a = PathwayTrace::compute_id("scrum", "crates/queryd/src/a.rs", Some("L"));
let b = PathwayTrace::compute_id("scrum", "crates/queryd/src/b.rs", Some("L"));
assert_eq!(a, b);
}
#[test]
fn compute_id_differs_on_signal_class() {
let a = PathwayTrace::compute_id("scrum", "crates/q/s", Some("CONVERGING"));
let b = PathwayTrace::compute_id("scrum", "crates/q/s", Some("LOOPING"));
assert_ne!(a, b);
}
#[test]
fn cosine_handles_mismatched_lengths() {
assert_eq!(cosine(&[1.0, 0.0], &[1.0]), 0.0);
}
#[test]
fn cosine_of_identical_normalized_is_one() {
let v = vec![0.6, 0.8];
let c = cosine(&v, &v);
assert!((c - 1.0).abs() < 1e-5);
}
#[test]
fn success_rate_is_zero_before_any_replay() {
let t = mk_trace("a", true, 0, 0);
assert_eq!(t.success_rate(), 0.0);
}
#[test]
fn success_rate_ratio() {
let t = mk_trace("a", true, 4, 3);
assert!((t.success_rate() - 0.75).abs() < 1e-5);
}
#[tokio::test]
async fn insert_and_stats_roundtrip() {
let mem = PathwayMemory::new(mk_store());
mem.insert(mk_trace("a", true, 0, 0)).await.unwrap();
let stats = mem.stats().await;
assert_eq!(stats.total_pathways, 1);
assert_eq!(stats.retired, 0);
assert_eq!(stats.with_audit_pass, 1);
}
#[tokio::test]
async fn hot_swap_rejects_when_probation_not_met() {
// Probation: replay_count must be >= 3 before success-rate gate
// can fire. A fresh pathway with 0 replays must NEVER hot-swap
// even if its similarity is 1.0 and audit passes.
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", true, 0, 0);
let qvec = trace.pathway_vec.clone();
mem.insert(trace).await.unwrap();
let got = mem
.query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec)
.await;
assert!(got.is_none(), "fresh pathway must not hot-swap");
}
#[tokio::test]
async fn hot_swap_rejects_when_audit_explicitly_fails() {
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", false, 5, 5); // audit FAILED explicitly
let qvec = trace.pathway_vec.clone();
mem.insert(trace).await.unwrap();
let got = mem
.query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec)
.await;
assert!(got.is_none(), "pathway with explicit audit FAIL must not hot-swap");
}
#[tokio::test]
async fn hot_swap_accepts_unaudited_pathway_for_bootstrap() {
// v1 bootstrap: auditor doesn't update pathway audit_consensus
// until Phase N+1 wires it. Until then, null audit_consensus
// must NOT block hot-swap — the success_rate + probation gates
// alone prove safety. Once auditor wires up, explicit audit
// failures will re-introduce the block (see previous test).
let mem = PathwayMemory::new(mk_store());
let mut trace = mk_trace("a", true, 5, 5);
trace.audit_consensus = None; // bootstrap path
trace.pathway_vec = build_pathway_vec(&trace);
let qvec = trace.pathway_vec.clone();
mem.insert(trace).await.unwrap();
let got = mem
.query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec)
.await;
assert!(got.is_some(), "unaudited pathway with good replay history must hot-swap");
}
#[tokio::test]
async fn hot_swap_rejects_when_success_rate_below_80pct() {
// 10 replays, 7 succeeded = 70% — below the 0.80 threshold.
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", true, 10, 7);
let qvec = trace.pathway_vec.clone();
mem.insert(trace).await.unwrap();
let got = mem
.query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec)
.await;
assert!(got.is_none());
}
#[tokio::test]
async fn hot_swap_accepts_when_all_gates_pass() {
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", true, 5, 5); // 100% success after 5 replays
let qvec = trace.pathway_vec.clone();
mem.insert(trace).await.unwrap();
let got = mem
.query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec)
.await;
let cand = got.expect("should hot-swap");
assert!(cand.similarity >= 0.90);
assert_eq!(cand.recommended_rung, 2);
assert_eq!(cand.recommended_model, "qwen3-coder:480b");
}
#[tokio::test]
async fn record_replay_retires_pathway_on_failure_pattern() {
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", true, 0, 0);
let pid = trace.pathway_id.clone();
mem.insert(trace).await.unwrap();
// Three replays, all fail → success_rate = 0.0 → retired.
mem.record_replay_outcome(&pid, false).await.unwrap();
mem.record_replay_outcome(&pid, false).await.unwrap();
mem.record_replay_outcome(&pid, false).await.unwrap();
let stats = mem.stats().await;
assert_eq!(stats.retired, 1, "3 failures after insert must retire");
}
#[tokio::test]
async fn record_replay_does_not_retire_before_probation() {
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", true, 0, 0);
let pid = trace.pathway_id.clone();
mem.insert(trace).await.unwrap();
// Two replays (below probation of 3), both fail. Should NOT
// retire yet — probation requires minimum 3 data points.
mem.record_replay_outcome(&pid, false).await.unwrap();
mem.record_replay_outcome(&pid, false).await.unwrap();
let stats = mem.stats().await;
assert_eq!(stats.retired, 0, "only 2 replays → below probation floor");
}
#[tokio::test]
async fn retired_pathway_never_hot_swaps_again() {
let mem = PathwayMemory::new(mk_store());
let trace = mk_trace("a", true, 0, 0);
let pid = trace.pathway_id.clone();
let qvec = trace.pathway_vec.clone();
mem.insert(trace).await.unwrap();
for _ in 0..3 {
mem.record_replay_outcome(&pid, false).await.unwrap();
}
// Now record 10 successes to push success_rate well above 0.80.
// Pathway is still retired — retirement is sticky by design, to
// prevent oscillation on noise.
for _ in 0..10 {
mem.record_replay_outcome(&pid, true).await.unwrap();
}
let got = mem
.query_hot_swap("scrum_review", "crates/a/src/x.rs", Some("CONVERGING"), &qvec)
.await;
assert!(got.is_none(), "retirement must be sticky");
}
#[tokio::test]
async fn pathway_vec_differs_for_different_models() {
// Two pathways with same fingerprint but different ladder
// models should have different embeddings so the similarity
// gate can discriminate. This is what enables narrow fingerprint
// + similarity-vec to cluster correctly.
let a = mk_trace("a", true, 5, 5);
let mut b = a.clone();
b.ladder_attempts[0].model = "kimi-k2:1t".into();
b.pathway_vec = build_pathway_vec(&b);
let sim = cosine(&a.pathway_vec, &b.pathway_vec);
assert!(sim < 1.0, "different models → different embeddings");
assert!(sim > 0.5, "shared fingerprint → embeddings still related");
}
// ─── ADR-021 semantic-correctness layer tests ───────────────────
#[test]
fn pathway_trace_deserializes_without_new_fields_backcompat() {
// Critical: existing traces on disk (persisted before ADR-021)
// must still deserialize. serde(default) on the three new fields
// is the back-compat mechanism — verify it holds.
let json = r#"{
"pathway_id": "abc",
"task_class": "scrum_review",
"file_path": "crates/x/y.rs",
"signal_class": null,
"created_at": "2026-04-24T00:00:00Z",
"ladder_attempts": [],
"kb_chunks": [],
"observer_signals": [],
"bridge_hits": [],
"sub_pipeline_calls": [],
"audit_consensus": null,
"reducer_summary": "old trace",
"final_verdict": "accepted",
"pathway_vec": [],
"replay_count": 0,
"replays_succeeded": 0,
"retired": false
}"#;
let t: PathwayTrace = serde_json::from_str(json).expect("must deserialize pre-ADR-021 trace");
assert!(t.semantic_flags.is_empty());
assert!(t.type_hints_used.is_empty());
assert!(t.bug_fingerprints.is_empty());
assert_eq!(t.reducer_summary, "old trace");
}
#[test]
fn semantic_flag_serializes_as_tagged_enum() {
// Verifying the wire format — the tag field "kind" lets TS/JSON
// clients pattern-match without needing to know variant ordering.
let s = serde_json::to_string(&SemanticFlag::UnitMismatch).unwrap();
assert!(s.contains("UnitMismatch"), "got: {s}");
assert!(s.contains("kind"), "must be tagged enum for TS interop, got: {s}");
}
#[test]
fn bug_fingerprint_roundtrips_through_serde() {
let bp = BugFingerprint {
flag: SemanticFlag::UnitMismatch,
pattern_key: "row_count-file_count".into(),
example: "base_rows = pre_filter_rows - delta_count".into(),
occurrences: 1,
};
let s = serde_json::to_string(&bp).unwrap();
let parsed: BugFingerprint = serde_json::from_str(&s).unwrap();
assert_eq!(parsed, bp);
}
#[test]
fn pathway_vec_differs_when_bug_fingerprint_added() {
// A trace with a known bug history should embed differently
// from a clean trace with the same ladder/KB. This is the
// compounding signal: "same file, different bug history."
let clean = mk_trace("a", true, 5, 5);
let mut flagged = clean.clone();
flagged.semantic_flags.push(SemanticFlag::UnitMismatch);
flagged.bug_fingerprints.push(BugFingerprint {
flag: SemanticFlag::UnitMismatch,
pattern_key: "row_count-file_count".into(),
example: "x = y - z".into(),
occurrences: 1,
});
flagged.pathway_vec = build_pathway_vec(&flagged);
let sim = cosine(&clean.pathway_vec, &flagged.pathway_vec);
assert!(sim < 1.0, "bug history must shift the embedding");
assert!(sim > 0.3, "shared fingerprint should keep them loosely related");
}
#[test]
fn semantic_flag_discriminates_by_variant() {
// Two traces with different flag classes should embed to
// different points. Validates that the index can retrieve
// "files with UnitMismatch history" separately from
// "files with NullableConfusion history."
let mut a = mk_trace("x", true, 5, 5);
a.semantic_flags.push(SemanticFlag::UnitMismatch);
a.pathway_vec = build_pathway_vec(&a);
let mut b = a.clone();
b.semantic_flags = vec![SemanticFlag::NullableConfusion];
b.pathway_vec = build_pathway_vec(&b);
let sim = cosine(&a.pathway_vec, &b.pathway_vec);
assert!(sim < 1.0, "different flag variants → different embeddings");
}
#[tokio::test]
async fn bug_fingerprints_aggregate_by_pattern_key() {
// Three traces on the same narrow fingerprint — two with the
// same bug pattern, one with a different pattern. The aggregator
// must sum occurrences for the shared key and sort by count.
let mem = PathwayMemory::new(mk_store());
let mut t1 = mk_trace("q", true, 0, 0);
t1.bug_fingerprints.push(BugFingerprint {
flag: SemanticFlag::UnitMismatch,
pattern_key: "row-file".into(),
example: "a - b".into(),
occurrences: 2,
});
let mut t2 = mk_trace("q", true, 0, 0);
t2.bug_fingerprints.push(BugFingerprint {
flag: SemanticFlag::UnitMismatch,
pattern_key: "row-file".into(),
example: "x - y".into(),
occurrences: 1,
});
let mut t3 = mk_trace("q", true, 0, 0);
t3.bug_fingerprints.push(BugFingerprint {
flag: SemanticFlag::OffByOne,
pattern_key: "len-1".into(),
example: "items[len]".into(),
occurrences: 1,
});
mem.insert(t1).await.unwrap();
mem.insert(t2).await.unwrap();
mem.insert(t3).await.unwrap();
let fps = mem
.bug_fingerprints_for("scrum_review", "crates/q/src/x.rs", Some("CONVERGING"), 10)
.await;
assert_eq!(fps.len(), 2, "two distinct patterns after aggregation");
// First should be the aggregated UnitMismatch (3 total occurrences)
assert_eq!(fps[0].pattern_key, "row-file");
assert_eq!(fps[0].occurrences, 3);
assert_eq!(fps[1].pattern_key, "len-1");
assert_eq!(fps[1].occurrences, 1);
}
#[tokio::test]
async fn bug_fingerprints_empty_for_unseen_fingerprint() {
let mem = PathwayMemory::new(mk_store());
let fps = mem
.bug_fingerprints_for("scrum_review", "crates/never_seen/x.rs", None, 5)
.await;
assert!(fps.is_empty());
}
#[tokio::test]
async fn bug_fingerprints_respects_limit() {
let mem = PathwayMemory::new(mk_store());
for i in 0..10 {
let mut t = mk_trace("q", true, 0, 0);
t.bug_fingerprints.push(BugFingerprint {
flag: SemanticFlag::OffByOne,
pattern_key: format!("p{i}"),
example: "".into(),
occurrences: (10 - i) as u32, // decreasing so sort matters
});
mem.insert(t).await.unwrap();
}
let fps = mem
.bug_fingerprints_for("scrum_review", "crates/q/src/x.rs", Some("CONVERGING"), 3)
.await;
assert_eq!(fps.len(), 3);
// Highest occurrences first.
assert_eq!(fps[0].pattern_key, "p0");
assert_eq!(fps[0].occurrences, 10);
}
#[tokio::test]
async fn insert_preserves_semantic_fields() {
let mem = PathwayMemory::new(mk_store());
let mut t = mk_trace("a", true, 0, 0);
t.semantic_flags.push(SemanticFlag::UnitMismatch);
t.type_hints_used.push(TypeHint {
source: "arrow_schema".into(),
symbol: "pre_filter_rows".into(),
type_repr: "usize (sum of batch.num_rows)".into(),
});
t.bug_fingerprints.push(BugFingerprint {
flag: SemanticFlag::UnitMismatch,
pattern_key: "row-minus-file".into(),
example: "pre_filter_rows - delta_count".into(),
occurrences: 1,
});
mem.insert(t).await.unwrap();
// Reload from store via a fresh handle — proves persistence
// roundtrips the new fields as well as the old ones.
let mem2 = PathwayMemory::new(mem.store.clone());
mem2.load_from_storage().await.unwrap();
let stats = mem2.stats().await;
assert_eq!(stats.total_pathways, 1);
}
}