lakehouse/mcp-server/context7_bridge.ts
profit 7fe47babd9
Some checks failed
lakehouse/auditor cloud-flagged gap not in any claim: append_correction test writes a temp file directly instead of invoking the append_correction function, l
Phase 45 slice 4: batch scan + T3 drift-correction synthesis
Closes the PRD-listed remaining deliverables for Phase 45:
- POST /vectors/playbook_memory/doc_drift/scan
- T3 synthesis writing data/_kb/doc_drift_corrections.jsonl
- Backfill: unit tests for mcp-server/context7_bridge.ts (slice 2
  never had any)

crates/vectord/src/drift_synth.rs (NEW, 240 LOC)
  - DriftCorrection shape matching the PRD spec exactly
  - synthesize(): HTTPS POST to ollama.com/api/generate with
    gpt-oss:120b. Prompt explicitly instructs the model to admit
    "preview insufficient" rather than fabricate a diff.
  - append_correction(): JSONL append to data/_kb/ with mkdir -p
    on the parent; atomic at line level on Linux for typical sizes.
  - spawn_synthesize_and_append(): fire-and-forget wrapper. Never
    blocks the handler. No cloud key → skipped silently with a
    tracing::warn. Cloud failure → logged + dropped.
  - resolve_cloud_key(): same sources v1/ollama_cloud.rs uses
    (env OLLAMA_CLOUD_KEY → /root/llm_team_config.json → env
    OLLAMA_CLOUD_API_KEY).
  - 5 unit tests: JSON extraction (first object, code fences,
    unclosed), prompt composition, jsonl append shape.

crates/vectord/src/service.rs
  - /playbook_memory/doc_drift/scan — iterates active entries with
    doc_refs, optional (city, state, max_entries) filter. Per-entry:
    bridge check → flag if drifted → spawn synthesis per drifted
    tool. Honest response: scanned, eligible, drifted, newly_flagged,
    unknown, synthesis_spawned, details[].
  - /playbook_memory/doc_drift/check/{id}: slice 3 handler now also
    spawns synthesis per drifted tool. Response adds
    synthesis_spawned: bool.

mcp-server/context7_bridge.ts
  - Export normalizeTool + hashContent for testing.
  - Guard Bun.serve() behind `if (import.meta.main)` so imports
    don't double-bind :3900 (collides with systemd service).

mcp-server/context7_bridge.test.ts (NEW, 6 tests)
  - normalizeTool: lowercases + trims, preserves internal chars
  - hashContent: deterministic, sensitive to 1-char change,
    16 hex chars, differs for empty vs whitespace

Live verification (after gateway restart):

  seed playbook pb-seed-88abc7d1 with doc_refs[docker v23.0.0,
  stale hash]
  POST /doc_drift/scan {city:"Toledo", state:"OH", max_entries:5}
    → scanned=1 drifted=1 newly_flagged=1
       synthesis_spawned=1 unknown=0
  wait 30s
  cat data/_kb/doc_drift_corrections.jsonl
    → 1 record (603 bytes) with diff_summary + recommended_action
      from gpt-oss:120b. Model correctly noted "preview unavailable"
      rather than fabricating.

Tests: 6 bridge tests + 6 drift_synth tests + 51 pre-existing
vectord lib tests. All green. Release build clean.

