/** * Lakehouse Observer — autonomous iteration loop. * * Runs continuously alongside the agent gateway. Watches every operation, * logs outcomes, detects failures, and feeds learnings back so agents * improve over time without retraining. * * Three loops: * 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every * success/failure to the lakehouse * 2. ERROR ANALYZER — periodically reads the error log, asks a local * model to diagnose patterns, writes recommendations * 3. PLAYBOOK BUILDER — after N successful ops of the same type, * consolidates them into a reusable playbook entry * * This is the "third-party witness" J asked for — it watches what * agents do and helps them not repeat mistakes. */ const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700"; const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30"); // ─── Observed operation log ─── interface ObservedOp { timestamp: string; endpoint: string; input_summary: string; success: boolean; duration_ms: number; output_summary: string; error?: string; } const recentOps: ObservedOp[] = []; // ─── Wrapped gateway caller — every call gets observed ─── export async function observed( endpoint: string, body: any, description: string, ): Promise<{ data: any; op: ObservedOp }> { const t0 = Date.now(); let data: any; let error: string | undefined; let success = true; try { const resp = await fetch(`${GATEWAY}${endpoint}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), }); data = await resp.json(); if (data.error) { success = false; error = data.error; } } catch (e: any) { success = false; error = e.message; data = { error: e.message }; } const op: ObservedOp = { timestamp: new Date().toISOString(), endpoint, input_summary: description, success, duration_ms: Date.now() - t0, output_summary: success ? summarize(data) : `ERROR: ${error}`, error, }; recentOps.push(op); if (recentOps.length > 1000) recentOps.shift(); // Persist to lakehouse await persistOp(op); return { data, op }; } function summarize(data: any): string { if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`; if (data.rows) return `${data.row_count || data.rows.length} rows`; if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`; if (data.sources) return `${data.sources.length} sources`; return JSON.stringify(data).slice(0, 100); } async function persistOp(op: ObservedOp) { const csv = `timestamp,endpoint,input_summary,success,duration_ms,output_summary,error "${op.timestamp}","${op.endpoint}","${esc(op.input_summary)}",${op.success},${op.duration_ms},"${esc(op.output_summary)}","${esc(op.error || "")}"`; const form = new FormData(); form.append("file", new Blob([csv], { type: "text/csv" }), "op.csv"); await fetch(`${LAKEHOUSE}/ingest/file?name=observed_operations`, { method: "POST", body: form }).catch(() => {}); } function esc(s: string) { return s.replace(/"/g, '""').replace(/\n/g, ' '); } // ─── Error analyzer loop ─── async function analyzeErrors() { // Read recent failures const failures = recentOps.filter(op => !op.success); if (failures.length === 0) return; const errorSummary = failures.slice(-10).map(f => `[${f.endpoint}] ${f.input_summary}: ${f.error}` ).join("\n"); // Ask local model to diagnose try { const resp = await fetch(`${LAKEHOUSE}/ai/generate`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes: ${errorSummary} For each error: 1. What likely caused it? 2. How should the agent adjust its approach? 3. Should this be added to the playbook as a "don't do this"? Be specific and actionable. Under 200 words.`, model: "qwen2.5", max_tokens: 400, temperature: 0.2, }), }); const analysis = await resp.json(); if (analysis.text) { console.error(`[observer] Error analysis:\n${analysis.text}`); // Log the analysis as a playbook entry await fetch(`${GATEWAY}/log`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ operation: `error_analysis: ${failures.length} failures`, approach: "LLM-analyzed error patterns", result: analysis.text.slice(0, 500), context: errorSummary.slice(0, 500), }), }); } } catch (e) { console.error(`[observer] Analysis failed: ${e}`); } } // ─── Playbook consolidation ─── async function consolidatePlaybooks() { const successes = recentOps.filter(op => op.success); if (successes.length < 5) return; // Group by endpoint const groups: Record = {}; for (const op of successes) { const key = op.endpoint; if (!groups[key]) groups[key] = []; groups[key].push(op); } for (const [endpoint, ops] of Object.entries(groups)) { if (ops.length < 3) continue; const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length; const pattern = ops.slice(-3).map(o => o.input_summary).join("; "); await fetch(`${GATEWAY}/log`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ operation: `consolidated: ${ops.length} successful ${endpoint} calls`, approach: `common pattern: ${pattern.slice(0, 200)}`, result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`, context: `endpoint=${endpoint}`, }), }).catch(() => {}); } } // ─── Main loop ─── async function main() { console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}`); // Run a health check first const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null); if (!health) { console.error("[observer] gateway unreachable — exiting"); process.exit(1); } console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`); // Main loop let cycle = 0; while (true) { await Bun.sleep(CYCLE_SECS * 1000); cycle++; // Every cycle: analyze errors if any await analyzeErrors(); // Every 5 cycles: consolidate playbooks if (cycle % 5 === 0) { await consolidatePlaybooks(); } const stats = { cycle, total_ops: recentOps.length, successes: recentOps.filter(o => o.success).length, failures: recentOps.filter(o => !o.success).length, }; console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`); } } // Export the observed wrapper for other agents to use export { main as startObserver }; // Run if executed directly if (import.meta.main) { main().catch(console.error); }