vectord: Phase 45 closure — /doc_drift/scan + doc_drift_corrections.jsonl writes

Phase 45 (doc-drift detection + context7 integration) was mostly
already shipped in prior sessions: DocRef struct, doc_drift module,
/doc_drift/check + /doc_drift/resolve endpoints, mcp-server's
context7_bridge.ts, boost exclusion in compute_boost_for_filtered
_with_role. The two missing pieces this commit lands:

1. POST /vectors/playbook_memory/doc_drift/scan — batch scan across
   ALL active playbooks. Iterates the snapshot, filters out retired
   + already-flagged + no-doc_refs, runs check_all_refs on the rest,
   flags drifted entries via PlaybookMemory::flag_doc_drift.

2. Per-detection write to data/_kb/doc_drift_corrections.jsonl. One
   row per drifted playbook with playbook_id + scanned_at +
   drifted_tools[] + per_tool[] + recommended_action. Downstream
   consumers (overview model, operator dashboard, scrum_master
   prompt enrichment) read this file to surface "this playbook
   compounded the wrong way" signals to humans.

Idempotent by design:
- Already-flagged entries with no resolved_at are counted as
  `already_flagged` and skipped (no double-flag, no duplicate row).
- Re-scanning after resolve_doc_drift() unflags an entry brings it
  back into the eligible set on the next scan.

Aggregate response shape:
  {
    "scanned": N,                    // playbooks with doc_refs we checked
    "newly_flagged": N,              // drift detected this scan
    "already_flagged": N,            // skipped (still under review)
    "skipped_retired": N,
    "skipped_no_refs": N,            // pre-Phase-45 playbooks
    "drifted_by_tool": {tool: count},
    "corrections_written": N,
  }

Verified live:
  POST /doc_drift/scan
    → scanned=4, newly_flagged=4, drifted_by_tool={docker:4, terraform:1},
      corrections_written=4
  POST /doc_drift/scan (re-run)
    → scanned=0, newly_flagged=0, already_flagged=6 (idempotent)
  data/_kb/doc_drift_corrections.jsonl
    → 5 rows total (existing seed + this scan)

Phase 45 closure status:
  DocRef + PlaybookEntry.doc_refs        prior session
  doc_drift module + check_all_refs      prior session
  /doc_drift/check + /resolve            prior session
  mcp-server/context7_bridge.ts          prior session
  boost exclusion in compute_boost_*     prior session
  /doc_drift/scan + corrections.jsonl    THIS COMMIT

The 0→85% thesis stays valid against external doc drift. Popular
playbooks can no longer compound the wrong way as Docker / Terraform
/ React / etc. patch their docs — the scan flags drift, the boost
filter excludes the playbook, the operator reviews the corrections
.jsonl, and a revise call (Phase 27) supersedes the stale entry
with corrected operation/approach.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-27 08:00:50 -05:00
parent 98db129b8f
commit 6cafa7ec0e

View File

