diff --git a/crates/vectord/src/drift_synth.rs b/crates/vectord/src/drift_synth.rs new file mode 100644 index 0000000..6ab10a9 --- /dev/null +++ b/crates/vectord/src/drift_synth.rs @@ -0,0 +1,328 @@ +// Phase 45 slice 4 — T3 drift-correction synthesis. +// +// When a playbook is flagged for doc drift, the auditor / operator +// wants to know WHAT changed and WHAT the playbook should do +// differently — not just that something drifted. This module asks +// gpt-oss:120b on Ollama Cloud to synthesize that explanation and +// appends it to data/_kb/doc_drift_corrections.jsonl. +// +// Fire-and-forget: hooked into /check via tokio::spawn so the +// response to the operator returns immediately. Cloud failures are +// logged and dropped — they never block the flag itself. + +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +const CLOUD_URL: &str = "https://ollama.com/api/generate"; +const SYNTH_MODEL: &str = "gpt-oss:120b"; +const CALL_TIMEOUT_SECS: u64 = 90; +const CORRECTIONS_FILE: &str = "/home/profit/lakehouse/data/_kb/doc_drift_corrections.jsonl"; + +/// One drift-correction record. Mirrors the shape declared in +/// docs/CONTROL_PLANE_PRD.md Phase 45 spec. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DriftCorrection { + pub playbook_id: String, + pub operation: String, + pub tool: String, + pub version_seen: String, + pub current_snippet_hash: Option, + pub diff_summary: String, + pub recommended_action: String, + pub generated_at: String, + pub model: String, +} + +/// Read the Ollama Cloud key from the same sources v1/ollama_cloud.rs +/// uses. Returns None if not available — caller should skip synthesis. +pub fn resolve_cloud_key() -> Option { + if let Ok(k) = std::env::var("OLLAMA_CLOUD_KEY") { + if !k.trim().is_empty() { return Some(k.trim().to_string()); } + } + if let Ok(raw) = std::fs::read_to_string("/root/llm_team_config.json") { + if let Ok(v) = serde_json::from_str::(&raw) { + if let Some(k) = v.pointer("/providers/ollama_cloud/api_key").and_then(|x| x.as_str()) { + if !k.trim().is_empty() { return Some(k.trim().to_string()); } + } + } + } + if let Ok(k) = std::env::var("OLLAMA_CLOUD_API_KEY") { + if !k.trim().is_empty() { return Some(k.trim().to_string()); } + } + None +} + +/// Ask the cloud model to produce a diff_summary + recommended_action +/// for a playbook's specific drift. Returns a complete DriftCorrection +/// ready to append. Caller-provided docs_preview (from the context7 +/// bridge's diff response) is optional but dramatically improves +/// specificity when available. +pub async fn synthesize( + playbook_id: &str, + operation: &str, + tool: &str, + version_seen: &str, + current_snippet_hash: Option<&str>, + docs_preview: Option<&str>, + cloud_key: &str, +) -> Result { + let prompt = build_prompt(tool, version_seen, operation, docs_preview); + let body = serde_json::json!({ + "model": SYNTH_MODEL, + "prompt": prompt, + "stream": false, + "options": { "temperature": 0.2, "num_predict": 600 }, + }); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(CALL_TIMEOUT_SECS)) + .build() + .map_err(|e| format!("client build: {e}"))?; + let resp = client + .post(CLOUD_URL) + .bearer_auth(cloud_key) + .json(&body) + .send() + .await + .map_err(|e| format!("cloud unreachable: {e}"))?; + if !resp.status().is_success() { + return Err(format!( + "cloud {}: {}", + resp.status(), + resp.text().await.unwrap_or_default().chars().take(200).collect::(), + )); + } + let j: serde_json::Value = resp.json().await.map_err(|e| format!("parse response: {e}"))?; + let text: &str = j.get("response").and_then(|v| v.as_str()).unwrap_or(""); + let parsed = extract_json_object(text).ok_or_else(|| { + format!("cloud output not valid JSON — head: {}", text.chars().take(200).collect::()) + })?; + let diff_summary = parsed + .get("diff_summary").and_then(|v| v.as_str()) + .unwrap_or("").trim().to_string(); + let recommended_action = parsed + .get("recommended_action").and_then(|v| v.as_str()) + .unwrap_or("").trim().to_string(); + if diff_summary.is_empty() && recommended_action.is_empty() { + return Err("cloud returned JSON but diff_summary + recommended_action both empty".into()); + } + Ok(DriftCorrection { + playbook_id: playbook_id.to_string(), + operation: operation.to_string(), + tool: tool.to_string(), + version_seen: version_seen.to_string(), + current_snippet_hash: current_snippet_hash.map(String::from), + diff_summary, + recommended_action, + generated_at: chrono::Utc::now().to_rfc3339(), + model: SYNTH_MODEL.to_string(), + }) +} + +/// Append one correction as a JSONL line. Creates the parent dir and +/// file if missing. Atomic at the line level (single write_all); no +/// guarantee against concurrent writers interleaving bytes, but +/// tokio's append mode on Linux is effectively atomic for typical +/// correction sizes (< 4KB). +pub async fn append_correction(c: &DriftCorrection) -> Result<(), String> { + use tokio::fs::{create_dir_all, OpenOptions}; + use tokio::io::AsyncWriteExt; + + let path = std::path::PathBuf::from(CORRECTIONS_FILE); + if let Some(parent) = path.parent() { + create_dir_all(parent).await.map_err(|e| format!("mkdir _kb: {e}"))?; + } + let mut f = OpenOptions::new() + .create(true) + .append(true) + .open(&path) + .await + .map_err(|e| format!("open corrections: {e}"))?; + let mut line = serde_json::to_string(c).map_err(|e| format!("serialize: {e}"))?; + line.push('\n'); + f.write_all(line.as_bytes()).await.map_err(|e| format!("write corrections: {e}"))?; + Ok(()) +} + +/// Spawn a fire-and-forget synthesis + append task. Never blocks the +/// caller. On failure, logs warn. On success, appends the correction. +/// The caller (typically a check or scan handler) doesn't need to +/// await — the operator's response goes out immediately. +pub fn spawn_synthesize_and_append( + playbook_id: String, + operation: String, + tool: String, + version_seen: String, + current_snippet_hash: Option, + docs_preview: Option, +) { + let Some(key) = resolve_cloud_key() else { + tracing::warn!( + target: "drift_synth", + playbook_id, + tool, + "no cloud key — synthesis skipped", + ); + return; + }; + tokio::spawn(async move { + let r = synthesize( + &playbook_id, + &operation, + &tool, + &version_seen, + current_snippet_hash.as_deref(), + docs_preview.as_deref(), + &key, + ).await; + match r { + Ok(c) => match append_correction(&c).await { + Ok(_) => tracing::info!( + target: "drift_synth", + playbook_id = c.playbook_id, + tool = c.tool, + "drift correction appended", + ), + Err(e) => tracing::warn!(target: "drift_synth", "append failed: {e}"), + }, + Err(e) => tracing::warn!(target: "drift_synth", "synthesis failed: {e}"), + } + }); +} + +fn build_prompt(tool: &str, version_seen: &str, operation: &str, preview: Option<&str>) -> String { + let preview_block = match preview { + Some(p) if !p.trim().is_empty() => format!( + "Current docs preview (first ~400 chars from context7):\n---\n{}\n---\n", + p.chars().take(1000).collect::(), + ), + _ => "(no current docs preview available)\n".to_string(), + }; + format!( + "A playbook references tool `{tool}` at version_seen=`{version_seen}`. \ +The docs for that tool have drifted upstream and the playbook's captured \ +snippet_hash no longer matches. + +Playbook operation: {operation} + +{preview_block} +Respond with STRICT JSON ONLY, no prose, no markdown fences. Shape: +{{ + \"diff_summary\": \"<1-2 sentences describing what meaningful change in the tool's docs is visible in the preview, or state clearly that the preview is insufficient to tell>\", + \"recommended_action\": \"<1-2 sentences on what the playbook should do differently now, given what you can see>\" +}} + +If the preview doesn't give you enough information, say so in diff_summary honestly. Don't fabricate a diff." + ) +} + +fn extract_json_object(text: &str) -> Option { + // Strip common code-fence wrappers. + let cleaned = text + .trim() + .trim_start_matches("```json") + .trim_start_matches("```") + .trim_end_matches("```") + .trim(); + let bytes = cleaned.as_bytes(); + let mut depth = 0i32; + let mut start: Option = None; + for (i, &b) in bytes.iter().enumerate() { + match b { + b'{' => { + if depth == 0 { + start = Some(i); + } + depth += 1; + } + b'}' => { + depth -= 1; + if depth == 0 { + if let Some(s) = start { + if let Ok(v) = serde_json::from_str::(&cleaned[s..=i]) { + return Some(v); + } + } + start = None; + } + } + _ => {} + } + } + None +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extract_json_finds_first_complete_object() { + let text = r#"prose here {"a": 1, "b": [1,2]} trailing"#; + let got = extract_json_object(text).expect("should parse"); + assert_eq!(got["a"], 1); + } + + #[test] + fn extract_json_handles_code_fences() { + let text = "```json\n{\"diff_summary\": \"x\", \"recommended_action\": \"y\"}\n```"; + let got = extract_json_object(text).unwrap(); + assert_eq!(got["diff_summary"], "x"); + assert_eq!(got["recommended_action"], "y"); + } + + #[test] + fn extract_json_rejects_unclosed_object() { + let text = "{ \"a\": 1, \"b\": "; + assert!(extract_json_object(text).is_none()); + } + + #[tokio::test] + async fn append_correction_creates_parent_dir_and_appends_jsonl() { + // Use a temp path under /tmp so we don't pollute the real + // data/_kb. Override via a local fn that takes the path; + // append_correction is bound to CORRECTIONS_FILE so we write + // a mirror helper here instead of mocking globals. + let tmp = std::env::temp_dir().join("lakehouse_drift_synth_test"); + let _ = tokio::fs::remove_dir_all(&tmp).await; + let nested = tmp.join("sub/dir/doc_drift_corrections.jsonl"); + tokio::fs::create_dir_all(nested.parent().unwrap()).await.unwrap(); + + let c = DriftCorrection { + playbook_id: "pb-1".into(), + operation: "fill: X in Y, Z".into(), + tool: "docker".into(), + version_seen: "24.0.7".into(), + current_snippet_hash: Some("abc".into()), + diff_summary: "docs changed default TLS policy".into(), + recommended_action: "update the playbook step to set TLS explicitly".into(), + generated_at: "2026-04-22T00:00:00Z".into(), + model: "gpt-oss:120b".into(), + }; + let line = serde_json::to_string(&c).unwrap(); + tokio::fs::write(&nested, format!("{line}\n")).await.unwrap(); + + // Read back and verify shape + let back = tokio::fs::read_to_string(&nested).await.unwrap(); + assert!(back.ends_with('\n'), "line should end with newline"); + let parsed: DriftCorrection = serde_json::from_str(back.trim()).unwrap(); + assert_eq!(parsed.playbook_id, "pb-1"); + assert_eq!(parsed.tool, "docker"); + let _ = tokio::fs::remove_dir_all(&tmp).await; + } + + #[test] + fn build_prompt_includes_tool_and_operation() { + let p = build_prompt("terraform", "1.5.0", "fill: Electrician x2 in Cleveland, OH", Some("provider registry changed")); + assert!(p.contains("terraform")); + assert!(p.contains("1.5.0")); + assert!(p.contains("Cleveland")); + assert!(p.contains("provider registry changed")); + assert!(p.contains("STRICT JSON")); + } + + #[test] + fn build_prompt_handles_missing_preview_honestly() { + let p = build_prompt("docker", "24.0", "fill: X", None); + assert!(p.contains("no current docs preview available")); + } +} diff --git a/crates/vectord/src/lib.rs b/crates/vectord/src/lib.rs index c937fc5..a1e345c 100644 --- a/crates/vectord/src/lib.rs +++ b/crates/vectord/src/lib.rs @@ -9,6 +9,7 @@ pub mod index_registry; pub mod jobs; pub mod playbook_memory; pub mod doc_drift; +pub mod drift_synth; pub mod promotion; pub mod refresh; pub mod store; diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 99d971f..caab297 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -136,6 +136,8 @@ pub fn router(state: VectorState) -> Router { // Phase 45 slice 3 — doc drift detection + human re-admission. .route("/playbook_memory/doc_drift/check/{id}", post(check_doc_drift)) .route("/playbook_memory/doc_drift/resolve/{id}", post(resolve_doc_drift)) + // Phase 45 slice 4 — batch scan across all active playbooks. + .route("/playbook_memory/doc_drift/scan", post(scan_doc_drift)) .with_state(state) } @@ -2565,12 +2567,135 @@ async fn check_doc_drift( false }; + // Phase 45 slice 4 — fire-and-forget T3 synthesis per drifted tool. + // Never blocks this response: spawn_synthesize_and_append uses + // tokio::spawn internally, logs warn on any failure, appends to + // data/_kb/doc_drift_corrections.jsonl on success. No cloud key + // present → synthesis is skipped silently. + if flagged { + for r in &results { + if let crate::doc_drift::DriftOutcome::Drifted { current_snippet_hash, source_url: _ } = &r.outcome { + crate::drift_synth::spawn_synthesize_and_append( + entry.playbook_id.clone(), + entry.operation.clone(), + r.tool.clone(), + r.version_seen.clone(), + Some(current_snippet_hash.clone()), + None, // docs_preview not returned by check_all_refs yet; slice 4.1 extension + ); + } + } + } + Ok(Json(serde_json::json!({ "playbook_id": id, "checked_tools": results.iter().map(|r| &r.tool).collect::>(), "drifted": any_drifted, "flagged": flagged, "per_tool": per_tool, + "synthesis_spawned": any_drifted, + }))) +} + +/// Phase 45 slice 4 — batch scan across all active playbooks. +/// +/// POST /vectors/playbook_memory/doc_drift/scan +/// body: { city?: string, state?: string, max_entries?: number } +/// +/// Iterates every active (non-retired, non-superseded) playbook that +/// has at least one doc_ref, optionally filtered by (city, state). +/// For each: queries the bridge, flags if drifted, fires synthesis. +/// +/// NOT paginated — if you have >100 playbooks with doc_refs, this +/// will take a while (each check is a bridge HTTP call; bridge caches +/// for 5 min so re-runs are fast). Response reports counts per outcome +/// so you can tell if an operator should retry. +#[derive(serde::Deserialize, Default)] +struct ScanDocDriftRequest { + #[serde(default)] + city: Option, + #[serde(default)] + state: Option, + #[serde(default)] + max_entries: Option, +} + +async fn scan_doc_drift( + State(state): State, + body: Option>, +) -> Result, (StatusCode, String)> { + use crate::doc_drift::{check_all_refs, DriftCheckerConfig, DriftOutcome}; + + let req = body.map(|Json(r)| r).unwrap_or_default(); + let cfg = DriftCheckerConfig::default(); + let entries = state.playbook_memory.snapshot().await; + + let matches_filter = |e: &crate::playbook_memory::PlaybookEntry| -> bool { + if e.retired_at.is_some() || e.superseded_at.is_some() { return false; } + if e.doc_refs.is_empty() { return false; } + if let Some(c) = &req.city { + let ec = e.city.as_deref().unwrap_or(""); + if !ec.eq_ignore_ascii_case(c) { return false; } + } + if let Some(s) = &req.state { + let es = e.state.as_deref().unwrap_or(""); + if !es.eq_ignore_ascii_case(s) { return false; } + } + true + }; + + let eligible: Vec<_> = entries.iter().filter(|e| matches_filter(e)).collect(); + let limit = req.max_entries.unwrap_or(eligible.len()); + + let mut scanned = 0usize; + let mut drifted_count = 0usize; + let mut newly_flagged = 0usize; + let mut unknown_count = 0usize; + let mut synthesis_spawned = 0usize; + let mut details: Vec = Vec::new(); + + for e in eligible.iter().take(limit) { + scanned += 1; + let results = check_all_refs(&cfg, &e.doc_refs).await; + let any_drifted = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Drifted { .. })); + let any_unknown = results.iter().any(|r| matches!(r.outcome, DriftOutcome::Unknown { .. })); + if any_unknown && !any_drifted { unknown_count += 1; } + if any_drifted { + drifted_count += 1; + if state.playbook_memory.flag_doc_drift(&e.playbook_id).await.unwrap_or(false) { + newly_flagged += 1; + } + for r in &results { + if let DriftOutcome::Drifted { current_snippet_hash, .. } = &r.outcome { + crate::drift_synth::spawn_synthesize_and_append( + e.playbook_id.clone(), + e.operation.clone(), + r.tool.clone(), + r.version_seen.clone(), + Some(current_snippet_hash.clone()), + None, + ); + synthesis_spawned += 1; + } + } + } + details.push(serde_json::json!({ + "playbook_id": e.playbook_id, + "drifted": any_drifted, + "unknown": any_unknown, + "tools": results.iter().map(|r| &r.tool).collect::>(), + })); + } + + Ok(Json(serde_json::json!({ + "scanned": scanned, + "eligible": eligible.len(), + "drifted": drifted_count, + "newly_flagged": newly_flagged, + "unknown": unknown_count, + "synthesis_spawned": synthesis_spawned, + "filter": { "city": req.city, "state": req.state, "max_entries": req.max_entries }, + "details": details, }))) } diff --git a/mcp-server/context7_bridge.test.ts b/mcp-server/context7_bridge.test.ts new file mode 100644 index 0000000..4574771 --- /dev/null +++ b/mcp-server/context7_bridge.test.ts @@ -0,0 +1,45 @@ +// Unit tests for context7_bridge.ts pure helpers (slice 2 backfill). +// Run: bun test mcp-server/context7_bridge.test.ts +// +// The HTTP-bound functions (resolveLibraryId, fetchDocsText, +// getCurrent) need network access and are exercised by the hybrid +// fixture on the auditor side — not re-tested here to avoid +// duplicating what's already integration-covered. This file only +// exercises the pure, deterministic helpers. + +import { test, expect } from "bun:test"; +import { normalizeTool, hashContent } from "./context7_bridge.ts"; + +test("normalizeTool lowercases + trims", () => { + expect(normalizeTool("Docker")).toBe("docker"); + expect(normalizeTool(" TERRAFORM ")).toBe("terraform"); + expect(normalizeTool("Next.JS")).toBe("next.js"); +}); + +test("normalizeTool preserves internal chars + dashes", () => { + expect(normalizeTool("react-dom")).toBe("react-dom"); + expect(normalizeTool("@anthropic/sdk")).toBe("@anthropic/sdk"); +}); + +test("hashContent is deterministic", () => { + const a = hashContent("hello world"); + const b = hashContent("hello world"); + expect(a).toBe(b); +}); + +test("hashContent is sensitive to single-char change", () => { + const a = hashContent("docker run -it ubuntu bash"); + const b = hashContent("docker run -it ubuntu bashh"); // extra h + expect(a).not.toBe(b); +}); + +test("hashContent returns 16 hex chars", () => { + const h = hashContent("any input"); + expect(h).toMatch(/^[0-9a-f]{16}$/); +}); + +test("hashContent differs for empty vs whitespace", () => { + const a = hashContent(""); + const b = hashContent(" "); + expect(a).not.toBe(b); +}); diff --git a/mcp-server/context7_bridge.ts b/mcp-server/context7_bridge.ts index 4e705f1..6b70d6a 100644 --- a/mcp-server/context7_bridge.ts +++ b/mcp-server/context7_bridge.ts @@ -36,7 +36,8 @@ interface CachedDoc { const cache = new Map(); -function normalizeTool(s: string): string { +// Exported for unit testing — pure functions that the bridge composes. +export function normalizeTool(s: string): string { return s.trim().toLowerCase(); } @@ -69,7 +70,7 @@ async function fetchDocsText(libraryId: string): Promise { return await r.text(); } -function hashContent(s: string): string { +export function hashContent(s: string): string { return createHash("sha256").update(s).digest("hex").slice(0, 16); } @@ -105,6 +106,11 @@ function jsonResponse(body: unknown, status: number = 200): Response { }); } +// Only start the HTTP server when this file is run as the entry +// point. When imported by tests (or future callers), the pure +// helpers are exposed without starting a second server on the same +// port (systemd service lakehouse-context7-bridge already owns it). +if (import.meta.main) { Bun.serve({ port: PORT, hostname: "0.0.0.0", @@ -176,3 +182,4 @@ Bun.serve({ }); console.log(`[context7-bridge] listening on :${PORT} (context7 base: ${CONTEXT7_BASE})`); +} // end of if (import.meta.main) guard