Merge pull request 'Phase 45 slice 3: doc_drift check + resolve endpoints' (#5) from phase/45-slice-3 into main
This commit is contained in:
commit
6d7b251607
@ -26,3 +26,4 @@ chrono = { workspace = true }
|
||||
instant-distance = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
|
||||
|
||||
166
crates/vectord/src/doc_drift.rs
Normal file
166
crates/vectord/src/doc_drift.rs
Normal file
@ -0,0 +1,166 @@
|
||||
// Phase 45 slice 3 — context7 bridge client for doc-drift detection.
|
||||
//
|
||||
// Calls the Bun context7 bridge on :3900 (mcp-server/context7_bridge.ts)
|
||||
// which itself wraps context7's public API. For each DocRef on a
|
||||
// playbook, queries /docs/:tool/diff?since=<snippet_hash> and parses
|
||||
// the response.
|
||||
//
|
||||
// Kept deliberately thin: no caching here (bridge caches for 5 min),
|
||||
// no retry logic (transient fail = report as "unknown" drift status,
|
||||
// don't flag). The handler decides whether to flag the playbook based
|
||||
// on whether ANY tool came back drifted=true.
|
||||
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::playbook_memory::DocRef;
|
||||
|
||||
const DEFAULT_BRIDGE_URL: &str = "http://localhost:3900";
|
||||
const CALL_TIMEOUT_SECS: u64 = 15;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DriftCheck {
|
||||
pub tool: String,
|
||||
pub version_seen: String,
|
||||
pub outcome: DriftOutcome,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DriftOutcome {
|
||||
/// Bridge reports drift (current snippet_hash != previous).
|
||||
Drifted { current_snippet_hash: String, source_url: Option<String> },
|
||||
/// Bridge reports no drift — the stored snippet_hash still matches
|
||||
/// the current context7 docs.
|
||||
Unchanged,
|
||||
/// Bridge unreachable, 404 on the tool, or returned unparseable
|
||||
/// data. We deliberately don't flag on these — a down bridge
|
||||
/// shouldn't silently mark every playbook drift-flagged.
|
||||
Unknown { reason: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
struct BridgeDiffResponse {
|
||||
drifted: bool,
|
||||
current_snippet_hash: Option<String>,
|
||||
source_url: Option<String>,
|
||||
}
|
||||
|
||||
pub struct DriftCheckerConfig {
|
||||
pub bridge_url: String,
|
||||
}
|
||||
|
||||
impl Default for DriftCheckerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
bridge_url: std::env::var("LH_BRIDGE_URL")
|
||||
.unwrap_or_else(|_| DEFAULT_BRIDGE_URL.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// For every doc_ref, ask the bridge whether it drifted against the
|
||||
/// recorded snippet_hash. Returns per-tool outcomes.
|
||||
pub async fn check_all_refs(
|
||||
cfg: &DriftCheckerConfig,
|
||||
doc_refs: &[DocRef],
|
||||
) -> Vec<DriftCheck> {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(CALL_TIMEOUT_SECS))
|
||||
.build()
|
||||
.expect("reqwest client build");
|
||||
|
||||
let mut out = Vec::with_capacity(doc_refs.len());
|
||||
for r in doc_refs {
|
||||
let hash = r.snippet_hash.as_deref().unwrap_or("");
|
||||
if hash.is_empty() {
|
||||
// No hash to compare against — can't detect drift. Report
|
||||
// unknown so the caller isn't forced to flag.
|
||||
out.push(DriftCheck {
|
||||
tool: r.tool.clone(),
|
||||
version_seen: r.version_seen.clone(),
|
||||
outcome: DriftOutcome::Unknown {
|
||||
reason: "no snippet_hash recorded on doc_ref".into(),
|
||||
},
|
||||
});
|
||||
continue;
|
||||
}
|
||||
let url = format!(
|
||||
"{}/docs/{}/diff?since={}",
|
||||
cfg.bridge_url.trim_end_matches('/'),
|
||||
urlencoding_minimal(&r.tool),
|
||||
urlencoding_minimal(hash),
|
||||
);
|
||||
let outcome = match client.get(&url).send().await {
|
||||
Err(e) => DriftOutcome::Unknown {
|
||||
reason: format!("bridge unreachable: {}", e),
|
||||
},
|
||||
Ok(resp) => {
|
||||
if resp.status() == reqwest::StatusCode::NOT_FOUND {
|
||||
DriftOutcome::Unknown {
|
||||
reason: format!("bridge 404 — no context7 library for tool '{}'", r.tool),
|
||||
}
|
||||
} else if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
DriftOutcome::Unknown {
|
||||
reason: format!("bridge {}: {}", status, resp.text().await.unwrap_or_default()),
|
||||
}
|
||||
} else {
|
||||
match resp.json::<BridgeDiffResponse>().await {
|
||||
Err(e) => DriftOutcome::Unknown {
|
||||
reason: format!("bridge response parse: {}", e),
|
||||
},
|
||||
Ok(body) => {
|
||||
if body.drifted {
|
||||
DriftOutcome::Drifted {
|
||||
current_snippet_hash: body.current_snippet_hash.unwrap_or_default(),
|
||||
source_url: body.source_url,
|
||||
}
|
||||
} else {
|
||||
DriftOutcome::Unchanged
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
out.push(DriftCheck {
|
||||
tool: r.tool.clone(),
|
||||
version_seen: r.version_seen.clone(),
|
||||
outcome,
|
||||
});
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// Minimal URL path+query encoder for ASCII tool names. We're not
|
||||
/// pulling in the `urlencoding` crate for this single use — tool
|
||||
/// names are short alphanumeric (docker/terraform/react), snippet
|
||||
/// hashes are hex. Handles space + `/` + `?` + `&` defensively so
|
||||
/// unusual tool names don't corrupt the URL.
|
||||
fn urlencoding_minimal(s: &str) -> String {
|
||||
let mut out = String::with_capacity(s.len());
|
||||
for b in s.bytes() {
|
||||
match b {
|
||||
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char),
|
||||
_ => out.push_str(&format!("%{:02X}", b)),
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn urlencoding_handles_ascii_safe_chars_passthrough() {
|
||||
assert_eq!(urlencoding_minimal("docker"), "docker");
|
||||
assert_eq!(urlencoding_minimal("next.js"), "next.js");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn urlencoding_encodes_slash_and_space() {
|
||||
assert_eq!(urlencoding_minimal("foo/bar"), "foo%2Fbar");
|
||||
assert_eq!(urlencoding_minimal("foo bar"), "foo%20bar");
|
||||
}
|
||||
}
|
||||
@ -8,6 +8,7 @@ pub mod hnsw;
|
||||
pub mod index_registry;
|
||||
pub mod jobs;
|
||||
pub mod playbook_memory;
|
||||
pub mod doc_drift;
|
||||
pub mod promotion;
|
||||
pub mod refresh;
|
||||
pub mod store;
|
||||
|
||||
@ -398,6 +398,64 @@ impl PlaybookMemory {
|
||||
Ok(touched)
|
||||
}
|
||||
|
||||
/// Phase 45 slice 3 — stamp `doc_drift_flagged_at` on a playbook.
|
||||
/// Idempotent: if already flagged and not yet reviewed, this is a
|
||||
/// no-op. Callers should check the return value: Ok(true) means we
|
||||
/// made a new flag, Ok(false) means already flagged (unreviewed) or
|
||||
/// playbook not found, Err means persist failed.
|
||||
pub async fn flag_doc_drift(&self, playbook_id: &str) -> Result<bool, String> {
|
||||
let mut touched = false;
|
||||
{
|
||||
let mut state = self.state.write().await;
|
||||
for e in state.entries.iter_mut() {
|
||||
if e.playbook_id == playbook_id {
|
||||
// Skip if already flagged + unreviewed (avoid
|
||||
// churning the timestamp on every re-check).
|
||||
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
|
||||
break;
|
||||
}
|
||||
e.doc_drift_flagged_at = Some(chrono::Utc::now().to_rfc3339());
|
||||
e.doc_drift_reviewed_at = None; // fresh flag clears any stale review
|
||||
touched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if touched {
|
||||
self.persist().await?;
|
||||
}
|
||||
Ok(touched)
|
||||
}
|
||||
|
||||
/// Phase 45 slice 3 — stamp `doc_drift_reviewed_at` on a playbook,
|
||||
/// re-admitting it to boost calculation. Idempotent.
|
||||
pub async fn resolve_doc_drift(&self, playbook_id: &str) -> Result<bool, String> {
|
||||
let mut touched = false;
|
||||
{
|
||||
let mut state = self.state.write().await;
|
||||
for e in state.entries.iter_mut() {
|
||||
if e.playbook_id == playbook_id && e.doc_drift_flagged_at.is_some()
|
||||
&& e.doc_drift_reviewed_at.is_none()
|
||||
{
|
||||
e.doc_drift_reviewed_at = Some(chrono::Utc::now().to_rfc3339());
|
||||
touched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if touched {
|
||||
self.persist().await?;
|
||||
}
|
||||
Ok(touched)
|
||||
}
|
||||
|
||||
/// Read-only: get a playbook entry by id. Used by the drift-check
|
||||
/// handler to read doc_refs without exposing the full state lock.
|
||||
pub async fn get_entry(&self, playbook_id: &str) -> Option<PlaybookEntry> {
|
||||
let state = self.state.read().await;
|
||||
state.entries.iter().find(|e| e.playbook_id == playbook_id).cloned()
|
||||
}
|
||||
|
||||
/// Phase 25 — retire every entry matching (city, state) whose
|
||||
/// schema_fingerprint doesn't match the current one. Entries with
|
||||
/// no fingerprint (legacy) are skipped — caller can use
|
||||
@ -785,6 +843,13 @@ impl PlaybookMemory {
|
||||
.filter(|e| {
|
||||
if e.retired_at.is_some() { return false; }
|
||||
if e.superseded_at.is_some() { return false; }
|
||||
// Phase 45 slice 3 — entries flagged for doc drift and
|
||||
// not yet human-reviewed are excluded from boost. The
|
||||
// flag is the same strength of exclusion as retirement
|
||||
// for this purpose; reviewing (via /resolve) re-admits.
|
||||
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
|
||||
return false;
|
||||
}
|
||||
if let Some(vu) = &e.valid_until {
|
||||
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
|
||||
if now > parsed.with_timezone(&chrono::Utc) { return false; }
|
||||
@ -815,6 +880,13 @@ impl PlaybookMemory {
|
||||
.filter(|e| {
|
||||
if e.retired_at.is_some() { return false; }
|
||||
if e.superseded_at.is_some() { return false; }
|
||||
// Phase 45 slice 3 — same drift exclusion as the
|
||||
// non-geo path above. Keeps the two filter paths
|
||||
// consistent so a flagged entry is invisible to
|
||||
// BOTH geo-targeted and global boost queries.
|
||||
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
|
||||
return false;
|
||||
}
|
||||
if let Some(vu) = &e.valid_until {
|
||||
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
|
||||
if now > parsed.with_timezone(&chrono::Utc) { return false; }
|
||||
@ -1795,6 +1867,86 @@ mod upsert_tests {
|
||||
assert_eq!(snap[0].doc_refs[0].tool, "docker");
|
||||
}
|
||||
|
||||
// ─── Phase 45 slice 3 — doc_drift flag / resolve / boost exclusion ───
|
||||
|
||||
#[tokio::test]
|
||||
async fn flag_doc_drift_stamps_timestamp_and_persists() {
|
||||
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
|
||||
let e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
|
||||
pm.set_entries(vec![e.clone()]).await.unwrap();
|
||||
let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap();
|
||||
assert!(touched, "first flag should return true");
|
||||
|
||||
let fetched = pm.get_entry(&e.playbook_id).await.unwrap();
|
||||
assert!(fetched.doc_drift_flagged_at.is_some(), "flag_doc_drift should stamp the timestamp");
|
||||
assert!(fetched.doc_drift_reviewed_at.is_none(), "reviewed_at starts None on fresh flag");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn flag_doc_drift_is_idempotent_on_already_flagged() {
|
||||
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
|
||||
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
|
||||
e.doc_drift_flagged_at = Some("2026-04-01T00:00:00Z".into());
|
||||
pm.set_entries(vec![e.clone()]).await.unwrap();
|
||||
let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap();
|
||||
assert!(!touched, "already-flagged, unreviewed entry shouldn't re-stamp");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resolve_doc_drift_clears_flag_admission_gate() {
|
||||
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
|
||||
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
|
||||
e.doc_drift_flagged_at = Some("2026-04-21T00:00:00Z".into());
|
||||
pm.set_entries(vec![e.clone()]).await.unwrap();
|
||||
|
||||
let resolved = pm.resolve_doc_drift(&e.playbook_id).await.unwrap();
|
||||
assert!(resolved);
|
||||
let fetched = pm.get_entry(&e.playbook_id).await.unwrap();
|
||||
assert!(fetched.doc_drift_reviewed_at.is_some());
|
||||
|
||||
// Second resolve is a no-op
|
||||
let again = pm.resolve_doc_drift(&e.playbook_id).await.unwrap();
|
||||
assert!(!again);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_excludes_flagged_unreviewed_entries() {
|
||||
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
|
||||
let e_clean = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
|
||||
let mut e_flagged = mk("fill: Welder x1 in Nashville, TN", "2026-04-21", &["Bob"]);
|
||||
e_flagged.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into());
|
||||
e_flagged.doc_drift_reviewed_at = None;
|
||||
pm.set_entries(vec![e_clean, e_flagged]).await.unwrap();
|
||||
|
||||
let boosts = pm
|
||||
.compute_boost_for_filtered_with_role(
|
||||
&[1.0, 0.0, 0.0], 10, 0.5,
|
||||
Some(("Nashville", "TN")), None,
|
||||
).await;
|
||||
|
||||
let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect();
|
||||
assert!(keys.contains(&"Alice".to_string()));
|
||||
assert!(!keys.contains(&"Bob".to_string()), "flagged+unreviewed entry leaked into boost");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn boost_re_admits_resolved_entries() {
|
||||
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
|
||||
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
|
||||
e.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into());
|
||||
e.doc_drift_reviewed_at = Some("2026-04-22T00:01:00Z".into()); // human reviewed
|
||||
pm.set_entries(vec![e]).await.unwrap();
|
||||
|
||||
let boosts = pm
|
||||
.compute_boost_for_filtered_with_role(
|
||||
&[1.0, 0.0, 0.0], 10, 0.5,
|
||||
Some(("Nashville", "TN")), None,
|
||||
).await;
|
||||
|
||||
let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect();
|
||||
assert!(keys.contains(&"Alice".to_string()), "resolved entry should re-enter boost pool");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn update_refreshes_valid_until_when_caller_provides_one() {
|
||||
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
|
||||
|
||||
@ -133,6 +133,9 @@ pub fn router(state: VectorState) -> Router {
|
||||
.route("/playbook_memory/revise", post(revise_playbook_memory))
|
||||
.route("/playbook_memory/history/{id}", get(playbook_memory_history))
|
||||
.route("/playbook_memory/status", get(playbook_memory_status))
|
||||
// Phase 45 slice 3 — doc drift detection + human re-admission.
|
||||
.route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift))
|
||||
.route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
@ -2500,6 +2503,94 @@ async fn retire_playbook_memory(
|
||||
"supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into()))
|
||||
}
|
||||
|
||||
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/check/{id}
|
||||
///
|
||||
/// Iterates the playbook's `doc_refs`, asks the context7 bridge whether
|
||||
/// each one drifted against the recorded snippet_hash. If any tool
|
||||
/// returned `drifted: true`, stamps `doc_drift_flagged_at` on the
|
||||
/// entry — which excludes it from boost (via the filter in
|
||||
/// `compute_boost_for_filtered_with_role`) until a human reviews and
|
||||
/// resolves.
|
||||
///
|
||||
/// Unknown outcomes (bridge down, tool not in context7, no snippet
|
||||
/// hash) are explicitly NOT enough to flag. Only a positive drifted=true
|
||||
/// from the bridge flips the flag.
|
||||
async fn check_doc_drift(
|
||||
State(state): State<VectorState>,
|
||||
axum::extract::Path(id): axum::extract::Path<String>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
|
||||
use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome};
|
||||
|
||||
let entry = state.playbook_memory.get_entry(&id).await
|
||||
.ok_or((StatusCode::NOT_FOUND, format!("playbook not found: {id}")))?;
|
||||
|
||||
if entry.doc_refs.is_empty() {
|
||||
return Ok(Json(serde_json::json!({
|
||||
"playbook_id": id,
|
||||
"checked_tools": [],
|
||||
"drifted": false,
|
||||
"flagged": false,
|
||||
"reason": "entry has no doc_refs — nothing to check",
|
||||
})));
|
||||
}
|
||||
|
||||
let cfg = DriftCheckerConfig::default();
|
||||
let results = check_all_refs(&cfg, &entry.doc_refs).await;
|
||||
|
||||
let per_tool: Vec<serde_json::Value> = results.iter().map(|r| {
|
||||
let (drifted, current, src, reason) = match &r.outcome {
|
||||
DriftOutcome::Drifted { current_snippet_hash, source_url } =>
|
||||
(true, Some(current_snippet_hash.clone()), source_url.clone(), None),
|
||||
DriftOutcome::Unchanged =>
|
||||
(false, None, None, None),
|
||||
DriftOutcome::Unknown { reason } =>
|
||||
(false, None, None, Some(reason.clone())),
|
||||
};
|
||||
serde_json::json!({
|
||||
"tool": r.tool,
|
||||
"version_seen": r.version_seen,
|
||||
"drifted": drifted,
|
||||
"current_snippet_hash": current,
|
||||
"source_url": src,
|
||||
"unknown_reason": reason,
|
||||
})
|
||||
}).collect();
|
||||
|
||||
let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. }));
|
||||
|
||||
let flagged = if any_drifted {
|
||||
state.playbook_memory.flag_doc_drift(&id).await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("flag: {e}")))?
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
Ok(Json(serde_json::json!({
|
||||
"playbook_id": id,
|
||||
"checked_tools": results.iter().map(|r| &r.tool).collect::<Vec<_>>(),
|
||||
"drifted": any_drifted,
|
||||
"flagged": flagged,
|
||||
"per_tool": per_tool,
|
||||
})))
|
||||
}
|
||||
|
||||
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id}
|
||||
///
|
||||
/// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`.
|
||||
/// Idempotent: returns `resolved: false` if nothing changed (entry
|
||||
/// wasn't flagged, already reviewed, or doesn't exist).
|
||||
async fn resolve_doc_drift(
|
||||
State(state): State<VectorState>,
|
||||
axum::extract::Path(id): axum::extract::Path<String>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
|
||||
let resolved = state.playbook_memory.resolve_doc_drift(&id).await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("resolve: {e}")))?;
|
||||
Ok(Json(serde_json::json!({
|
||||
"playbook_id": id,
|
||||
"resolved": resolved,
|
||||
})))
|
||||
}
|
||||
|
||||
/// Phase 27 — request body for `POST /playbook_memory/revise`. Same
|
||||
/// shape as a seed request minus `append` (revise is always
|
||||
/// append-semantics for a specific parent) plus `parent_id`. The new
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user