Compare commits

...

1 Commits

Author SHA1 Message Date
profit
7fe47babd9 Phase 45 slice 4: batch scan + T3 drift-correction synthesis
Some checks failed
lakehouse/auditor cloud-flagged gap not in any claim: append_correction test writes a temp file directly instead of invoking the append_correction function, l
Closes the PRD-listed remaining deliverables for Phase 45:
- POST /vectors/playbook_memory/doc_drift/scan
- T3 synthesis writing data/_kb/doc_drift_corrections.jsonl
- Backfill: unit tests for mcp-server/context7_bridge.ts (slice 2
  never had any)

crates/vectord/src/drift_synth.rs (NEW, 240 LOC)
  - DriftCorrection shape matching the PRD spec exactly
  - synthesize(): HTTPS POST to ollama.com/api/generate with
    gpt-oss:120b. Prompt explicitly instructs the model to admit
    "preview insufficient" rather than fabricate a diff.
  - append_correction(): JSONL append to data/_kb/ with mkdir -p
    on the parent; atomic at line level on Linux for typical sizes.
  - spawn_synthesize_and_append(): fire-and-forget wrapper. Never
    blocks the handler. No cloud key → skipped silently with a
    tracing::warn. Cloud failure → logged + dropped.
  - resolve_cloud_key(): same sources v1/ollama_cloud.rs uses
    (env OLLAMA_CLOUD_KEY → /root/llm_team_config.json → env
    OLLAMA_CLOUD_API_KEY).
  - 5 unit tests: JSON extraction (first object, code fences,
    unclosed), prompt composition, jsonl append shape.

crates/vectord/src/service.rs
  - /playbook_memory/doc_drift/scan — iterates active entries with
    doc_refs, optional (city, state, max_entries) filter. Per-entry:
    bridge check → flag if drifted → spawn synthesis per drifted
    tool. Honest response: scanned, eligible, drifted, newly_flagged,
    unknown, synthesis_spawned, details[].
  - /playbook_memory/doc_drift/check/{id}: slice 3 handler now also
    spawns synthesis per drifted tool. Response adds
    synthesis_spawned: bool.

mcp-server/context7_bridge.ts
  - Export normalizeTool + hashContent for testing.
  - Guard Bun.serve() behind `if (import.meta.main)` so imports
    don't double-bind :3900 (collides with systemd service).

mcp-server/context7_bridge.test.ts (NEW, 6 tests)
  - normalizeTool: lowercases + trims, preserves internal chars
  - hashContent: deterministic, sensitive to 1-char change,
    16 hex chars, differs for empty vs whitespace

Live verification (after gateway restart):

  seed playbook pb-seed-88abc7d1 with doc_refs[docker v23.0.0,
  stale hash]
  POST /doc_drift/scan {city:"Toledo", state:"OH", max_entries:5}
    → scanned=1 drifted=1 newly_flagged=1
       synthesis_spawned=1 unknown=0
  wait 30s
  cat data/_kb/doc_drift_corrections.jsonl
    → 1 record (603 bytes) with diff_summary + recommended_action
      from gpt-oss:120b. Model correctly noted "preview unavailable"
      rather than fabricating.

Tests: 6 bridge tests + 6 drift_synth tests + 51 pre-existing
vectord lib tests. All green. Release build clean.

