lakehouse/ui/server.ts
root 8b77d67c9c
Some checks failed
lakehouse/auditor 1 blocking issue: cloud: claim not backed — "journal event verified live (total_events_created 0→1 after probe)."
OpenRouter rescue ladder + tree-split reduce fix + observer→LLM Team + scrum_applier + first auto-applied patch
## Infrastructure (scrum loop hardening)

crates/gateway/src/v1/openrouter.rs — new OpenRouter provider
  Direct HTTPS to openrouter.ai/api/v1/chat/completions with OpenAI-compatible shape.
  Key resolution: OPENROUTER_API_KEY env → /home/profit/.env → /root/llm_team_config.json
  (shares LLM Team UI's quota). Added after iter 5 hit repeated Ollama Cloud 502s on
  kimi-k2:1t — different provider backbone as rescue rung. Unit tests pin the URL
  stripping and OpenAI wire shape.

crates/gateway/src/v1/mod.rs + main.rs
  Added `"openrouter" | "openrouter_free"` arm to /v1/chat dispatch.
  V1State.openrouter_key loaded at startup via openrouter::resolve_openrouter_key()
  mirroring the Ollama Cloud pattern. Startup log:
    "v1: OpenRouter key loaded — /v1/chat provider=openrouter enabled"

tests/real-world/scrum_master_pipeline.ts
  * 9-rung ladder — kimi-k2:1t → qwen3-coder:480b → deepseek-v3.1:671b →
    mistral-large-3:675b → gpt-oss:120b → qwen3.5:397b → openrouter/gpt-oss-120b:free
    → openrouter/gemma-3-27b-it:free → local qwen3.5:latest.
    Added qwen3-coder:480b as rung 2 after live probes confirmed it rescues
    kimi-k2:1t 502s cleanly (0.9s latency, substantive reviews).
    Dropped devstral-2 (displaced by qwen3-coder); dropped kimi-k2.6 (not available);
    dropped minimax-m2.7 (returned 0 chars / 400 thinking tokens).
    Local fallback promoted qwen3.5:latest per J's direction 2026-04-24.
  * MAX_ATTEMPTS bumped 6 → 9 to accommodate the rescue tier.
  * Tree-split scratchpad fixed — was concatenating shard markers directly
    into the reviewer input, causing kimi-k2:1t to write titles like
    "Forensic Audit Report – file.rs (shard 3)". Now uses internal §N§
    markers during accumulation and runs a proper reduce step that
    collapses per-shard digests into ONE coherent file-level synthesis
    with markers stripped. Matches the Phase 21 aibridge::tree_split
    map→reduce design. Fallback to stripped scratchpad if reducer returns thin.

tests/real-world/scrum_applier.ts — NEW (737 lines)
  The auto-apply pipeline. Reads scrum_reviews.jsonl, filters rows where
  gradient_tier ∈ {auto, dry_run} AND confidence_avg ≥ MIN_CONF (default 90),
  asks the reviewer model for concrete old_string/new_string patch JSON,
  applies via text replacement, runs cargo check after each file, commits
  if green and reverts if red. Deny-list: /etc/, config/, ops/, auditor/,
  docs/, data/, mcp-server/, ui/, sidecar/, scripts/. Hard caps: per-patch
  confidence ≥ MIN_CONF, old_string must be exactly unique, max 20 lines per
  patch. Never runs on main without explicit LH_APPLIER_BRANCH override.
  Audit trail in data/_kb/auto_apply.jsonl.

  Empirical behavior (dry-run over iter 4 reviews):
    5 eligible files → 1 green commit-ready, 2 build-red reverts, 2 all-rejected
  The build-green gate caught 2 bad patches before they'd have merged.

mcp-server/observer.ts — LLM Team code_review escalation
  When a sig_hash accumulates ≥3 failures (ESCALATION_THRESHOLD), fire-and-forget
  POST /api/run?mode=code_review at localhost:5000 with the failure cluster context.
  Parses facts/entities/relationships/file_hints from the response. Writes to a
  new data/_kb/observer_escalations.jsonl surface. Answers J's vision of the
  observer triggering richer LLM Team calls when failures pile up.
  Non-blocking: runs parallel to existing qwen2.5 analyzer, never replaces it.
  Tracks escalated sig_hashes in a session-local Set to avoid re-hammering
  LLM Team when a cluster persists across observer cycles.

crates/aibridge/src/context.rs
  First auto-applied patch produced by scrum_applier.ts (dry-run path —
  applier writes files in dry-run mode but doesn't commit; bug noted for
  iter 6 fix). Adds #[deprecated] annotation to the inline estimate_tokens
  helper pointing callers to the centralized shared::model_matrix::ModelMatrix
  entry point (P21-002 — duplicate token-estimator surfaces). Cargo check
  passes with the annotation (verified by applier's own build gate).

## Visual Control Plane (UI)

ui/server.ts — Bun.serve on :3950 with /data/* fan-out:
  /data/services, /data/reviews, /data/metrics, /data/trust, /data/overrides,
  /data/findings, /data/outcomes, /data/audit_facts, /data/file/:path,
  /data/refactor_signals, /data/search?q=, /data/signal_classes,
  /data/logs/:svc (journalctl tail per systemd unit), /data/scrum_log.
  Bug fix: tryFetch always attempts JSON.parse before falling back to text
  — observer's Bun.serve returns JSON without application/json content-type,
  which was displaying stats as a raw string ("0 ops" on map) before.

ui/index.html + ui.css — dark neo-brutalist shell. 6 views:
  MAP (D3 force-graph + overlays) / TRACE (per-file iter history) /
  TRAJECTORY (signal-class cards + refactor-signals table + reverse-index
  search box) / METRICS (every card has SOURCE + GOOD lines explaining
  where the number comes from and what target trajectory means) /
  KB (card grid with tooltips on every field) / CONSOLE (per-service
  journalctl tabs).

ui/ui.js — polling client, D3 wiring, signal-class panel, refactor-signals
  table, reverse-index search, per-service console tabs. Bug fix:
  renderNodeContext had Object.entries() iterating string characters when
  /health returned a plain string — now guards with typeof check so
  "lakehouse ok" renders as one row instead of "0 l / 1 a / 2 k / ...".

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 03:45:35 -05:00

418 lines
18 KiB
TypeScript

// Visual Control Plane server — v1
// Single Bun.serve process on :3950. Serves static index.html and
// /data/* endpoints that fan out to the live services + tail jsonl KB
// files. No build step, no node_modules. Restart via systemd or
// `bun run ui/server.ts`.
const PORT = Number(process.env.LH_UI_PORT ?? 3950);
const KB = "/home/profit/lakehouse/data/_kb";
const REPO = "/home/profit/lakehouse";
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const OBSERVER = "http://localhost:3800";
const MCP = "http://localhost:3700";
const CONTEXT7 = "http://localhost:3900";
// Tail helper — read last N lines of a jsonl file without loading
// the whole thing. For files up to a few MB this is fine to read fully.
async function tailJsonl(path: string, n = 50): Promise<any[]> {
try {
const text = await Bun.file(path).text();
const lines = text.trim().split("\n").filter(Boolean);
const tail = lines.slice(-n);
return tail.map(l => {
try { return JSON.parse(l); } catch { return { _raw: l, _error: "parse" }; }
});
} catch (e) {
return [];
}
}
async function tryFetch(url: string, timeout = 1500): Promise<any | null> {
try {
const r = await fetch(url, { signal: AbortSignal.timeout(timeout) });
if (!r.ok) return null;
// Fix 2026-04-24: some upstream services (observer Bun.serve) return
// JSON without an application/json content-type. Don't rely on header
// — try parsing the body as JSON; fall back to raw text on failure.
const body = await r.text();
try { return JSON.parse(body); } catch { return body; }
} catch {
return null;
}
}
// Compact the massive /vectors/indexes response into just the shape the
// UI needs: [{name, source, model, dims, chunks, bucket, backend}]
async function indexesSummary(): Promise<any> {
const j = await tryFetch(`${GATEWAY}/vectors/indexes`);
if (!Array.isArray(j)) return { count: 0, items: [] };
const items = j.slice(0, 12).map((i: any) => ({
name: i.index_name,
source: i.source,
dims: i.dimensions,
chunks: i.chunk_count,
backend: i.vector_backend,
bucket: i.bucket,
}));
return { count: j.length, items };
}
async function servicesSnapshot() {
const [gw, sc, obs, mcp, c7, jstats, ustats] = await Promise.all([
tryFetch(`${GATEWAY}/health`),
tryFetch(`${SIDECAR}/health`),
tryFetch(`${OBSERVER}/health`),
tryFetch(`${MCP}/health`),
tryFetch(`${CONTEXT7}/health`),
tryFetch(`${GATEWAY}/journal/stats`),
tryFetch(`${GATEWAY}/v1/usage`),
]);
return {
ts: new Date().toISOString(),
nodes: [
{ id: "gateway", label: "Gateway :3100", status: gw ? "healthy" : "down", health: gw },
{ id: "sidecar", label: "Sidecar :3200", status: sc ? "healthy" : "down", health: sc },
{ id: "observer", label: "Observer :3800", status: obs ? "healthy" : "down", health: obs,
stats: await tryFetch(`${OBSERVER}/stats`) },
{ id: "mcp", label: "MCP :3700", status: mcp ? "healthy" : "down", health: mcp },
{ id: "context7", label: "Context7 :3900", status: c7 ? "healthy" : "down", health: c7 },
],
// Virtual nodes — backed by gateway subsystems rather than own ports
subsystems: [
{ id: "journal", label: "Journal", stats: jstats },
{ id: "usage", label: "Usage /v1", stats: ustats },
{ id: "vectord", label: "Vectord", stats: await indexesSummary() },
{ id: "playbook", label: "Playbook", stats: await tryFetch(`${GATEWAY}/vectors/playbook_memory/status`) },
{ id: "agent", label: "Autotune", stats: await tryFetch(`${GATEWAY}/vectors/agent/status`) },
],
};
}
// Extract phrase-level markers that indicate "this should be removed,
// simplified, or refactored" across scrum suggestions. These are the
// signals that accumulate into a refactor recommendation.
const REFACTOR_PHRASES = [
"should be removed", "remove this", "dead code", "unused", "unnecessary",
"duplicate of", "duplicates", "redundant",
"consolidate", "merge with", "extract into",
"refactor", "rewrite", "replace with",
"orphaned", "stale", "deprecated",
"pseudocode", "placeholder", "stub",
"split this file", "too large",
];
// Signal-class classifier — per file, given 2+ consecutive iterations'
// reviews, tag the file's behavior:
// CONVERGING — resolved > novel, score ↑
// LOOPING — 3+ same findings repeat, novel = 0, score flat
// ORBITING — novel findings each iter, no resolved (healthy depth)
// PLATEAU — score flat + findings flat (diminishing returns)
// MIXED — partial/unclear
// This is the foundation for iter-6+ auto-routing: each class gets a
// different sub-pipeline (specialist model, reviewer rotation, etc).
const SIGNAL_PHRASES = [
"pseudocode", "placeholder", "stub", "unwired", "missing", "dead code", "orphaned",
"duplicate", "redundant", "refactor", "rewrite", "remove", "unused", "unnecessary",
];
async function signalClasses(): Promise<any> {
const runsDir = `${REPO}/tests/real-world/runs`;
// Load every review, group by file, sort by timestamp
const perFile: Record<string, Array<{run: string, phrases: Set<string>, score: number | null, conf_avg: number | null, findings: number, ts: number}>> = {};
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const files = await Array.fromAsync(new Bun.Glob("review_*.json").scan({ cwd: `${runsDir}/${d}` }));
for (const f of files) {
try {
const p = `${runsDir}/${d}/${f}`;
const j = JSON.parse(await Bun.file(p).text());
const key = j.file?.replace("/home/profit/lakehouse/", "") ?? "?";
const sug = (j.suggestions ?? "").toLowerCase();
const phrases = new Set<string>();
for (const ph of SIGNAL_PHRASES) if (sug.includes(ph)) phrases.add(ph);
const scoreMatch = sug.match(/(\d(?:\.\d)?)\s*\/\s*10\b/);
const score = scoreMatch ? parseFloat(scoreMatch[1]) : null;
const mconf = [...sug.matchAll(/(?:confidence[*:\s]*\s*|\|\s*)(\d{1,3})\s*%/gi)].map(m=>parseInt(m[1],10));
const jconf = [...sug.matchAll(/"confidence"\s*:\s*(\d{1,3})(?!\d)/gi)].map(m=>parseInt(m[1],10));
const all = [...mconf, ...jconf].filter(x => 0 <= x && x <= 100);
const conf_avg = all.length ? Math.round(all.reduce((a,b)=>a+b,0)/all.length) : null;
const ts = (await Bun.file(p).stat()).mtime.getTime();
(perFile[key] ??= []).push({ run: d, phrases, score, conf_avg, findings: all.length, ts });
} catch {}
}
}
} catch (e) {
return { error: String(e), classes: {} };
}
const classes: Record<string, any> = {};
for (const [file, runs] of Object.entries(perFile)) {
runs.sort((a, b) => a.ts - b.ts);
if (runs.length < 2) { classes[file] = { cls: "NEW", runs: runs.length }; continue; }
const last = runs[runs.length - 1];
const prev = runs[runs.length - 2];
const novel = [...last.phrases].filter(p => !prev.phrases.has(p));
const resolved = [...prev.phrases].filter(p => !last.phrases.has(p));
const looping = [...prev.phrases].filter(p => last.phrases.has(p));
const dScore = (last.score != null && prev.score != null) ? last.score - prev.score : null;
const dConf = (last.conf_avg != null && prev.conf_avg != null) ? last.conf_avg - prev.conf_avg : null;
const dFindings = last.findings - prev.findings;
let cls: string;
if (dScore != null && dScore > 0 && resolved.length > novel.length) cls = "CONVERGING";
else if (looping.length >= 3 && novel.length === 0 && (dScore == null || Math.abs(dScore) < 0.5)) cls = "LOOPING";
else if (novel.length >= 2 && resolved.length === 0) cls = "ORBITING";
else if (Math.abs(dFindings) <= 1 && (dScore == null || Math.abs(dScore) < 0.5)) cls = "PLATEAU";
else cls = "MIXED";
classes[file] = {
cls,
runs: runs.length,
iter_span: `${runs[0].run}${last.run}`,
prev_score: prev.score,
last_score: last.score,
delta_score: dScore,
delta_conf: dConf,
delta_findings: dFindings,
novel,
resolved,
looping,
};
}
// Summary counts
const counts: Record<string, number> = {};
for (const v of Object.values(classes)) counts[v.cls] = (counts[v.cls] ?? 0) + 1;
return { generated_at: new Date().toISOString(), counts, classes };
}
async function refactorSignals(): Promise<any> {
// Walk every accepted review across all scrum runs. For each file,
// count how many times its suggestions mention a refactor phrase.
// Return a sorted list — files most often flagged for refactor first.
const runsDir = `${REPO}/tests/real-world/runs`;
const perFile: Record<string, { file: string; hits: number; phrases: Record<string, number>; examples: string[]; iterations: number }> = {};
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const files = await Array.fromAsync(new Bun.Glob("review_*.json").scan({ cwd: `${runsDir}/${d}` }));
for (const f of files) {
const p = `${runsDir}/${d}/${f}`;
try {
const j = JSON.parse(await Bun.file(p).text());
const file = j.file?.replace("/home/profit/lakehouse/", "") ?? "?";
const sug = (j.suggestions ?? "").toLowerCase();
if (!perFile[file]) perFile[file] = { file, hits: 0, phrases: {}, examples: [], iterations: 0 };
perFile[file].iterations++;
for (const phrase of REFACTOR_PHRASES) {
const count = (sug.match(new RegExp(phrase, "gi")) ?? []).length;
if (count > 0) {
perFile[file].hits += count;
perFile[file].phrases[phrase] = (perFile[file].phrases[phrase] ?? 0) + count;
// Pull one example sentence around the phrase
if (perFile[file].examples.length < 3) {
const idx = sug.indexOf(phrase);
if (idx >= 0) {
const s = Math.max(0, idx - 60);
const e = Math.min(sug.length, idx + phrase.length + 80);
perFile[file].examples.push("…" + sug.slice(s, e).replace(/\s+/g, " ") + "…");
}
}
}
}
} catch {}
}
}
} catch (e) {
return { error: String(e), signals: [] };
}
const signals = Object.values(perFile)
.filter(x => x.hits > 0)
.sort((a, b) => b.hits - a.hits)
.slice(0, 30);
return { signals, scanned: Object.keys(perFile).length };
}
async function reverseIndex(query: string, limit = 20): Promise<any> {
// Grep-like substring search across every review's suggestions.
// Returns file + snippet + which iter it was in + score + verdict.
const runsDir = `${REPO}/tests/real-world/runs`;
if (!query || query.length < 2) return { query, hits: [] };
const q = query.toLowerCase();
const hits: any[] = [];
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const files = await Array.fromAsync(new Bun.Glob("review_*.json").scan({ cwd: `${runsDir}/${d}` }));
for (const f of files) {
const p = `${runsDir}/${d}/${f}`;
try {
const j = JSON.parse(await Bun.file(p).text());
const sug = j.suggestions ?? "";
const lower = sug.toLowerCase();
const idx = lower.indexOf(q);
if (idx < 0) continue;
const s = Math.max(0, idx - 80);
const e = Math.min(sug.length, idx + q.length + 200);
hits.push({
file: j.file?.replace("/home/profit/lakehouse/", ""),
run_id: d,
model: j.escalated_to_model,
snippet: sug.slice(s, e).replace(/\s+/g, " "),
});
if (hits.length >= limit) break;
} catch {}
}
if (hits.length >= limit) break;
}
} catch (e) {
return { query, error: String(e), hits: [] };
}
return { query, hits };
}
async function fileHistory(relpath: string): Promise<any> {
// Walk all scrum_<id>/review_*.json files and gather every review
// for this file path. Returns timeline rows keyed by run_id.
const runsDir = `${REPO}/tests/real-world/runs`;
const out: any[] = [];
try {
const dirs = await Array.fromAsync(new Bun.Glob("scrum_*").scan({ cwd: runsDir, onlyFiles: false }));
for (const d of dirs) {
const safe = relpath.replaceAll("/", "_");
const p = `${runsDir}/${d}/review_${safe}.json`;
if (await Bun.file(p).exists()) {
const j = JSON.parse(await Bun.file(p).text());
const sug = j.suggestions ?? "";
const scoreMatch = sug.match(/(?:score[\s*:]*)?(\d(?:\.\d)?)\s*\/\s*10\b/i);
const score = scoreMatch ? parseFloat(scoreMatch[1]) : null;
const confs = [...sug.matchAll(/(?:Confidence[*:\s]*\s*|\|\s*)(\d{1,3})\s*%/gi)]
.map(m => parseInt(m[1], 10)).filter(x => x >= 0 && x <= 100);
const jsonConfs = [...sug.matchAll(/"confidence"\s*:\s*(\d{1,3})(?!\d)/gi)]
.map(m => parseInt(m[1], 10)).filter(x => x >= 0 && x <= 100);
const all = [...confs, ...jsonConfs];
const mt = await Bun.file(p).stat();
out.push({
run_id: d,
reviewed_at: j.reviewed_at ?? mt.mtime,
model: j.escalated_to_model,
score,
chars: sug.length,
conf_avg: all.length ? Math.round(all.reduce((a,b)=>a+b,0)/all.length) : null,
conf_min: all.length ? Math.min(...all) : null,
findings: all.length,
output_format: sug.includes('"verdict"') ? "forensic_json" : "markdown",
// first 1200 chars preview
preview: sug.slice(0, 1200),
});
}
}
} catch (e) {
return { error: String(e), history: [] };
}
out.sort((a, b) => String(a.reviewed_at).localeCompare(String(b.reviewed_at)));
return { file: relpath, history: out };
}
Bun.serve({
port: PORT,
hostname: "0.0.0.0",
async fetch(req) {
const url = new URL(req.url);
const path = url.pathname;
// Static shell
if (path === "/" || path === "/index.html") {
return new Response(Bun.file(`${REPO}/ui/index.html`));
}
if (path === "/ui.css") {
return new Response(Bun.file(`${REPO}/ui/ui.css`), { headers: { "content-type": "text/css" } });
}
if (path === "/ui.js") {
return new Response(Bun.file(`${REPO}/ui/ui.js`), { headers: { "content-type": "application/javascript" } });
}
// Data API
if (path === "/data/services") return Response.json(await servicesSnapshot());
if (path === "/data/reviews") {
const n = Number(url.searchParams.get("tail") ?? 50);
return Response.json(await tailJsonl(`${KB}/scrum_reviews.jsonl`, n));
}
if (path === "/data/findings") return Response.json(await tailJsonl(`${KB}/phase_sweep_findings.jsonl`));
if (path === "/data/metrics") return Response.json(await tailJsonl(`${KB}/scrum_loop_metrics.jsonl`));
if (path === "/data/trust") return Response.json(await tailJsonl(`${KB}/model_trust.jsonl`, 200));
if (path === "/data/overrides") return Response.json(await tailJsonl(`${KB}/human_overrides.jsonl`));
if (path === "/data/outcomes") return Response.json(await tailJsonl(`${KB}/outcomes.jsonl`, 30));
if (path === "/data/audit_facts") return Response.json(await tailJsonl(`${KB}/audit_facts.jsonl`, 30));
if (path.startsWith("/data/file/")) {
const relpath = decodeURIComponent(path.slice("/data/file/".length));
return Response.json(await fileHistory(relpath));
}
if (path === "/data/refactor_signals") {
return Response.json(await refactorSignals());
}
if (path === "/data/signal_classes") {
return Response.json(await signalClasses());
}
if (path === "/data/search") {
const q = url.searchParams.get("q") ?? "";
return Response.json(await reverseIndex(q, 30));
}
// Per-service systemd log tail. Allowed service list is fixed so the
// :service path param can never be used to invoke arbitrary units.
if (path.startsWith("/data/logs/")) {
const svc = path.slice("/data/logs/".length).split("?")[0];
const UNITS: Record<string, string> = {
gateway: "lakehouse.service",
sidecar: "lakehouse-sidecar.service",
observer: "lakehouse-observer.service",
mcp: "lakehouse-agent.service",
context7: "lakehouse-context7-bridge.service",
auditor: "lakehouse-auditor.service",
langfuse: "lakehouse-langfuse-bridge.service",
};
const unit = UNITS[svc];
if (!unit) return Response.json({ error: "unknown service", allowed: Object.keys(UNITS) }, { status: 400 });
const n = Number(url.searchParams.get("n") ?? 60);
try {
// Use execFile-style API: pass args as array, never shell-interpolate
const proc = Bun.spawn(["journalctl", "-u", unit, "-n", String(n), "--no-pager", "--output=short-iso"], {
stdout: "pipe",
stderr: "pipe",
});
const text = await new Response(proc.stdout).text();
await proc.exited;
const lines = text.split("\n").filter(Boolean);
return Response.json({ service: svc, unit, lines });
} catch (e) {
return Response.json({ service: svc, unit, error: String(e), lines: [] });
}
}
// Live scrum log tail — best-effort
if (path === "/data/scrum_log") {
try {
const bg = await Array.fromAsync(new Bun.Glob("scrum_iter*.log").scan({ cwd: "/tmp" }));
if (bg.length === 0) return Response.json({ lines: [] });
bg.sort();
const latest = `/tmp/${bg[bg.length - 1]}`;
const text = await Bun.file(latest).text();
const lines = text.split("\n").slice(-80);
return Response.json({ file: latest, lines });
} catch (e) {
return Response.json({ error: String(e) });
}
}
return new Response("not found", { status: 404 });
},
});
console.log(`[ui] visual control plane listening on http://0.0.0.0:${PORT}`);