From 8bacd434654f3169b69ab7b14933bf62c24f13fa Mon Sep 17 00:00:00 2001 From: profit Date: Wed, 22 Apr 2026 14:12:57 -0500 Subject: [PATCH] Phase 45 slice 3: doc_drift check + resolve endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the last open loop of Phase 45. Previously, playbooks could carry doc_refs (slice 1) and the context7 bridge could report drift (slice 2) — but nothing tied them together. An operator had no way to say "check this playbook against its doc sources and flag it if the docs moved." This slice wires that. Ships: - crates/vectord/src/doc_drift.rs — thin context7 bridge client. No cache (bridge has its own 5-min TTL). No retry (transient failure = Unknown outcome, caller decides). - PlaybookMemory::flag_doc_drift(id) — stamps doc_drift_flagged_at idempotently. Once flagged, compute_boost_for_filtered_with_role excludes the entry from both the non-geo and geo-indexed boost paths until resolved. - PlaybookMemory::resolve_doc_drift(id) — human re-admission. Stamps doc_drift_reviewed_at which clears the boost exclusion. - PlaybookMemory::get_entry(id) — new read-only accessor the handler uses to read doc_refs without exposing the state lock. - POST /vectors/playbook_memory/doc_drift/check/{id} - POST /vectors/playbook_memory/doc_drift/resolve/{id} Design call: Unknown outcomes from the bridge (bridge down, tool not in context7, no snippet_hash recorded) are NEVER enough to flag. Only a positive drifted=true from the bridge flips the flag. A down bridge doesn't silently drift-flag every playbook. Tests (5 new, in upsert_tests mod): - flag_doc_drift_stamps_timestamp_and_persists - flag_doc_drift_is_idempotent_on_already_flagged - resolve_doc_drift_clears_flag_admission_gate - boost_excludes_flagged_unreviewed_entries - boost_re_admits_resolved_entries 14/14 upsert tests pass (9 pre-existing + 5 new). Live end-to-end — hybrid fixture on auditor/scaffold (merged to main at b6d69b2) now shows: overall: PASS shipped: [38, 40, 45.1, 45.2, 45.3] placeholder: [—] ✓ Phase 38 /v1/chat 4039ms ✓ Phase 40 Langfuse trace 11ms ✓ Phase 45.1 seed + doc_refs 748ms ✓ Phase 45.2 bridge diff 563ms ✓ Phase 45.3 drift-check endpoint 116ms ← was a 404 before this First time the fixture reports overall=PASS with zero placeholder layers. The honest "not built" signal on layer 5 is now honestly "built and working." Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vectord/Cargo.toml | 1 + crates/vectord/src/doc_drift.rs | 166 ++++++++++++++++++++++++++ crates/vectord/src/lib.rs | 1 + crates/vectord/src/playbook_memory.rs | 152 +++++++++++++++++++++++ crates/vectord/src/service.rs | 91 ++++++++++++++ 5 files changed, 411 insertions(+) create mode 100644 crates/vectord/src/doc_drift.rs diff --git a/crates/vectord/Cargo.toml b/crates/vectord/Cargo.toml index 86b4b36..d50a6e3 100644 --- a/crates/vectord/Cargo.toml +++ b/crates/vectord/Cargo.toml @@ -26,3 +26,4 @@ chrono = { workspace = true } instant-distance = { workspace = true } uuid = { workspace = true } sha2 = { workspace = true } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/crates/vectord/src/doc_drift.rs b/crates/vectord/src/doc_drift.rs new file mode 100644 index 0000000..aa1c310 --- /dev/null +++ b/crates/vectord/src/doc_drift.rs @@ -0,0 +1,166 @@ +// Phase 45 slice 3 — context7 bridge client for doc-drift detection. +// +// Calls the Bun context7 bridge on :3900 (mcp-server/context7_bridge.ts) +// which itself wraps context7's public API. For each DocRef on a +// playbook, queries /docs/:tool/diff?since= and parses +// the response. +// +// Kept deliberately thin: no caching here (bridge caches for 5 min), +// no retry logic (transient fail = report as "unknown" drift status, +// don't flag). The handler decides whether to flag the playbook based +// on whether ANY tool came back drifted=true. + +use serde::Deserialize; +use std::time::Duration; + +use crate::playbook_memory::DocRef; + +const DEFAULT_BRIDGE_URL: &str = "http://localhost:3900"; +const CALL_TIMEOUT_SECS: u64 = 15; + +#[derive(Debug, Clone)] +pub struct DriftCheck { + pub tool: String, + pub version_seen: String, + pub outcome: DriftOutcome, +} + +#[derive(Debug, Clone)] +pub enum DriftOutcome { + /// Bridge reports drift (current snippet_hash != previous). + Drifted { current_snippet_hash: String, source_url: Option }, + /// Bridge reports no drift — the stored snippet_hash still matches + /// the current context7 docs. + Unchanged, + /// Bridge unreachable, 404 on the tool, or returned unparseable + /// data. We deliberately don't flag on these — a down bridge + /// shouldn't silently mark every playbook drift-flagged. + Unknown { reason: String }, +} + +#[derive(Debug, Clone, Deserialize)] +struct BridgeDiffResponse { + drifted: bool, + current_snippet_hash: Option, + source_url: Option, +} + +pub struct DriftCheckerConfig { + pub bridge_url: String, +} + +impl Default for DriftCheckerConfig { + fn default() -> Self { + Self { + bridge_url: std::env::var("LH_BRIDGE_URL") + .unwrap_or_else(|_| DEFAULT_BRIDGE_URL.to_string()), + } + } +} + +/// For every doc_ref, ask the bridge whether it drifted against the +/// recorded snippet_hash. Returns per-tool outcomes. +pub async fn check_all_refs( + cfg: &DriftCheckerConfig, + doc_refs: &[DocRef], +) -> Vec { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(CALL_TIMEOUT_SECS)) + .build() + .expect("reqwest client build"); + + let mut out = Vec::with_capacity(doc_refs.len()); + for r in doc_refs { + let hash = r.snippet_hash.as_deref().unwrap_or(""); + if hash.is_empty() { + // No hash to compare against — can't detect drift. Report + // unknown so the caller isn't forced to flag. + out.push(DriftCheck { + tool: r.tool.clone(), + version_seen: r.version_seen.clone(), + outcome: DriftOutcome::Unknown { + reason: "no snippet_hash recorded on doc_ref".into(), + }, + }); + continue; + } + let url = format!( + "{}/docs/{}/diff?since={}", + cfg.bridge_url.trim_end_matches('/'), + urlencoding_minimal(&r.tool), + urlencoding_minimal(hash), + ); + let outcome = match client.get(&url).send().await { + Err(e) => DriftOutcome::Unknown { + reason: format!("bridge unreachable: {}", e), + }, + Ok(resp) => { + if resp.status() == reqwest::StatusCode::NOT_FOUND { + DriftOutcome::Unknown { + reason: format!("bridge 404 — no context7 library for tool '{}'", r.tool), + } + } else if !resp.status().is_success() { + let status = resp.status(); + DriftOutcome::Unknown { + reason: format!("bridge {}: {}", status, resp.text().await.unwrap_or_default()), + } + } else { + match resp.json::().await { + Err(e) => DriftOutcome::Unknown { + reason: format!("bridge response parse: {}", e), + }, + Ok(body) => { + if body.drifted { + DriftOutcome::Drifted { + current_snippet_hash: body.current_snippet_hash.unwrap_or_default(), + source_url: body.source_url, + } + } else { + DriftOutcome::Unchanged + } + } + } + } + } + }; + out.push(DriftCheck { + tool: r.tool.clone(), + version_seen: r.version_seen.clone(), + outcome, + }); + } + out +} + +/// Minimal URL path+query encoder for ASCII tool names. We're not +/// pulling in the `urlencoding` crate for this single use — tool +/// names are short alphanumeric (docker/terraform/react), snippet +/// hashes are hex. Handles space + `/` + `?` + `&` defensively so +/// unusual tool names don't corrupt the URL. +fn urlencoding_minimal(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for b in s.bytes() { + match b { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char), + _ => out.push_str(&format!("%{:02X}", b)), + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn urlencoding_handles_ascii_safe_chars_passthrough() { + assert_eq!(urlencoding_minimal("docker"), "docker"); + assert_eq!(urlencoding_minimal("next.js"), "next.js"); + } + + #[test] + fn urlencoding_encodes_slash_and_space() { + assert_eq!(urlencoding_minimal("foo/bar"), "foo%2Fbar"); + assert_eq!(urlencoding_minimal("foo bar"), "foo%20bar"); + } +} diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index 0f0be40..c937fc5 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -8,6 +8,7 @@ pub mod hnsw; pub mod index_registry; pub mod jobs; pub mod playbook_memory; +pub mod doc_drift; pub mod promotion; pub mod refresh; pub mod store; diff --git a/crates/vectord/src/playbook_memory.rs b/crates/vectord/src/playbook_memory.rs index b74b304..b8c4fd9 100644 --- a/crates/vectord/src/playbook_memory.rs +++ b/crates/vectord/src/playbook_memory.rs @@ -398,6 +398,64 @@ impl PlaybookMemory { Ok(touched) } + /// Phase 45 slice 3 — stamp `doc_drift_flagged_at` on a playbook. + /// Idempotent: if already flagged and not yet reviewed, this is a + /// no-op. Callers should check the return value: Ok(true) means we + /// made a new flag, Ok(false) means already flagged (unreviewed) or + /// playbook not found, Err means persist failed. + pub async fn flag_doc_drift(&self, playbook_id: &str) -> Result { + let mut touched = false; + { + let mut state = self.state.write().await; + for e in state.entries.iter_mut() { + if e.playbook_id == playbook_id { + // Skip if already flagged + unreviewed (avoid + // churning the timestamp on every re-check). + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + break; + } + e.doc_drift_flagged_at = Some(chrono::Utc::now().to_rfc3339()); + e.doc_drift_reviewed_at = None; // fresh flag clears any stale review + touched = true; + break; + } + } + } + if touched { + self.persist().await?; + } + Ok(touched) + } + + /// Phase 45 slice 3 — stamp `doc_drift_reviewed_at` on a playbook, + /// re-admitting it to boost calculation. Idempotent. + pub async fn resolve_doc_drift(&self, playbook_id: &str) -> Result { + let mut touched = false; + { + let mut state = self.state.write().await; + for e in state.entries.iter_mut() { + if e.playbook_id == playbook_id && e.doc_drift_flagged_at.is_some() + && e.doc_drift_reviewed_at.is_none() + { + e.doc_drift_reviewed_at = Some(chrono::Utc::now().to_rfc3339()); + touched = true; + break; + } + } + } + if touched { + self.persist().await?; + } + Ok(touched) + } + + /// Read-only: get a playbook entry by id. Used by the drift-check + /// handler to read doc_refs without exposing the full state lock. + pub async fn get_entry(&self, playbook_id: &str) -> Option { + let state = self.state.read().await; + state.entries.iter().find(|e| e.playbook_id == playbook_id).cloned() + } + /// Phase 25 — retire every entry matching (city, state) whose /// schema_fingerprint doesn't match the current one. Entries with /// no fingerprint (legacy) are skipped — caller can use @@ -785,6 +843,13 @@ impl PlaybookMemory { .filter(|e| { if e.retired_at.is_some() { return false; } if e.superseded_at.is_some() { return false; } + // Phase 45 slice 3 — entries flagged for doc drift and + // not yet human-reviewed are excluded from boost. The + // flag is the same strength of exclusion as retirement + // for this purpose; reviewing (via /resolve) re-admits. + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + return false; + } if let Some(vu) = &e.valid_until { if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { if now > parsed.with_timezone(&chrono::Utc) { return false; } @@ -815,6 +880,13 @@ impl PlaybookMemory { .filter(|e| { if e.retired_at.is_some() { return false; } if e.superseded_at.is_some() { return false; } + // Phase 45 slice 3 — same drift exclusion as the + // non-geo path above. Keeps the two filter paths + // consistent so a flagged entry is invisible to + // BOTH geo-targeted and global boost queries. + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + return false; + } if let Some(vu) = &e.valid_until { if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { if now > parsed.with_timezone(&chrono::Utc) { return false; } @@ -1795,6 +1867,86 @@ mod upsert_tests { assert_eq!(snap[0].doc_refs[0].tool, "docker"); } + // ─── Phase 45 slice 3 — doc_drift flag / resolve / boost exclusion ─── + + #[tokio::test] + async fn flag_doc_drift_stamps_timestamp_and_persists() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + pm.set_entries(vec![e.clone()]).await.unwrap(); + let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap(); + assert!(touched, "first flag should return true"); + + let fetched = pm.get_entry(&e.playbook_id).await.unwrap(); + assert!(fetched.doc_drift_flagged_at.is_some(), "flag_doc_drift should stamp the timestamp"); + assert!(fetched.doc_drift_reviewed_at.is_none(), "reviewed_at starts None on fresh flag"); + } + + #[tokio::test] + async fn flag_doc_drift_is_idempotent_on_already_flagged() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + e.doc_drift_flagged_at = Some("2026-04-01T00:00:00Z".into()); + pm.set_entries(vec![e.clone()]).await.unwrap(); + let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap(); + assert!(!touched, "already-flagged, unreviewed entry shouldn't re-stamp"); + } + + #[tokio::test] + async fn resolve_doc_drift_clears_flag_admission_gate() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + e.doc_drift_flagged_at = Some("2026-04-21T00:00:00Z".into()); + pm.set_entries(vec![e.clone()]).await.unwrap(); + + let resolved = pm.resolve_doc_drift(&e.playbook_id).await.unwrap(); + assert!(resolved); + let fetched = pm.get_entry(&e.playbook_id).await.unwrap(); + assert!(fetched.doc_drift_reviewed_at.is_some()); + + // Second resolve is a no-op + let again = pm.resolve_doc_drift(&e.playbook_id).await.unwrap(); + assert!(!again); + } + + #[tokio::test] + async fn boost_excludes_flagged_unreviewed_entries() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e_clean = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + let mut e_flagged = mk("fill: Welder x1 in Nashville, TN", "2026-04-21", &["Bob"]); + e_flagged.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into()); + e_flagged.doc_drift_reviewed_at = None; + pm.set_entries(vec![e_clean, e_flagged]).await.unwrap(); + + let boosts = pm + .compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 10, 0.5, + Some(("Nashville", "TN")), None, + ).await; + + let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect(); + assert!(keys.contains(&"Alice".to_string())); + assert!(!keys.contains(&"Bob".to_string()), "flagged+unreviewed entry leaked into boost"); + } + + #[tokio::test] + async fn boost_re_admits_resolved_entries() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + e.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into()); + e.doc_drift_reviewed_at = Some("2026-04-22T00:01:00Z".into()); // human reviewed + pm.set_entries(vec![e]).await.unwrap(); + + let boosts = pm + .compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 10, 0.5, + Some(("Nashville", "TN")), None, + ).await; + + let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect(); + assert!(keys.contains(&"Alice".to_string()), "resolved entry should re-enter boost pool"); + } + #[tokio::test] async fn update_refreshes_valid_until_when_caller_provides_one() { let pm = PlaybookMemory::new(Arc::new(InMemory::new())); diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index a68ade3..99d971f 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -133,6 +133,9 @@ pub fn router(state: VectorState) -> Router { .route("/playbook_memory/revise", post(revise_playbook_memory)) .route("/playbook_memory/history/{id}", get(playbook_memory_history)) .route("/playbook_memory/status", get(playbook_memory_status)) + // Phase 45 slice 3 — doc drift detection + human re-admission. + .route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift)) + .route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift)) .with_state(state) } @@ -2500,6 +2503,94 @@ async fn retire_playbook_memory( "supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into())) } +/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/check/{id} +/// +/// Iterates the playbook's `doc_refs`, asks the context7 bridge whether +/// each one drifted against the recorded snippet_hash. If any tool +/// returned `drifted: true`, stamps `doc_drift_flagged_at` on the +/// entry — which excludes it from boost (via the filter in +/// `compute_boost_for_filtered_with_role`) until a human reviews and +/// resolves. +/// +/// Unknown outcomes (bridge down, tool not in context7, no snippet +/// hash) are explicitly NOT enough to flag. Only a positive drifted=true +/// from the bridge flips the flag. +async fn check_doc_drift( + State(state): State, + axum::extract::Path(id): axum::extract::Path, +) -> Result, (StatusCode, String)> { + use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome}; + + let entry = state.playbook_memory.get_entry(&id).await + .ok_or((StatusCode::NOT_FOUND, format!("playbook not found: {id}")))?; + + if entry.doc_refs.is_empty() { + return Ok(Json(serde_json::json!({ + "playbook_id": id, + "checked_tools": [], + "drifted": false, + "flagged": false, + "reason": "entry has no doc_refs — nothing to check", + }))); + } + + let cfg = DriftCheckerConfig::default(); + let results = check_all_refs(&cfg, &entry.doc_refs).await; + + let per_tool: Vec = results.iter().map(|r| { + let (drifted, current, src, reason) = match &r.outcome { + DriftOutcome::Drifted { current_snippet_hash, source_url } => + (true, Some(current_snippet_hash.clone()), source_url.clone(), None), + DriftOutcome::Unchanged => + (false, None, None, None), + DriftOutcome::Unknown { reason } => + (false, None, None, Some(reason.clone())), + }; + serde_json::json!({ + "tool": r.tool, + "version_seen": r.version_seen, + "drifted": drifted, + "current_snippet_hash": current, + "source_url": src, + "unknown_reason": reason, + }) + }).collect(); + + let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. })); + + let flagged = if any_drifted { + state.playbook_memory.flag_doc_drift(&id).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("flag: {e}")))? + } else { + false + }; + + Ok(Json(serde_json::json!({ + "playbook_id": id, + "checked_tools": results.iter().map(|r| &r.tool).collect::>(), + "drifted": any_drifted, + "flagged": flagged, + "per_tool": per_tool, + }))) +} + +/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id} +/// +/// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`. +/// Idempotent: returns `resolved: false` if nothing changed (entry +/// wasn't flagged, already reviewed, or doesn't exist). +async fn resolve_doc_drift( + State(state): State, + axum::extract::Path(id): axum::extract::Path, +) -> Result, (StatusCode, String)> { + let resolved = state.playbook_memory.resolve_doc_drift(&id).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("resolve: {e}")))?; + Ok(Json(serde_json::json!({ + "playbook_id": id, + "resolved": resolved, + }))) +} + /// Phase 27 — request body for `POST /playbook_memory/revise`. Same /// shape as a seed request minus `append` (revise is always /// append-semantics for a specific parent) plus `parent_id`. The new