profit ac01fffd9a checkpoint: matrix-agent-validated (2026-04-25)
Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.

WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.

WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
  briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
    * UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
    * REVISE: chains versions, parent.superseded_at + superseded_by stamped
    * RETIRE: marks specific trace retired with reason, excluded from retrieval
    * HISTORY: walks chain root→tip, cycle-safe

KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces

Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 19:43:27 -05:00

456 lines
20 KiB
TypeScript

// Visual Control Plane server — v1
// Single Bun.serve process on :3950. Serves static index.html and
// /data/* endpoints that fan out to the live services + tail jsonl KB
// files. No build step, no node_modules. Restart via systemd or
// `bun run ui/server.ts`.
const PORT = Number(process.env.LH_UI_PORT ?? 3950);
const KB = "/home/profit/lakehouse/data/_kb";
const REPO = "/home/profit/lakehouse";
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const OBSERVER = "http://localhost:3800";
const MCP = "http://localhost:3700";
const CONTEXT7 = "http://localhost:3900";
// Tail helper — read last N lines of a jsonl file without loading
// the whole thing. For files up to a few MB this is fine to read fully.
async function tailJsonl(path: string, n = 50): Promise<any[]> {
try {
const text = await Bun.file(path).text();
const lines = text.trim().split("\n").filter(Boolean);
const tail = lines.slice(-n);
return tail.map(l => {
try { return JSON.parse(l); } catch { return { _raw: l, _error: "parse" }; }
});
} catch (e) {
return [];
}
}
async function tryFetch(url: string, timeout = 1500): Promise<any | null> {
try {
const r = await fetch(url, { signal: AbortSignal.timeout(timeout) });
if (!r.ok) return null;
// Fix 2026-04-24: some upstream services (observer Bun.serve) return
// JSON without an application/json content-type. Don't rely on header
// — try parsing the body as JSON; fall back to raw text on failure.
const body = await r.text();
try { return JSON.parse(body); } catch { return body; }
} catch {
return null;
}
}
// Compact the massive /vectors/indexes response into just the shape the
// UI needs: [{name, source, model, dims, chunks, bucket, backend}]
async function indexesSummary(): Promise<any> {
const j = await tryFetch(`${GATEWAY}/vectors/indexes`);
if (!Array.isArray(j)) return { count: 0, items: [] };
const items = j.slice(0, 12).map((i: any) => ({
name: i.index_name,
source: i.source,
dims: i.dimensions,
chunks: i.chunk_count,
backend: i.vector_backend,
bucket: i.bucket,
}));
return { count: j.length, items };
}
async function servicesSnapshot() {
const [gw, sc, obs, mcp, c7, jstats, ustats] = await Promise.all([
tryFetch(`${GATEWAY}/health`),
tryFetch(`${SIDECAR}/health`),
tryFetch(`${OBSERVER}/health`),
tryFetch(`${MCP}/health`),
tryFetch(`${CONTEXT7}/health`),
tryFetch(`${GATEWAY}/journal/stats`),
tryFetch(`${GATEWAY}/v1/usage`),
]);
return {
ts: new Date().toISOString(),
nodes: [
{ id: "gateway", label: "Gateway :3100", status: gw ? "healthy" : "down", health: gw },
{ id: "sidecar", label: "Sidecar :3200", status: sc ? "healthy" : "down", health: sc },
{ id: "observer", label: "Observer :3800", status: obs ? "healthy" : "down", health: obs,
stats: await tryFetch(`${OBSERVER}/stats`) },
{ id: "mcp", label: "MCP :3700", status: mcp ? "healthy" : "down", health: mcp },
{ id: "context7", label: "Context7 :3900", status: c7 ? "healthy" : "down", health: c7 },
],
// Virtual nodes — backed by gateway subsystems rather than own ports
subsystems: [
{ id: "journal", label: "Journal", stats: jstats },
{ id: "usage", label: "Usage /v1", stats: ustats },
{ id: "vectord", label: "Vectord", stats: await indexesSummary() },
{ id: "playbook", label: "Playbook", stats: await tryFetch(`${GATEWAY}/vectors/playbook_memory/status`) },
{ id: "agent", label: "Autotune", stats: await tryFetch(`${GATEWAY}/vectors/agent/status`) },
],
};
}
// Extract phrase-level markers that indicate "this should be removed,
// simplified, or refactored" across scrum suggestions. These are the
// signals that accumulate into a refactor recommendation.
const REFACTOR_PHRASES = [
"should be removed", "remove this", "dead code", "unused", "unnecessary",
"duplicate of", "duplicates", "redundant",
"consolidate", "merge with", "extract into",
"refactor", "rewrite", "replace with",
"orphaned", "stale", "deprecated",
"pseudocode", "placeholder", "stub",
"split this file", "too large",
];
// Signal-class classifier — per file, given 2+ consecutive iterations'
// reviews, tag the file's behavior:
// CONVERGING — resolved > novel, score ↑
// LOOPING — 3+ same findings repeat, novel = 0, score flat
// ORBITING — novel findings each iter, no resolved (healthy depth)
// PLATEAU — score flat + findings flat (diminishing returns)
// MIXED — partial/unclear
// This is the foundation for iter-6+ auto-routing: each class gets a
// different sub-pipeline (specialist model, reviewer rotation, etc).
const SIGNAL_PHRASES = [
"pseudocode", "placeholder", "stub", "unwired", "missing", "dead code", "orphaned",
"duplicate", "redundant", "refactor", "rewrite", "remove", "unused", "unnecessary",
];
async function signalClasses(): Promise<any> {
const runsDir = `${REPO}/tests/real-world/runs`;
// Load every review, group by file, sort by timestamp
const perFile: Record<string, Array<{run: string, phrases: Set<string>, score: number | null, conf_avg: number | null, findings: number, ts: number}>> = {};
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const files = await Array.fromAsync(new Bun.Glob("review_*.json").scan({ cwd: `${runsDir}/${d}` }));
for (const f of files) {
try {
const p = `${runsDir}/${d}/${f}`;
const j = JSON.parse(await Bun.file(p).text());
const key = j.file?.replace("/home/profit/lakehouse/", "") ?? "?";
const sug = (j.suggestions ?? "").toLowerCase();
const phrases = new Set<string>();
for (const ph of SIGNAL_PHRASES) if (sug.includes(ph)) phrases.add(ph);
const scoreMatch = sug.match(/(\d(?:\.\d)?)\s*\/\s*10\b/);
const score = scoreMatch ? parseFloat(scoreMatch[1]) : null;
const mconf = [...sug.matchAll(/(?:confidence[*:\s]*\s*|\|\s*)(\d{1,3})\s*%/gi)].map(m=>parseInt(m[1],10));
const jconf = [...sug.matchAll(/"confidence"\s*:\s*(\d{1,3})(?!\d)/gi)].map(m=>parseInt(m[1],10));
const all = [...mconf, ...jconf].filter(x => 0 <= x && x <= 100);
const conf_avg = all.length ? Math.round(all.reduce((a,b)=>a+b,0)/all.length) : null;
const ts = (await Bun.file(p).stat()).mtime.getTime();
(perFile[key] ??= []).push({ run: d, phrases, score, conf_avg, findings: all.length, ts });
} catch {}
}
}
} catch (e) {
return { error: String(e), classes: {} };
}
const classes: Record<string, any> = {};
for (const [file, runs] of Object.entries(perFile)) {
runs.sort((a, b) => a.ts - b.ts);
if (runs.length < 2) { classes[file] = { cls: "NEW", runs: runs.length }; continue; }
const last = runs[runs.length - 1];
const prev = runs[runs.length - 2];
const novel = [...last.phrases].filter(p => !prev.phrases.has(p));
const resolved = [...prev.phrases].filter(p => !last.phrases.has(p));
const looping = [...prev.phrases].filter(p => last.phrases.has(p));
const dScore = (last.score != null && prev.score != null) ? last.score - prev.score : null;
const dConf = (last.conf_avg != null && prev.conf_avg != null) ? last.conf_avg - prev.conf_avg : null;
const dFindings = last.findings - prev.findings;
let cls: string;
if (dScore != null && dScore > 0 && resolved.length > novel.length) cls = "CONVERGING";
else if (looping.length >= 3 && novel.length === 0 && (dScore == null || Math.abs(dScore) < 0.5)) cls = "LOOPING";
else if (novel.length >= 2 && resolved.length === 0) cls = "ORBITING";
else if (Math.abs(dFindings) <= 1 && (dScore == null || Math.abs(dScore) < 0.5)) cls = "PLATEAU";
else cls = "MIXED";
classes[file] = {
cls,
runs: runs.length,
iter_span: `${runs[0].run}${last.run}`,
prev_score: prev.score,
last_score: last.score,
delta_score: dScore,
delta_conf: dConf,
delta_findings: dFindings,
novel,
resolved,
looping,
};
}
// Summary counts
const counts: Record<string, number> = {};
for (const v of Object.values(classes)) counts[v.cls] = (counts[v.cls] ?? 0) + 1;
return { generated_at: new Date().toISOString(), counts, classes };
}
async function refactorSignals(): Promise<any> {
// Walk every accepted review across all scrum runs. For each file,
// count how many times its suggestions mention a refactor phrase.
// Return a sorted list — files most often flagged for refactor first.
const runsDir = `${REPO}/tests/real-world/runs`;
const perFile: Record<string, { file: string; hits: number; phrases: Record<string, number>; examples: string[]; iterations: number }> = {};
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const files = await Array.fromAsync(new Bun.Glob("review_*.json").scan({ cwd: `${runsDir}/${d}` }));
for (const f of files) {
const p = `${runsDir}/${d}/${f}`;
try {
const j = JSON.parse(await Bun.file(p).text());
const file = j.file?.replace("/home/profit/lakehouse/", "") ?? "?";
const sug = (j.suggestions ?? "").toLowerCase();
if (!perFile[file]) perFile[file] = { file, hits: 0, phrases: {}, examples: [], iterations: 0 };
perFile[file].iterations++;
for (const phrase of REFACTOR_PHRASES) {
const count = (sug.match(new RegExp(phrase, "gi")) ?? []).length;
if (count > 0) {
perFile[file].hits += count;
perFile[file].phrases[phrase] = (perFile[file].phrases[phrase] ?? 0) + count;
// Pull one example sentence around the phrase
if (perFile[file].examples.length < 3) {
const idx = sug.indexOf(phrase);
if (idx >= 0) {
const s = Math.max(0, idx - 60);
const e = Math.min(sug.length, idx + phrase.length + 80);
perFile[file].examples.push("…" + sug.slice(s, e).replace(/\s+/g, " ") + "…");
}
}
}
}
} catch {}
}
}
} catch (e) {
return { error: String(e), signals: [] };
}
const signals = Object.values(perFile)
.filter(x => x.hits > 0)
.sort((a, b) => b.hits - a.hits)
.slice(0, 30);
return { signals, scanned: Object.keys(perFile).length };
}
async function reverseIndex(query: string, limit = 20): Promise<any> {
// Grep-like substring search across every review's suggestions.
// Returns file + snippet + which iter it was in + score + verdict.
const runsDir = `${REPO}/tests/real-world/runs`;
if (!query || query.length < 2) return { query, hits: [] };
const q = query.toLowerCase();
const hits: any[] = [];
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const files = await Array.fromAsync(new Bun.Glob("review_*.json").scan({ cwd: `${runsDir}/${d}` }));
for (const f of files) {
const p = `${runsDir}/${d}/${f}`;
try {
const j = JSON.parse(await Bun.file(p).text());
const sug = j.suggestions ?? "";
const lower = sug.toLowerCase();
const idx = lower.indexOf(q);
if (idx < 0) continue;
const s = Math.max(0, idx - 80);
const e = Math.min(sug.length, idx + q.length + 200);
hits.push({
file: j.file?.replace("/home/profit/lakehouse/", ""),
run_id: d,
model: j.escalated_to_model,
snippet: sug.slice(s, e).replace(/\s+/g, " "),
});
if (hits.length >= limit) break;
} catch {}
}
if (hits.length >= limit) break;
}
} catch (e) {
return { query, error: String(e), hits: [] };
}
return { query, hits };
}
async function fileHistory(relpath: string): Promise<any> {
// Walk all scrum_<id>/review_*.json files and gather every review
// for this file path. Returns timeline rows keyed by run_id.
const runsDir = `${REPO}/tests/real-world/runs`;
const out: any[] = [];
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const safe = relpath.replaceAll("/", "_");
const p = `${runsDir}/${d}/review_${safe}.json`;
if (await Bun.file(p).exists()) {
const j = JSON.parse(await Bun.file(p).text());
const sug = j.suggestions ?? "";
const scoreMatch = sug.match(/(?:score[\s*:]*)?(\d(?:\.\d)?)\s*\/\s*10\b/i);
const score = scoreMatch ? parseFloat(scoreMatch[1]) : null;
const confs = [...sug.matchAll(/(?:Confidence[*:\s]*\s*|\|\s*)(\d{1,3})\s*%/gi)]
.map(m => parseInt(m[1], 10)).filter(x => x >= 0 && x <= 100);
const jsonConfs = [...sug.matchAll(/"confidence"\s*:\s*(\d{1,3})(?!\d)/gi)]
.map(m => parseInt(m[1], 10)).filter(x => x >= 0 && x <= 100);
const all = [...confs, ...jsonConfs];
const mt = await Bun.file(p).stat();
out.push({
run_id: d,
reviewed_at: j.reviewed_at ?? mt.mtime,
model: j.escalated_to_model,
score,
chars: sug.length,
conf_avg: all.length ? Math.round(all.reduce((a,b)=>a+b,0)/all.length) : null,
conf_min: all.length ? Math.min(...all) : null,
findings: all.length,
output_format: sug.includes('"verdict"') ? "forensic_json" : "markdown",
// first 1200 chars preview
preview: sug.slice(0, 1200),
});
}
}
} catch (e) {
return { error: String(e), history: [] };
}
out.sort((a, b) => String(a.reviewed_at).localeCompare(String(b.reviewed_at)));
return { file: relpath, history: out };
}
Bun.serve({
port: PORT,
hostname: "0.0.0.0",
async fetch(req) {
const url = new URL(req.url);
const path = url.pathname;
// Static shell
if (path === "/" || path === "/index.html") {
return new Response(Bun.file(`${REPO}/ui/index.html`));
}
if (path === "/ui.css") {
return new Response(Bun.file(`${REPO}/ui/ui.css`), { headers: { "content-type": "text/css" } });
}
if (path === "/ui.js") {
return new Response(Bun.file(`${REPO}/ui/ui.js`), { headers: { "content-type": "application/javascript" } });
}
// Data API
if (path === "/data/services") return Response.json(await servicesSnapshot());
if (path === "/data/reviews") {
const n = Number(url.searchParams.get("tail") ?? 50);
return Response.json(await tailJsonl(`${KB}/scrum_reviews.jsonl`, n));
}
if (path === "/data/findings") return Response.json(await tailJsonl(`${KB}/phase_sweep_findings.jsonl`));
if (path === "/data/metrics") return Response.json(await tailJsonl(`${KB}/scrum_loop_metrics.jsonl`));
if (path === "/data/trust") return Response.json(await tailJsonl(`${KB}/model_trust.jsonl`, 200));
if (path === "/data/overrides") return Response.json(await tailJsonl(`${KB}/human_overrides.jsonl`));
if (path === "/data/outcomes") return Response.json(await tailJsonl(`${KB}/outcomes.jsonl`, 30));
if (path === "/data/audit_facts") return Response.json(await tailJsonl(`${KB}/audit_facts.jsonl`, 30));
// Pathway memory — consensus-designed sidecar (2026-04-24). Two
// exposed metrics: reuse_rate (activity — is it firing?) and
// avg_rungs_saved_per_commit (value — is it earning its keep?).
// Round-3 consensus (qwen3.5:397b) pointed out that activity
// without value tells us nothing; the UI needs both to judge the
// health of the hot-swap learning loop.
if (path === "/data/pathway_stats") {
try {
const r = await fetch("http://localhost:3100/vectors/pathway/stats", { signal: AbortSignal.timeout(3000) });
if (!r.ok) return Response.json({ error: `vectord ${r.status}`, stats: null });
const stats = await r.json();
// Tail recent scrum events to compute avg_rungs_saved_per_commit
// (a committed review = any row in scrum_reviews.jsonl; rungs_saved
// only populates when pathway memory fired AND the recommended
// model actually produced the accept).
const reviews = await tailJsonl(`${KB}/scrum_reviews.jsonl`, 200);
let totalCommits = 0;
let totalRungsSaved = 0;
let hotSwapHits = 0;
for (const r of reviews) {
totalCommits++;
if (r.pathway_hot_swap_hit) hotSwapHits++;
if (typeof r.rungs_saved === "number") totalRungsSaved += r.rungs_saved;
}
return Response.json({
stats,
scrum_window: {
reviews: totalCommits,
hot_swap_hits: hotSwapHits,
pathway_reuse_rate: totalCommits ? hotSwapHits / totalCommits : 0,
avg_rungs_saved_per_commit: totalCommits ? totalRungsSaved / totalCommits : 0,
},
});
} catch (e) {
return Response.json({ error: (e as Error).message, stats: null });
}
}
if (path.startsWith("/data/file/")) {
const relpath = decodeURIComponent(path.slice("/data/file/".length));
return Response.json(await fileHistory(relpath));
}
if (path === "/data/refactor_signals") {
return Response.json(await refactorSignals());
}
if (path === "/data/signal_classes") {
return Response.json(await signalClasses());
}
if (path === "/data/search") {
const q = url.searchParams.get("q") ?? "";
return Response.json(await reverseIndex(q, 30));
}
// Per-service systemd log tail. Allowed service list is fixed so the
// :service path param can never be used to invoke arbitrary units.
if (path.startsWith("/data/logs/")) {
const svc = path.slice("/data/logs/".length).split("?")[0];
const UNITS: Record<string, string> = {
gateway: "lakehouse.service",
sidecar: "lakehouse-sidecar.service",
observer: "lakehouse-observer.service",
mcp: "lakehouse-agent.service",
context7: "lakehouse-context7-bridge.service",
auditor: "lakehouse-auditor.service",
langfuse: "lakehouse-langfuse-bridge.service",
};
const unit = UNITS[svc];
if (!unit) return Response.json({ error: "unknown service", allowed: Object.keys(UNITS) }, { status: 400 });
const n = Number(url.searchParams.get("n") ?? 60);
try {
// Use execFile-style API: pass args as array, never shell-interpolate
const proc = Bun.spawn(["journalctl", "-u", unit, "-n", String(n), "--no-pager", "--output=short-iso"], {
stdout: "pipe",
stderr: "pipe",
});
const text = await new Response(proc.stdout).text();
await proc.exited;
const lines = text.split("\n").filter(Boolean);
return Response.json({ service: svc, unit, lines });
} catch (e) {
return Response.json({ service: svc, unit, error: String(e), lines: [] });
}
}
// Live scrum log tail — best-effort
if (path === "/data/scrum_log") {
try {
const bg = await Array.fromAsync(new Bun.Glob("scrum_iter*.log").scan({ cwd: "/tmp" }));
if (bg.length === 0) return Response.json({ lines: [] });
bg.sort();
const latest = `/tmp/${bg[bg.length - 1]}`;
const text = await Bun.file(latest).text();
const lines = text.split("\n").slice(-80);
return Response.json({ file: latest, lines });
} catch (e) {
return Response.json({ error: String(e) });
}
}
return new Response("not found", { status: 404 });
},
});
console.log(`[ui] visual control plane listening on http://0.0.0.0:${PORT}`);