Three new systemd services: - lakehouse-agent (:3700) — REST gateway wrapping all lakehouse tools. Clean JSON in/out, no protocol complexity. 9 endpoints: /search, /sql, /match, /worker/:id, /ask, /log, /playbooks, /profile/:id, /vram - lakehouse-observer — watches operations, logs to lakehouse, asks local model to diagnose failure patterns, consolidates successful patterns into playbooks every 5 cycles - Stdio MCP transport preserved for Claude Code integration AGENT_INSTRUCTIONS.md: complete operating manual for sub-agents. Rules: never hallucinate, SQL first for structured questions, hybrid for matching, log every success, check playbooks before complex tasks. Observer loop: observed() wrapper timestamps + persists every gateway call → error analyzer reads failures + asks LLM for diagnosis → playbook consolidator groups successes by endpoint pattern All three designed for zero human intervention — agents operate, observer watches, playbooks accumulate, iteration happens internally. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
236 lines
7.0 KiB
TypeScript
236 lines
7.0 KiB
TypeScript
/**
|
|
* Lakehouse Observer — autonomous iteration loop.
|
|
*
|
|
* Runs continuously alongside the agent gateway. Watches every operation,
|
|
* logs outcomes, detects failures, and feeds learnings back so agents
|
|
* improve over time without retraining.
|
|
*
|
|
* Three loops:
|
|
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
|
|
* success/failure to the lakehouse
|
|
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
|
|
* model to diagnose patterns, writes recommendations
|
|
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
|
|
* consolidates them into a reusable playbook entry
|
|
*
|
|
* This is the "third-party witness" J asked for — it watches what
|
|
* agents do and helps them not repeat mistakes.
|
|
*/
|
|
|
|
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
|
|
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
|
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
|
|
|
|
// ─── Observed operation log ───
|
|
|
|
interface ObservedOp {
|
|
timestamp: string;
|
|
endpoint: string;
|
|
input_summary: string;
|
|
success: boolean;
|
|
duration_ms: number;
|
|
output_summary: string;
|
|
error?: string;
|
|
}
|
|
|
|
const recentOps: ObservedOp[] = [];
|
|
|
|
// ─── Wrapped gateway caller — every call gets observed ───
|
|
|
|
export async function observed(
|
|
endpoint: string,
|
|
body: any,
|
|
description: string,
|
|
): Promise<{ data: any; op: ObservedOp }> {
|
|
const t0 = Date.now();
|
|
let data: any;
|
|
let error: string | undefined;
|
|
let success = true;
|
|
|
|
try {
|
|
const resp = await fetch(`${GATEWAY}${endpoint}`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify(body),
|
|
});
|
|
data = await resp.json();
|
|
if (data.error) {
|
|
success = false;
|
|
error = data.error;
|
|
}
|
|
} catch (e: any) {
|
|
success = false;
|
|
error = e.message;
|
|
data = { error: e.message };
|
|
}
|
|
|
|
const op: ObservedOp = {
|
|
timestamp: new Date().toISOString(),
|
|
endpoint,
|
|
input_summary: description,
|
|
success,
|
|
duration_ms: Date.now() - t0,
|
|
output_summary: success
|
|
? summarize(data)
|
|
: `ERROR: ${error}`,
|
|
error,
|
|
};
|
|
|
|
recentOps.push(op);
|
|
if (recentOps.length > 1000) recentOps.shift();
|
|
|
|
// Persist to lakehouse
|
|
await persistOp(op);
|
|
|
|
return { data, op };
|
|
}
|
|
|
|
function summarize(data: any): string {
|
|
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
|
|
if (data.rows) return `${data.row_count || data.rows.length} rows`;
|
|
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
|
|
if (data.sources) return `${data.sources.length} sources`;
|
|
return JSON.stringify(data).slice(0, 100);
|
|
}
|
|
|
|
async function persistOp(op: ObservedOp) {
|
|
const csv = `timestamp,endpoint,input_summary,success,duration_ms,output_summary,error
|
|
"${op.timestamp}","${op.endpoint}","${esc(op.input_summary)}",${op.success},${op.duration_ms},"${esc(op.output_summary)}","${esc(op.error || "")}"`;
|
|
|
|
const form = new FormData();
|
|
form.append("file", new Blob([csv], { type: "text/csv" }), "op.csv");
|
|
await fetch(`${LAKEHOUSE}/ingest/file?name=observed_operations`, { method: "POST", body: form }).catch(() => {});
|
|
}
|
|
|
|
function esc(s: string) { return s.replace(/"/g, '""').replace(/\n/g, ' '); }
|
|
|
|
// ─── Error analyzer loop ───
|
|
|
|
async function analyzeErrors() {
|
|
// Read recent failures
|
|
const failures = recentOps.filter(op => !op.success);
|
|
if (failures.length === 0) return;
|
|
|
|
const errorSummary = failures.slice(-10).map(f =>
|
|
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
|
|
).join("\n");
|
|
|
|
// Ask local model to diagnose
|
|
try {
|
|
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
|
|
|
|
${errorSummary}
|
|
|
|
For each error:
|
|
1. What likely caused it?
|
|
2. How should the agent adjust its approach?
|
|
3. Should this be added to the playbook as a "don't do this"?
|
|
|
|
Be specific and actionable. Under 200 words.`,
|
|
model: "qwen2.5",
|
|
max_tokens: 400,
|
|
temperature: 0.2,
|
|
}),
|
|
});
|
|
const analysis = await resp.json();
|
|
if (analysis.text) {
|
|
console.error(`[observer] Error analysis:\n${analysis.text}`);
|
|
// Log the analysis as a playbook entry
|
|
await fetch(`${GATEWAY}/log`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
operation: `error_analysis: ${failures.length} failures`,
|
|
approach: "LLM-analyzed error patterns",
|
|
result: analysis.text.slice(0, 500),
|
|
context: errorSummary.slice(0, 500),
|
|
}),
|
|
});
|
|
}
|
|
} catch (e) {
|
|
console.error(`[observer] Analysis failed: ${e}`);
|
|
}
|
|
}
|
|
|
|
// ─── Playbook consolidation ───
|
|
|
|
async function consolidatePlaybooks() {
|
|
const successes = recentOps.filter(op => op.success);
|
|
if (successes.length < 5) return;
|
|
|
|
// Group by endpoint
|
|
const groups: Record<string, ObservedOp[]> = {};
|
|
for (const op of successes) {
|
|
const key = op.endpoint;
|
|
if (!groups[key]) groups[key] = [];
|
|
groups[key].push(op);
|
|
}
|
|
|
|
for (const [endpoint, ops] of Object.entries(groups)) {
|
|
if (ops.length < 3) continue;
|
|
|
|
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
|
|
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
|
|
|
|
await fetch(`${GATEWAY}/log`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
|
|
approach: `common pattern: ${pattern.slice(0, 200)}`,
|
|
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
|
|
context: `endpoint=${endpoint}`,
|
|
}),
|
|
}).catch(() => {});
|
|
}
|
|
}
|
|
|
|
// ─── Main loop ───
|
|
|
|
async function main() {
|
|
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}`);
|
|
|
|
// Run a health check first
|
|
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
|
|
if (!health) {
|
|
console.error("[observer] gateway unreachable — exiting");
|
|
process.exit(1);
|
|
}
|
|
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
|
|
|
|
// Main loop
|
|
let cycle = 0;
|
|
while (true) {
|
|
await Bun.sleep(CYCLE_SECS * 1000);
|
|
cycle++;
|
|
|
|
// Every cycle: analyze errors if any
|
|
await analyzeErrors();
|
|
|
|
// Every 5 cycles: consolidate playbooks
|
|
if (cycle % 5 === 0) {
|
|
await consolidatePlaybooks();
|
|
}
|
|
|
|
const stats = {
|
|
cycle,
|
|
total_ops: recentOps.length,
|
|
successes: recentOps.filter(o => o.success).length,
|
|
failures: recentOps.filter(o => !o.success).length,
|
|
};
|
|
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
|
|
}
|
|
}
|
|
|
|
// Export the observed wrapper for other agents to use
|
|
export { main as startObserver };
|
|
|
|
// Run if executed directly
|
|
if (import.meta.main) {
|
|
main().catch(console.error);
|
|
}
|