lakehouse/mcp-server/observer.ts
profit 6e39d8778f
All checks were successful
lakehouse/auditor all checks passed (3 findings, all info)
Cohesion: Python inventory + integration plan + Phase A verdict indexing
Three artifacts in one PR:

1. docs/PYTHON_INVENTORY.md — every .py file in the repo classified:
   Production (sidecar routers + 3 systemd services), Documented
   (kb_measure, kb_staffer_report), Manual (one-off tools), Dead
   (sidecar/sidecar/lab_ui.py + pipeline_lab.py are genuinely
   not imported anywhere).

2. docs/COHESION_INTEGRATION_PLAN.md — the "smarter DB" loop J
   called out as missing. Six phases A-F. Phase A ships here; B-F
   are named + sequenced for follow-up PRs. Each phase adds ONE
   wire of the loop; no single PR does them all.

3. Phase A wire (auditor verdicts → observer + KB):
   - auditor/audit.ts: after assembleVerdict, fire-and-forget POST
     to :3800/event with source="auditor" AND append to
     data/_kb/outcomes.jsonl with kind="audit". Errors log + drop
     — the verdict is still on disk at _auditor/verdicts/.
   - mcp-server/observer.ts: extend source union to include
     "auditor" | "bot" (was "mcp" | "scenario" only, which silently
     coerced my first auditor POST to source="scenario"). Accept
     body.ok OR body.success. Accept body.audit_duration_ms as a
     fallback for duration_ms. Uses body.one_liner as
     output_summary when set.

Live-verified after observer restart:
   re-audit PR #6 → verdict=request_changes, 4 findings (1 warn)
     observer: by_source={'auditor': 1}  (previously coerced to 'scenario')
     _kb/outcomes.jsonl tail: kind=audit sig=pr6-7fe47bab
       pr=6 overall=request_changes

