diff --git a/crates/vectord/Cargo.toml b/crates/vectord/Cargo.toml index 86b4b36..d50a6e3 100644 --- a/crates/vectord/Cargo.toml +++ b/crates/vectord/Cargo.toml @@ -26,3 +26,4 @@ chrono = { workspace = true } instant-distance = { workspace = true } uuid = { workspace = true } sha2 = { workspace = true } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/crates/vectord/src/doc_drift.rs b/crates/vectord/src/doc_drift.rs new file mode 100644 index 0000000..aa1c310 --- /dev/null +++ b/crates/vectord/src/doc_drift.rs @@ -0,0 +1,166 @@ +// Phase 45 slice 3 — context7 bridge client for doc-drift detection. +// +// Calls the Bun context7 bridge on :3900 (mcp-server/context7_bridge.ts) +// which itself wraps context7's public API. For each DocRef on a +// playbook, queries /docs/:tool/diff?since= and parses +// the response. +// +// Kept deliberately thin: no caching here (bridge caches for 5 min), +// no retry logic (transient fail = report as "unknown" drift status, +// don't flag). The handler decides whether to flag the playbook based +// on whether ANY tool came back drifted=true. + +use serde::Deserialize; +use std::time::Duration; + +use crate::playbook_memory::DocRef; + +const DEFAULT_BRIDGE_URL: &str = "http://localhost:3900"; +const CALL_TIMEOUT_SECS: u64 = 15; + +#[derive(Debug, Clone)] +pub struct DriftCheck { + pub tool: String, + pub version_seen: String, + pub outcome: DriftOutcome, +} + +#[derive(Debug, Clone)] +pub enum DriftOutcome { + /// Bridge reports drift (current snippet_hash != previous). + Drifted { current_snippet_hash: String, source_url: Option }, + /// Bridge reports no drift — the stored snippet_hash still matches + /// the current context7 docs. + Unchanged, + /// Bridge unreachable, 404 on the tool, or returned unparseable + /// data. We deliberately don't flag on these — a down bridge + /// shouldn't silently mark every playbook drift-flagged. + Unknown { reason: String }, +} + +#[derive(Debug, Clone, Deserialize)] +struct BridgeDiffResponse { + drifted: bool, + current_snippet_hash: Option, + source_url: Option, +} + +pub struct DriftCheckerConfig { + pub bridge_url: String, +} + +impl Default for DriftCheckerConfig { + fn default() -> Self { + Self { + bridge_url: std::env::var("LH_BRIDGE_URL") + .unwrap_or_else(|_| DEFAULT_BRIDGE_URL.to_string()), + } + } +} + +/// For every doc_ref, ask the bridge whether it drifted against the +/// recorded snippet_hash. Returns per-tool outcomes. +pub async fn check_all_refs( + cfg: &DriftCheckerConfig, + doc_refs: &[DocRef], +) -> Vec { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(CALL_TIMEOUT_SECS)) + .build() + .expect("reqwest client build"); + + let mut out = Vec::with_capacity(doc_refs.len()); + for r in doc_refs { + let hash = r.snippet_hash.as_deref().unwrap_or(""); + if hash.is_empty() { + // No hash to compare against — can't detect drift. Report + // unknown so the caller isn't forced to flag. + out.push(DriftCheck { + tool: r.tool.clone(), + version_seen: r.version_seen.clone(), + outcome: DriftOutcome::Unknown { + reason: "no snippet_hash recorded on doc_ref".into(), + }, + }); + continue; + } + let url = format!( + "{}/docs/{}/diff?since={}", + cfg.bridge_url.trim_end_matches('/'), + urlencoding_minimal(&r.tool), + urlencoding_minimal(hash), + ); + let outcome = match client.get(&url).send().await { + Err(e) => DriftOutcome::Unknown { + reason: format!("bridge unreachable: {}", e), + }, + Ok(resp) => { + if resp.status() == reqwest::StatusCode::NOT_FOUND { + DriftOutcome::Unknown { + reason: format!("bridge 404 — no context7 library for tool '{}'", r.tool), + } + } else if !resp.status().is_success() { + let status = resp.status(); + DriftOutcome::Unknown { + reason: format!("bridge {}: {}", status, resp.text().await.unwrap_or_default()), + } + } else { + match resp.json::().await { + Err(e) => DriftOutcome::Unknown { + reason: format!("bridge response parse: {}", e), + }, + Ok(body) => { + if body.drifted { + DriftOutcome::Drifted { + current_snippet_hash: body.current_snippet_hash.unwrap_or_default(), + source_url: body.source_url, + } + } else { + DriftOutcome::Unchanged + } + } + } + } + } + }; + out.push(DriftCheck { + tool: r.tool.clone(), + version_seen: r.version_seen.clone(), + outcome, + }); + } + out +} + +/// Minimal URL path+query encoder for ASCII tool names. We're not +/// pulling in the `urlencoding` crate for this single use — tool +/// names are short alphanumeric (docker/terraform/react), snippet +/// hashes are hex. Handles space + `/` + `?` + `&` defensively so +/// unusual tool names don't corrupt the URL. +fn urlencoding_minimal(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for b in s.bytes() { + match b { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char), + _ => out.push_str(&format!("%{:02X}", b)), + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn urlencoding_handles_ascii_safe_chars_passthrough() { + assert_eq!(urlencoding_minimal("docker"), "docker"); + assert_eq!(urlencoding_minimal("next.js"), "next.js"); + } + + #[test] + fn urlencoding_encodes_slash_and_space() { + assert_eq!(urlencoding_minimal("foo/bar"), "foo%2Fbar"); + assert_eq!(urlencoding_minimal("foo bar"), "foo%20bar"); + } +} diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index 0f0be40..c937fc5 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -8,6 +8,7 @@ pub mod hnsw; pub mod index_registry; pub mod jobs; pub mod playbook_memory; +pub mod doc_drift; pub mod promotion; pub mod refresh; pub mod store; diff --git a/crates/vectord/src/playbook_memory.rs b/crates/vectord/src/playbook_memory.rs index b74b304..b8c4fd9 100644 --- a/crates/vectord/src/playbook_memory.rs +++ b/crates/vectord/src/playbook_memory.rs @@ -398,6 +398,64 @@ impl PlaybookMemory { Ok(touched) } + /// Phase 45 slice 3 — stamp `doc_drift_flagged_at` on a playbook. + /// Idempotent: if already flagged and not yet reviewed, this is a + /// no-op. Callers should check the return value: Ok(true) means we + /// made a new flag, Ok(false) means already flagged (unreviewed) or + /// playbook not found, Err means persist failed. + pub async fn flag_doc_drift(&self, playbook_id: &str) -> Result { + let mut touched = false; + { + let mut state = self.state.write().await; + for e in state.entries.iter_mut() { + if e.playbook_id == playbook_id { + // Skip if already flagged + unreviewed (avoid + // churning the timestamp on every re-check). + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + break; + } + e.doc_drift_flagged_at = Some(chrono::Utc::now().to_rfc3339()); + e.doc_drift_reviewed_at = None; // fresh flag clears any stale review + touched = true; + break; + } + } + } + if touched { + self.persist().await?; + } + Ok(touched) + } + + /// Phase 45 slice 3 — stamp `doc_drift_reviewed_at` on a playbook, + /// re-admitting it to boost calculation. Idempotent. + pub async fn resolve_doc_drift(&self, playbook_id: &str) -> Result { + let mut touched = false; + { + let mut state = self.state.write().await; + for e in state.entries.iter_mut() { + if e.playbook_id == playbook_id && e.doc_drift_flagged_at.is_some() + && e.doc_drift_reviewed_at.is_none() + { + e.doc_drift_reviewed_at = Some(chrono::Utc::now().to_rfc3339()); + touched = true; + break; + } + } + } + if touched { + self.persist().await?; + } + Ok(touched) + } + + /// Read-only: get a playbook entry by id. Used by the drift-check + /// handler to read doc_refs without exposing the full state lock. + pub async fn get_entry(&self, playbook_id: &str) -> Option { + let state = self.state.read().await; + state.entries.iter().find(|e| e.playbook_id == playbook_id).cloned() + } + /// Phase 25 — retire every entry matching (city, state) whose /// schema_fingerprint doesn't match the current one. Entries with /// no fingerprint (legacy) are skipped — caller can use @@ -785,6 +843,13 @@ impl PlaybookMemory { .filter(|e| { if e.retired_at.is_some() { return false; } if e.superseded_at.is_some() { return false; } + // Phase 45 slice 3 — entries flagged for doc drift and + // not yet human-reviewed are excluded from boost. The + // flag is the same strength of exclusion as retirement + // for this purpose; reviewing (via /resolve) re-admits. + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + return false; + } if let Some(vu) = &e.valid_until { if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { if now > parsed.with_timezone(&chrono::Utc) { return false; } @@ -815,6 +880,13 @@ impl PlaybookMemory { .filter(|e| { if e.retired_at.is_some() { return false; } if e.superseded_at.is_some() { return false; } + // Phase 45 slice 3 — same drift exclusion as the + // non-geo path above. Keeps the two filter paths + // consistent so a flagged entry is invisible to + // BOTH geo-targeted and global boost queries. + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + return false; + } if let Some(vu) = &e.valid_until { if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { if now > parsed.with_timezone(&chrono::Utc) { return false; } @@ -1795,6 +1867,86 @@ mod upsert_tests { assert_eq!(snap[0].doc_refs[0].tool, "docker"); } + // ─── Phase 45 slice 3 — doc_drift flag / resolve / boost exclusion ─── + + #[tokio::test] + async fn flag_doc_drift_stamps_timestamp_and_persists() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + pm.set_entries(vec![e.clone()]).await.unwrap(); + let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap(); + assert!(touched, "first flag should return true"); + + let fetched = pm.get_entry(&e.playbook_id).await.unwrap(); + assert!(fetched.doc_drift_flagged_at.is_some(), "flag_doc_drift should stamp the timestamp"); + assert!(fetched.doc_drift_reviewed_at.is_none(), "reviewed_at starts None on fresh flag"); + } + + #[tokio::test] + async fn flag_doc_drift_is_idempotent_on_already_flagged() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + e.doc_drift_flagged_at = Some("2026-04-01T00:00:00Z".into()); + pm.set_entries(vec![e.clone()]).await.unwrap(); + let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap(); + assert!(!touched, "already-flagged, unreviewed entry shouldn't re-stamp"); + } + + #[tokio::test] + async fn resolve_doc_drift_clears_flag_admission_gate() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + e.doc_drift_flagged_at = Some("2026-04-21T00:00:00Z".into()); + pm.set_entries(vec![e.clone()]).await.unwrap(); + + let resolved = pm.resolve_doc_drift(&e.playbook_id).await.unwrap(); + assert!(resolved); + let fetched = pm.get_entry(&e.playbook_id).await.unwrap(); + assert!(fetched.doc_drift_reviewed_at.is_some()); + + // Second resolve is a no-op + let again = pm.resolve_doc_drift(&e.playbook_id).await.unwrap(); + assert!(!again); + } + + #[tokio::test] + async fn boost_excludes_flagged_unreviewed_entries() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let e_clean = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + let mut e_flagged = mk("fill: Welder x1 in Nashville, TN", "2026-04-21", &["Bob"]); + e_flagged.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into()); + e_flagged.doc_drift_reviewed_at = None; + pm.set_entries(vec![e_clean, e_flagged]).await.unwrap(); + + let boosts = pm + .compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 10, 0.5, + Some(("Nashville", "TN")), None, + ).await; + + let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect(); + assert!(keys.contains(&"Alice".to_string())); + assert!(!keys.contains(&"Bob".to_string()), "flagged+unreviewed entry leaked into boost"); + } + + #[tokio::test] + async fn boost_re_admits_resolved_entries() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]); + e.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into()); + e.doc_drift_reviewed_at = Some("2026-04-22T00:01:00Z".into()); // human reviewed + pm.set_entries(vec![e]).await.unwrap(); + + let boosts = pm + .compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 10, 0.5, + Some(("Nashville", "TN")), None, + ).await; + + let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect(); + assert!(keys.contains(&"Alice".to_string()), "resolved entry should re-enter boost pool"); + } + #[tokio::test] async fn update_refreshes_valid_until_when_caller_provides_one() { let pm = PlaybookMemory::new(Arc::new(InMemory::new())); diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index a68ade3..99d971f 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -133,6 +133,9 @@ pub fn router(state: VectorState) -> Router { .route("/playbook_memory/revise", post(revise_playbook_memory)) .route("/playbook_memory/history/{id}", get(playbook_memory_history)) .route("/playbook_memory/status", get(playbook_memory_status)) + // Phase 45 slice 3 — doc drift detection + human re-admission. + .route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift)) + .route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift)) .with_state(state) } @@ -2500,6 +2503,94 @@ async fn retire_playbook_memory( "supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into())) } +/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/check/{id} +/// +/// Iterates the playbook's `doc_refs`, asks the context7 bridge whether +/// each one drifted against the recorded snippet_hash. If any tool +/// returned `drifted: true`, stamps `doc_drift_flagged_at` on the +/// entry — which excludes it from boost (via the filter in +/// `compute_boost_for_filtered_with_role`) until a human reviews and +/// resolves. +/// +/// Unknown outcomes (bridge down, tool not in context7, no snippet +/// hash) are explicitly NOT enough to flag. Only a positive drifted=true +/// from the bridge flips the flag. +async fn check_doc_drift( + State(state): State, + axum::extract::Path(id): axum::extract::Path, +) -> Result, (StatusCode, String)> { + use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome}; + + let entry = state.playbook_memory.get_entry(&id).await + .ok_or((StatusCode::NOT_FOUND, format!("playbook not found: {id}")))?; + + if entry.doc_refs.is_empty() { + return Ok(Json(serde_json::json!({ + "playbook_id": id, + "checked_tools": [], + "drifted": false, + "flagged": false, + "reason": "entry has no doc_refs — nothing to check", + }))); + } + + let cfg = DriftCheckerConfig::default(); + let results = check_all_refs(&cfg, &entry.doc_refs).await; + + let per_tool: Vec = results.iter().map(|r| { + let (drifted, current, src, reason) = match &r.outcome { + DriftOutcome::Drifted { current_snippet_hash, source_url } => + (true, Some(current_snippet_hash.clone()), source_url.clone(), None), + DriftOutcome::Unchanged => + (false, None, None, None), + DriftOutcome::Unknown { reason } => + (false, None, None, Some(reason.clone())), + }; + serde_json::json!({ + "tool": r.tool, + "version_seen": r.version_seen, + "drifted": drifted, + "current_snippet_hash": current, + "source_url": src, + "unknown_reason": reason, + }) + }).collect(); + + let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. })); + + let flagged = if any_drifted { + state.playbook_memory.flag_doc_drift(&id).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("flag: {e}")))? + } else { + false + }; + + Ok(Json(serde_json::json!({ + "playbook_id": id, + "checked_tools": results.iter().map(|r| &r.tool).collect::>(), + "drifted": any_drifted, + "flagged": flagged, + "per_tool": per_tool, + }))) +} + +/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id} +/// +/// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`. +/// Idempotent: returns `resolved: false` if nothing changed (entry +/// wasn't flagged, already reviewed, or doesn't exist). +async fn resolve_doc_drift( + State(state): State, + axum::extract::Path(id): axum::extract::Path, +) -> Result, (StatusCode, String)> { + let resolved = state.playbook_memory.resolve_doc_drift(&id).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("resolve: {e}")))?; + Ok(Json(serde_json::json!({ + "playbook_id": id, + "resolved": resolved, + }))) +} + /// Phase 27 — request body for `POST /playbook_memory/revise`. Same /// shape as a seed request minus `append` (revise is always /// append-semantics for a specific parent) plus `parent_id`. The new