Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
escalateFailureClusterToLLMTeam now calls a new buildKbPreamble() that mirrors what scrum_master_pipeline does on every per-file review: queries /vectors/pathway/bug_fingerprints + /vectors/search against the lakehouse_arch_v1 corpus, then asks local qwen3.5:latest (provider=ollama) to synthesize a tight briefing. The synthesized preamble prepends the existing escalation prompt so the cloud reviewer sees historical context the same way scrum reviewers do. Reuses existing KB primitives — no new corpora, no new endpoints, no new abstractions. Same code path scrum already exercises 3+ times per review; observer joins the same compounding loop. Audit row gains kb_preamble_chars so we can later track enrichment yield per escalation. Empty preamble (both fingerprints + matrix return nothing) → empty string, prompt unchanged. Verified: qwen3.5:latest synthesis fires for every escalation with non-empty matrix hits (gateway log: 445→72 tokens, 3.1s). Matrix retrieval correctly surfaces PRD Phase 40/44 chunks for chat_completion clusters. Pathway memory stays consistent with scrum (84→87 traces); chat_completion task_class doesn't have fingerprints yet — graceful. Local-model synthesis was J's explicit ask: compress the raw bundle before the cloud call so the briefing is actionable, not a dump. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
820 lines
32 KiB
TypeScript
820 lines
32 KiB
TypeScript
/**
|
||
* Lakehouse Observer — autonomous iteration loop.
|
||
*
|
||
* Runs continuously alongside the agent gateway. Watches every operation,
|
||
* logs outcomes, detects failures, and feeds learnings back so agents
|
||
* improve over time without retraining.
|
||
*
|
||
* Three loops:
|
||
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
|
||
* success/failure to the lakehouse
|
||
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
|
||
* model to diagnose patterns, writes recommendations
|
||
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
|
||
* consolidates them into a reusable playbook entry
|
||
*
|
||
* This is the "third-party witness" J asked for — it watches what
|
||
* agents do and helps them not repeat mistakes.
|
||
*/
|
||
|
||
import { filterChunks } from "./relevance";
|
||
|
||
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
|
||
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
||
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
|
||
// Phase 24 — observer now listens on an HTTP port for external ops
|
||
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
|
||
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
|
||
|
||
// ─── Observed operation log ───
|
||
|
||
interface ObservedOp {
|
||
timestamp: string;
|
||
endpoint: string;
|
||
input_summary: string;
|
||
success: boolean;
|
||
duration_ms: number;
|
||
output_summary: string;
|
||
error?: string;
|
||
// Phase 24 — optional provenance so error analyzer and playbook
|
||
// builder can differentiate MCP-layer ops from scenario-sourced
|
||
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
|
||
source?: "mcp" | "scenario" | "langfuse" | "overseer_correction";
|
||
staffer_id?: string;
|
||
sig_hash?: string;
|
||
event_kind?: string;
|
||
role?: string;
|
||
city?: string;
|
||
state?: string;
|
||
count?: number;
|
||
rescue_attempted?: boolean;
|
||
rescue_succeeded?: boolean;
|
||
// Overseer-correction-specific (2026-04-23): lets the analyzer
|
||
// correlate corrections with the drift that prompted them and with
|
||
// subsequent outcomes that either validated or invalidated the advice.
|
||
task_class?: string;
|
||
correction?: string;
|
||
applied_at_turn?: number;
|
||
}
|
||
|
||
const recentOps: ObservedOp[] = [];
|
||
|
||
// Phase 24 — external ingest path. Scenarios POST outcome summaries
|
||
// here so the observer's analyzer + playbook builder see them. Called
|
||
// from the Bun.serve() handler below. Same ring buffer as the MCP-
|
||
// wrapped path so downstream loops don't need to know the source.
|
||
export function recordExternalOp(op: ObservedOp): void {
|
||
recentOps.push({ ...op, source: op.source ?? "scenario" });
|
||
if (recentOps.length > 2000) recentOps.shift();
|
||
}
|
||
|
||
// ─── Wrapped gateway caller — every call gets observed ───
|
||
|
||
export async function observed(
|
||
endpoint: string,
|
||
body: any,
|
||
description: string,
|
||
): Promise<{ data: any; op: ObservedOp }> {
|
||
const t0 = Date.now();
|
||
let data: any;
|
||
let error: string | undefined;
|
||
let success = true;
|
||
|
||
try {
|
||
const resp = await fetch(`${GATEWAY}${endpoint}`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify(body),
|
||
});
|
||
data = await resp.json();
|
||
if (data.error) {
|
||
success = false;
|
||
error = data.error;
|
||
}
|
||
} catch (e: any) {
|
||
success = false;
|
||
error = e.message;
|
||
data = { error: e.message };
|
||
}
|
||
|
||
const op: ObservedOp = {
|
||
timestamp: new Date().toISOString(),
|
||
endpoint,
|
||
input_summary: description,
|
||
success,
|
||
duration_ms: Date.now() - t0,
|
||
output_summary: success
|
||
? summarize(data)
|
||
: `ERROR: ${error}`,
|
||
error,
|
||
};
|
||
|
||
recentOps.push(op);
|
||
if (recentOps.length > 1000) recentOps.shift();
|
||
|
||
// Persist to lakehouse
|
||
await persistOp(op);
|
||
|
||
return { data, op };
|
||
}
|
||
|
||
function summarize(data: any): string {
|
||
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
|
||
if (data.rows) return `${data.row_count || data.rows.length} rows`;
|
||
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
|
||
if (data.sources) return `${data.sources.length} sources`;
|
||
return JSON.stringify(data).slice(0, 100);
|
||
}
|
||
|
||
// Phase 24 honesty fix — the old persistOp used /ingest/file which
|
||
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
|
||
// Every op silently wiped all prior ops. Now we append a JSONL line to
|
||
// data/_observer/ops.jsonl so the historical trace is durable. The
|
||
// observer analyzer + playbook builder read from this file when it
|
||
// outgrows the 2000-entry in-memory ring.
|
||
async function persistOp(op: ObservedOp) {
|
||
try {
|
||
const { mkdir, appendFile } = await import("node:fs/promises");
|
||
await mkdir("data/_observer", { recursive: true });
|
||
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
|
||
} catch {
|
||
// Persistence is best-effort; in-memory ring still works.
|
||
}
|
||
}
|
||
|
||
|
||
// ─── LLM Team escalation (code_review mode) ───
|
||
//
|
||
// When recent failures on a single sig_hash cross a threshold the
|
||
// local qwen2.5 analysis is probably insufficient. J's 2026-04-24
|
||
// direction: "the observer would trigger to give more context" —
|
||
// route failure clusters to LLM Team's specialized code_review mode
|
||
// (via /api/run) so richer structured signal lands in the KB for
|
||
// scrum + auditor + playbook memory to consume next pass.
|
||
//
|
||
// Non-destructive: runs in parallel to the existing qwen2.5 analysis,
|
||
// never replaces it. Writes to data/_kb/observer_escalations.jsonl
|
||
// as a dedicated audit surface.
|
||
|
||
const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000";
|
||
const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl";
|
||
const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers
|
||
|
||
// ─── KB enrichment helper (2026-04-26) ────────────────────────────
|
||
// Mirrors what scrum_master_pipeline already does on every per-file
|
||
// review: queries pathway_memory bug fingerprints + the lakehouse_arch
|
||
// matrix corpus, then asks qwen3.5:latest to synthesize a tight
|
||
// briefing. We reuse the same primitives so observer escalations carry
|
||
// the same compounding context the scrum loop builds — no new index
|
||
// surfaces, no new corpora.
|
||
//
|
||
// `task_class` is derived from the cluster (most ops use the same one);
|
||
// pathway/bug_fingerprints is permissive about a null file_path, so
|
||
// non-code clusters (scenario fills, v1.chat events) just see broader
|
||
// matches via task_class alone.
|
||
//
|
||
// Returns "" when there's no useful signal — caller treats empty as
|
||
// "no preamble" and skips the prepend.
|
||
async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise<string> {
|
||
const sample = cluster[0];
|
||
const taskClass = sample?.event_kind
|
||
?? (sample?.source === "scenario" ? "scenario_fill" : "observer_escalation");
|
||
|
||
// Step 1: pathway bug fingerprints. Best-effort; null filePath just
|
||
// widens the query at the matrix-index level.
|
||
let fingerprints: { flag: { kind: string }; pattern_key: string; example: string; occurrences: number }[] = [];
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/vectors/pathway/bug_fingerprints`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({ task_class: taskClass, file_path: null, signal_class: null, limit: 5 }),
|
||
signal: AbortSignal.timeout(5000),
|
||
});
|
||
if (r.ok) fingerprints = (await r.json() as any).fingerprints ?? [];
|
||
} catch {}
|
||
|
||
// Step 2: matrix retrieval against the architectural corpus we
|
||
// already maintain. Cluster summary is the search query.
|
||
const clusterSummary = cluster.slice(-5).map(o =>
|
||
`${o.endpoint ?? "?"} ${o.input_summary ?? ""} ${o.error ?? ""}`
|
||
).join(" | ");
|
||
let matrixChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = [];
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/vectors/search`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 5 }),
|
||
signal: AbortSignal.timeout(5000),
|
||
});
|
||
if (r.ok) matrixChunks = (await r.json() as any).results ?? [];
|
||
} catch {}
|
||
|
||
if (fingerprints.length === 0 && matrixChunks.length === 0) return "";
|
||
|
||
// Step 3: synthesis via local model (qwen3.5:latest, provider=ollama).
|
||
// Compresses the raw bundle to a 1-2 sentence briefing the cloud
|
||
// reviewer can actually use. If local model is down/slow, fall back
|
||
// to the raw dump rather than blocking the escalation path.
|
||
const rawBundle = [
|
||
fingerprints.length > 0
|
||
? "PRIOR BUG PATTERNS (pathway memory):\n" + fingerprints.map((fp, i) =>
|
||
`${i + 1}. [${fp.flag.kind}] ${fp.pattern_key} (×${fp.occurrences}) e.g. ${fp.example.slice(0, 120)}`
|
||
).join("\n")
|
||
: "",
|
||
matrixChunks.length > 0
|
||
? "RELATED ARCHITECTURE CONTEXT:\n" + matrixChunks.map((c, i) =>
|
||
`${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 200)}`
|
||
).join("\n")
|
||
: "",
|
||
].filter(Boolean).join("\n\n");
|
||
|
||
const synthPrompt = `A failure cluster (sig_hash=${sigHash.slice(0, 8)}, ${cluster.length} occurrences, task_class=${taskClass}) is about to be escalated for diagnosis. Here are prior signals from our knowledge base:
|
||
|
||
${rawBundle}
|
||
|
||
Output a single paragraph (≤300 chars) briefing the cloud reviewer on which prior signals are most likely relevant to this cluster. If nothing matches, say so plainly. No preamble, no markdown.`;
|
||
|
||
let synthesized = "";
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({
|
||
provider: "ollama",
|
||
model: "qwen3.5:latest",
|
||
messages: [{ role: "user", content: synthPrompt }],
|
||
max_tokens: 200,
|
||
temperature: 0.1,
|
||
think: false,
|
||
}),
|
||
signal: AbortSignal.timeout(15000),
|
||
});
|
||
if (r.ok) {
|
||
const j = await r.json() as any;
|
||
synthesized = (j?.choices?.[0]?.message?.content ?? "").trim();
|
||
}
|
||
} catch {}
|
||
|
||
const body = synthesized.length > 0 ? synthesized : rawBundle;
|
||
return `═══ KB CONTEXT — prior signals on this task class (synthesized by qwen3.5:latest) ═══\n${body}\n═══\n\n`;
|
||
}
|
||
|
||
async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) {
|
||
// Package the failure cluster as a single context blob. Originally
|
||
// I routed this to LLM Team's `code_review` mode at /api/run, but
|
||
// that mode isn't registered in llm_team_ui.py — it returned
|
||
// "Unknown mode" on every call. Revised 2026-04-24: route directly
|
||
// to the gateway's /v1/chat with provider=ollama_cloud + qwen3-coder:480b
|
||
// (the coding specialist that's rung 2 of the scrum ladder, proven
|
||
// to produce substantive structured reviews). Fire-and-forget so
|
||
// downstream failures don't block observer's normal loop.
|
||
const context = cluster.slice(-8).map((o, i) =>
|
||
`[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}`
|
||
).join("\n");
|
||
const kbPreamble = await buildKbPreamble(sigHash, cluster);
|
||
const prompt = `${kbPreamble}sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`;
|
||
|
||
try {
|
||
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
provider: "ollama_cloud",
|
||
model: "qwen3-coder:480b",
|
||
messages: [{ role: "user", content: prompt }],
|
||
max_tokens: 800,
|
||
temperature: 0.2,
|
||
}),
|
||
signal: AbortSignal.timeout(60000),
|
||
});
|
||
if (!resp.ok) {
|
||
console.error(`[observer] escalation /v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
|
||
return;
|
||
}
|
||
const j: any = await resp.json();
|
||
const analysis = j?.choices?.[0]?.message?.content ?? "";
|
||
|
||
// Audit row stays schema-compatible with the prior implementation —
|
||
// downstream consumers see structured fields regardless of the
|
||
// review-source change. Facts/entities stay empty (this call is
|
||
// direct-model, not extract-mode); the raw analysis carries the
|
||
// signal.
|
||
const row = {
|
||
ts: new Date().toISOString(),
|
||
source: "observer_escalation",
|
||
mode: "direct_chat_qwen3_coder_480b",
|
||
sig_hash: sigHash,
|
||
cluster_size: cluster.length,
|
||
cluster_staffer: cluster[0]?.staffer_id,
|
||
cluster_endpoint: cluster[0]?.endpoint,
|
||
prompt_tokens: j?.usage?.prompt_tokens ?? 0,
|
||
completion_tokens: j?.usage?.completion_tokens ?? 0,
|
||
kb_preamble_chars: kbPreamble.length,
|
||
analysis: analysis.slice(0, 4000),
|
||
};
|
||
const { appendFile } = await import("node:fs/promises");
|
||
await appendFile(LLM_TEAM_ESCALATIONS, JSON.stringify(row) + "\n");
|
||
console.error(
|
||
`[observer] escalated sig_hash=${sigHash.slice(0, 8)} · cluster=${cluster.length} · ${analysis.length} chars`
|
||
);
|
||
} catch (e) {
|
||
console.error(`[observer] escalation failed: ${(e as Error).message}`);
|
||
}
|
||
}
|
||
|
||
// Track which sig_hashes we've already escalated this session so we
|
||
// don't hammer LLM Team on every analyzeErrors tick when a cluster
|
||
// persists across cycles.
|
||
const escalatedSigHashes = new Set<string>();
|
||
|
||
// ─── Hand-review for scrum/agent candidate responses (2026-04-25) ───
|
||
//
|
||
// Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict
|
||
// can be treated as truth about whether a candidate review is grounded.
|
||
// Two-tier evaluator:
|
||
// 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with
|
||
// response + source excerpt + grounding stats as context.
|
||
// 2. On cloud failure (throttle/timeout) → deterministic heuristic
|
||
// over grounding_pct + total_quotes. Marked source: "heuristic"
|
||
// so consumers can tell which rung produced the verdict.
|
||
// Every verdict is persisted to data/_kb/observer_reviews.jsonl.
|
||
|
||
const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl";
|
||
|
||
interface HandReviewInput {
|
||
file_path: string;
|
||
model: string;
|
||
response: string;
|
||
source_content: string;
|
||
grounding_stats: { total: number; grounded: number; groundedPct: number | null };
|
||
attempt: number;
|
||
}
|
||
|
||
interface HandReviewVerdict {
|
||
verdict: "accept" | "reject" | "cycle";
|
||
confidence: number;
|
||
notes: string;
|
||
source: "cloud" | "heuristic";
|
||
}
|
||
|
||
async function handReview(input: HandReviewInput): Promise<HandReviewVerdict> {
|
||
const t0 = Date.now();
|
||
let verdict: HandReviewVerdict;
|
||
|
||
try {
|
||
verdict = await cloudHandReview(input);
|
||
} catch (e) {
|
||
console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`);
|
||
verdict = heuristicHandReview(input);
|
||
}
|
||
|
||
// Persist regardless of source so we can later compare cloud vs
|
||
// heuristic verdicts on the same input and tune the heuristic.
|
||
const row = {
|
||
ts: new Date().toISOString(),
|
||
file_path: input.file_path,
|
||
model: input.model,
|
||
attempt: input.attempt,
|
||
response_chars: input.response.length,
|
||
grounding_stats: input.grounding_stats,
|
||
verdict: verdict.verdict,
|
||
confidence: verdict.confidence,
|
||
notes: verdict.notes,
|
||
source: verdict.source,
|
||
duration_ms: Date.now() - t0,
|
||
};
|
||
try {
|
||
const { appendFile } = await import("node:fs/promises");
|
||
await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n");
|
||
} catch { /* best-effort persistence */ }
|
||
|
||
return verdict;
|
||
}
|
||
|
||
async function cloudHandReview(input: HandReviewInput): Promise<HandReviewVerdict> {
|
||
const grounded = input.grounding_stats.grounded;
|
||
const total = input.grounding_stats.total;
|
||
const pct = input.grounding_stats.groundedPct;
|
||
// Truncate to keep the prompt under typical context windows.
|
||
// 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context.
|
||
const responseExcerpt = input.response.slice(0, 2000);
|
||
const sourceExcerpt = input.source_content.slice(0, 4000);
|
||
|
||
const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated.
|
||
|
||
FILE: ${input.file_path}
|
||
MODEL: ${input.model}
|
||
ATTEMPT: ${input.attempt}
|
||
ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""}
|
||
|
||
REVIEW (first 2000 chars):
|
||
\`\`\`
|
||
${responseExcerpt}
|
||
\`\`\`
|
||
|
||
SOURCE EXCERPT (first 4000 chars):
|
||
\`\`\`
|
||
${sourceExcerpt}
|
||
\`\`\`
|
||
|
||
Respond ONLY with a JSON object:
|
||
{
|
||
"verdict": "accept" | "reject" | "cycle",
|
||
"confidence": 0-100,
|
||
"notes": "<1-2 sentences on what makes this grounded or hallucinated>"
|
||
}
|
||
|
||
- accept: review references real symbols/lines in source; findings could be acted on.
|
||
- reject: review invents APIs, fabricates calls, contradicts source. Do NOT record.
|
||
- cycle: review is mediocre — partially grounded but wrong shape, try a stronger model.`;
|
||
|
||
// Hand-review uses paid OpenRouter so it sidesteps the Ollama Cloud
|
||
// throttle that drove every prior iter into the heuristic fallback.
|
||
// Grok 4.1 fast: $0.20 in / $0.50 out per M tokens, 2M ctx. A typical
|
||
// hand-review (~6K input + 300 output) costs ~$0.0014. Selected via
|
||
// J directive 2026-04-25 ("best model under $0.72/M").
|
||
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
provider: "openrouter",
|
||
model: "x-ai/grok-4.1-fast",
|
||
messages: [{ role: "user", content: prompt }],
|
||
max_tokens: 300,
|
||
temperature: 0.0,
|
||
}),
|
||
signal: AbortSignal.timeout(45000),
|
||
});
|
||
if (!resp.ok) {
|
||
throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
|
||
}
|
||
const j: any = await resp.json();
|
||
const content = (j?.choices?.[0]?.message?.content ?? "").trim();
|
||
// Pull JSON object from the response — model may wrap it in prose.
|
||
const m = content.match(/\{[\s\S]*\}/);
|
||
if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`);
|
||
const parsed = JSON.parse(m[0]);
|
||
const v = String(parsed.verdict ?? "accept").toLowerCase();
|
||
return {
|
||
verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept",
|
||
confidence: Number(parsed.confidence ?? 50),
|
||
notes: String(parsed.notes ?? "").slice(0, 500),
|
||
source: "cloud",
|
||
};
|
||
}
|
||
|
||
function heuristicHandReview(input: HandReviewInput): HandReviewVerdict {
|
||
// Deterministic fallback when cloud is throttled. Conservative:
|
||
// only flip to reject when the evidence is overwhelming, otherwise
|
||
// accept (fall-open principle — observer is policy, not blocker).
|
||
const total = input.grounding_stats.total;
|
||
const pct = input.grounding_stats.groundedPct;
|
||
const respLen = input.response.length;
|
||
|
||
// Too short to be a real review
|
||
if (respLen < 1500) {
|
||
return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" };
|
||
}
|
||
// Below 5 quotes — not enough signal to judge grounding; accept
|
||
if (total < 5 || pct === null) {
|
||
return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" };
|
||
}
|
||
// Very heavy hallucination
|
||
if (pct < 20) {
|
||
return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" };
|
||
}
|
||
// Mediocre — cycle to a stronger model
|
||
if (pct < 50) {
|
||
return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" };
|
||
}
|
||
// Good enough
|
||
return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" };
|
||
}
|
||
|
||
async function maybeEscalate(failures: ObservedOp[]) {
|
||
// Group failures by sig_hash
|
||
const bySig = new Map<string, ObservedOp[]>();
|
||
for (const f of failures) {
|
||
const k = f.sig_hash ?? "__no_sig__";
|
||
(bySig.get(k) ?? bySig.set(k, []).get(k)!).push(f);
|
||
}
|
||
for (const [sigHash, cluster] of bySig) {
|
||
if (sigHash === "__no_sig__") continue;
|
||
if (cluster.length < ESCALATION_THRESHOLD) continue;
|
||
if (escalatedSigHashes.has(sigHash)) continue;
|
||
escalatedSigHashes.add(sigHash);
|
||
// Fire-and-forget — don't block the existing analyzer loop.
|
||
escalateFailureClusterToLLMTeam(sigHash, cluster).catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── Error analyzer loop ───
|
||
|
||
async function analyzeErrors() {
|
||
// Read recent failures
|
||
const failures = recentOps.filter(op => !op.success);
|
||
if (failures.length === 0) return;
|
||
|
||
// NEW 2026-04-24: escalate recurring sig_hash clusters to LLM Team
|
||
// code_review mode. Runs in parallel to the local qwen2.5 analysis
|
||
// below — non-blocking, richer downstream signal for scrum/auditor.
|
||
maybeEscalate(failures).catch(() => {});
|
||
|
||
const errorSummary = failures.slice(-10).map(f =>
|
||
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
|
||
).join("\n");
|
||
|
||
// Ask local model to diagnose
|
||
try {
|
||
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
|
||
|
||
${errorSummary}
|
||
|
||
For each error:
|
||
1. What likely caused it?
|
||
2. How should the agent adjust its approach?
|
||
3. Should this be added to the playbook as a "don't do this"?
|
||
|
||
Be specific and actionable. Under 200 words.`,
|
||
model: "qwen2.5",
|
||
max_tokens: 400,
|
||
temperature: 0.2,
|
||
}),
|
||
});
|
||
const analysis = await resp.json();
|
||
if (analysis.text) {
|
||
console.error(`[observer] Error analysis:\n${analysis.text}`);
|
||
// Log the analysis as a playbook entry
|
||
await fetch(`${GATEWAY}/log`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
operation: `error_analysis: ${failures.length} failures`,
|
||
approach: "LLM-analyzed error patterns",
|
||
result: analysis.text.slice(0, 500),
|
||
context: errorSummary.slice(0, 500),
|
||
}),
|
||
});
|
||
}
|
||
} catch (e) {
|
||
console.error(`[observer] Analysis failed: ${e}`);
|
||
}
|
||
}
|
||
|
||
// ─── Playbook consolidation ───
|
||
|
||
async function consolidatePlaybooks() {
|
||
const successes = recentOps.filter(op => op.success);
|
||
if (successes.length < 5) return;
|
||
|
||
// Group by endpoint
|
||
const groups: Record<string, ObservedOp[]> = {};
|
||
for (const op of successes) {
|
||
const key = op.endpoint;
|
||
if (!groups[key]) groups[key] = [];
|
||
groups[key].push(op);
|
||
}
|
||
|
||
for (const [endpoint, ops] of Object.entries(groups)) {
|
||
if (ops.length < 3) continue;
|
||
|
||
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
|
||
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
|
||
|
||
await fetch(`${GATEWAY}/log`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
|
||
approach: `common pattern: ${pattern.slice(0, 200)}`,
|
||
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
|
||
context: `endpoint=${endpoint}`,
|
||
}),
|
||
}).catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── HTTP listener for external ops (Phase 24) ───
|
||
|
||
// Scenarios POST per-event outcomes here so the observer's analyzer +
|
||
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
|
||
// also exposed at /stats for external health checks.
|
||
function startHttpListener() {
|
||
Bun.serve({
|
||
port: OBSERVER_PORT,
|
||
hostname: "0.0.0.0",
|
||
fetch(req) {
|
||
const url = new URL(req.url);
|
||
if (req.method === "GET" && url.pathname === "/health") {
|
||
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
|
||
}
|
||
if (req.method === "GET" && url.pathname === "/stats") {
|
||
const bySource = new Map<string, number>();
|
||
for (const o of recentOps) {
|
||
const k = o.source ?? "mcp";
|
||
bySource.set(k, (bySource.get(k) ?? 0) + 1);
|
||
}
|
||
return new Response(JSON.stringify({
|
||
total: recentOps.length,
|
||
successes: recentOps.filter(o => o.success).length,
|
||
failures: recentOps.filter(o => !o.success).length,
|
||
by_source: Object.fromEntries(bySource),
|
||
recent_scenario_ops: recentOps
|
||
.filter(o => o.source === "scenario")
|
||
.slice(-10)
|
||
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
|
||
}));
|
||
}
|
||
// ─── Hand-review endpoint (2026-04-25) ───
|
||
// scrum/agent posts a candidate response + source content + grounding
|
||
// stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with
|
||
// semantic context and returns {verdict, confidence, notes}. On
|
||
// cloud throttle, falls back to a deterministic heuristic over the
|
||
// grounding stats so the loop keeps moving with honest signal.
|
||
//
|
||
// This is the policy layer scrum was missing — pre-2026-04-25 the
|
||
// scrum_master applied a hardcoded grounding-rate threshold inline,
|
||
// which baked judgment into the wrong layer. Now scrum reports data
|
||
// (response + source + stats) and observer decides accept/reject/cycle.
|
||
if (req.method === "POST" && url.pathname === "/review") {
|
||
return req.json().then((body: any) => handReview(body))
|
||
.then((verdict) => new Response(JSON.stringify(verdict), {
|
||
headers: { "content-type": "application/json" },
|
||
}))
|
||
.catch((e: Error) =>
|
||
new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), {
|
||
status: 200, // fall-open shape — scrum keeps moving on observer failure
|
||
headers: { "content-type": "application/json" },
|
||
}));
|
||
}
|
||
if (req.method === "POST" && url.pathname === "/event") {
|
||
return req.json().then((body: any) => {
|
||
const op: ObservedOp = {
|
||
timestamp: body.timestamp ?? new Date().toISOString(),
|
||
endpoint: body.endpoint ?? "scenario:fill",
|
||
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
|
||
success: !!body.success,
|
||
duration_ms: Number(body.duration_ms ?? 0),
|
||
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
|
||
error: body.error,
|
||
// Respect the client's provenance if set (langfuse bridge
|
||
// sends source:"langfuse", etc.). Default to "scenario"
|
||
// to keep legacy Phase 24 callers working.
|
||
source: body.source ?? "scenario",
|
||
staffer_id: body.staffer_id,
|
||
sig_hash: body.sig_hash,
|
||
event_kind: body.event_kind,
|
||
role: body.role,
|
||
city: body.city,
|
||
state: body.state,
|
||
count: body.count,
|
||
rescue_attempted: !!body.rescue_attempted,
|
||
rescue_succeeded: !!body.rescue_succeeded,
|
||
};
|
||
recordExternalOp(op);
|
||
persistOp(op).catch(() => {});
|
||
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
|
||
}).catch((e: Error) =>
|
||
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
|
||
}
|
||
// ─── Relevance filter (2026-04-25) ───
|
||
// Drops "adjacency pollution" from matrix-retrieved chunks before
|
||
// they reach the reviewer LLM. Caller (scrum/agent) posts the
|
||
// focus file + candidate chunks; observer scores via heuristic
|
||
// (path/symbol/token signals) and returns kept + dropped lists.
|
||
// Pure function — no I/O, safe to call hot.
|
||
if (req.method === "POST" && url.pathname === "/relevance") {
|
||
return req.json().then((body: any) => {
|
||
const focus = body.focus_file ?? body.focus ?? {};
|
||
const chunks = body.chunks ?? [];
|
||
const threshold = typeof body.threshold === "number" ? body.threshold : 0.3;
|
||
const result = filterChunks(focus, chunks, threshold);
|
||
return new Response(JSON.stringify(result), {
|
||
headers: { "content-type": "application/json" },
|
||
});
|
||
}).catch((e: Error) =>
|
||
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
|
||
}
|
||
return new Response("not found", { status: 404 });
|
||
},
|
||
});
|
||
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
|
||
}
|
||
|
||
// ─── Overseer corrections tailer (2026-04-23) ───
|
||
|
||
// The gateway's /v1/respond loop writes T3 overseer corrections to
|
||
// data/_kb/overseer_corrections.jsonl. Tail it once per cycle and
|
||
// inject each new row into the same recentOps ring that analyzeErrors
|
||
// + consolidatePlaybooks read — so a correction that just fired shows
|
||
// up alongside the outcomes it was meant to repair, and the analyzer
|
||
// can flag patterns like "three corrections on staffing.fill with the
|
||
// same advice — underlying problem isn't a drift, it's a data gap".
|
||
const CORRECTIONS_PATH = process.env.OVERSEER_CORRECTIONS_PATH
|
||
?? "/home/profit/lakehouse/data/_kb/overseer_corrections.jsonl";
|
||
let correctionsCursor = 0; // byte offset
|
||
|
||
async function tailOverseerCorrections(): Promise<number> {
|
||
const f = Bun.file(CORRECTIONS_PATH);
|
||
if (!(await f.exists())) return 0;
|
||
const size = f.size;
|
||
if (size <= correctionsCursor) return 0;
|
||
|
||
// Read only the suffix since the last cursor; keeps tail work
|
||
// bounded even as the file grows.
|
||
const text = await f.slice(correctionsCursor, size).text();
|
||
correctionsCursor = size;
|
||
|
||
let forwarded = 0;
|
||
for (const line of text.split("\n")) {
|
||
if (!line.trim()) continue;
|
||
let row: any;
|
||
try { row = JSON.parse(line); } catch { continue; }
|
||
const op: ObservedOp = {
|
||
timestamp: row.created_at ?? new Date().toISOString(),
|
||
endpoint: `overseer:${row.model ?? "gpt-oss:120b"}`,
|
||
input_summary: `${row.task_class ?? "?"}: ${row.reason ?? "escalation"}`,
|
||
// Correction itself is neither success nor failure — it's a
|
||
// mitigation attempt. We mark success=true so analyzeErrors
|
||
// doesn't count it as a failure, but the preview lets the
|
||
// analyzer see what was tried.
|
||
success: true,
|
||
duration_ms: Number(row.usage?.latency_ms ?? 0),
|
||
output_summary: String(row.correction ?? "").slice(0, 200),
|
||
source: "overseer_correction",
|
||
sig_hash: row.sig_hash,
|
||
task_class: row.task_class,
|
||
correction: String(row.correction ?? ""),
|
||
applied_at_turn: Number(row.applied_at_turn ?? 0),
|
||
};
|
||
recordExternalOp(op);
|
||
forwarded++;
|
||
}
|
||
return forwarded;
|
||
}
|
||
|
||
// ─── Main loop ───
|
||
|
||
async function main() {
|
||
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
|
||
|
||
// Run a health check first
|
||
const health = await fetch(`${GATEWAY}/health`).then(r => r.ok ? r.text() : null).catch(() => null);
|
||
if (!health) {
|
||
console.error("[observer] gateway unreachable — exiting");
|
||
process.exit(1);
|
||
}
|
||
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
|
||
|
||
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
|
||
startHttpListener();
|
||
|
||
// Main loop
|
||
let cycle = 0;
|
||
while (true) {
|
||
await Bun.sleep(CYCLE_SECS * 1000);
|
||
cycle++;
|
||
|
||
// Every cycle: tail the overseer corrections KB stream, then
|
||
// analyze errors. Order matters — if an overseer correction just
|
||
// landed for a sig_hash that previously failed, the analyzer
|
||
// should see both.
|
||
const newCorrections = await tailOverseerCorrections();
|
||
if (newCorrections > 0) {
|
||
console.error(`[observer] pulled ${newCorrections} new overseer correction(s) into ring`);
|
||
}
|
||
await analyzeErrors();
|
||
|
||
// Every 5 cycles: consolidate playbooks
|
||
if (cycle % 5 === 0) {
|
||
await consolidatePlaybooks();
|
||
}
|
||
|
||
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
|
||
const langfuseOps = recentOps.filter(o => o.source === "langfuse").length;
|
||
const correctionOps = recentOps.filter(o => o.source === "overseer_correction").length;
|
||
const stats = {
|
||
cycle,
|
||
total_ops: recentOps.length,
|
||
successes: recentOps.filter(o => o.success).length,
|
||
failures: recentOps.filter(o => !o.success).length,
|
||
scenario_ops: scenarioOps,
|
||
langfuse_ops: langfuseOps,
|
||
overseer_corrections: correctionOps,
|
||
};
|
||
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
|
||
}
|
||
}
|
||
|
||
// Export the observed wrapper for other agents to use
|
||
export { main as startObserver };
|
||
|
||
// Run if executed directly
|
||
if (import.meta.main) {
|
||
main().catch(console.error);
|
||
}
|