Phase 45 slice 3: doc_drift check + resolve endpoints #5

Merged
profit merged 1 commits from phase/45-slice-3 into main 2026-04-22 19:14:13 +00:00
5 changed files with 411 additions and 0 deletions

View File

@ -26,3 +26,4 @@ chrono = { workspace = true }
instant-distance = { workspace = true }
uuid = { workspace = true }
sha2 = { workspace = true }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }

View File

@ -0,0 +1,166 @@
// Phase 45 slice 3 — context7 bridge client for doc-drift detection.
//
// Calls the Bun context7 bridge on :3900 (mcp-server/context7_bridge.ts)
// which itself wraps context7's public API. For each DocRef on a
// playbook, queries /docs/:tool/diff?since=<snippet_hash> and parses
// the response.
//
// Kept deliberately thin: no caching here (bridge caches for 5 min),
// no retry logic (transient fail = report as "unknown" drift status,
// don't flag). The handler decides whether to flag the playbook based
// on whether ANY tool came back drifted=true.
use serde::Deserialize;
use std::time::Duration;
use crate::playbook_memory::DocRef;
const DEFAULT_BRIDGE_URL: &str = "http://localhost:3900";
const CALL_TIMEOUT_SECS: u64 = 15;
#[derive(Debug, Clone)]
pub struct DriftCheck {
pub tool: String,
pub version_seen: String,
pub outcome: DriftOutcome,
}
#[derive(Debug, Clone)]
pub enum DriftOutcome {
/// Bridge reports drift (current snippet_hash != previous).
Drifted { current_snippet_hash: String, source_url: Option<String> },
/// Bridge reports no drift — the stored snippet_hash still matches
/// the current context7 docs.
Unchanged,
/// Bridge unreachable, 404 on the tool, or returned unparseable
/// data. We deliberately don't flag on these — a down bridge
/// shouldn't silently mark every playbook drift-flagged.
Unknown { reason: String },
}
#[derive(Debug, Clone, Deserialize)]
struct BridgeDiffResponse {
drifted: bool,
current_snippet_hash: Option<String>,
source_url: Option<String>,
}
pub struct DriftCheckerConfig {
pub bridge_url: String,
}
impl Default for DriftCheckerConfig {
fn default() -> Self {
Self {
bridge_url: std::env::var("LH_BRIDGE_URL")
.unwrap_or_else(|_| DEFAULT_BRIDGE_URL.to_string()),
}
}
}
/// For every doc_ref, ask the bridge whether it drifted against the
/// recorded snippet_hash. Returns per-tool outcomes.
pub async fn check_all_refs(
cfg: &DriftCheckerConfig,
doc_refs: &[DocRef],
) -> Vec<DriftCheck> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(CALL_TIMEOUT_SECS))
.build()
.expect("reqwest client build");
let mut out = Vec::with_capacity(doc_refs.len());
for r in doc_refs {
let hash = r.snippet_hash.as_deref().unwrap_or("");
if hash.is_empty() {
// No hash to compare against — can't detect drift. Report
// unknown so the caller isn't forced to flag.
out.push(DriftCheck {
tool: r.tool.clone(),
version_seen: r.version_seen.clone(),
outcome: DriftOutcome::Unknown {
reason: "no snippet_hash recorded on doc_ref".into(),
},
});
continue;
}
let url = format!(
"{}/docs/{}/diff?since={}",
cfg.bridge_url.trim_end_matches('/'),
urlencoding_minimal(&r.tool),
urlencoding_minimal(hash),
);
let outcome = match client.get(&url).send().await {
Err(e) => DriftOutcome::Unknown {
reason: format!("bridge unreachable: {}", e),
},
Ok(resp) => {
if resp.status() == reqwest::StatusCode::NOT_FOUND {
DriftOutcome::Unknown {
reason: format!("bridge 404 — no context7 library for tool '{}'", r.tool),
}
} else if !resp.status().is_success() {
let status = resp.status();
DriftOutcome::Unknown {
reason: format!("bridge {}: {}", status, resp.text().await.unwrap_or_default()),
}
} else {
match resp.json::<BridgeDiffResponse>().await {
Err(e) => DriftOutcome::Unknown {
reason: format!("bridge response parse: {}", e),
},
Ok(body) => {
if body.drifted {
DriftOutcome::Drifted {
current_snippet_hash: body.current_snippet_hash.unwrap_or_default(),
source_url: body.source_url,
}
} else {
DriftOutcome::Unchanged
}
}
}
}
}
};
out.push(DriftCheck {
tool: r.tool.clone(),
version_seen: r.version_seen.clone(),
outcome,
});
}
out
}
/// Minimal URL path+query encoder for ASCII tool names. We're not
/// pulling in the `urlencoding` crate for this single use — tool
/// names are short alphanumeric (docker/terraform/react), snippet
/// hashes are hex. Handles space + `/` + `?` + `&` defensively so
/// unusual tool names don't corrupt the URL.
fn urlencoding_minimal(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => out.push(b as char),
_ => out.push_str(&format!("%{:02X}", b)),
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn urlencoding_handles_ascii_safe_chars_passthrough() {
assert_eq!(urlencoding_minimal("docker"), "docker");
assert_eq!(urlencoding_minimal("next.js"), "next.js");
}
#[test]
fn urlencoding_encodes_slash_and_space() {
assert_eq!(urlencoding_minimal("foo/bar"), "foo%2Fbar");
assert_eq!(urlencoding_minimal("foo bar"), "foo%20bar");
}
}