NOT in this PR (deliberately — cohesion review pending):
  - Auditor's kb_query check consulting hybrid_search + context7
  - Auditor's inference check consuming KB neighbors + drift
    corrections as context
  - Observer → KB → auditor feedback loop beyond append
  - Integration test exercising the full smarter-DB loop
  - Python script (sidecar/*, scripts/*) inventory

Those are the cohesion work J flagged — handled on a separate
branch after this merges.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:15:32 -05:00

186 lines
6.9 KiB
TypeScript

// Phase 45 slice 2 — context7 HTTP bridge.
//
// Exposes an HTTP surface the Rust gateway consumes to check external
// doc drift on playbooks. Wraps context7's public API:
//
// https://context7.com/api/v1/search?query=<name> → resolve
// https://context7.com/api/v1/<lib-id>/cli?type=txt&tokens=N → docs
//
// Own port so a failure here never tips over the mcp-server on :3700.
// Cache is in-memory (5 min TTL) — context7 rate-limits by IP, and
// gateway drift-checks are the hot caller.
//
// Endpoints:
// GET /health health + cache stats
// GET /docs/:tool resolve + fetch + return descriptor
// GET /docs/:tool/diff?since=X return drift vs content hash X
// GET /cache dump current cache (debugging)
import { createHash } from "node:crypto";
const PORT = Number(process.env.CONTEXT7_BRIDGE_PORT ?? 3900);
const CONTEXT7_BASE = (process.env.CONTEXT7_BASE_URL ?? "https://context7.com/api/v1").replace(/\/+$/, "");
const TOKENS_PER_FETCH = 1500; // enough for a drift-meaningful slice, not so much we hammer
const CACHE_TTL_MS = 5 * 60 * 1000;
interface CachedDoc {
tool: string;
library_id: string; // context7 canonical, e.g. "/docker/docs"
title: string;
last_updated: string | null;
snippet_hash: string; // SHA-256 of first TOKENS_PER_FETCH of fetched docs
docs_preview: string; // first 400 chars, returned in responses for display
retrieved_at: string; // ISO when WE last fetched
source_url: string; // human-facing context7 URL
}
const cache = new Map<string, { entry: CachedDoc; at: number }>();
// Exported for unit testing — pure functions that the bridge composes.
export function normalizeTool(s: string): string {
return s.trim().toLowerCase();
}
async function resolveLibraryId(tool: string): Promise<{ id: string; title: string; last_updated: string | null } | null> {
const r = await fetch(`${CONTEXT7_BASE}/search?query=${encodeURIComponent(tool)}`, {
signal: AbortSignal.timeout(10000),
});
if (!r.ok) throw new Error(`context7 search ${r.status}: ${await r.text().catch(() => "?")}`);
const j = await r.json() as any;
const results = Array.isArray(j.results) ? j.results : [];
// First result is top-ranked; context7 sorts by its internal score.
// Skip "state != finalized" to avoid unstable previews.
const pick = results.find((x: any) => x?.state === "finalized") ?? results[0];
if (!pick?.id) return null;
return {
id: String(pick.id),
title: String(pick.title ?? tool),
last_updated: typeof pick.lastUpdateDate === "string" ? pick.lastUpdateDate : null,
};
}
async function fetchDocsText(libraryId: string): Promise<string> {
// Strip the leading slash on library_id so the URL doesn't have a
// double slash. context7 returns ids like "/docker/docs" but the
// fetch path wants "docker/docs".
const cleanId = libraryId.replace(/^\/+/, "");
const url = `${CONTEXT7_BASE}/${cleanId}?type=txt&tokens=${TOKENS_PER_FETCH}`;
const r = await fetch(url, { signal: AbortSignal.timeout(20000) });
if (!r.ok) throw new Error(`context7 fetch ${r.status} on ${cleanId}: ${await r.text().catch(() => "?")}`);
return await r.text();
}
export function hashContent(s: string): string {
return createHash("sha256").update(s).digest("hex").slice(0, 16);
}
async function getCurrent(tool: string): Promise<CachedDoc> {
const key = normalizeTool(tool);
const cached = cache.get(key);
if (cached && Date.now() - cached.at < CACHE_TTL_MS) {
return cached.entry;
}
const resolved = await resolveLibraryId(key);
if (!resolved) {
throw new Error(`no context7 library found for '${tool}'`);
}
const text = await fetchDocsText(resolved.id);
const entry: CachedDoc = {
tool: key,
library_id: resolved.id,
title: resolved.title,
last_updated: resolved.last_updated,
snippet_hash: hashContent(text),
docs_preview: text.slice(0, 400),
retrieved_at: new Date().toISOString(),
source_url: `https://context7.com${resolved.id.startsWith("/") ? resolved.id : "/" + resolved.id}`,
};
cache.set(key, { entry, at: Date.now() });
return entry;
}
function jsonResponse(body: unknown, status: number = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: { "content-type": "application/json" },
});
}
// Only start the HTTP server when this file is run as the entry
// point. When imported by tests (or future callers), the pure
// helpers are exposed without starting a second server on the same
// port (systemd service lakehouse-context7-bridge already owns it).
if (import.meta.main) {
Bun.serve({
port: PORT,
hostname: "0.0.0.0",
async fetch(req) {
const url = new URL(req.url);
if (url.pathname === "/health") {
return jsonResponse({
status: "ok",
cache_size: cache.size,
context7_base: CONTEXT7_BASE,
ttl_ms: CACHE_TTL_MS,
});
}
if (url.pathname === "/cache") {
const dump = Array.from(cache.entries()).map(([k, v]) => ({
tool: k,
library_id: v.entry.library_id,
snippet_hash: v.entry.snippet_hash,
retrieved_at: v.entry.retrieved_at,
age_ms: Date.now() - v.at,
}));
return jsonResponse({ entries: dump });
}
// GET /docs/:tool — return current descriptor (fetches + caches)
const m1 = url.pathname.match(/^\/docs\/([^/]+)$/);
if (m1 && req.method === "GET") {
const tool = decodeURIComponent(m1[1]);
try {
const entry = await getCurrent(tool);
return jsonResponse(entry);
} catch (e) {
return jsonResponse({ error: (e as Error).message }, 404);
}
}
// GET /docs/:tool/diff?since=hash — compare current vs recorded hash
const m2 = url.pathname.match(/^\/docs\/([^/]+)\/diff$/);
if (m2 && req.method === "GET") {
const tool = decodeURIComponent(m2[1]);
const since = url.searchParams.get("since");
if (!since) {
return jsonResponse({ error: "query param 'since' (snippet_hash) required" }, 400);
}
try {
const entry = await getCurrent(tool);
const drifted = entry.snippet_hash !== since;
return jsonResponse({
tool: entry.tool,
drifted,
previous_snippet_hash: since,
current_snippet_hash: entry.snippet_hash,
library_id: entry.library_id,
title: entry.title,
last_updated_upstream: entry.last_updated,
retrieved_at: entry.retrieved_at,
source_url: entry.source_url,
docs_preview: drifted ? entry.docs_preview : null,
});
} catch (e) {
return jsonResponse({ error: (e as Error).message }, 404);
}
}
return jsonResponse({ error: "not found", paths: ["/health", "/docs/:tool", "/docs/:tool/diff?since=HASH", "/cache"] }, 404);
},
});
console.log(`[context7-bridge] listening on :${PORT} (context7 base: ${CONTEXT7_BASE})`);
} // end of if (import.meta.main) guard