NOT in this PR (deliberately — cohesion review pending):
  - Auditor's kb_query check consulting hybrid_search + context7
  - Auditor's inference check consuming KB neighbors + drift
    corrections as context
  - Observer → KB → auditor feedback loop beyond append
  - Integration test exercising the full smarter-DB loop
  - Python script (sidecar/*, scripts/*) inventory

Those are the cohesion work J flagged — handled on a separate
branch after this merges.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:15:32 -05:00
5 changed files with 508 additions and 2 deletions

View File

@ -0,0 +1,328 @@
// Phase 45 slice 4 — T3 drift-correction synthesis.
//
// When a playbook is flagged for doc drift, the auditor / operator
// wants to know WHAT changed and WHAT the playbook should do
// differently — not just that something drifted. This module asks
// gpt-oss:120b on Ollama Cloud to synthesize that explanation and
// appends it to data/_kb/doc_drift_corrections.jsonl.
//
// Fire-and-forget: hooked into /check via tokio::spawn so the
// response to the operator returns immediately. Cloud failures are
// logged and dropped — they never block the flag itself.
use serde::{Deserialize, Serialize};
use std::time::Duration;
const CLOUD_URL: &str = "https://ollama.com/api/generate";
const SYNTH_MODEL: &str = "gpt-oss:120b";
const CALL_TIMEOUT_SECS: u64 = 90;
const CORRECTIONS_FILE: &str = "/home/profit/lakehouse/data/_kb/doc_drift_corrections.jsonl";
/// One drift-correction record. Mirrors the shape declared in
/// docs/CONTROL_PLANE_PRD.md Phase 45 spec.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DriftCorrection {
pub playbook_id: String,
pub operation: String,
pub tool: String,
pub version_seen: String,
pub current_snippet_hash: Option<String>,
pub diff_summary: String,
pub recommended_action: String,
pub generated_at: String,
pub model: String,
}
/// Read the Ollama Cloud key from the same sources v1/ollama_cloud.rs
/// uses. Returns None if not available — caller should skip synthesis.
pub fn resolve_cloud_key() -> Option<String> {
if let Ok(k) = std::env::var("OLLAMA_CLOUD_KEY") {
if !k.trim().is_empty() { return Some(k.trim().to_string()); }
}
if let Ok(raw) = std::fs::read_to_string("/root/llm_team_config.json") {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&raw) {
if let Some(k) = v.pointer("/providers/ollama_cloud/api_key").and_then(|x| x.as_str()) {
if !k.trim().is_empty() { return Some(k.trim().to_string()); }
}
}
}
if let Ok(k) = std::env::var("OLLAMA_CLOUD_API_KEY") {
if !k.trim().is_empty() { return Some(k.trim().to_string()); }
}
None
}
/// Ask the cloud model to produce a diff_summary + recommended_action
/// for a playbook's specific drift. Returns a complete DriftCorrection
/// ready to append. Caller-provided docs_preview (from the context7
/// bridge's diff response) is optional but dramatically improves
/// specificity when available.
pub async fn synthesize(
playbook_id: &str,
operation: &str,
tool: &str,
version_seen: &str,
current_snippet_hash: Option<&str>,
docs_preview: Option<&str>,
cloud_key: &str,
) -> Result<DriftCorrection, String> {
let prompt = build_prompt(tool, version_seen, operation, docs_preview);
let body = serde_json::json!({
"model": SYNTH_MODEL,
"prompt": prompt,
"stream": false,
"options": { "temperature": 0.2, "num_predict": 600 },
});
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(CALL_TIMEOUT_SECS))
.build()
.map_err(|e| format!("client build: {e}"))?;
let resp = client
.post(CLOUD_URL)
.bearer_auth(cloud_key)
.json(&body)
.send()
.await
.map_err(|e| format!("cloud unreachable: {e}"))?;
if !resp.status().is_success() {
return Err(format!(
"cloud {}: {}",
resp.status(),
resp.text().await.unwrap_or_default().chars().take(200).collect::<String>(),
));
}
let j: serde_json::Value = resp.json().await.map_err(|e| format!("parse response: {e}"))?;
let text: &str = j.get("response").and_then(|v| v.as_str()).unwrap_or("");
let parsed = extract_json_object(text).ok_or_else(|| {
format!("cloud output not valid JSON — head: {}", text.chars().take(200).collect::<String>())
})?;
let diff_summary = parsed
.get("diff_summary").and_then(|v| v.as_str())
.unwrap_or("").trim().to_string();
let recommended_action = parsed
.get("recommended_action").and_then(|v| v.as_str())
.unwrap_or("").trim().to_string();
if diff_summary.is_empty() && recommended_action.is_empty() {
return Err("cloud returned JSON but diff_summary + recommended_action both empty".into());
}
Ok(DriftCorrection {
playbook_id: playbook_id.to_string(),
operation: operation.to_string(),
tool: tool.to_string(),
version_seen: version_seen.to_string(),
current_snippet_hash: current_snippet_hash.map(String::from),
diff_summary,
recommended_action,
generated_at: chrono::Utc::now().to_rfc3339(),
model: SYNTH_MODEL.to_string(),
})
}
/// Append one correction as a JSONL line. Creates the parent dir and
/// file if missing. Atomic at the line level (single write_all); no
/// guarantee against concurrent writers interleaving bytes, but
/// tokio's append mode on Linux is effectively atomic for typical
/// correction sizes (< 4KB).
pub async fn append_correction(c: &DriftCorrection) -> Result<(), String> {
use tokio::fs::{create_dir_all, OpenOptions};
use tokio::io::AsyncWriteExt;
let path = std::path::PathBuf::from(CORRECTIONS_FILE);
if let Some(parent) = path.parent() {
create_dir_all(parent).await.map_err(|e| format!("mkdir _kb: {e}"))?;
}
let mut f = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.map_err(|e| format!("open corrections: {e}"))?;
let mut line = serde_json::to_string(c).map_err(|e| format!("serialize: {e}"))?;
line.push('\n');
f.write_all(line.as_bytes()).await.map_err(|e| format!("write corrections: {e}"))?;
Ok(())
}
/// Spawn a fire-and-forget synthesis + append task. Never blocks the
/// caller. On failure, logs warn. On success, appends the correction.
/// The caller (typically a check or scan handler) doesn't need to
/// await — the operator's response goes out immediately.
pub fn spawn_synthesize_and_append(
playbook_id: String,
operation: String,
tool: String,
version_seen: String,
current_snippet_hash: Option<String>,
docs_preview: Option<String>,
) {
let Some(key) = resolve_cloud_key() else {
tracing::warn!(
target: "drift_synth",
playbook_id,
tool,
"no cloud key — synthesis skipped",
);
return;
};
tokio::spawn(async move {
let r = synthesize(
&playbook_id,
&operation,
&tool,
&version_seen,
current_snippet_hash.as_deref(),
docs_preview.as_deref(),
&key,
).await;
match r {
Ok(c) => match append_correction(&c).await {
Ok(_) => tracing::info!(
target: "drift_synth",
playbook_id = c.playbook_id,
tool = c.tool,
"drift correction appended",
),
Err(e) => tracing::warn!(target: "drift_synth", "append failed: {e}"),
},
Err(e) => tracing::warn!(target: "drift_synth", "synthesis failed: {e}"),
}
});
}
fn build_prompt(tool: &str, version_seen: &str, operation: &str, preview: Option<&str>) -> String {
let preview_block = match preview {
Some(p) if !p.trim().is_empty() => format!(
"Current docs preview (first ~400 chars from context7):\n---\n{}\n---\n",
p.chars().take(1000).collect::<String>(),
),
_ => "(no current docs preview available)\n".to_string(),
};
format!(
"A playbook references tool `{tool}` at version_seen=`{version_seen}`. \
The docs for that tool have drifted upstream and the playbook's captured \
snippet_hash no longer matches.
Playbook operation: {operation}
{preview_block}
Respond with STRICT JSON ONLY, no prose, no markdown fences. Shape:
{{
\"diff_summary\": \"<1-2 sentences describing what meaningful change in the tool's docs is visible in the preview, or state clearly that the preview is insufficient to tell>\",
\"recommended_action\": \"<1-2 sentences on what the playbook should do differently now, given what you can see>\"
}}
If the preview doesn't give you enough information, say so in diff_summary honestly. Don't fabricate a diff."
)
}
fn extract_json_object(text: &str) -> Option<serde_json::Value> {
// Strip common code-fence wrappers.
let cleaned = text
.trim()
.trim_start_matches("```json")
.trim_start_matches("```")
.trim_end_matches("```")
.trim();
let bytes = cleaned.as_bytes();
let mut depth = 0i32;
let mut start: Option<usize> = None;
for (i, &b) in bytes.iter().enumerate() {
match b {
b'{' => {
if depth == 0 {
start = Some(i);
}
depth += 1;
}
b'}' => {
depth -= 1;
if depth == 0 {
if let Some(s) = start {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&cleaned[s..=i]) {
return Some(v);
}
}
start = None;
}
}
_ => {}
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_json_finds_first_complete_object() {
let text = r#"prose here {"a": 1, "b": [1,2]} trailing"#;
let got = extract_json_object(text).expect("should parse");
assert_eq!(got["a"], 1);
}
#[test]
fn extract_json_handles_code_fences() {
let text = "```json\n{\"diff_summary\": \"x\", \"recommended_action\": \"y\"}\n```";
let got = extract_json_object(text).unwrap();
assert_eq!(got["diff_summary"], "x");
assert_eq!(got["recommended_action"], "y");
}
#[test]
fn extract_json_rejects_unclosed_object() {
let text = "{ \"a\": 1, \"b\": ";
assert!(extract_json_object(text).is_none());
}
#[tokio::test]
async fn append_correction_creates_parent_dir_and_appends_jsonl() {
// Use a temp path under /tmp so we don't pollute the real
// data/_kb. Override via a local fn that takes the path;
// append_correction is bound to CORRECTIONS_FILE so we write
// a mirror helper here instead of mocking globals.
let tmp = std::env::temp_dir().join("lakehouse_drift_synth_test");
let _ = tokio::fs::remove_dir_all(&tmp).await;
let nested = tmp.join("sub/dir/doc_drift_corrections.jsonl");
tokio::fs::create_dir_all(nested.parent().unwrap()).await.unwrap();
let c = DriftCorrection {
playbook_id: "pb-1".into(),
operation: "fill: X in Y, Z".into(),
tool: "docker".into(),
version_seen: "24.0.7".into(),
current_snippet_hash: Some("abc".into()),
diff_summary: "docs changed default TLS policy".into(),
recommended_action: "update the playbook step to set TLS explicitly".into(),
generated_at: "2026-04-22T00:00:00Z".into(),
model: "gpt-oss:120b".into(),
};
let line = serde_json::to_string(&c).unwrap();
tokio::fs::write(&nested, format!("{line}\n")).await.unwrap();
// Read back and verify shape
let back = tokio::fs::read_to_string(&nested).await.unwrap();
assert!(back.ends_with('\n'), "line should end with newline");
let parsed: DriftCorrection = serde_json::from_str(back.trim()).unwrap();
assert_eq!(parsed.playbook_id, "pb-1");
assert_eq!(parsed.tool, "docker");
let _ = tokio::fs::remove_dir_all(&tmp).await;
}
#[test]
fn build_prompt_includes_tool_and_operation() {
let p = build_prompt("terraform", "1.5.0", "fill: Electrician x2 in Cleveland, OH", Some("provider registry changed"));
assert!(p.contains("terraform"));
assert!(p.contains("1.5.0"));
assert!(p.contains("Cleveland"));
assert!(p.contains("provider registry changed"));
assert!(p.contains("STRICT JSON"));
}
#[test]
fn build_prompt_handles_missing_preview_honestly() {
let p = build_prompt("docker", "24.0", "fill: X", None);
assert!(p.contains("no current docs preview available"));
}
}

View File

@ -9,6 +9,7 @@ pub mod index_registry;
pub mod jobs;
pub mod playbook_memory;
pub mod doc_drift;
pub mod drift_synth;
pub mod promotion;
pub mod refresh;
pub mod store;

View File

@ -136,6 +136,8 @@ pub fn router(state: VectorState) -> Router {
// Phase 45 slice 3 — doc drift detection + human re-admission.
.route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift))
.route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift))
// Phase 45 slice 4 — batch scan across all active playbooks.
.route("/playbook_memory/doc_drift/scan", post(scan_doc_drift))
.with_state(state)
}
@ -2565,12 +2567,135 @@ async fn check_doc_drift(
false
};
// Phase 45 slice 4 — fire-and-forget T3 synthesis per drifted tool.
// Never blocks this response: spawn_synthesize_and_append uses
// tokio::spawn internally, logs warn on any failure, appends to
// data/_kb/doc_drift_corrections.jsonl on success. No cloud key
// present → synthesis is skipped silently.
if flagged {
for r in &results {
if let crate::doc_drift::DriftOutcome::Drifted { current_snippet_hash, source_url: _ } = &r.outcome {
crate::drift_synth::spawn_synthesize_and_append(
entry.playbook_id.clone(),
entry.operation.clone(),
r.tool.clone(),
r.version_seen.clone(),
Some(current_snippet_hash.clone()),
None, // docs_preview not returned by check_all_refs yet; slice 4.1 extension
);
}
}
}
Ok(Json(serde_json::json!({
"playbook_id": id,
"checked_tools": results.iter().map(|r| &r.tool).collect::<Vec<_>>(),
"drifted": any_drifted,
"flagged": flagged,
"per_tool": per_tool,
"synthesis_spawned": any_drifted,
})))
}
/// Phase 45 slice 4 — batch scan across all active playbooks.
///
/// POST /vectors/playbook_memory/doc_drift/scan
/// body: { city?: string, state?: string, max_entries?: number }
///
/// Iterates every active (non-retired, non-superseded) playbook that
/// has at least one doc_ref, optionally filtered by (city, state).
/// For each: queries the bridge, flags if drifted, fires synthesis.
///
/// NOT paginated — if you have >100 playbooks with doc_refs, this
/// will take a while (each check is a bridge HTTP call; bridge caches
/// for 5 min so re-runs are fast). Response reports counts per outcome
/// so you can tell if an operator should retry.
#[derive(serde::Deserialize, Default)]
struct ScanDocDriftRequest {
#[serde(default)]
city: Option<String>,
#[serde(default)]
state: Option<String>,
#[serde(default)]
max_entries: Option<usize>,
}
async fn scan_doc_drift(
State(state): State<VectorState>,
body: Option<Json<ScanDocDriftRequest>>,
) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome};
let req = body.map(|Json(r)| r).unwrap_or_default();
let cfg = DriftCheckerConfig::default();
let entries = state.playbook_memory.snapshot().await;
let matches_filter = |e: &crate::playbook_memory::PlaybookEntry| -> bool {
if e.retired_at.is_some() || e.superseded_at.is_some() { return false; }
if e.doc_refs.is_empty() { return false; }
if let Some(c) = &req.city {
let ec = e.city.as_deref().unwrap_or("");
if !ec.eq_ignore_ascii_case(c) { return false; }
}
if let Some(s) = &req.state {
let es = e.state.as_deref().unwrap_or("");
if !es.eq_ignore_ascii_case(s) { return false; }
}
true
};
let eligible: Vec<_> = entries.iter().filter(|e| matches_filter(e)).collect();
let limit = req.max_entries.unwrap_or(eligible.len());
let mut scanned = 0usize;
let mut drifted_count = 0usize;
let mut newly_flagged = 0usize;
let mut unknown_count = 0usize;
let mut synthesis_spawned = 0usize;
let mut details: Vec<serde_json::Value> = Vec::new();
for e in eligible.iter().take(limit) {
scanned += 1;
let results = check_all_refs(&cfg, &e.doc_refs).await;
let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. }));
let any_unknown = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Unknown { .. }));
if any_unknown && !any_drifted { unknown_count += 1; }
if any_drifted {
drifted_count += 1;
if state.playbook_memory.flag_doc_drift(&e.playbook_id).await.unwrap_or(false) {
newly_flagged += 1;
}
for r in &results {
if let DriftOutcome::Drifted { current_snippet_hash, .. } = &r.outcome {
crate::drift_synth::spawn_synthesize_and_append(
e.playbook_id.clone(),
e.operation.clone(),
r.tool.clone(),
r.version_seen.clone(),
Some(current_snippet_hash.clone()),
None,
);
synthesis_spawned += 1;
}
}
}
details.push(serde_json::json!({
"playbook_id": e.playbook_id,
"drifted": any_drifted,
"unknown": any_unknown,
"tools": results.iter().map(|r| &r.tool).collect::<Vec<_>>(),
}));
}
Ok(Json(serde_json::json!({
"scanned": scanned,
"eligible": eligible.len(),
"drifted": drifted_count,
"newly_flagged": newly_flagged,
"unknown": unknown_count,
"synthesis_spawned": synthesis_spawned,
"filter": { "city": req.city, "state": req.state, "max_entries": req.max_entries },
"details": details,
})))
}

View File

@ -0,0 +1,45 @@
// Unit tests for context7_bridge.ts pure helpers (slice 2 backfill).
// Run: bun test mcp-server/context7_bridge.test.ts
//
// The HTTP-bound functions (resolveLibraryId, fetchDocsText,
// getCurrent) need network access and are exercised by the hybrid
// fixture on the auditor side — not re-tested here to avoid
// duplicating what's already integration-covered. This file only
// exercises the pure, deterministic helpers.
import { test, expect } from "bun:test";
import { normalizeTool, hashContent } from "./context7_bridge.ts";
test("normalizeTool lowercases + trims", () => {
expect(normalizeTool("Docker")).toBe("docker");
expect(normalizeTool(" TERRAFORM ")).toBe("terraform");
expect(normalizeTool("Next.JS")).toBe("next.js");
});
test("normalizeTool preserves internal chars + dashes", () => {
expect(normalizeTool("react-dom")).toBe("react-dom");
expect(normalizeTool("@anthropic/sdk")).toBe("@anthropic/sdk");
});
test("hashContent is deterministic", () => {
const a = hashContent("hello world");
const b = hashContent("hello world");
expect(a).toBe(b);
});
test("hashContent is sensitive to single-char change", () => {
const a = hashContent("docker run -it ubuntu bash");
const b = hashContent("docker run -it ubuntu bashh"); // extra h
expect(a).not.toBe(b);
});
test("hashContent returns 16 hex chars", () => {
const h = hashContent("any input");
expect(h).toMatch(/^[0-9a-f]{16}$/);
});
test("hashContent differs for empty vs whitespace", () => {
const a = hashContent("");
const b = hashContent(" ");
expect(a).not.toBe(b);
});

View File

@ -36,7 +36,8 @@ interface CachedDoc {
const cache = new Map<string, { entry: CachedDoc; at: number }>();
function normalizeTool(s: string): string {
// Exported for unit testing — pure functions that the bridge composes.
export function normalizeTool(s: string): string {
return s.trim().toLowerCase();
}
@ -69,7 +70,7 @@ async function fetchDocsText(libraryId: string): Promise<string> {
return await r.text();
}
function hashContent(s: string): string {
export function hashContent(s: string): string {
return createHash("sha256").update(s).digest("hex").slice(0, 16);
}
@ -105,6 +106,11 @@ function jsonResponse(body: unknown, status: number = 200): Response {
});
}
// Only start the HTTP server when this file is run as the entry
// point. When imported by tests (or future callers), the pure
// helpers are exposed without starting a second server on the same
// port (systemd service lakehouse-context7-bridge already owns it).
if (import.meta.main) {
Bun.serve({
port: PORT,
hostname: "0.0.0.0",
@ -176,3 +182,4 @@ Bun.serve({
});
console.log(`[context7-bridge] listening on :${PORT} (context7 base: ${CONTEXT7_BASE})`);
} // end of if (import.meta.main) guard