/** * Lakehouse Observer — autonomous iteration loop. * * Runs continuously alongside the agent gateway. Watches every operation, * logs outcomes, detects failures, and feeds learnings back so agents * improve over time without retraining. * * Three loops: * 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every * success/failure to the lakehouse * 2. ERROR ANALYZER — periodically reads the error log, asks a local * model to diagnose patterns, writes recommendations * 3. PLAYBOOK BUILDER — after N successful ops of the same type, * consolidates them into a reusable playbook entry * * This is the "third-party witness" J asked for — it watches what * agents do and helps them not repeat mistakes. */ import { filterChunks } from "./relevance"; const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700"; const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30"); // Phase 24 — observer now listens on an HTTP port for external ops // (scenarios bypass the MCP:3700 layer by design). Default 3800. const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800"); // ─── Observed operation log ─── interface ObservedOp { timestamp: string; endpoint: string; input_summary: string; success: boolean; duration_ms: number; output_summary: string; error?: string; // Phase 24 — optional provenance so error analyzer and playbook // builder can differentiate MCP-layer ops from scenario-sourced // events. Scenarios set source="scenario" + staffer_id + sig_hash. source?: "mcp" | "scenario" | "langfuse" | "overseer_correction"; staffer_id?: string; sig_hash?: string; event_kind?: string; role?: string; city?: string; state?: string; count?: number; rescue_attempted?: boolean; rescue_succeeded?: boolean; // Overseer-correction-specific (2026-04-23): lets the analyzer // correlate corrections with the drift that prompted them and with // subsequent outcomes that either validated or invalidated the advice. task_class?: string; correction?: string; applied_at_turn?: number; } const recentOps: ObservedOp[] = []; // Phase 24 — external ingest path. Scenarios POST outcome summaries // here so the observer's analyzer + playbook builder see them. Called // from the Bun.serve() handler below. Same ring buffer as the MCP- // wrapped path so downstream loops don't need to know the source. export function recordExternalOp(op: ObservedOp): void { recentOps.push({ ...op, source: op.source ?? "scenario" }); if (recentOps.length > 2000) recentOps.shift(); } // ─── Wrapped gateway caller — every call gets observed ─── export async function observed( endpoint: string, body: any, description: string, ): Promise<{ data: any; op: ObservedOp }> { const t0 = Date.now(); let data: any; let error: string | undefined; let success = true; try { const resp = await fetch(`${GATEWAY}${endpoint}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), }); data = await resp.json(); if (data.error) { success = false; error = data.error; } } catch (e: any) { success = false; error = e.message; data = { error: e.message }; } const op: ObservedOp = { timestamp: new Date().toISOString(), endpoint, input_summary: description, success, duration_ms: Date.now() - t0, output_summary: success ? summarize(data) : `ERROR: ${error}`, error, }; recentOps.push(op); if (recentOps.length > 1000) recentOps.shift(); // Persist to lakehouse await persistOp(op); return { data, op }; } function summarize(data: any): string { if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`; if (data.rows) return `${data.row_count || data.rows.length} rows`; if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`; if (data.sources) return `${data.sources.length} sources`; return JSON.stringify(data).slice(0, 100); } // Phase 24 honesty fix — the old persistOp used /ingest/file which // REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md). // Every op silently wiped all prior ops. Now we append a JSONL line to // data/_observer/ops.jsonl so the historical trace is durable. The // observer analyzer + playbook builder read from this file when it // outgrows the 2000-entry in-memory ring. async function persistOp(op: ObservedOp) { try { const { mkdir, appendFile } = await import("node:fs/promises"); await mkdir("data/_observer", { recursive: true }); await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n"); } catch { // Persistence is best-effort; in-memory ring still works. } } // ─── LLM Team escalation (code_review mode) ─── // // When recent failures on a single sig_hash cross a threshold the // local-model analysis is probably insufficient. J's 2026-04-24 // direction: "the observer would trigger to give more context" — // route failure clusters to LLM Team's specialized code_review mode // (via /api/run) so richer structured signal lands in the KB for // scrum + auditor + playbook memory to consume next pass. // // Non-destructive: runs in parallel to the existing local diagnose // call (qwen3.5:latest after the 2026-04-30 bump), never replaces // it. Writes to data/_kb/observer_escalations.jsonl as a dedicated // audit surface. const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000"; const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl"; const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers // ─── KB enrichment helper (2026-04-26) ──────────────────────────── // Mirrors what scrum_master_pipeline already does on every per-file // review: queries pathway_memory bug fingerprints + the lakehouse_arch // matrix corpus, then asks qwen3.5:latest to synthesize a tight // briefing. We reuse the same primitives so observer escalations carry // the same compounding context the scrum loop builds — no new index // surfaces, no new corpora. // // `task_class` is derived from the cluster (most ops use the same one); // pathway/bug_fingerprints is permissive about a null file_path, so // non-code clusters (scenario fills, v1.chat events) just see broader // matches via task_class alone. // // Returns "" when there's no useful signal — caller treats empty as // "no preamble" and skips the prepend. async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise { const sample = cluster[0]; const taskClass = sample?.event_kind ?? (sample?.source === "scenario" ? "scenario_fill" : "observer_escalation"); // Step 1: pathway bug fingerprints. Best-effort; null filePath just // widens the query at the matrix-index level. let fingerprints: { flag: { kind: string }; pattern_key: string; example: string; occurrences: number }[] = []; try { const r = await fetch(`${LAKEHOUSE}/vectors/pathway/bug_fingerprints`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ task_class: taskClass, file_path: null, signal_class: null, limit: 5 }), signal: AbortSignal.timeout(5000), }); if (r.ok) fingerprints = (await r.json() as any).fingerprints ?? []; } catch {} // Step 2: architectural matrix (lakehouse_arch_v1) — ADRs/PRD/plan // intent. Cluster summary is the search query. const clusterSummary = cluster.slice(-5).map(o => `${o.endpoint ?? "?"} ${o.input_summary ?? ""} ${o.error ?? ""}` ).join(" | "); let matrixChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = []; try { const r = await fetch(`${LAKEHOUSE}/vectors/search`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 4 }), signal: AbortSignal.timeout(5000), }); if (r.ok) matrixChunks = (await r.json() as any).results ?? []; } catch {} // Step 3: gold-standard prior answers (lakehouse_answers_v1) — past // scrum reviews + observer escalations. This is where the BIG-model // results we save live; future small-model handlers retrieve them // here as scaffolding so the cheap rung gets near-paid quality. let answerChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = []; try { const r = await fetch(`${LAKEHOUSE}/vectors/search`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ index_name: "lakehouse_answers_v1", query: `${taskClass} ${clusterSummary}`, top_k: 3 }), signal: AbortSignal.timeout(5000), }); if (r.ok) answerChunks = (await r.json() as any).results ?? []; } catch {} if (fingerprints.length === 0 && matrixChunks.length === 0 && answerChunks.length === 0) return ""; // Step 3: synthesis via local model (qwen3.5:latest, provider=ollama). // Compresses the raw bundle to a 1-2 sentence briefing the cloud // reviewer can actually use. If local model is down/slow, fall back // to the raw dump rather than blocking the escalation path. const rawBundle = [ fingerprints.length > 0 ? "PRIOR BUG PATTERNS (pathway memory):\n" + fingerprints.map((fp, i) => `${i + 1}. [${fp.flag.kind}] ${fp.pattern_key} (×${fp.occurrences}) e.g. ${fp.example.slice(0, 120)}` ).join("\n") : "", matrixChunks.length > 0 ? "RELATED ARCHITECTURE CONTEXT:\n" + matrixChunks.map((c, i) => `${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 200)}` ).join("\n") : "", answerChunks.length > 0 ? "PRIOR GOLD-STANDARD ANSWERS (similar past reviews + escalations):\n" + answerChunks.map((c, i) => `${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 240)}` ).join("\n") : "", ].filter(Boolean).join("\n\n"); const synthPrompt = `A failure cluster (sig_hash=${sigHash.slice(0, 8)}, ${cluster.length} occurrences, task_class=${taskClass}) is about to be escalated for diagnosis. Here are prior signals from our knowledge base: ${rawBundle} Output a single paragraph (≤300 chars) briefing the cloud reviewer on which prior signals are most likely relevant to this cluster. If nothing matches, say so plainly. No preamble, no markdown.`; let synthesized = ""; try { const r = await fetch(`${LAKEHOUSE}/v1/chat`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ provider: "ollama", model: "qwen3.5:latest", messages: [{ role: "user", content: synthPrompt }], max_tokens: 200, temperature: 0.1, think: false, }), signal: AbortSignal.timeout(15000), }); if (r.ok) { const j = await r.json() as any; synthesized = (j?.choices?.[0]?.message?.content ?? "").trim(); } } catch {} const body = synthesized.length > 0 ? synthesized : rawBundle; return `═══ KB CONTEXT — prior signals on this task class (synthesized by qwen3.5:latest) ═══\n${body}\n═══\n\n`; } async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) { // Package the failure cluster as a single context blob. Originally // I routed this to LLM Team's `code_review` mode at /api/run, but // that mode isn't registered in llm_team_ui.py — it returned // "Unknown mode" on every call. Revised 2026-04-24: route directly // to the gateway's /v1/chat with provider=ollama_cloud + qwen3-coder:480b // (the coding specialist that's rung 2 of the scrum ladder, proven // to produce substantive structured reviews). Fire-and-forget so // downstream failures don't block observer's normal loop. const context = cluster.slice(-8).map((o, i) => `[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}` ).join("\n"); const kbPreamble = await buildKbPreamble(sigHash, cluster); const prompt = `${kbPreamble}sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`; try { // 2026-04-26: switched from ollama_cloud/qwen3-coder:480b (weekly // 429 quota was blocking escalations) to paid OpenRouter // deepseek-v3.1-terminus — 671B reasoning specialist, $0.21 in / // $0.79 out per M tokens (under the $0.85/M ceiling J set), 164K // ctx. Per-escalation cost: ~$0.0006 (typical 500-token prompt + // 300-token completion). const resp = await fetch(`${LAKEHOUSE}/v1/chat`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ provider: "openrouter", model: "deepseek/deepseek-v3.1-terminus", messages: [{ role: "user", content: prompt }], max_tokens: 800, temperature: 0.2, }), signal: AbortSignal.timeout(60000), }); if (!resp.ok) { console.error(`[observer] escalation /v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`); return; } const j: any = await resp.json(); const analysis = j?.choices?.[0]?.message?.content ?? ""; // Audit row stays schema-compatible with the prior implementation — // downstream consumers see structured fields regardless of the // review-source change. Facts/entities stay empty (this call is // direct-model, not extract-mode); the raw analysis carries the // signal. const row = { ts: new Date().toISOString(), source: "observer_escalation", mode: "direct_chat_deepseek_v3_1_terminus", sig_hash: sigHash, cluster_size: cluster.length, cluster_staffer: cluster[0]?.staffer_id, cluster_endpoint: cluster[0]?.endpoint, prompt_tokens: j?.usage?.prompt_tokens ?? 0, completion_tokens: j?.usage?.completion_tokens ?? 0, kb_preamble_chars: kbPreamble.length, analysis: analysis.slice(0, 4000), }; const { appendFile } = await import("node:fs/promises"); await appendFile(LLM_TEAM_ESCALATIONS, JSON.stringify(row) + "\n"); console.error( `[observer] escalated sig_hash=${sigHash.slice(0, 8)} · cluster=${cluster.length} · ${analysis.length} chars` ); } catch (e) { console.error(`[observer] escalation failed: ${(e as Error).message}`); } } // Track which sig_hashes we've already escalated this session so we // don't hammer LLM Team on every analyzeErrors tick when a cluster // persists across cycles. const escalatedSigHashes = new Set(); // ─── Hand-review for scrum/agent candidate responses (2026-04-25) ─── // // Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict // can be treated as truth about whether a candidate review is grounded. // Two-tier evaluator: // 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with // response + source excerpt + grounding stats as context. // 2. On cloud failure (throttle/timeout) → deterministic heuristic // over grounding_pct + total_quotes. Marked source: "heuristic" // so consumers can tell which rung produced the verdict. // Every verdict is persisted to data/_kb/observer_reviews.jsonl. const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl"; interface HandReviewInput { file_path: string; model: string; response: string; source_content: string; grounding_stats: { total: number; grounded: number; groundedPct: number | null }; attempt: number; } interface HandReviewVerdict { verdict: "accept" | "reject" | "cycle"; confidence: number; notes: string; source: "cloud" | "heuristic"; } async function handReview(input: HandReviewInput): Promise { const t0 = Date.now(); let verdict: HandReviewVerdict; try { verdict = await cloudHandReview(input); } catch (e) { console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`); verdict = heuristicHandReview(input); } // Persist regardless of source so we can later compare cloud vs // heuristic verdicts on the same input and tune the heuristic. const row = { ts: new Date().toISOString(), file_path: input.file_path, model: input.model, attempt: input.attempt, response_chars: input.response.length, grounding_stats: input.grounding_stats, verdict: verdict.verdict, confidence: verdict.confidence, notes: verdict.notes, source: verdict.source, duration_ms: Date.now() - t0, }; try { const { appendFile } = await import("node:fs/promises"); await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n"); } catch { /* best-effort persistence */ } return verdict; } async function cloudHandReview(input: HandReviewInput): Promise { const grounded = input.grounding_stats.grounded; const total = input.grounding_stats.total; const pct = input.grounding_stats.groundedPct; // Truncate to keep the prompt under typical context windows. // 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context. const responseExcerpt = input.response.slice(0, 2000); const sourceExcerpt = input.source_content.slice(0, 4000); const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated. FILE: ${input.file_path} MODEL: ${input.model} ATTEMPT: ${input.attempt} ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""} REVIEW (first 2000 chars): \`\`\` ${responseExcerpt} \`\`\` SOURCE EXCERPT (first 4000 chars): \`\`\` ${sourceExcerpt} \`\`\` Respond ONLY with a JSON object: { "verdict": "accept" | "reject" | "cycle", "confidence": 0-100, "notes": "<1-2 sentences on what makes this grounded or hallucinated>" } - accept: review references real symbols/lines in source; findings could be acted on. - reject: review invents APIs, fabricates calls, contradicts source. Do NOT record. - cycle: review is mediocre — partially grounded but wrong shape, try a stronger model.`; // Hand-review uses paid OpenRouter so it sidesteps the Ollama Cloud // throttle that drove every prior iter into the heuristic fallback. // Grok 4.1 fast: $0.20 in / $0.50 out per M tokens, 2M ctx. A typical // hand-review (~6K input + 300 output) costs ~$0.0014. Selected via // J directive 2026-04-25 ("best model under $0.72/M"). const resp = await fetch(`${LAKEHOUSE}/v1/chat`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ provider: "openrouter", model: "x-ai/grok-4.1-fast", messages: [{ role: "user", content: prompt }], max_tokens: 300, temperature: 0.0, }), signal: AbortSignal.timeout(45000), }); if (!resp.ok) { throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`); } const j: any = await resp.json(); const content = (j?.choices?.[0]?.message?.content ?? "").trim(); // Pull JSON object from the response — model may wrap it in prose. const m = content.match(/\{[\s\S]*\}/); if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`); const parsed = JSON.parse(m[0]); const v = String(parsed.verdict ?? "accept").toLowerCase(); return { verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept", confidence: Number(parsed.confidence ?? 50), notes: String(parsed.notes ?? "").slice(0, 500), source: "cloud", }; } function heuristicHandReview(input: HandReviewInput): HandReviewVerdict { // Deterministic fallback when cloud is throttled. Conservative: // only flip to reject when the evidence is overwhelming, otherwise // accept (fall-open principle — observer is policy, not blocker). const total = input.grounding_stats.total; const pct = input.grounding_stats.groundedPct; const respLen = input.response.length; // Too short to be a real review if (respLen < 1500) { return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" }; } // Below 5 quotes — not enough signal to judge grounding; accept if (total < 5 || pct === null) { return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" }; } // Very heavy hallucination if (pct < 20) { return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" }; } // Mediocre — cycle to a stronger model if (pct < 50) { return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" }; } // Good enough return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" }; } async function maybeEscalate(failures: ObservedOp[]) { // Group failures by sig_hash const bySig = new Map(); for (const f of failures) { const k = f.sig_hash ?? "__no_sig__"; (bySig.get(k) ?? bySig.set(k, []).get(k)!).push(f); } for (const [sigHash, cluster] of bySig) { if (sigHash === "__no_sig__") continue; if (cluster.length < ESCALATION_THRESHOLD) continue; if (escalatedSigHashes.has(sigHash)) continue; escalatedSigHashes.add(sigHash); // Fire-and-forget — don't block the existing analyzer loop. escalateFailureClusterToLLMTeam(sigHash, cluster).catch(() => {}); } } // ─── Error analyzer loop ─── async function analyzeErrors() { // Read recent failures const failures = recentOps.filter(op => !op.success); if (failures.length === 0) return; // NEW 2026-04-24: escalate recurring sig_hash clusters to LLM Team // code_review mode. Runs in parallel to the local diagnose call // below — non-blocking, richer downstream signal for scrum/auditor. maybeEscalate(failures).catch(() => {}); const errorSummary = failures.slice(-10).map(f => `[${f.endpoint}] ${f.input_summary}: ${f.error}` ).join("\n"); // Ask local model to diagnose. Phase 44 migration (2026-04-27): // /v1/chat instead of legacy /ai/generate so /v1/usage tracks the // call + Langfuse traces it. 2026-04-30 model bump: qwen2.5 → // qwen3.5:latest to match the small-model-pipeline local-tier default. try { const resp = await fetch(`${LAKEHOUSE}/v1/chat`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ model: "qwen3.5:latest", provider: "ollama", messages: [{ role: "user", content: `You are a system reliability observer. Analyze these recent failures and suggest fixes: ${errorSummary} For each error: 1. What likely caused it? 2. How should the agent adjust its approach? 3. Should this be added to the playbook as a "don't do this"? Be specific and actionable. Under 200 words.`, }], max_tokens: 400, temperature: 0.2, }), }); const analysis = await resp.json() as any; const analysisText = analysis?.choices?.[0]?.message?.content ?? ""; if (analysisText) { console.error(`[observer] Error analysis:\n${analysisText}`); // Log the analysis as a playbook entry await fetch(`${GATEWAY}/log`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ operation: `error_analysis: ${failures.length} failures`, approach: "LLM-analyzed error patterns", result: analysisText.slice(0, 500), context: errorSummary.slice(0, 500), }), }); } } catch (e) { console.error(`[observer] Analysis failed: ${e}`); } } // ─── Playbook consolidation ─── async function consolidatePlaybooks() { const successes = recentOps.filter(op => op.success); if (successes.length < 5) return; // Group by endpoint const groups: Record = {}; for (const op of successes) { const key = op.endpoint; if (!groups[key]) groups[key] = []; groups[key].push(op); } for (const [endpoint, ops] of Object.entries(groups)) { if (ops.length < 3) continue; const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length; const pattern = ops.slice(-3).map(o => o.input_summary).join("; "); await fetch(`${GATEWAY}/log`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ operation: `consolidated: ${ops.length} successful ${endpoint} calls`, approach: `common pattern: ${pattern.slice(0, 200)}`, result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`, context: `endpoint=${endpoint}`, }), }).catch(() => {}); } } // ─── HTTP listener for external ops (Phase 24) ─── // Scenarios POST per-event outcomes here so the observer's analyzer + // playbook builder see them alongside MCP-wrapped ops. Read-only stats // also exposed at /stats for external health checks. function startHttpListener() { Bun.serve({ port: OBSERVER_PORT, hostname: "0.0.0.0", fetch(req) { const url = new URL(req.url); if (req.method === "GET" && url.pathname === "/health") { return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length })); } if (req.method === "GET" && url.pathname === "/stats") { const bySource = new Map(); for (const o of recentOps) { const k = o.source ?? "mcp"; bySource.set(k, (bySource.get(k) ?? 0) + 1); } return new Response(JSON.stringify({ total: recentOps.length, successes: recentOps.filter(o => o.success).length, failures: recentOps.filter(o => !o.success).length, by_source: Object.fromEntries(bySource), recent_scenario_ops: recentOps .filter(o => o.source === "scenario") .slice(-10) .map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })), })); } // ─── Hand-review endpoint (2026-04-25) ─── // scrum/agent posts a candidate response + source content + grounding // stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with // semantic context and returns {verdict, confidence, notes}. On // cloud throttle, falls back to a deterministic heuristic over the // grounding stats so the loop keeps moving with honest signal. // // This is the policy layer scrum was missing — pre-2026-04-25 the // scrum_master applied a hardcoded grounding-rate threshold inline, // which baked judgment into the wrong layer. Now scrum reports data // (response + source + stats) and observer decides accept/reject/cycle. if (req.method === "POST" && url.pathname === "/review") { return req.json().then((body: any) => handReview(body)) .then((verdict) => new Response(JSON.stringify(verdict), { headers: { "content-type": "application/json" }, })) .catch((e: Error) => new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), { status: 200, // fall-open shape — scrum keeps moving on observer failure headers: { "content-type": "application/json" }, })); } if (req.method === "POST" && url.pathname === "/event") { return req.json().then((body: any) => { const op: ObservedOp = { timestamp: body.timestamp ?? new Date().toISOString(), endpoint: body.endpoint ?? "scenario:fill", input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`, success: !!body.success, duration_ms: Number(body.duration_ms ?? 0), output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")), error: body.error, // Respect the client's provenance if set (langfuse bridge // sends source:"langfuse", etc.). Default to "scenario" // to keep legacy Phase 24 callers working. source: body.source ?? "scenario", staffer_id: body.staffer_id, sig_hash: body.sig_hash, event_kind: body.event_kind, role: body.role, city: body.city, state: body.state, count: body.count, rescue_attempted: !!body.rescue_attempted, rescue_succeeded: !!body.rescue_succeeded, }; recordExternalOp(op); persistOp(op).catch(() => {}); return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length })); }).catch((e: Error) => new Response(JSON.stringify({ error: e.message }), { status: 400 })); } // ─── Relevance filter (2026-04-25) ─── // Drops "adjacency pollution" from matrix-retrieved chunks before // they reach the reviewer LLM. Caller (scrum/agent) posts the // focus file + candidate chunks; observer scores via heuristic // (path/symbol/token signals) and returns kept + dropped lists. // Pure function — no I/O, safe to call hot. if (req.method === "POST" && url.pathname === "/relevance") { return req.json().then((body: any) => { const focus = body.focus_file ?? body.focus ?? {}; const chunks = body.chunks ?? []; const threshold = typeof body.threshold === "number" ? body.threshold : 0.3; const result = filterChunks(focus, chunks, threshold); return new Response(JSON.stringify(result), { headers: { "content-type": "application/json" }, }); }).catch((e: Error) => new Response(JSON.stringify({ error: e.message }), { status: 400 })); } return new Response("not found", { status: 404 }); }, }); console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`); } // ─── Overseer corrections tailer (2026-04-23) ─── // The gateway's /v1/respond loop writes T3 overseer corrections to // data/_kb/overseer_corrections.jsonl. Tail it once per cycle and // inject each new row into the same recentOps ring that analyzeErrors // + consolidatePlaybooks read — so a correction that just fired shows // up alongside the outcomes it was meant to repair, and the analyzer // can flag patterns like "three corrections on staffing.fill with the // same advice — underlying problem isn't a drift, it's a data gap". const CORRECTIONS_PATH = process.env.OVERSEER_CORRECTIONS_PATH ?? "/home/profit/lakehouse/data/_kb/overseer_corrections.jsonl"; let correctionsCursor = 0; // byte offset async function tailOverseerCorrections(): Promise { const f = Bun.file(CORRECTIONS_PATH); if (!(await f.exists())) return 0; const size = f.size; if (size <= correctionsCursor) return 0; // Read only the suffix since the last cursor; keeps tail work // bounded even as the file grows. const text = await f.slice(correctionsCursor, size).text(); correctionsCursor = size; let forwarded = 0; for (const line of text.split("\n")) { if (!line.trim()) continue; let row: any; try { row = JSON.parse(line); } catch { continue; } const op: ObservedOp = { timestamp: row.created_at ?? new Date().toISOString(), endpoint: `overseer:${row.model ?? "claude-opus-4-7"}`, input_summary: `${row.task_class ?? "?"}: ${row.reason ?? "escalation"}`, // Correction itself is neither success nor failure — it's a // mitigation attempt. We mark success=true so analyzeErrors // doesn't count it as a failure, but the preview lets the // analyzer see what was tried. success: true, duration_ms: Number(row.usage?.latency_ms ?? 0), output_summary: String(row.correction ?? "").slice(0, 200), source: "overseer_correction", sig_hash: row.sig_hash, task_class: row.task_class, correction: String(row.correction ?? ""), applied_at_turn: Number(row.applied_at_turn ?? 0), }; recordExternalOp(op); forwarded++; } return forwarded; } // ─── Main loop ─── async function main() { console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`); // Run a health check first const health = await fetch(`${GATEWAY}/health`).then(r => r.ok ? r.text() : null).catch(() => null); if (!health) { console.error("[observer] gateway unreachable — exiting"); process.exit(1); } console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`); // Phase 24 — bind HTTP listener so scenarios can POST outcomes. startHttpListener(); // Main loop let cycle = 0; while (true) { await Bun.sleep(CYCLE_SECS * 1000); cycle++; // Every cycle: tail the overseer corrections KB stream, then // analyze errors. Order matters — if an overseer correction just // landed for a sig_hash that previously failed, the analyzer // should see both. const newCorrections = await tailOverseerCorrections(); if (newCorrections > 0) { console.error(`[observer] pulled ${newCorrections} new overseer correction(s) into ring`); } await analyzeErrors(); // Every 5 cycles: consolidate playbooks if (cycle % 5 === 0) { await consolidatePlaybooks(); } const scenarioOps = recentOps.filter(o => o.source === "scenario").length; const langfuseOps = recentOps.filter(o => o.source === "langfuse").length; const correctionOps = recentOps.filter(o => o.source === "overseer_correction").length; const stats = { cycle, total_ops: recentOps.length, successes: recentOps.filter(o => o.success).length, failures: recentOps.filter(o => !o.success).length, scenario_ops: scenarioOps, langfuse_ops: langfuseOps, overseer_corrections: correctionOps, }; console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`); } } // Export the observed wrapper for other agents to use export { main as startObserver }; // Run if executed directly if (import.meta.main) { main().catch(console.error); }