The shape of the loop is now visible to downstream consumers. Phase
B (auditor's kb_query check reads these audit rows for history)
lands in a follow-up PR. Phase C-F similar.

NOT in this PR:
- Actually deleting lab_ui.py + pipeline_lab.py (operator decision,
  called out in the inventory doc)
- Cleaning up the 5 overlapping Python scripts (same)
- Phases B-F of the cohesion plan (separate PRs per wire)
- Integration test that asserts "smarter DB" across runs (Phase F)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:22:42 -05:00

355 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Lakehouse Observer — autonomous iteration loop.
*
* Runs continuously alongside the agent gateway. Watches every operation,
* logs outcomes, detects failures, and feeds learnings back so agents
* improve over time without retraining.
*
* Three loops:
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
* success/failure to the lakehouse
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
* model to diagnose patterns, writes recommendations
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
* consolidates them into a reusable playbook entry
*
* This is the "third-party witness" J asked for — it watches what
* agents do and helps them not repeat mistakes.
*/
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
// Phase 24 — observer now listens on an HTTP port for external ops
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
// ─── Observed operation log ───
interface ObservedOp {
timestamp: string;
endpoint: string;
input_summary: string;
success: boolean;
duration_ms: number;
output_summary: string;
error?: string;
// Phase 24 — optional provenance so error analyzer and playbook
// builder can differentiate MCP-layer ops from scenario-sourced
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
// Phase A (cohesion plan 2026-04-22) — auditor verdicts also post
// to :3800/event with source="auditor"; stats endpoint counts them
// separately so KB readers can see audit cadence alongside
// scenario cadence.
source?: "mcp" | "scenario" | "auditor" | "bot";
staffer_id?: string;
sig_hash?: string;
event_kind?: string;
role?: string;
city?: string;
state?: string;
count?: number;
rescue_attempted?: boolean;
rescue_succeeded?: boolean;
}
const recentOps: ObservedOp[] = [];
// Phase 24 — external ingest path. Scenarios POST outcome summaries
// here so the observer's analyzer + playbook builder see them. Called
// from the Bun.serve() handler below. Same ring buffer as the MCP-
// wrapped path so downstream loops don't need to know the source.
export function recordExternalOp(op: ObservedOp): void {
recentOps.push({ ...op, source: op.source ?? "scenario" });
if (recentOps.length > 2000) recentOps.shift();
}
// ─── Wrapped gateway caller — every call gets observed ───
export async function observed(
endpoint: string,
body: any,
description: string,
): Promise<{ data: any; op: ObservedOp }> {
const t0 = Date.now();
let data: any;
let error: string | undefined;
let success = true;
try {
const resp = await fetch(`${GATEWAY}${endpoint}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
data = await resp.json();
if (data.error) {
success = false;
error = data.error;
}
} catch (e: any) {
success = false;
error = e.message;
data = { error: e.message };
}
const op: ObservedOp = {
timestamp: new Date().toISOString(),
endpoint,
input_summary: description,
success,
duration_ms: Date.now() - t0,
output_summary: success
? summarize(data)
: `ERROR: ${error}`,
error,
};
recentOps.push(op);
if (recentOps.length > 1000) recentOps.shift();
// Persist to lakehouse
await persistOp(op);
return { data, op };
}
function summarize(data: any): string {
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
if (data.rows) return `${data.row_count || data.rows.length} rows`;
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
if (data.sources) return `${data.sources.length} sources`;
return JSON.stringify(data).slice(0, 100);
}
// Phase 24 honesty fix — the old persistOp used /ingest/file which
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
// Every op silently wiped all prior ops. Now we append a JSONL line to
// data/_observer/ops.jsonl so the historical trace is durable. The
// observer analyzer + playbook builder read from this file when it
// outgrows the 2000-entry in-memory ring.
async function persistOp(op: ObservedOp) {
try {
const { mkdir, appendFile } = await import("node:fs/promises");
await mkdir("data/_observer", { recursive: true });
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
} catch {
// Persistence is best-effort; in-memory ring still works.
}
}
// ─── Error analyzer loop ───
async function analyzeErrors() {
// Read recent failures
const failures = recentOps.filter(op => !op.success);
if (failures.length === 0) return;
const errorSummary = failures.slice(-10).map(f =>
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
).join("\n");
// Ask local model to diagnose
try {
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
${errorSummary}
For each error:
1. What likely caused it?
2. How should the agent adjust its approach?
3. Should this be added to the playbook as a "don't do this"?
Be specific and actionable. Under 200 words.`,
model: "qwen2.5",
max_tokens: 400,
temperature: 0.2,
}),
});
const analysis = await resp.json();
if (analysis.text) {
console.error(`[observer] Error analysis:\n${analysis.text}`);
// Log the analysis as a playbook entry
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `error_analysis: ${failures.length} failures`,
approach: "LLM-analyzed error patterns",
result: analysis.text.slice(0, 500),
context: errorSummary.slice(0, 500),
}),
});
}
} catch (e) {
console.error(`[observer] Analysis failed: ${e}`);
}
}
// ─── Playbook consolidation ───
async function consolidatePlaybooks() {
const successes = recentOps.filter(op => op.success);
if (successes.length < 5) return;
// Group by endpoint
const groups: Record<string, ObservedOp[]> = {};
for (const op of successes) {
const key = op.endpoint;
if (!groups[key]) groups[key] = [];
groups[key].push(op);
}
for (const [endpoint, ops] of Object.entries(groups)) {
if (ops.length < 3) continue;
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
approach: `common pattern: ${pattern.slice(0, 200)}`,
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
context: `endpoint=${endpoint}`,
}),
}).catch(() => {});
}
}
// ─── HTTP listener for external ops (Phase 24) ───
// Scenarios POST per-event outcomes here so the observer's analyzer +
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
// also exposed at /stats for external health checks.
function startHttpListener() {
Bun.serve({
port: OBSERVER_PORT,
hostname: "0.0.0.0",
fetch(req) {
const url = new URL(req.url);
if (req.method === "GET" && url.pathname === "/health") {
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
}
if (req.method === "GET" && url.pathname === "/stats") {
const bySource = new Map<string, number>();
for (const o of recentOps) {
const k = o.source ?? "mcp";
bySource.set(k, (bySource.get(k) ?? 0) + 1);
}
return new Response(JSON.stringify({
total: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
by_source: Object.fromEntries(bySource),
recent_scenario_ops: recentOps
.filter(o => o.source === "scenario")
.slice(-10)
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
}));
}
if (req.method === "POST" && url.pathname === "/event") {
return req.json().then((body: any) => {
// Accept caller-provided source when present and valid.
// Unrecognized sources fall back to "scenario" for
// backward-compat with older callers.
const allowedSource = ["mcp", "scenario", "auditor", "bot"] as const;
const bodySrc = typeof body.source === "string" ? body.source : "";
const source = (allowedSource as readonly string[]).includes(bodySrc)
? (bodySrc as typeof allowedSource[number])
: "scenario";
// Phase A: auditor+bot POST with ok=true|false (not success)
// for symmetry with the kind_of of their domain. Accept either.
const success = typeof body.success === "boolean" ? body.success
: typeof body.ok === "boolean" ? body.ok
: false;
const op: ObservedOp = {
timestamp: body.timestamp ?? new Date().toISOString(),
endpoint: body.endpoint ?? (source === "auditor" ? "auditor:verdict"
: source === "bot" ? "bot:cycle"
: "scenario:fill"),
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
success,
duration_ms: Number(body.duration_ms ?? body.audit_duration_ms ?? 0),
output_summary: body.output_summary ?? body.one_liner ?? (success ? "ok" : (body.error ?? "failed")),
error: body.error,
source,
staffer_id: body.staffer_id,
sig_hash: body.sig_hash,
event_kind: body.event_kind,
role: body.role,
city: body.city,
state: body.state,
count: body.count,
rescue_attempted: !!body.rescue_attempted,
rescue_succeeded: !!body.rescue_succeeded,
};
recordExternalOp(op);
persistOp(op).catch(() => {});
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
}).catch((e: Error) =>
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
}
return new Response("not found", { status: 404 });
},
});
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
}
// ─── Main loop ───
async function main() {
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
// Run a health check first
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
if (!health) {
console.error("[observer] gateway unreachable — exiting");
process.exit(1);
}
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
startHttpListener();
// Main loop
let cycle = 0;
while (true) {
await Bun.sleep(CYCLE_SECS * 1000);
cycle++;
// Every cycle: analyze errors if any
await analyzeErrors();
// Every 5 cycles: consolidate playbooks
if (cycle % 5 === 0) {
await consolidatePlaybooks();
}
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
const stats = {
cycle,
total_ops: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
scenario_ops: scenarioOps,
};
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
}
}
// Export the observed wrapper for other agents to use
export { main as startObserver };
// Run if executed directly
if (import.meta.main) {
main().catch(console.error);
}