profit ac01fffd9a checkpoint: matrix-agent-validated (2026-04-25)
Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.

WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.

WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
  briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
    * UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
    * REVISE: chains versions, parent.superseded_at + superseded_by stamped
    * RETIRE: marks specific trace retired with reason, excluded from retrieval
    * HISTORY: walks chain root→tip, cycle-safe

KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces

Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:43:27 -05:00

179 lines
6.5 KiB
TypeScript

// Phase 45 slice 2 — context7 HTTP bridge.
//
// Exposes an HTTP surface the Rust gateway consumes to check external
// doc drift on playbooks. Wraps context7's public API:
//
// https://context7.com/api/v1/search?query=<name> → resolve
// https://context7.com/api/v1/<lib-id>/cli?type=txt&tokens=N → docs
//
// Own port so a failure here never tips over the mcp-server on :3700.
// Cache is in-memory (5 min TTL) — context7 rate-limits by IP, and
// gateway drift-checks are the hot caller.
//
// Endpoints:
// GET /health health + cache stats
// GET /docs/:tool resolve + fetch + return descriptor
// GET /docs/:tool/diff?since=X return drift vs content hash X
// GET /cache dump current cache (debugging)
import { createHash } from "node:crypto";
const PORT = Number(process.env.CONTEXT7_BRIDGE_PORT ?? 3900);
const CONTEXT7_BASE = (process.env.CONTEXT7_BASE_URL ?? "https://context7.com/api/v1").replace(/\/+$/, "");
const TOKENS_PER_FETCH = 1500; // enough for a drift-meaningful slice, not so much we hammer
const CACHE_TTL_MS = 5 * 60 * 1000;
interface CachedDoc {
tool: string;
library_id: string; // context7 canonical, e.g. "/docker/docs"
title: string;
last_updated: string | null;
snippet_hash: string; // SHA-256 of first TOKENS_PER_FETCH of fetched docs
docs_preview: string; // first 400 chars, returned in responses for display
retrieved_at: string; // ISO when WE last fetched
source_url: string; // human-facing context7 URL
}
const cache = new Map<string, { entry: CachedDoc; at: number }>();
function normalizeTool(s: string): string {
return s.trim().toLowerCase();
}
async function resolveLibraryId(tool: string): Promise<{ id: string; title: string; last_updated: string | null } | null> {
const r = await fetch(`${CONTEXT7_BASE}/search?query=${encodeURIComponent(tool)}`, {
signal: AbortSignal.timeout(10000),
});
if (!r.ok) throw new Error(`context7 search ${r.status}: ${await r.text().catch(() => "?")}`);
const j = await r.json() as any;
const results = Array.isArray(j.results) ? j.results : [];
// First result is top-ranked; context7 sorts by its internal score.
// Skip "state != finalized" to avoid unstable previews.
const pick = results.find((x: any) => x?.state === "finalized") ?? results[0];
if (!pick?.id) return null;
return {
id: String(pick.id),
title: String(pick.title ?? tool),
last_updated: typeof pick.lastUpdateDate === "string" ? pick.lastUpdateDate : null,
};
}
async function fetchDocsText(libraryId: string): Promise<string> {
// Strip the leading slash on library_id so the URL doesn't have a
// double slash. context7 returns ids like "/docker/docs" but the
// fetch path wants "docker/docs".
const cleanId = libraryId.replace(/^\/+/, "");
const url = `${CONTEXT7_BASE}/${cleanId}?type=txt&tokens=${TOKENS_PER_FETCH}`;
const r = await fetch(url, { signal: AbortSignal.timeout(20000) });
if (!r.ok) throw new Error(`context7 fetch ${r.status} on ${cleanId}: ${await r.text().catch(() => "?")}`);
return await r.text();
}
function hashContent(s: string): string {
return createHash("sha256").update(s).digest("hex").slice(0, 16);
}
async function getCurrent(tool: string): Promise<CachedDoc> {
const key = normalizeTool(tool);
const cached = cache.get(key);
if (cached && Date.now() - cached.at < CACHE_TTL_MS) {
return cached.entry;
}
const resolved = await resolveLibraryId(key);
if (!resolved) {
throw new Error(`no context7 library found for '${tool}'`);
}
const text = await fetchDocsText(resolved.id);
const entry: CachedDoc = {
tool: key,
library_id: resolved.id,
title: resolved.title,
last_updated: resolved.last_updated,
snippet_hash: hashContent(text),
docs_preview: text.slice(0, 400),
retrieved_at: new Date().toISOString(),
source_url: `https://context7.com${resolved.id.startsWith("/") ? resolved.id : "/" + resolved.id}`,
};
cache.set(key, { entry, at: Date.now() });
return entry;
}
function jsonResponse(body: unknown, status: number = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: { "content-type": "application/json" },
});
}
Bun.serve({
port: PORT,
hostname: "0.0.0.0",
async fetch(req) {
const url = new URL(req.url);
if (url.pathname === "/health") {
return jsonResponse({
status: "ok",
cache_size: cache.size,
context7_base: CONTEXT7_BASE,
ttl_ms: CACHE_TTL_MS,
});
}
if (url.pathname === "/cache") {
const dump = Array.from(cache.entries()).map(([k, v]) => ({
tool: k,
library_id: v.entry.library_id,
snippet_hash: v.entry.snippet_hash,
retrieved_at: v.entry.retrieved_at,
age_ms: Date.now() - v.at,
}));
return jsonResponse({ entries: dump });
}
// GET /docs/:tool — return current descriptor (fetches + caches)
const m1 = url.pathname.match(/^\/docs\/([^/]+)$/);
if (m1 && req.method === "GET") {
const tool = decodeURIComponent(m1[1]);
try {
const entry = await getCurrent(tool);
return jsonResponse(entry);
} catch (e) {
return jsonResponse({ error: (e as Error).message }, 404);
}
}
// GET /docs/:tool/diff?since=hash — compare current vs recorded hash
const m2 = url.pathname.match(/^\/docs\/([^/]+)\/diff$/);
if (m2 && req.method === "GET") {
const tool = decodeURIComponent(m2[1]);
const since = url.searchParams.get("since");
if (!since) {
return jsonResponse({ error: "query param 'since' (snippet_hash) required" }, 400);
}
try {
const entry = await getCurrent(tool);
const drifted = entry.snippet_hash !== since;
return jsonResponse({
tool: entry.tool,
drifted,
previous_snippet_hash: since,
current_snippet_hash: entry.snippet_hash,
library_id: entry.library_id,
title: entry.title,
last_updated_upstream: entry.last_updated,
retrieved_at: entry.retrieved_at,
source_url: entry.source_url,
docs_preview: drifted ? entry.docs_preview : null,
});
} catch (e) {
return jsonResponse({ error: (e as Error).message }, 404);
}
}
return jsonResponse({ error: "not found", paths: ["/health", "/docs/:tool", "/docs/:tool/diff?since=HASH", "/cache"] }, 404);
},
});
console.log(`[context7-bridge] listening on :${PORT} (context7 base: ${CONTEXT7_BASE})`);