View File

@ -8,6 +8,7 @@ pub mod hnsw;
pub mod index_registry;
pub mod jobs;
pub mod playbook_memory;
pub mod doc_drift;
pub mod promotion;
pub mod refresh;
pub mod store;

View File

@ -398,6 +398,64 @@ impl PlaybookMemory {
Ok(touched)
}
/// Phase 45 slice 3 — stamp `doc_drift_flagged_at` on a playbook.
/// Idempotent: if already flagged and not yet reviewed, this is a
/// no-op. Callers should check the return value: Ok(true) means we
/// made a new flag, Ok(false) means already flagged (unreviewed) or
/// playbook not found, Err means persist failed.
pub async fn flag_doc_drift(&self, playbook_id: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id {
// Skip if already flagged + unreviewed (avoid
// churning the timestamp on every re-check).
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
break;
}
e.doc_drift_flagged_at = Some(chrono::Utc::now().to_rfc3339());
e.doc_drift_reviewed_at = None; // fresh flag clears any stale review
touched = true;
break;
}
}
}
if touched {
self.persist().await?;
}
Ok(touched)
}
/// Phase 45 slice 3 — stamp `doc_drift_reviewed_at` on a playbook,
/// re-admitting it to boost calculation. Idempotent.
pub async fn resolve_doc_drift(&self, playbook_id: &str) -> Result<bool, String> {
let mut touched = false;
{
let mut state = self.state.write().await;
for e in state.entries.iter_mut() {
if e.playbook_id == playbook_id && e.doc_drift_flagged_at.is_some()
&& e.doc_drift_reviewed_at.is_none()
{
e.doc_drift_reviewed_at = Some(chrono::Utc::now().to_rfc3339());
touched = true;
break;
}
}
}
if touched {
self.persist().await?;
}
Ok(touched)
}
/// Read-only: get a playbook entry by id. Used by the drift-check
/// handler to read doc_refs without exposing the full state lock.
pub async fn get_entry(&self, playbook_id: &str) -> Option<PlaybookEntry> {
let state = self.state.read().await;
state.entries.iter().find(|e| e.playbook_id == playbook_id).cloned()
}
/// Phase 25 — retire every entry matching (city, state) whose
/// schema_fingerprint doesn't match the current one. Entries with
/// no fingerprint (legacy) are skipped — caller can use
@ -785,6 +843,13 @@ impl PlaybookMemory {
.filter(|e| {
if e.retired_at.is_some() { return false; }
if e.superseded_at.is_some() { return false; }
// Phase 45 slice 3 — entries flagged for doc drift and
// not yet human-reviewed are excluded from boost. The
// flag is the same strength of exclusion as retirement
// for this purpose; reviewing (via /resolve) re-admits.
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
return false;
}
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
@ -815,6 +880,13 @@ impl PlaybookMemory {
.filter(|e| {
if e.retired_at.is_some() { return false; }
if e.superseded_at.is_some() { return false; }
// Phase 45 slice 3 — same drift exclusion as the
// non-geo path above. Keeps the two filter paths
// consistent so a flagged entry is invisible to
// BOTH geo-targeted and global boost queries.
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
return false;
}
if let Some(vu) = &e.valid_until {
if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) {
if now > parsed.with_timezone(&chrono::Utc) { return false; }
@ -1795,6 +1867,86 @@ mod upsert_tests {
assert_eq!(snap[0].doc_refs[0].tool, "docker");
}
// ─── Phase 45 slice 3 — doc_drift flag / resolve / boost exclusion ───
#[tokio::test]
async fn flag_doc_drift_stamps_timestamp_and_persists() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
pm.set_entries(vec![e.clone()]).await.unwrap();
let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap();
assert!(touched, "first flag should return true");
let fetched = pm.get_entry(&e.playbook_id).await.unwrap();
assert!(fetched.doc_drift_flagged_at.is_some(), "flag_doc_drift should stamp the timestamp");
assert!(fetched.doc_drift_reviewed_at.is_none(), "reviewed_at starts None on fresh flag");
}
#[tokio::test]
async fn flag_doc_drift_is_idempotent_on_already_flagged() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
e.doc_drift_flagged_at = Some("2026-04-01T00:00:00Z".into());
pm.set_entries(vec![e.clone()]).await.unwrap();
let touched = pm.flag_doc_drift(&e.playbook_id).await.unwrap();
assert!(!touched, "already-flagged, unreviewed entry shouldn't re-stamp");
}
#[tokio::test]
async fn resolve_doc_drift_clears_flag_admission_gate() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
e.doc_drift_flagged_at = Some("2026-04-21T00:00:00Z".into());
pm.set_entries(vec![e.clone()]).await.unwrap();
let resolved = pm.resolve_doc_drift(&e.playbook_id).await.unwrap();
assert!(resolved);
let fetched = pm.get_entry(&e.playbook_id).await.unwrap();
assert!(fetched.doc_drift_reviewed_at.is_some());
// Second resolve is a no-op
let again = pm.resolve_doc_drift(&e.playbook_id).await.unwrap();
assert!(!again);
}
#[tokio::test]
async fn boost_excludes_flagged_unreviewed_entries() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let e_clean = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
let mut e_flagged = mk("fill: Welder x1 in Nashville, TN", "2026-04-21", &["Bob"]);
e_flagged.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into());
e_flagged.doc_drift_reviewed_at = None;
pm.set_entries(vec![e_clean, e_flagged]).await.unwrap();
let boosts = pm
.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 10, 0.5,
Some(("Nashville", "TN")), None,
).await;
let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect();
assert!(keys.contains(&"Alice".to_string()));
assert!(!keys.contains(&"Bob".to_string()), "flagged+unreviewed entry leaked into boost");
}
#[tokio::test]
async fn boost_re_admits_resolved_entries() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));
let mut e = mk("fill: Welder x1 in Nashville, TN", "2026-04-22", &["Alice"]);
e.doc_drift_flagged_at = Some("2026-04-22T00:00:00Z".into());
e.doc_drift_reviewed_at = Some("2026-04-22T00:01:00Z".into()); // human reviewed
pm.set_entries(vec![e]).await.unwrap();
let boosts = pm
.compute_boost_for_filtered_with_role(
&[1.0, 0.0, 0.0], 10, 0.5,
Some(("Nashville", "TN")), None,
).await;
let keys: Vec<_> = boosts.keys().map(|(_, _, n)| n.clone()).collect();
assert!(keys.contains(&"Alice".to_string()), "resolved entry should re-enter boost pool");
}
#[tokio::test]
async fn update_refreshes_valid_until_when_caller_provides_one() {
let pm = PlaybookMemory::new(Arc::new(InMemory::new()));

View File

@ -133,6 +133,9 @@ pub fn router(state: VectorState) -> Router {
.route("/playbook_memory/revise", post(revise_playbook_memory))
.route("/playbook_memory/history/{id}", get(playbook_memory_history))
.route("/playbook_memory/status", get(playbook_memory_status))
// Phase 45 slice 3 — doc drift detection + human re-admission.
.route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift))
.route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift))
.with_state(state)
}
@ -2500,6 +2503,94 @@ async fn retire_playbook_memory(
"supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into()))
}
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/check/{id}
///
/// Iterates the playbook's `doc_refs`, asks the context7 bridge whether
/// each one drifted against the recorded snippet_hash. If any tool
/// returned `drifted: true`, stamps `doc_drift_flagged_at` on the
/// entry — which excludes it from boost (via the filter in
/// `compute_boost_for_filtered_with_role`) until a human reviews and
/// resolves.
///
/// Unknown outcomes (bridge down, tool not in context7, no snippet
/// hash) are explicitly NOT enough to flag. Only a positive drifted=true
/// from the bridge flips the flag.
async fn check_doc_drift(
State(state): State<VectorState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome};
let entry = state.playbook_memory.get_entry(&id).await
.ok_or((StatusCode::NOT_FOUND, format!("playbook not found: {id}")))?;
if entry.doc_refs.is_empty() {
return Ok(Json(serde_json::json!({
"playbook_id": id,
"checked_tools": [],
"drifted": false,
"flagged": false,
"reason": "entry has no doc_refs — nothing to check",
})));
}
let cfg = DriftCheckerConfig::default();
let results = check_all_refs(&cfg, &entry.doc_refs).await;
let per_tool: Vec<serde_json::Value> = results.iter().map(|r| {
let (drifted, current, src, reason) = match &r.outcome {
DriftOutcome::Drifted { current_snippet_hash, source_url } =>
(true, Some(current_snippet_hash.clone()), source_url.clone(), None),
DriftOutcome::Unchanged =>
(false, None, None, None),
DriftOutcome::Unknown { reason } =>
(false, None, None, Some(reason.clone())),
};
serde_json::json!({
"tool": r.tool,
"version_seen": r.version_seen,
"drifted": drifted,
"current_snippet_hash": current,
"source_url": src,
"unknown_reason": reason,
})
}).collect();
let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. }));
let flagged = if any_drifted {
state.playbook_memory.flag_doc_drift(&id).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("flag: {e}")))?
} else {
false
};
Ok(Json(serde_json::json!({
"playbook_id": id,
"checked_tools": results.iter().map(|r| &r.tool).collect::<Vec<_>>(),
"drifted": any_drifted,
"flagged": flagged,
"per_tool": per_tool,
})))
}
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id}
///
/// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`.
/// Idempotent: returns `resolved: false` if nothing changed (entry
/// wasn't flagged, already reviewed, or doesn't exist).
async fn resolve_doc_drift(
State(state): State<VectorState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
let resolved = state.playbook_memory.resolve_doc_drift(&id).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("resolve: {e}")))?;
Ok(Json(serde_json::json!({
"playbook_id": id,
"resolved": resolved,
})))
}
/// Phase 27 — request body for `POST /playbook_memory/revise`. Same
/// shape as a seed request minus `append` (revise is always
/// append-semantics for a specific parent) plus `parent_id`. The new