lakehouse/mcp-server/observer.ts
root 0844206660
Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
observer + scrum: gold-standard answer corpus for compounding context
The compose-don't-add discipline applied to the original ask: when big
models produce good results (scrum reviews + observer escalations),
save them into the matrix indexer so future small-model handlers can
retrieve them as scaffolding. Local model gets near-paid quality from
a fraction of the cost.

New: scripts/build_answers_corpus.ts indexes lakehouse_answers_v1
from data/_kb/scrum_reviews.jsonl + data/_kb/observer_escalations.jsonl.
doc_id prefixes ('review:' vs 'escalation:') let consumers same-file-
gate the prior-reviews case while keeping escalations broad.

observer.ts: buildKbPreamble adds lakehouse_answers_v1 as a third
retrieval source alongside pathway/bug_fingerprints + lakehouse_arch_v1.
qwen3.5:latest synthesis now compresses three lenses into a single
briefing for the cloud reviewer.

scrum_master_pipeline.ts: epilogue dispatches a fire-and-forget rebuild
of lakehouse_answers_v1 after each run so this run's accepted reviews
are retrievable within ~30s. LH_SCRUM_SKIP_ANSWERS_REBUILD=1 disables.

Verified live: kb_preamble grew 416 → 727 chars after wiring third
source; qwen3.5:latest synthesis (702 → 128 tokens) compresses
correctly; deepseek-v3.1-terminus diagnosis (301 → 148 tokens) is
sharper, citing architectural patterns (circuit breaker, adapter
files) instead of generic timeouts. Total cost per escalation
unchanged at ~$0.0002.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 18:49:36 -05:00

