lakehouse/mcp-server/observer.ts
root b532ae61f1 Agent gateway + observer — autonomous internal operation
Three new systemd services:
- lakehouse-agent (:3700) — REST gateway wrapping all lakehouse tools.
  Clean JSON in/out, no protocol complexity. 9 endpoints: /search,
  /sql, /match, /worker/:id, /ask, /log, /playbooks, /profile/:id, /vram
- lakehouse-observer — watches operations, logs to lakehouse, asks
  local model to diagnose failure patterns, consolidates successful
  patterns into playbooks every 5 cycles
- Stdio MCP transport preserved for Claude Code integration

AGENT_INSTRUCTIONS.md: complete operating manual for sub-agents.
Rules: never hallucinate, SQL first for structured questions, hybrid
for matching, log every success, check playbooks before complex tasks.

Observer loop:
  observed() wrapper timestamps + persists every gateway call →
  error analyzer reads failures + asks LLM for diagnosis →
  playbook consolidator groups successes by endpoint pattern

All three designed for zero human intervention — agents operate,
observer watches, playbooks accumulate, iteration happens internally.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-17 00:00:08 -05:00

236 lines
7.0 KiB
TypeScript

/**
* Lakehouse Observer — autonomous iteration loop.
*
* Runs continuously alongside the agent gateway. Watches every operation,
* logs outcomes, detects failures, and feeds learnings back so agents
* improve over time without retraining.
*
* Three loops:
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
* success/failure to the lakehouse
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
* model to diagnose patterns, writes recommendations
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
* consolidates them into a reusable playbook entry
*
* This is the "third-party witness" J asked for — it watches what
* agents do and helps them not repeat mistakes.
*/
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
// ─── Observed operation log ───
interface ObservedOp {
timestamp: string;
endpoint: string;
input_summary: string;
success: boolean;
duration_ms: number;
output_summary: string;
error?: string;
}
const recentOps: ObservedOp[] = [];
// ─── Wrapped gateway caller — every call gets observed ───
export async function observed(
endpoint: string,
body: any,
description: string,
): Promise<{ data: any; op: ObservedOp }> {
const t0 = Date.now();
let data: any;
let error: string | undefined;
let success = true;
try {
const resp = await fetch(`${GATEWAY}${endpoint}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
data = await resp.json();
if (data.error) {
success = false;
error = data.error;
}
} catch (e: any) {
success = false;
error = e.message;
data = { error: e.message };
}
const op: ObservedOp = {
timestamp: new Date().toISOString(),
endpoint,
input_summary: description,
success,
duration_ms: Date.now() - t0,
output_summary: success
? summarize(data)
: `ERROR: ${error}`,
error,
};
recentOps.push(op);
if (recentOps.length > 1000) recentOps.shift();
// Persist to lakehouse
await persistOp(op);
return { data, op };
}
function summarize(data: any): string {
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
if (data.rows) return `${data.row_count || data.rows.length} rows`;
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
if (data.sources) return `${data.sources.length} sources`;
return JSON.stringify(data).slice(0, 100);
}
async function persistOp(op: ObservedOp) {
const csv = `timestamp,endpoint,input_summary,success,duration_ms,output_summary,error
"${op.timestamp}","${op.endpoint}","${esc(op.input_summary)}",${op.success},${op.duration_ms},"${esc(op.output_summary)}","${esc(op.error || "")}"`;
const form = new FormData();
form.append("file", new Blob([csv], { type: "text/csv" }), "op.csv");
await fetch(`${LAKEHOUSE}/ingest/file?name=observed_operations`, { method: "POST", body: form }).catch(() => {});
}
function esc(s: string) { return s.replace(/"/g, '""').replace(/\n/g, ' '); }
// ─── Error analyzer loop ───
async function analyzeErrors() {
// Read recent failures
const failures = recentOps.filter(op => !op.success);
if (failures.length === 0) return;
const errorSummary = failures.slice(-10).map(f =>
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
).join("\n");
// Ask local model to diagnose
try {
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
${errorSummary}
For each error:
1. What likely caused it?
2. How should the agent adjust its approach?
3. Should this be added to the playbook as a "don't do this"?
Be specific and actionable. Under 200 words.`,
model: "qwen2.5",
max_tokens: 400,
temperature: 0.2,
}),
});
const analysis = await resp.json();
if (analysis.text) {
console.error(`[observer] Error analysis:\n${analysis.text}`);
// Log the analysis as a playbook entry
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `error_analysis: ${failures.length} failures`,
approach: "LLM-analyzed error patterns",
result: analysis.text.slice(0, 500),
context: errorSummary.slice(0, 500),
}),
});
}
} catch (e) {
console.error(`[observer] Analysis failed: ${e}`);
}
}
// ─── Playbook consolidation ───
async function consolidatePlaybooks() {
const successes = recentOps.filter(op => op.success);
if (successes.length < 5) return;
// Group by endpoint
const groups: Record<string, ObservedOp[]> = {};
for (const op of successes) {
const key = op.endpoint;
if (!groups[key]) groups[key] = [];
groups[key].push(op);
}
for (const [endpoint, ops] of Object.entries(groups)) {
if (ops.length < 3) continue;
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
approach: `common pattern: ${pattern.slice(0, 200)}`,
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
context: `endpoint=${endpoint}`,
}),
}).catch(() => {});
}
}
// ─── Main loop ───
async function main() {
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}`);
// Run a health check first
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
if (!health) {
console.error("[observer] gateway unreachable — exiting");
process.exit(1);
}
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
// Main loop
let cycle = 0;
while (true) {
await Bun.sleep(CYCLE_SECS * 1000);
cycle++;
// Every cycle: analyze errors if any
await analyzeErrors();
// Every 5 cycles: consolidate playbooks
if (cycle % 5 === 0) {
await consolidatePlaybooks();
}
const stats = {
cycle,
total_ops: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
};
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
}
}
// Export the observed wrapper for other agents to use
export { main as startObserver };
// Run if executed directly
if (import.meta.main) {
main().catch(console.error);
}