lakehouse/mcp-server/observer.ts
root 25ea3de836
Some checks failed
lakehouse/auditor 1 blocking issue: cloud: claim not backed — "journal event verified live (total_events_created 0→1 after probe)."
observer: fix LLM Team escalation — route to /v1/chat qwen3-coder:480b instead of dead mode
Discovery 2026-04-24: /api/run?mode=code_review returns "Unknown mode"
(error response from llm_team_ui.py). The 2026-04-24 observer escalation
wiring pointed at a dead endpoint and was failing silently. My earlier
claim of "9 registered LLM Team modes" came from GET probes that all
returned 405 — I interpreted that as "POST-only endpoints exist" when
it just means "GET is not allowed for anything, and on POST only `extract`
is registered."

Rewire: observer's escalateFailureClusterToLLMTeam now hits
  POST /v1/chat { provider: "ollama_cloud", model: "qwen3-coder:480b", ... }
which is the same coding-specialist rung 2 of the scrum ladder that
reliably produces substantive reviews. Probe shows 1240 chars of
substantive analysis in ~8.7s.

Also tightens scrum_applier:
  * MODEL default: kimi-k2:1t → qwen3-coder:480b (coding specialist)
  * Size gate: 20 lines → 6 lines (surgical patches only)
  * Max patches per file: 3 → 2
  * Prompt: explicit forbidden-actions list (no struct renames, no
    function-signature changes, no new modules) and mechanical-only
    whitelist

These changes produced the first auto-applied commit (96b46cd), which
landed a 2-line import addition that passed cargo check. Zero-to-one.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 04:14:33 -05:00