@ -146,6 +146,11 @@ pub fn router(state: VectorState) -> Router {
// Phase 45 slice 3 — doc drift detection + human re-admission.
.route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift))
.route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift))
// Phase 45 closure (2026-04-27) — batch scan across all active
// playbooks. Operator runs this on a schedule (cron or manual);
// each newly-detected drift writes a row to
// data/_kb/doc_drift_corrections.jsonl for downstream review.
.route("/playbook_memory/doc_drift/scan", post(scan_doc_drift))
// Pathway memory — consensus-designed sidecar (2026-04-24).
// scrum_master_pipeline POSTs /pathway/insert at the end of each
// review, calls /pathway/query before running the ladder for a
@ -2539,6 +2544,119 @@ async fn check_doc_drift(
})))
}
/// Phase 45 closure (2026-04-27) — POST /playbook_memory/doc_drift/scan
///
/// Iterates all active playbooks (non-retired, has doc_refs), runs
/// drift check against context7 for each, flags drifted entries via
/// PlaybookMemory::flag_doc_drift, and appends a row to
/// data/_kb/doc_drift_corrections.jsonl for each drift detected.
///
/// Returns aggregate stats so an operator can see at-a-glance how
/// many playbooks drifted and which tools moved.
///
/// Honors entries already flagged: they're counted in `already_flagged`
/// (no double-flag, no duplicate corrections.jsonl row).
async fn scan_doc_drift(
State(state): State<VectorState>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome};
let entries = state.playbook_memory.snapshot().await;
let now = chrono::Utc::now().to_rfc3339();
let cfg = DriftCheckerConfig::default();
let mut scanned = 0usize;
let mut newly_flagged = 0usize;
let mut already_flagged = 0usize;
let mut skipped_no_refs = 0usize;
let mut skipped_retired = 0usize;
let mut tool_counts: std::collections::HashMap<String, usize> = Default::default();
let mut corrections_rows: Vec<String> = vec![];
for e in entries.iter() {
if e.retired_at.is_some() { skipped_retired += 1; continue; }
if e.doc_refs.is_empty() { skipped_no_refs += 1; continue; }
if e.doc_drift_flagged_at.is_some() && e.doc_drift_reviewed_at.is_none() {
already_flagged += 1;
continue;
}
scanned += 1;
let results = check_all_refs(&cfg, &e.doc_refs).await;
let drifted_tools: Vec<&str> = results.iter()
.filter(|r| matches!(r.outcome, DriftOutcome::Drifted { .. }))
.map(|r| r.tool.as_str())
.collect();
if drifted_tools.is_empty() { continue; }
// Flag the entry.
let flagged = state.playbook_memory.flag_doc_drift(&e.playbook_id).await
.unwrap_or(false);
if flagged { newly_flagged += 1; }
for t in &drifted_tools {
*tool_counts.entry(t.to_string()).or_insert(0) += 1;
}
// Build corrections.jsonl row — one per drifted playbook with
// the tool list inline. Downstream consumers (overview model,
// operator dashboard) read this to decide reviews + revisions.
let row = serde_json::json!({
"playbook_id": e.playbook_id,
"scanned_at": now,
"drifted_tools": drifted_tools,
"per_tool": results.iter().map(|r| {
let (drifted, current, src) = match &r.outcome {
DriftOutcome::Drifted { current_snippet_hash, source_url } =>
(true, Some(current_snippet_hash.clone()), source_url.clone()),
_ => (false, None, None),
};
serde_json::json!({
"tool": r.tool, "version_seen": r.version_seen,
"drifted": drifted, "current_snippet_hash": current, "source_url": src,
})
}).collect::<Vec<_>>(),
"recommended_action": "review-and-resolve",
});
corrections_rows.push(row.to_string());
}
// Persist corrections.jsonl row(s) for the operator/overview model.
if !corrections_rows.is_empty() {
let path = std::path::PathBuf::from("/home/profit/lakehouse/data/_kb/doc_drift_corrections.jsonl");
if let Some(parent) = path.parent() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
tracing::warn!(target: "vectord.doc_drift", "create_dir_all {parent:?}: {e}");
}
}
let body = corrections_rows.join("\n") + "\n";
if let Err(e) = tokio::fs::OpenOptions::new()
.create(true).append(true).open(&path).await
{
tracing::warn!(target: "vectord.doc_drift", "open {path:?}: {e}");
} else {
use tokio::io::AsyncWriteExt;
match tokio::fs::OpenOptions::new().create(true).append(true).open(&path).await {
Ok(mut f) => {
if let Err(e) = f.write_all(body.as_bytes()).await {
tracing::warn!(target: "vectord.doc_drift", "append {path:?}: {e}");
}
}
Err(e) => tracing::warn!(target: "vectord.doc_drift", "reopen {path:?}: {e}"),
}
}
}
Ok(Json(serde_json::json!({
"scanned_at": now,
"scanned": scanned,
"newly_flagged": newly_flagged,
"already_flagged": already_flagged,
"skipped_retired": skipped_retired,
"skipped_no_refs": skipped_no_refs,
"drifted_by_tool": tool_counts,
"corrections_written": corrections_rows.len(),
})))
}
/// Phase 45 slice 3 — POST /playbook_memory/doc_drift/resolve/{id}
///
/// Human-in-the-loop re-admission. Stamps `doc_drift_reviewed_at`.