diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index a8a44ce..20fe7bd 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -146,6 +146,11 @@ pub fn router(state: VectorState) -> Router { // Phase 45 slice 3 — doc drift detection + human re-admission. .route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift)) .route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift)) + // Phase 45 closure (2026-04-27) — batch scan across all active + // playbooks. Operator runs this on a schedule (cron or manual); + // each newly-detected drift writes a row to + // data/_kb/doc_drift_corrections.jsonl for downstream review. + .route("/playbook_memory/doc_drift/scan", post(scan_doc_drift)) // Pathway memory — consensus-designed sidecar (2026-04-24). // scrum_master_pipeline POSTs /pathway/insert at the end of each // review, calls /pathway/query before running the ladder for a @@ -2539,6 +2544,119 @@ async fn check_doc_drift( }))) } +/// Phase 45 closure (2026-04-27) — POST /playbook_memory/doc_drift/scan +/// +/// Iterates all active playbooks (non-retired, has doc_refs), runs +/// drift check against context7 for each, flags drifted entries via +/// PlaybookMemory::flag_doc_drift, and appends a row to +/// data/_kb/doc_drift_corrections.jsonl for each drift detected. +/// +/// Returns aggregate stats so an operator can see at-a-glance how +/// many playbooks drifted and which tools moved. +/// +/// Honors entries already flagged: they're counted in `already_flagged` +/// (no double-flag, no duplicate corrections.jsonl row). +async fn scan_doc_drift( + State(state): State, +) -> Result, (StatusCode, String)> { + use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome}; + + let entries = state.playbook_memory.snapshot().await; + let now = chrono::Utc::now().to_rfc3339(); + let cfg = DriftCheckerConfig::default(); + + let mut scanned = 0usize; + let mut newly_flagged = 0usize; + let mut already_flagged = 0usize; + let mut skipped_no_refs = 0usize; + let mut skipped_retired = 0usize; + let mut tool_counts: std::collections::HashMap = Default::default(); + let mut corrections_rows: Vec = vec![]; + + for e in entries.iter() { + if e.retired_at.is_some() { skipped_retired += 1; continue; } + if e.doc_refs.is_empty() { skipped_no_refs += 1; continue; } + if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() { + already_flagged += 1; + continue; + } + scanned += 1; + let results = check_all_refs(&cfg, &e.doc_refs).await; + let drifted_tools: Vec<&str> = results.iter() + .filter(|r| matches!(r.outcome, DriftOutcome::Drifted { .. })) + .map(|r| r.tool.as_str()) + .collect(); + if drifted_tools.is_empty() { continue; } + + // Flag the entry. + let flagged = state.playbook_memory.flag_doc_drift(&e.playbook_id).await + .unwrap_or(false); + if flagged { newly_flagged += 1; } + for t in &drifted_tools { + *tool_counts.entry(t.to_string()).or_insert(0) += 1; + } + + // Build corrections.jsonl row — one per drifted playbook with + // the tool list inline. Downstream consumers (overview model, + // operator dashboard) read this to decide reviews + revisions. + let row = serde_json::json!({ + "playbook_id": e.playbook_id, + "scanned_at": now, + "drifted_tools": drifted_tools, + "per_tool": results.iter().map(|r| { + let (drifted, current, src) = match &r.outcome { + DriftOutcome::Drifted { current_snippet_hash, source_url } => + (true, Some(current_snippet_hash.clone()), source_url.clone()), + _ => (false, None, None), + }; + serde_json::json!({ + "tool": r.tool, "version_seen": r.version_seen, + "drifted": drifted, "current_snippet_hash": current, "source_url": src, + }) + }).collect::>(), + "recommended_action": "review-and-resolve", + }); + corrections_rows.push(row.to_string()); + } + + // Persist corrections.jsonl row(s) for the operator/overview model. + if !corrections_rows.is_empty() { + let path = std::path::PathBuf::from("/home/profit/lakehouse/data/_kb/doc_drift_corrections.jsonl"); + if let Some(parent) = path.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + tracing::warn!(target: "vectord.doc_drift", "create_dir_all {parent:?}: {e}"); + } + } + let body = corrections_rows.join("\n") + "\n"; + if let Err(e) = tokio::fs::OpenOptions::new() + .create(true).append(true).open(&path).await + { + tracing::warn!(target: "vectord.doc_drift", "open {path:?}: {e}"); + } else { + use tokio::io::AsyncWriteExt; + match tokio::fs::OpenOptions::new().create(true).append(true).open(&path).await { + Ok(mut f) => { + if let Err(e) = f.write_all(body.as_bytes()).await { + tracing::warn!(target: "vectord.doc_drift", "append {path:?}: {e}"); + } + } + Err(e) => tracing::warn!(target: "vectord.doc_drift", "reopen {path:?}: {e}"), + } + } + } + + Ok(Json(serde_json::json!({ + "scanned_at": now, + "scanned": scanned, + "newly_flagged": newly_flagged, + "already_flagged": already_flagged, + "skipped_retired": skipped_retired, + "skipped_no_refs": skipped_no_refs, + "drifted_by_tool": tool_counts, + "corrections_written": corrections_rows.len(), + }))) +} + /// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id} /// /// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`.