513 lines
19 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Lakehouse Observer — autonomous iteration loop.
*
* Runs continuously alongside the agent gateway. Watches every operation,
* logs outcomes, detects failures, and feeds learnings back so agents
* improve over time without retraining.
*
* Three loops:
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
* success/failure to the lakehouse
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
* model to diagnose patterns, writes recommendations
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
* consolidates them into a reusable playbook entry
*
* This is the "third-party witness" J asked for — it watches what
* agents do and helps them not repeat mistakes.
*/
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
// Phase 24 — observer now listens on an HTTP port for external ops
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
// ─── Observed operation log ───
interface ObservedOp {
timestamp: string;
endpoint: string;
input_summary: string;
success: boolean;
duration_ms: number;
output_summary: string;
error?: string;
// Phase 24 — optional provenance so error analyzer and playbook
// builder can differentiate MCP-layer ops from scenario-sourced
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
source?: "mcp" | "scenario" | "langfuse" | "overseer_correction";
staffer_id?: string;
sig_hash?: string;
event_kind?: string;
role?: string;
city?: string;
state?: string;
count?: number;
rescue_attempted?: boolean;
rescue_succeeded?: boolean;
// Overseer-correction-specific (2026-04-23): lets the analyzer
// correlate corrections with the drift that prompted them and with
// subsequent outcomes that either validated or invalidated the advice.
task_class?: string;
correction?: string;
applied_at_turn?: number;
}
const recentOps: ObservedOp[] = [];
// Phase 24 — external ingest path. Scenarios POST outcome summaries
// here so the observer's analyzer + playbook builder see them. Called
// from the Bun.serve() handler below. Same ring buffer as the MCP-
// wrapped path so downstream loops don't need to know the source.
export function recordExternalOp(op: ObservedOp): void {
recentOps.push({ ...op, source: op.source ?? "scenario" });
if (recentOps.length > 2000) recentOps.shift();
}
// ─── Wrapped gateway caller — every call gets observed ───
export async function observed(
endpoint: string,
body: any,
description: string,
): Promise<{ data: any; op: ObservedOp }> {
const t0 = Date.now();
let data: any;
let error: string | undefined;
let success = true;
try {
const resp = await fetch(`${GATEWAY}${endpoint}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
data = await resp.json();
if (data.error) {
success = false;
error = data.error;
}
} catch (e: any) {
success = false;
error = e.message;
data = { error: e.message };
}
const op: ObservedOp = {
timestamp: new Date().toISOString(),
endpoint,
input_summary: description,
success,
duration_ms: Date.now() - t0,
output_summary: success
? summarize(data)
: `ERROR: ${error}`,
error,
};
recentOps.push(op);
if (recentOps.length > 1000) recentOps.shift();
// Persist to lakehouse
await persistOp(op);
return { data, op };
}
function summarize(data: any): string {
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
if (data.rows) return `${data.row_count || data.rows.length} rows`;
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
if (data.sources) return `${data.sources.length} sources`;
return JSON.stringify(data).slice(0, 100);
}
// Phase 24 honesty fix — the old persistOp used /ingest/file which
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
// Every op silently wiped all prior ops. Now we append a JSONL line to
// data/_observer/ops.jsonl so the historical trace is durable. The
// observer analyzer + playbook builder read from this file when it
// outgrows the 2000-entry in-memory ring.
async function persistOp(op: ObservedOp) {
try {
const { mkdir, appendFile } = await import("node:fs/promises");
await mkdir("data/_observer", { recursive: true });
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
} catch {
// Persistence is best-effort; in-memory ring still works.
}
}
// ─── LLM Team escalation (code_review mode) ───
//
// When recent failures on a single sig_hash cross a threshold the
// local qwen2.5 analysis is probably insufficient. J's 2026-04-24
// direction: "the observer would trigger to give more context" —
// route failure clusters to LLM Team's specialized code_review mode
// (via /api/run) so richer structured signal lands in the KB for
// scrum + auditor + playbook memory to consume next pass.
//
// Non-destructive: runs in parallel to the existing qwen2.5 analysis,
// never replaces it. Writes to data/_kb/observer_escalations.jsonl
// as a dedicated audit surface.
const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000";
const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl";
const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers
async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) {
// Package the failure cluster as a single context blob. Originally
// I routed this to LLM Team's `code_review` mode at /api/run, but
// that mode isn't registered in llm_team_ui.py — it returned
// "Unknown mode" on every call. Revised 2026-04-24: route directly
// to the gateway's /v1/chat with provider=ollama_cloud + qwen3-coder:480b
// (the coding specialist that's rung 2 of the scrum ladder, proven
// to produce substantive structured reviews). Fire-and-forget so
// downstream failures don't block observer's normal loop.
const context = cluster.slice(-8).map((o, i) =>
`[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}`
).join("\n");
const prompt = `sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`;
try {
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
provider: "ollama_cloud",
model: "qwen3-coder:480b",
messages: [{ role: "user", content: prompt }],
max_tokens: 800,
temperature: 0.2,
}),
signal: AbortSignal.timeout(60000),
});
if (!resp.ok) {
console.error(`[observer] escalation /v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
return;
}
const j: any = await resp.json();
const analysis = j?.choices?.[0]?.message?.content ?? "";
// Audit row stays schema-compatible with the prior implementation —
// downstream consumers see structured fields regardless of the
// review-source change. Facts/entities stay empty (this call is
// direct-model, not extract-mode); the raw analysis carries the
// signal.
const row = {
ts: new Date().toISOString(),
source: "observer_escalation",
mode: "direct_chat_qwen3_coder_480b",
sig_hash: sigHash,
cluster_size: cluster.length,
cluster_staffer: cluster[0]?.staffer_id,
cluster_endpoint: cluster[0]?.endpoint,
prompt_tokens: j?.usage?.prompt_tokens ?? 0,
completion_tokens: j?.usage?.completion_tokens ?? 0,
analysis: analysis.slice(0, 4000),
};
const { appendFile } = await import("node:fs/promises");
await appendFile(LLM_TEAM_ESCALATIONS, JSON.stringify(row) + "\n");
console.error(
`[observer] escalated sig_hash=${sigHash.slice(0, 8)} · cluster=${cluster.length} · ${analysis.length} chars`
);
} catch (e) {
console.error(`[observer] escalation failed: ${(e as Error).message}`);
}
}
// Track which sig_hashes we've already escalated this session so we
// don't hammer LLM Team on every analyzeErrors tick when a cluster
// persists across cycles.
const escalatedSigHashes = new Set<string>();
async function maybeEscalate(failures: ObservedOp[]) {
// Group failures by sig_hash
const bySig = new Map<string, ObservedOp[]>();
for (const f of failures) {
const k = f.sig_hash ?? "__no_sig__";
(bySig.get(k) ?? bySig.set(k, []).get(k)!).push(f);
}
for (const [sigHash, cluster] of bySig) {
if (sigHash === "__no_sig__") continue;
if (cluster.length < ESCALATION_THRESHOLD) continue;
if (escalatedSigHashes.has(sigHash)) continue;
escalatedSigHashes.add(sigHash);
// Fire-and-forget — don't block the existing analyzer loop.
escalateFailureClusterToLLMTeam(sigHash, cluster).catch(() => {});
}
}
// ─── Error analyzer loop ───
async function analyzeErrors() {
// Read recent failures
const failures = recentOps.filter(op => !op.success);
if (failures.length === 0) return;
// NEW 2026-04-24: escalate recurring sig_hash clusters to LLM Team
// code_review mode. Runs in parallel to the local qwen2.5 analysis
// below — non-blocking, richer downstream signal for scrum/auditor.
maybeEscalate(failures).catch(() => {});
const errorSummary = failures.slice(-10).map(f =>
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
).join("\n");
// Ask local model to diagnose
try {
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
${errorSummary}
For each error:
1. What likely caused it?
2. How should the agent adjust its approach?
3. Should this be added to the playbook as a "don't do this"?
Be specific and actionable. Under 200 words.`,
model: "qwen2.5",
max_tokens: 400,
temperature: 0.2,
}),
});
const analysis = await resp.json();
if (analysis.text) {
console.error(`[observer] Error analysis:\n${analysis.text}`);
// Log the analysis as a playbook entry
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `error_analysis: ${failures.length} failures`,
approach: "LLM-analyzed error patterns",
result: analysis.text.slice(0, 500),
context: errorSummary.slice(0, 500),
}),
});
}
} catch (e) {
console.error(`[observer] Analysis failed: ${e}`);
}
}
// ─── Playbook consolidation ───
async function consolidatePlaybooks() {
const successes = recentOps.filter(op => op.success);
if (successes.length < 5) return;
// Group by endpoint
const groups: Record<string, ObservedOp[]> = {};
for (const op of successes) {
const key = op.endpoint;
if (!groups[key]) groups[key] = [];
groups[key].push(op);
}
for (const [endpoint, ops] of Object.entries(groups)) {
if (ops.length < 3) continue;
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
approach: `common pattern: ${pattern.slice(0, 200)}`,
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
context: `endpoint=${endpoint}`,
}),
}).catch(() => {});
}
}
// ─── HTTP listener for external ops (Phase 24) ───
// Scenarios POST per-event outcomes here so the observer's analyzer +
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
// also exposed at /stats for external health checks.
function startHttpListener() {
Bun.serve({
port: OBSERVER_PORT,
hostname: "0.0.0.0",
fetch(req) {
const url = new URL(req.url);
if (req.method === "GET" && url.pathname === "/health") {
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
}
if (req.method === "GET" && url.pathname === "/stats") {
const bySource = new Map<string, number>();
for (const o of recentOps) {
const k = o.source ?? "mcp";
bySource.set(k, (bySource.get(k) ?? 0) + 1);
}
return new Response(JSON.stringify({
total: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
by_source: Object.fromEntries(bySource),
recent_scenario_ops: recentOps
.filter(o => o.source === "scenario")
.slice(-10)
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
}));
}
if (req.method === "POST" && url.pathname === "/event") {
return req.json().then((body: any) => {
const op: ObservedOp = {
timestamp: body.timestamp ?? new Date().toISOString(),
endpoint: body.endpoint ?? "scenario:fill",
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
success: !!body.success,
duration_ms: Number(body.duration_ms ?? 0),
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
error: body.error,
// Respect the client's provenance if set (langfuse bridge
// sends source:"langfuse", etc.). Default to "scenario"
// to keep legacy Phase 24 callers working.
source: body.source ?? "scenario",
staffer_id: body.staffer_id,
sig_hash: body.sig_hash,
event_kind: body.event_kind,
role: body.role,
city: body.city,
state: body.state,
count: body.count,
rescue_attempted: !!body.rescue_attempted,
rescue_succeeded: !!body.rescue_succeeded,
};
recordExternalOp(op);
persistOp(op).catch(() => {});
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
}).catch((e: Error) =>
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
}
return new Response("not found", { status: 404 });
},
});
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
}
// ─── Overseer corrections tailer (2026-04-23) ───
// The gateway's /v1/respond loop writes T3 overseer corrections to
// data/_kb/overseer_corrections.jsonl. Tail it once per cycle and
// inject each new row into the same recentOps ring that analyzeErrors
// + consolidatePlaybooks read — so a correction that just fired shows
// up alongside the outcomes it was meant to repair, and the analyzer
// can flag patterns like "three corrections on staffing.fill with the
// same advice — underlying problem isn't a drift, it's a data gap".
const CORRECTIONS_PATH = process.env.OVERSEER_CORRECTIONS_PATH
?? "/home/profit/lakehouse/data/_kb/overseer_corrections.jsonl";
let correctionsCursor = 0; // byte offset
async function tailOverseerCorrections(): Promise<number> {
const f = Bun.file(CORRECTIONS_PATH);
if (!(await f.exists())) return 0;
const size = f.size;
if (size <= correctionsCursor) return 0;
// Read only the suffix since the last cursor; keeps tail work
// bounded even as the file grows.
const text = await f.slice(correctionsCursor, size).text();
correctionsCursor = size;
let forwarded = 0;
for (const line of text.split("\n")) {
if (!line.trim()) continue;
let row: any;
try { row = JSON.parse(line); } catch { continue; }
const op: ObservedOp = {
timestamp: row.created_at ?? new Date().toISOString(),
endpoint: `overseer:${row.model ?? "gpt-oss:120b"}`,
input_summary: `${row.task_class ?? "?"}: ${row.reason ?? "escalation"}`,
// Correction itself is neither success nor failure — it's a
// mitigation attempt. We mark success=true so analyzeErrors
// doesn't count it as a failure, but the preview lets the
// analyzer see what was tried.
success: true,
duration_ms: Number(row.usage?.latency_ms ?? 0),
output_summary: String(row.correction ?? "").slice(0, 200),
source: "overseer_correction",
sig_hash: row.sig_hash,
task_class: row.task_class,
correction: String(row.correction ?? ""),
applied_at_turn: Number(row.applied_at_turn ?? 0),
};
recordExternalOp(op);
forwarded++;
}
return forwarded;
}
// ─── Main loop ───
async function main() {
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
// Run a health check first
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
if (!health) {
console.error("[observer] gateway unreachable — exiting");
process.exit(1);
}
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
startHttpListener();
// Main loop
let cycle = 0;
while (true) {
await Bun.sleep(CYCLE_SECS * 1000);
cycle++;
// Every cycle: tail the overseer corrections KB stream, then
// analyze errors. Order matters — if an overseer correction just
// landed for a sig_hash that previously failed, the analyzer
// should see both.
const newCorrections = await tailOverseerCorrections();
if (newCorrections > 0) {
console.error(`[observer] pulled ${newCorrections} new overseer correction(s) into ring`);
}
await analyzeErrors();
// Every 5 cycles: consolidate playbooks
if (cycle % 5 === 0) {
await consolidatePlaybooks();
}
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
const langfuseOps = recentOps.filter(o => o.source === "langfuse").length;
const correctionOps = recentOps.filter(o => o.source === "overseer_correction").length;
const stats = {
cycle,
total_ops: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
scenario_ops: scenarioOps,
langfuse_ops: langfuseOps,
overseer_corrections: correctionOps,
};
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
}
}
// Export the observed wrapper for other agents to use
export { main as startObserver };
// Run if executed directly
if (import.meta.main) {
main().catch(console.error);
}