846 lines
34 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Lakehouse Observer — autonomous iteration loop.
*
* Runs continuously alongside the agent gateway. Watches every operation,
* logs outcomes, detects failures, and feeds learnings back so agents
* improve over time without retraining.
*
* Three loops:
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
* success/failure to the lakehouse
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
* model to diagnose patterns, writes recommendations
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
* consolidates them into a reusable playbook entry
*
* This is the "third-party witness" J asked for — it watches what
* agents do and helps them not repeat mistakes.
*/
import { filterChunks } from "./relevance";
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
// Phase 24 — observer now listens on an HTTP port for external ops
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
// ─── Observed operation log ───
interface ObservedOp {
timestamp: string;
endpoint: string;
input_summary: string;
success: boolean;
duration_ms: number;
output_summary: string;
error?: string;
// Phase 24 — optional provenance so error analyzer and playbook
// builder can differentiate MCP-layer ops from scenario-sourced
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
source?: "mcp" | "scenario" | "langfuse" | "overseer_correction";
staffer_id?: string;
sig_hash?: string;
event_kind?: string;
role?: string;
city?: string;
state?: string;
count?: number;
rescue_attempted?: boolean;
rescue_succeeded?: boolean;
// Overseer-correction-specific (2026-04-23): lets the analyzer
// correlate corrections with the drift that prompted them and with
// subsequent outcomes that either validated or invalidated the advice.
task_class?: string;
correction?: string;
applied_at_turn?: number;
}
const recentOps: ObservedOp[] = [];
// Phase 24 — external ingest path. Scenarios POST outcome summaries
// here so the observer's analyzer + playbook builder see them. Called
// from the Bun.serve() handler below. Same ring buffer as the MCP-
// wrapped path so downstream loops don't need to know the source.
export function recordExternalOp(op: ObservedOp): void {
recentOps.push({ ...op, source: op.source ?? "scenario" });
if (recentOps.length > 2000) recentOps.shift();
}
// ─── Wrapped gateway caller — every call gets observed ───
export async function observed(
endpoint: string,
body: any,
description: string,
): Promise<{ data: any; op: ObservedOp }> {
const t0 = Date.now();
let data: any;
let error: string | undefined;
let success = true;
try {
const resp = await fetch(`${GATEWAY}${endpoint}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
data = await resp.json();
if (data.error) {
success = false;
error = data.error;
}
} catch (e: any) {
success = false;
error = e.message;
data = { error: e.message };
}
const op: ObservedOp = {
timestamp: new Date().toISOString(),
endpoint,
input_summary: description,
success,
duration_ms: Date.now() - t0,
output_summary: success
? summarize(data)
: `ERROR: ${error}`,
error,
};
recentOps.push(op);
if (recentOps.length > 1000) recentOps.shift();
// Persist to lakehouse
await persistOp(op);
return { data, op };
}
function summarize(data: any): string {
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
if (data.rows) return `${data.row_count || data.rows.length} rows`;
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
if (data.sources) return `${data.sources.length} sources`;
return JSON.stringify(data).slice(0, 100);
}
// Phase 24 honesty fix — the old persistOp used /ingest/file which
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
// Every op silently wiped all prior ops. Now we append a JSONL line to
// data/_observer/ops.jsonl so the historical trace is durable. The
// observer analyzer + playbook builder read from this file when it
// outgrows the 2000-entry in-memory ring.
async function persistOp(op: ObservedOp) {
try {
const { mkdir, appendFile } = await import("node:fs/promises");
await mkdir("data/_observer", { recursive: true });
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
} catch {
// Persistence is best-effort; in-memory ring still works.
}
}
// ─── LLM Team escalation (code_review mode) ───
//
// When recent failures on a single sig_hash cross a threshold the
// local qwen2.5 analysis is probably insufficient. J's 2026-04-24
// direction: "the observer would trigger to give more context" —
// route failure clusters to LLM Team's specialized code_review mode
// (via /api/run) so richer structured signal lands in the KB for
// scrum + auditor + playbook memory to consume next pass.
//
// Non-destructive: runs in parallel to the existing qwen2.5 analysis,
// never replaces it. Writes to data/_kb/observer_escalations.jsonl
// as a dedicated audit surface.
const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000";
const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl";
const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers
// ─── KB enrichment helper (2026-04-26) ────────────────────────────
// Mirrors what scrum_master_pipeline already does on every per-file
// review: queries pathway_memory bug fingerprints + the lakehouse_arch
// matrix corpus, then asks qwen3.5:latest to synthesize a tight
// briefing. We reuse the same primitives so observer escalations carry
// the same compounding context the scrum loop builds — no new index
// surfaces, no new corpora.
//
// `task_class` is derived from the cluster (most ops use the same one);
// pathway/bug_fingerprints is permissive about a null file_path, so
// non-code clusters (scenario fills, v1.chat events) just see broader
// matches via task_class alone.
//
// Returns "" when there's no useful signal — caller treats empty as
// "no preamble" and skips the prepend.
async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise<string> {
const sample = cluster[0];
const taskClass = sample?.event_kind
?? (sample?.source === "scenario" ? "scenario_fill" : "observer_escalation");
// Step 1: pathway bug fingerprints. Best-effort; null filePath just
// widens the query at the matrix-index level.
let fingerprints: { flag: { kind: string }; pattern_key: string; example: string; occurrences: number }[] = [];
try {
const r = await fetch(`${LAKEHOUSE}/vectors/pathway/bug_fingerprints`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ task_class: taskClass, file_path: null, signal_class: null, limit: 5 }),
signal: AbortSignal.timeout(5000),
});
if (r.ok) fingerprints = (await r.json() as any).fingerprints ?? [];
} catch {}
// Step 2: architectural matrix (lakehouse_arch_v1) — ADRs/PRD/plan
// intent. Cluster summary is the search query.
const clusterSummary = cluster.slice(-5).map(o =>
`${o.endpoint ?? "?"} ${o.input_summary ?? ""} ${o.error ?? ""}`
).join(" | ");
let matrixChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = [];
try {
const r = await fetch(`${LAKEHOUSE}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 4 }),
signal: AbortSignal.timeout(5000),
});
if (r.ok) matrixChunks = (await r.json() as any).results ?? [];
} catch {}
// Step 3: gold-standard prior answers (lakehouse_answers_v1) — past
// scrum reviews + observer escalations. This is where the BIG-model
// results we save live; future small-model handlers retrieve them
// here as scaffolding so the cheap rung gets near-paid quality.
let answerChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = [];
try {
const r = await fetch(`${LAKEHOUSE}/vectors/search`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ index_name: "lakehouse_answers_v1", query: `${taskClass} ${clusterSummary}`, top_k: 3 }),
signal: AbortSignal.timeout(5000),
});
if (r.ok) answerChunks = (await r.json() as any).results ?? [];
} catch {}
if (fingerprints.length === 0 && matrixChunks.length === 0 && answerChunks.length === 0) return "";
// Step 3: synthesis via local model (qwen3.5:latest, provider=ollama).
// Compresses the raw bundle to a 1-2 sentence briefing the cloud
// reviewer can actually use. If local model is down/slow, fall back
// to the raw dump rather than blocking the escalation path.
const rawBundle = [
fingerprints.length > 0
? "PRIOR BUG PATTERNS (pathway memory):\n" + fingerprints.map((fp, i) =>
`${i + 1}. [${fp.flag.kind}] ${fp.pattern_key} (×${fp.occurrences}) e.g. ${fp.example.slice(0, 120)}`
).join("\n")
: "",
matrixChunks.length > 0
? "RELATED ARCHITECTURE CONTEXT:\n" + matrixChunks.map((c, i) =>
`${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 200)}`
).join("\n")
: "",
answerChunks.length > 0
? "PRIOR GOLD-STANDARD ANSWERS (similar past reviews + escalations):\n" + answerChunks.map((c, i) =>
`${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 240)}`
).join("\n")
: "",
].filter(Boolean).join("\n\n");
const synthPrompt = `A failure cluster (sig_hash=${sigHash.slice(0, 8)}, ${cluster.length} occurrences, task_class=${taskClass}) is about to be escalated for diagnosis. Here are prior signals from our knowledge base:
${rawBundle}
Output a single paragraph (≤300 chars) briefing the cloud reviewer on which prior signals are most likely relevant to this cluster. If nothing matches, say so plainly. No preamble, no markdown.`;
let synthesized = "";
try {
const r = await fetch(`${LAKEHOUSE}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "ollama",
model: "qwen3.5:latest",
messages: [{ role: "user", content: synthPrompt }],
max_tokens: 200,
temperature: 0.1,
think: false,
}),
signal: AbortSignal.timeout(15000),
});
if (r.ok) {
const j = await r.json() as any;
synthesized = (j?.choices?.[0]?.message?.content ?? "").trim();
}
} catch {}
const body = synthesized.length > 0 ? synthesized : rawBundle;
return `═══ KB CONTEXT — prior signals on this task class (synthesized by qwen3.5:latest) ═══\n${body}\n═══\n\n`;
}
async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) {
// Package the failure cluster as a single context blob. Originally
// I routed this to LLM Team's `code_review` mode at /api/run, but
// that mode isn't registered in llm_team_ui.py — it returned
// "Unknown mode" on every call. Revised 2026-04-24: route directly
// to the gateway's /v1/chat with provider=ollama_cloud + qwen3-coder:480b
// (the coding specialist that's rung 2 of the scrum ladder, proven
// to produce substantive structured reviews). Fire-and-forget so
// downstream failures don't block observer's normal loop.
const context = cluster.slice(-8).map((o, i) =>
`[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}`
).join("\n");
const kbPreamble = await buildKbPreamble(sigHash, cluster);
const prompt = `${kbPreamble}sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`;
try {
// 2026-04-26: switched from ollama_cloud/qwen3-coder:480b (weekly
// 429 quota was blocking escalations) to paid OpenRouter
// deepseek-v3.1-terminus — 671B reasoning specialist, $0.21 in /
// $0.79 out per M tokens (under the $0.85/M ceiling J set), 164K
// ctx. Per-escalation cost: ~$0.0006 (typical 500-token prompt +
// 300-token completion).
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
provider: "openrouter",
model: "deepseek/deepseek-v3.1-terminus",
messages: [{ role: "user", content: prompt }],
max_tokens: 800,
temperature: 0.2,
}),
signal: AbortSignal.timeout(60000),
});
if (!resp.ok) {
console.error(`[observer] escalation /v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
return;
}
const j: any = await resp.json();
const analysis = j?.choices?.[0]?.message?.content ?? "";
// Audit row stays schema-compatible with the prior implementation —
// downstream consumers see structured fields regardless of the
// review-source change. Facts/entities stay empty (this call is
// direct-model, not extract-mode); the raw analysis carries the
// signal.
const row = {
ts: new Date().toISOString(),
source: "observer_escalation",
mode: "direct_chat_deepseek_v3_1_terminus",
sig_hash: sigHash,
cluster_size: cluster.length,
cluster_staffer: cluster[0]?.staffer_id,
cluster_endpoint: cluster[0]?.endpoint,
prompt_tokens: j?.usage?.prompt_tokens ?? 0,
completion_tokens: j?.usage?.completion_tokens ?? 0,
kb_preamble_chars: kbPreamble.length,
analysis: analysis.slice(0, 4000),
};
const { appendFile } = await import("node:fs/promises");
await appendFile(LLM_TEAM_ESCALATIONS, JSON.stringify(row) + "\n");
console.error(
`[observer] escalated sig_hash=${sigHash.slice(0, 8)} · cluster=${cluster.length} · ${analysis.length} chars`
);
} catch (e) {
console.error(`[observer] escalation failed: ${(e as Error).message}`);
}
}
// Track which sig_hashes we've already escalated this session so we
// don't hammer LLM Team on every analyzeErrors tick when a cluster
// persists across cycles.
const escalatedSigHashes = new Set<string>();
// ─── Hand-review for scrum/agent candidate responses (2026-04-25) ───
//
// Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict
// can be treated as truth about whether a candidate review is grounded.
// Two-tier evaluator:
// 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with
// response + source excerpt + grounding stats as context.
// 2. On cloud failure (throttle/timeout) → deterministic heuristic
// over grounding_pct + total_quotes. Marked source: "heuristic"
// so consumers can tell which rung produced the verdict.
// Every verdict is persisted to data/_kb/observer_reviews.jsonl.
const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl";
interface HandReviewInput {
file_path: string;
model: string;
response: string;
source_content: string;
grounding_stats: { total: number; grounded: number; groundedPct: number | null };
attempt: number;
}
interface HandReviewVerdict {
verdict: "accept" | "reject" | "cycle";
confidence: number;
notes: string;
source: "cloud" | "heuristic";
}
async function handReview(input: HandReviewInput): Promise<HandReviewVerdict> {
const t0 = Date.now();
let verdict: HandReviewVerdict;
try {
verdict = await cloudHandReview(input);
} catch (e) {
console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`);
verdict = heuristicHandReview(input);
}
// Persist regardless of source so we can later compare cloud vs
// heuristic verdicts on the same input and tune the heuristic.
const row = {
ts: new Date().toISOString(),
file_path: input.file_path,
model: input.model,
attempt: input.attempt,
response_chars: input.response.length,
grounding_stats: input.grounding_stats,
verdict: verdict.verdict,
confidence: verdict.confidence,
notes: verdict.notes,
source: verdict.source,
duration_ms: Date.now() - t0,
};
try {
const { appendFile } = await import("node:fs/promises");
await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n");
} catch { /* best-effort persistence */ }
return verdict;
}
async function cloudHandReview(input: HandReviewInput): Promise<HandReviewVerdict> {
const grounded = input.grounding_stats.grounded;
const total = input.grounding_stats.total;
const pct = input.grounding_stats.groundedPct;
// Truncate to keep the prompt under typical context windows.
// 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context.
const responseExcerpt = input.response.slice(0, 2000);
const sourceExcerpt = input.source_content.slice(0, 4000);
const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated.
FILE: ${input.file_path}
MODEL: ${input.model}
ATTEMPT: ${input.attempt}
ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""}
REVIEW (first 2000 chars):
\`\`\`
${responseExcerpt}
\`\`\`
SOURCE EXCERPT (first 4000 chars):
\`\`\`
${sourceExcerpt}
\`\`\`
Respond ONLY with a JSON object:
{
"verdict": "accept" | "reject" | "cycle",
"confidence": 0-100,
"notes": "<1-2 sentences on what makes this grounded or hallucinated>"
}
- accept: review references real symbols/lines in source; findings could be acted on.
- reject: review invents APIs, fabricates calls, contradicts source. Do NOT record.
- cycle: review is mediocre — partially grounded but wrong shape, try a stronger model.`;
// Hand-review uses paid OpenRouter so it sidesteps the Ollama Cloud
// throttle that drove every prior iter into the heuristic fallback.
// Grok 4.1 fast: $0.20 in / $0.50 out per M tokens, 2M ctx. A typical
// hand-review (~6K input + 300 output) costs ~$0.0014. Selected via
// J directive 2026-04-25 ("best model under $0.72/M").
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
provider: "openrouter",
model: "x-ai/grok-4.1-fast",
messages: [{ role: "user", content: prompt }],
max_tokens: 300,
temperature: 0.0,
}),
signal: AbortSignal.timeout(45000),
});
if (!resp.ok) {
throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
}
const j: any = await resp.json();
const content = (j?.choices?.[0]?.message?.content ?? "").trim();
// Pull JSON object from the response — model may wrap it in prose.
const m = content.match(/\{[\s\S]*\}/);
if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`);
const parsed = JSON.parse(m[0]);
const v = String(parsed.verdict ?? "accept").toLowerCase();
return {
verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept",
confidence: Number(parsed.confidence ?? 50),
notes: String(parsed.notes ?? "").slice(0, 500),
source: "cloud",
};
}
function heuristicHandReview(input: HandReviewInput): HandReviewVerdict {
// Deterministic fallback when cloud is throttled. Conservative:
// only flip to reject when the evidence is overwhelming, otherwise
// accept (fall-open principle — observer is policy, not blocker).
const total = input.grounding_stats.total;
const pct = input.grounding_stats.groundedPct;
const respLen = input.response.length;
// Too short to be a real review
if (respLen < 1500) {
return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" };
}
// Below 5 quotes — not enough signal to judge grounding; accept
if (total < 5 || pct === null) {
return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" };
}
// Very heavy hallucination
if (pct < 20) {
return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" };
}
// Mediocre — cycle to a stronger model
if (pct < 50) {
return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" };
}
// Good enough
return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" };
}
async function maybeEscalate(failures: ObservedOp[]) {
// Group failures by sig_hash
const bySig = new Map<string, ObservedOp[]>();
for (const f of failures) {
const k = f.sig_hash ?? "__no_sig__";
(bySig.get(k) ?? bySig.set(k, []).get(k)!).push(f);
}
for (const [sigHash, cluster] of bySig) {
if (sigHash === "__no_sig__") continue;
if (cluster.length < ESCALATION_THRESHOLD) continue;
if (escalatedSigHashes.has(sigHash)) continue;
escalatedSigHashes.add(sigHash);
// Fire-and-forget — don't block the existing analyzer loop.
escalateFailureClusterToLLMTeam(sigHash, cluster).catch(() => {});
}
}
// ─── Error analyzer loop ───
async function analyzeErrors() {
// Read recent failures
const failures = recentOps.filter(op => !op.success);
if (failures.length === 0) return;
// NEW 2026-04-24: escalate recurring sig_hash clusters to LLM Team
// code_review mode. Runs in parallel to the local qwen2.5 analysis
// below — non-blocking, richer downstream signal for scrum/auditor.
maybeEscalate(failures).catch(() => {});
const errorSummary = failures.slice(-10).map(f =>
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
).join("\n");
// Ask local model to diagnose
try {
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
${errorSummary}
For each error:
1. What likely caused it?
2. How should the agent adjust its approach?
3. Should this be added to the playbook as a "don't do this"?
Be specific and actionable. Under 200 words.`,
model: "qwen2.5",
max_tokens: 400,
temperature: 0.2,
}),
});
const analysis = await resp.json();
if (analysis.text) {
console.error(`[observer] Error analysis:\n${analysis.text}`);
// Log the analysis as a playbook entry
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `error_analysis: ${failures.length} failures`,
approach: "LLM-analyzed error patterns",
result: analysis.text.slice(0, 500),
context: errorSummary.slice(0, 500),
}),
});
}
} catch (e) {
console.error(`[observer] Analysis failed: ${e}`);
}
}
// ─── Playbook consolidation ───
async function consolidatePlaybooks() {
const successes = recentOps.filter(op => op.success);
if (successes.length < 5) return;
// Group by endpoint
const groups: Record<string, ObservedOp[]> = {};
for (const op of successes) {
const key = op.endpoint;
if (!groups[key]) groups[key] = [];
groups[key].push(op);
}
for (const [endpoint, ops] of Object.entries(groups)) {
if (ops.length < 3) continue;
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
await fetch(`${GATEWAY}/log`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
approach: `common pattern: ${pattern.slice(0, 200)}`,
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
context: `endpoint=${endpoint}`,
}),
}).catch(() => {});
}
}
// ─── HTTP listener for external ops (Phase 24) ───
// Scenarios POST per-event outcomes here so the observer's analyzer +
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
// also exposed at /stats for external health checks.
function startHttpListener() {
Bun.serve({
port: OBSERVER_PORT,
hostname: "0.0.0.0",
fetch(req) {
const url = new URL(req.url);
if (req.method === "GET" && url.pathname === "/health") {
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
}
if (req.method === "GET" && url.pathname === "/stats") {
const bySource = new Map<string, number>();
for (const o of recentOps) {
const k = o.source ?? "mcp";
bySource.set(k, (bySource.get(k) ?? 0) + 1);
}
return new Response(JSON.stringify({
total: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
by_source: Object.fromEntries(bySource),
recent_scenario_ops: recentOps
.filter(o => o.source === "scenario")
.slice(-10)
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
}));
}
// ─── Hand-review endpoint (2026-04-25) ───
// scrum/agent posts a candidate response + source content + grounding
// stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with
// semantic context and returns {verdict, confidence, notes}. On
// cloud throttle, falls back to a deterministic heuristic over the
// grounding stats so the loop keeps moving with honest signal.
//
// This is the policy layer scrum was missing — pre-2026-04-25 the
// scrum_master applied a hardcoded grounding-rate threshold inline,
// which baked judgment into the wrong layer. Now scrum reports data
// (response + source + stats) and observer decides accept/reject/cycle.
if (req.method === "POST" && url.pathname === "/review") {
return req.json().then((body: any) => handReview(body))
.then((verdict) => new Response(JSON.stringify(verdict), {
headers: { "content-type": "application/json" },
}))
.catch((e: Error) =>
new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), {
status: 200, // fall-open shape — scrum keeps moving on observer failure
headers: { "content-type": "application/json" },
}));
}
if (req.method === "POST" && url.pathname === "/event") {
return req.json().then((body: any) => {
const op: ObservedOp = {
timestamp: body.timestamp ?? new Date().toISOString(),
endpoint: body.endpoint ?? "scenario:fill",
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
success: !!body.success,
duration_ms: Number(body.duration_ms ?? 0),
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
error: body.error,
// Respect the client's provenance if set (langfuse bridge
// sends source:"langfuse", etc.). Default to "scenario"
// to keep legacy Phase 24 callers working.
source: body.source ?? "scenario",
staffer_id: body.staffer_id,
sig_hash: body.sig_hash,
event_kind: body.event_kind,
role: body.role,
city: body.city,
state: body.state,
count: body.count,
rescue_attempted: !!body.rescue_attempted,
rescue_succeeded: !!body.rescue_succeeded,
};
recordExternalOp(op);
persistOp(op).catch(() => {});
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
}).catch((e: Error) =>
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
}
// ─── Relevance filter (2026-04-25) ───
// Drops "adjacency pollution" from matrix-retrieved chunks before
// they reach the reviewer LLM. Caller (scrum/agent) posts the
// focus file + candidate chunks; observer scores via heuristic
// (path/symbol/token signals) and returns kept + dropped lists.
// Pure function — no I/O, safe to call hot.
if (req.method === "POST" && url.pathname === "/relevance") {
return req.json().then((body: any) => {
const focus = body.focus_file ?? body.focus ?? {};
const chunks = body.chunks ?? [];
const threshold = typeof body.threshold === "number" ? body.threshold : 0.3;
const result = filterChunks(focus, chunks, threshold);
return new Response(JSON.stringify(result), {
headers: { "content-type": "application/json" },
});
}).catch((e: Error) =>
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
}
return new Response("not found", { status: 404 });
},
});
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
}
// ─── Overseer corrections tailer (2026-04-23) ───
// The gateway's /v1/respond loop writes T3 overseer corrections to
// data/_kb/overseer_corrections.jsonl. Tail it once per cycle and
// inject each new row into the same recentOps ring that analyzeErrors
// + consolidatePlaybooks read — so a correction that just fired shows
// up alongside the outcomes it was meant to repair, and the analyzer
// can flag patterns like "three corrections on staffing.fill with the
// same advice — underlying problem isn't a drift, it's a data gap".
const CORRECTIONS_PATH = process.env.OVERSEER_CORRECTIONS_PATH
?? "/home/profit/lakehouse/data/_kb/overseer_corrections.jsonl";
let correctionsCursor = 0; // byte offset
async function tailOverseerCorrections(): Promise<number> {
const f = Bun.file(CORRECTIONS_PATH);
if (!(await f.exists())) return 0;
const size = f.size;
if (size <= correctionsCursor) return 0;
// Read only the suffix since the last cursor; keeps tail work
// bounded even as the file grows.
const text = await f.slice(correctionsCursor, size).text();
correctionsCursor = size;
let forwarded = 0;
for (const line of text.split("\n")) {
if (!line.trim()) continue;
let row: any;
try { row = JSON.parse(line); } catch { continue; }
const op: ObservedOp = {
timestamp: row.created_at ?? new Date().toISOString(),
endpoint: `overseer:${row.model ?? "gpt-oss:120b"}`,
input_summary: `${row.task_class ?? "?"}: ${row.reason ?? "escalation"}`,
// Correction itself is neither success nor failure — it's a
// mitigation attempt. We mark success=true so analyzeErrors
// doesn't count it as a failure, but the preview lets the
// analyzer see what was tried.
success: true,
duration_ms: Number(row.usage?.latency_ms ?? 0),
output_summary: String(row.correction ?? "").slice(0, 200),
source: "overseer_correction",
sig_hash: row.sig_hash,
task_class: row.task_class,
correction: String(row.correction ?? ""),
applied_at_turn: Number(row.applied_at_turn ?? 0),
};
recordExternalOp(op);
forwarded++;
}
return forwarded;
}
// ─── Main loop ───
async function main() {
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
// Run a health check first
const health = await fetch(`${GATEWAY}/health`).then(r => r.ok ? r.text() : null).catch(() => null);
if (!health) {
console.error("[observer] gateway unreachable — exiting");
process.exit(1);
}
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
startHttpListener();
// Main loop
let cycle = 0;
while (true) {
await Bun.sleep(CYCLE_SECS * 1000);
cycle++;
// Every cycle: tail the overseer corrections KB stream, then
// analyze errors. Order matters — if an overseer correction just
// landed for a sig_hash that previously failed, the analyzer
// should see both.
const newCorrections = await tailOverseerCorrections();
if (newCorrections > 0) {
console.error(`[observer] pulled ${newCorrections} new overseer correction(s) into ring`);
}
await analyzeErrors();
// Every 5 cycles: consolidate playbooks
if (cycle % 5 === 0) {
await consolidatePlaybooks();
}
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
const langfuseOps = recentOps.filter(o => o.source === "langfuse").length;
const correctionOps = recentOps.filter(o => o.source === "overseer_correction").length;
const stats = {
cycle,
total_ops: recentOps.length,
successes: recentOps.filter(o => o.success).length,
failures: recentOps.filter(o => !o.success).length,
scenario_ops: scenarioOps,
langfuse_ops: langfuseOps,
overseer_corrections: correctionOps,
};
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
}
}
// Export the observed wrapper for other agents to use
export { main as startObserver };
// Run if executed directly
if (import.meta.main) {
main().catch(console.error);
}