Some checks failed
lakehouse/auditor 16 blocking issues: cloud: claim not backed — "Verified end-to-end:"
Migrates the four TypeScript /generate callers to the gateway's
/v1/chat surface so every LLM call lands on /v1/usage and Langfuse:
tests/multi-agent/agent.ts::generate() provider="ollama"
tests/agent_test/agent_harness.ts::callAgent provider="ollama"
bot/propose.ts::generateProposal provider="ollama_cloud"
mcp-server/observer.ts (error analysis) provider="ollama"
Each migration follows the same pattern as the prior generateCloud()
migration (already on /v1/chat from 2026-04-24): replace
`fetch(SIDECAR/generate)` with `fetch(GATEWAY/v1/chat)`, swap the
prompt-style body for OpenAI-compat messages array, extract
content from `choices[0].message.content` instead of `text`.
Same upstream models in every case — gateway is the new home for
the call, transport otherwise unchanged.
Adds scripts/check_phase44_callers.sh — fail-loud regression guard
that exits non-zero if any non-adapter file fetches /generate or
api/generate. Adapter files (crates/gateway, crates/aibridge,
sidecar/) are exempt. Pre-tightening regex flagged prose mentions
in comments; the shipped regex requires `fetch(...)` or
`client.post(...)` shape so comments don't trip it.
Verification:
bun build mcp-server/observer.ts compiles
bun build tests/multi-agent/agent.ts compiles
bun build tests/agent_test/agent_harness.ts compiles
bun build bot/propose.ts compiles
./scripts/check_phase44_callers.sh ✅ clean
systemctl restart lakehouse-observer active
Phase 44 part 2 (deferred):
- crates/aibridge/src/client.rs:118 still posts to sidecar /generate
directly. AiClient is the foundational Rust LLM caller used by
8+ vectord modules; migrating it is a workspace-wide refactor
that needs its own commit. Plan: keep AiClient as the local-
transport layer for the gateway's `provider=ollama` arm, but
introduce a thin `/v1/chat` wrapper for external callers (vectord
autotune, agent, rag, refresh, supervisor, playbook_memory).
- tests/real-world/hard_task_escalation.ts: comment mentions
/api/generate but doesn't actually call it. Comment is left
intentionally as historical context; regex no longer flags it.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
853 lines
34 KiB
TypeScript
853 lines
34 KiB
TypeScript
/**
|
||
* Lakehouse Observer — autonomous iteration loop.
|
||
*
|
||
* Runs continuously alongside the agent gateway. Watches every operation,
|
||
* logs outcomes, detects failures, and feeds learnings back so agents
|
||
* improve over time without retraining.
|
||
*
|
||
* Three loops:
|
||
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
|
||
* success/failure to the lakehouse
|
||
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
|
||
* model to diagnose patterns, writes recommendations
|
||
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
|
||
* consolidates them into a reusable playbook entry
|
||
*
|
||
* This is the "third-party witness" J asked for — it watches what
|
||
* agents do and helps them not repeat mistakes.
|
||
*/
|
||
|
||
import { filterChunks } from "./relevance";
|
||
|
||
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
|
||
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
||
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
|
||
// Phase 24 — observer now listens on an HTTP port for external ops
|
||
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
|
||
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
|
||
|
||
// ─── Observed operation log ───
|
||
|
||
interface ObservedOp {
|
||
timestamp: string;
|
||
endpoint: string;
|
||
input_summary: string;
|
||
success: boolean;
|
||
duration_ms: number;
|
||
output_summary: string;
|
||
error?: string;
|
||
// Phase 24 — optional provenance so error analyzer and playbook
|
||
// builder can differentiate MCP-layer ops from scenario-sourced
|
||
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
|
||
source?: "mcp" | "scenario" | "langfuse" | "overseer_correction";
|
||
staffer_id?: string;
|
||
sig_hash?: string;
|
||
event_kind?: string;
|
||
role?: string;
|
||
city?: string;
|
||
state?: string;
|
||
count?: number;
|
||
rescue_attempted?: boolean;
|
||
rescue_succeeded?: boolean;
|
||
// Overseer-correction-specific (2026-04-23): lets the analyzer
|
||
// correlate corrections with the drift that prompted them and with
|
||
// subsequent outcomes that either validated or invalidated the advice.
|
||
task_class?: string;
|
||
correction?: string;
|
||
applied_at_turn?: number;
|
||
}
|
||
|
||
const recentOps: ObservedOp[] = [];
|
||
|
||
// Phase 24 — external ingest path. Scenarios POST outcome summaries
|
||
// here so the observer's analyzer + playbook builder see them. Called
|
||
// from the Bun.serve() handler below. Same ring buffer as the MCP-
|
||
// wrapped path so downstream loops don't need to know the source.
|
||
export function recordExternalOp(op: ObservedOp): void {
|
||
recentOps.push({ ...op, source: op.source ?? "scenario" });
|
||
if (recentOps.length > 2000) recentOps.shift();
|
||
}
|
||
|
||
// ─── Wrapped gateway caller — every call gets observed ───
|
||
|
||
export async function observed(
|
||
endpoint: string,
|
||
body: any,
|
||
description: string,
|
||
): Promise<{ data: any; op: ObservedOp }> {
|
||
const t0 = Date.now();
|
||
let data: any;
|
||
let error: string | undefined;
|
||
let success = true;
|
||
|
||
try {
|
||
const resp = await fetch(`${GATEWAY}${endpoint}`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify(body),
|
||
});
|
||
data = await resp.json();
|
||
if (data.error) {
|
||
success = false;
|
||
error = data.error;
|
||
}
|
||
} catch (e: any) {
|
||
success = false;
|
||
error = e.message;
|
||
data = { error: e.message };
|
||
}
|
||
|
||
const op: ObservedOp = {
|
||
timestamp: new Date().toISOString(),
|
||
endpoint,
|
||
input_summary: description,
|
||
success,
|
||
duration_ms: Date.now() - t0,
|
||
output_summary: success
|
||
? summarize(data)
|
||
: `ERROR: ${error}`,
|
||
error,
|
||
};
|
||
|
||
recentOps.push(op);
|
||
if (recentOps.length > 1000) recentOps.shift();
|
||
|
||
// Persist to lakehouse
|
||
await persistOp(op);
|
||
|
||
return { data, op };
|
||
}
|
||
|
||
function summarize(data: any): string {
|
||
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
|
||
if (data.rows) return `${data.row_count || data.rows.length} rows`;
|
||
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
|
||
if (data.sources) return `${data.sources.length} sources`;
|
||
return JSON.stringify(data).slice(0, 100);
|
||
}
|
||
|
||
// Phase 24 honesty fix — the old persistOp used /ingest/file which
|
||
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
|
||
// Every op silently wiped all prior ops. Now we append a JSONL line to
|
||
// data/_observer/ops.jsonl so the historical trace is durable. The
|
||
// observer analyzer + playbook builder read from this file when it
|
||
// outgrows the 2000-entry in-memory ring.
|
||
async function persistOp(op: ObservedOp) {
|
||
try {
|
||
const { mkdir, appendFile } = await import("node:fs/promises");
|
||
await mkdir("data/_observer", { recursive: true });
|
||
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
|
||
} catch {
|
||
// Persistence is best-effort; in-memory ring still works.
|
||
}
|
||
}
|
||
|
||
|
||
// ─── LLM Team escalation (code_review mode) ───
|
||
//
|
||
// When recent failures on a single sig_hash cross a threshold the
|
||
// local qwen2.5 analysis is probably insufficient. J's 2026-04-24
|
||
// direction: "the observer would trigger to give more context" —
|
||
// route failure clusters to LLM Team's specialized code_review mode
|
||
// (via /api/run) so richer structured signal lands in the KB for
|
||
// scrum + auditor + playbook memory to consume next pass.
|
||
//
|
||
// Non-destructive: runs in parallel to the existing qwen2.5 analysis,
|
||
// never replaces it. Writes to data/_kb/observer_escalations.jsonl
|
||
// as a dedicated audit surface.
|
||
|
||
const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000";
|
||
const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl";
|
||
const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers
|
||
|
||
// ─── KB enrichment helper (2026-04-26) ────────────────────────────
|
||
// Mirrors what scrum_master_pipeline already does on every per-file
|
||
// review: queries pathway_memory bug fingerprints + the lakehouse_arch
|
||
// matrix corpus, then asks qwen3.5:latest to synthesize a tight
|
||
// briefing. We reuse the same primitives so observer escalations carry
|
||
// the same compounding context the scrum loop builds — no new index
|
||
// surfaces, no new corpora.
|
||
//
|
||
// `task_class` is derived from the cluster (most ops use the same one);
|
||
// pathway/bug_fingerprints is permissive about a null file_path, so
|
||
// non-code clusters (scenario fills, v1.chat events) just see broader
|
||
// matches via task_class alone.
|
||
//
|
||
// Returns "" when there's no useful signal — caller treats empty as
|
||
// "no preamble" and skips the prepend.
|
||
async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise<string> {
|
||
const sample = cluster[0];
|
||
const taskClass = sample?.event_kind
|
||
?? (sample?.source === "scenario" ? "scenario_fill" : "observer_escalation");
|
||
|
||
// Step 1: pathway bug fingerprints. Best-effort; null filePath just
|
||
// widens the query at the matrix-index level.
|
||
let fingerprints: { flag: { kind: string }; pattern_key: string; example: string; occurrences: number }[] = [];
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/vectors/pathway/bug_fingerprints`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({ task_class: taskClass, file_path: null, signal_class: null, limit: 5 }),
|
||
signal: AbortSignal.timeout(5000),
|
||
});
|
||
if (r.ok) fingerprints = (await r.json() as any).fingerprints ?? [];
|
||
} catch {}
|
||
|
||
// Step 2: architectural matrix (lakehouse_arch_v1) — ADRs/PRD/plan
|
||
// intent. Cluster summary is the search query.
|
||
const clusterSummary = cluster.slice(-5).map(o =>
|
||
`${o.endpoint ?? "?"} ${o.input_summary ?? ""} ${o.error ?? ""}`
|
||
).join(" | ");
|
||
let matrixChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = [];
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/vectors/search`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 4 }),
|
||
signal: AbortSignal.timeout(5000),
|
||
});
|
||
if (r.ok) matrixChunks = (await r.json() as any).results ?? [];
|
||
} catch {}
|
||
|
||
// Step 3: gold-standard prior answers (lakehouse_answers_v1) — past
|
||
// scrum reviews + observer escalations. This is where the BIG-model
|
||
// results we save live; future small-model handlers retrieve them
|
||
// here as scaffolding so the cheap rung gets near-paid quality.
|
||
let answerChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = [];
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/vectors/search`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({ index_name: "lakehouse_answers_v1", query: `${taskClass} ${clusterSummary}`, top_k: 3 }),
|
||
signal: AbortSignal.timeout(5000),
|
||
});
|
||
if (r.ok) answerChunks = (await r.json() as any).results ?? [];
|
||
} catch {}
|
||
|
||
if (fingerprints.length === 0 && matrixChunks.length === 0 && answerChunks.length === 0) return "";
|
||
|
||
// Step 3: synthesis via local model (qwen3.5:latest, provider=ollama).
|
||
// Compresses the raw bundle to a 1-2 sentence briefing the cloud
|
||
// reviewer can actually use. If local model is down/slow, fall back
|
||
// to the raw dump rather than blocking the escalation path.
|
||
const rawBundle = [
|
||
fingerprints.length > 0
|
||
? "PRIOR BUG PATTERNS (pathway memory):\n" + fingerprints.map((fp, i) =>
|
||
`${i + 1}. [${fp.flag.kind}] ${fp.pattern_key} (×${fp.occurrences}) e.g. ${fp.example.slice(0, 120)}`
|
||
).join("\n")
|
||
: "",
|
||
matrixChunks.length > 0
|
||
? "RELATED ARCHITECTURE CONTEXT:\n" + matrixChunks.map((c, i) =>
|
||
`${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 200)}`
|
||
).join("\n")
|
||
: "",
|
||
answerChunks.length > 0
|
||
? "PRIOR GOLD-STANDARD ANSWERS (similar past reviews + escalations):\n" + answerChunks.map((c, i) =>
|
||
`${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 240)}`
|
||
).join("\n")
|
||
: "",
|
||
].filter(Boolean).join("\n\n");
|
||
|
||
const synthPrompt = `A failure cluster (sig_hash=${sigHash.slice(0, 8)}, ${cluster.length} occurrences, task_class=${taskClass}) is about to be escalated for diagnosis. Here are prior signals from our knowledge base:
|
||
|
||
${rawBundle}
|
||
|
||
Output a single paragraph (≤300 chars) briefing the cloud reviewer on which prior signals are most likely relevant to this cluster. If nothing matches, say so plainly. No preamble, no markdown.`;
|
||
|
||
let synthesized = "";
|
||
try {
|
||
const r = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "content-type": "application/json" },
|
||
body: JSON.stringify({
|
||
provider: "ollama",
|
||
model: "qwen3.5:latest",
|
||
messages: [{ role: "user", content: synthPrompt }],
|
||
max_tokens: 200,
|
||
temperature: 0.1,
|
||
think: false,
|
||
}),
|
||
signal: AbortSignal.timeout(15000),
|
||
});
|
||
if (r.ok) {
|
||
const j = await r.json() as any;
|
||
synthesized = (j?.choices?.[0]?.message?.content ?? "").trim();
|
||
}
|
||
} catch {}
|
||
|
||
const body = synthesized.length > 0 ? synthesized : rawBundle;
|
||
return `═══ KB CONTEXT — prior signals on this task class (synthesized by qwen3.5:latest) ═══\n${body}\n═══\n\n`;
|
||
}
|
||
|
||
async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) {
|
||
// Package the failure cluster as a single context blob. Originally
|
||
// I routed this to LLM Team's `code_review` mode at /api/run, but
|
||
// that mode isn't registered in llm_team_ui.py — it returned
|
||
// "Unknown mode" on every call. Revised 2026-04-24: route directly
|
||
// to the gateway's /v1/chat with provider=ollama_cloud + qwen3-coder:480b
|
||
// (the coding specialist that's rung 2 of the scrum ladder, proven
|
||
// to produce substantive structured reviews). Fire-and-forget so
|
||
// downstream failures don't block observer's normal loop.
|
||
const context = cluster.slice(-8).map((o, i) =>
|
||
`[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}`
|
||
).join("\n");
|
||
const kbPreamble = await buildKbPreamble(sigHash, cluster);
|
||
const prompt = `${kbPreamble}sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`;
|
||
|
||
try {
|
||
// 2026-04-26: switched from ollama_cloud/qwen3-coder:480b (weekly
|
||
// 429 quota was blocking escalations) to paid OpenRouter
|
||
// deepseek-v3.1-terminus — 671B reasoning specialist, $0.21 in /
|
||
// $0.79 out per M tokens (under the $0.85/M ceiling J set), 164K
|
||
// ctx. Per-escalation cost: ~$0.0006 (typical 500-token prompt +
|
||
// 300-token completion).
|
||
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
provider: "openrouter",
|
||
model: "deepseek/deepseek-v3.1-terminus",
|
||
messages: [{ role: "user", content: prompt }],
|
||
max_tokens: 800,
|
||
temperature: 0.2,
|
||
}),
|
||
signal: AbortSignal.timeout(60000),
|
||
});
|
||
if (!resp.ok) {
|
||
console.error(`[observer] escalation /v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
|
||
return;
|
||
}
|
||
const j: any = await resp.json();
|
||
const analysis = j?.choices?.[0]?.message?.content ?? "";
|
||
|
||
// Audit row stays schema-compatible with the prior implementation —
|
||
// downstream consumers see structured fields regardless of the
|
||
// review-source change. Facts/entities stay empty (this call is
|
||
// direct-model, not extract-mode); the raw analysis carries the
|
||
// signal.
|
||
const row = {
|
||
ts: new Date().toISOString(),
|
||
source: "observer_escalation",
|
||
mode: "direct_chat_deepseek_v3_1_terminus",
|
||
sig_hash: sigHash,
|
||
cluster_size: cluster.length,
|
||
cluster_staffer: cluster[0]?.staffer_id,
|
||
cluster_endpoint: cluster[0]?.endpoint,
|
||
prompt_tokens: j?.usage?.prompt_tokens ?? 0,
|
||
completion_tokens: j?.usage?.completion_tokens ?? 0,
|
||
kb_preamble_chars: kbPreamble.length,
|
||
analysis: analysis.slice(0, 4000),
|
||
};
|
||
const { appendFile } = await import("node:fs/promises");
|
||
await appendFile(LLM_TEAM_ESCALATIONS, JSON.stringify(row) + "\n");
|
||
console.error(
|
||
`[observer] escalated sig_hash=${sigHash.slice(0, 8)} · cluster=${cluster.length} · ${analysis.length} chars`
|
||
);
|
||
} catch (e) {
|
||
console.error(`[observer] escalation failed: ${(e as Error).message}`);
|
||
}
|
||
}
|
||
|
||
// Track which sig_hashes we've already escalated this session so we
|
||
// don't hammer LLM Team on every analyzeErrors tick when a cluster
|
||
// persists across cycles.
|
||
const escalatedSigHashes = new Set<string>();
|
||
|
||
// ─── Hand-review for scrum/agent candidate responses (2026-04-25) ───
|
||
//
|
||
// Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict
|
||
// can be treated as truth about whether a candidate review is grounded.
|
||
// Two-tier evaluator:
|
||
// 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with
|
||
// response + source excerpt + grounding stats as context.
|
||
// 2. On cloud failure (throttle/timeout) → deterministic heuristic
|
||
// over grounding_pct + total_quotes. Marked source: "heuristic"
|
||
// so consumers can tell which rung produced the verdict.
|
||
// Every verdict is persisted to data/_kb/observer_reviews.jsonl.
|
||
|
||
const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl";
|
||
|
||
interface HandReviewInput {
|
||
file_path: string;
|
||
model: string;
|
||
response: string;
|
||
source_content: string;
|
||
grounding_stats: { total: number; grounded: number; groundedPct: number | null };
|
||
attempt: number;
|
||
}
|
||
|
||
interface HandReviewVerdict {
|
||
verdict: "accept" | "reject" | "cycle";
|
||
confidence: number;
|
||
notes: string;
|
||
source: "cloud" | "heuristic";
|
||
}
|
||
|
||
async function handReview(input: HandReviewInput): Promise<HandReviewVerdict> {
|
||
const t0 = Date.now();
|
||
let verdict: HandReviewVerdict;
|
||
|
||
try {
|
||
verdict = await cloudHandReview(input);
|
||
} catch (e) {
|
||
console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`);
|
||
verdict = heuristicHandReview(input);
|
||
}
|
||
|
||
// Persist regardless of source so we can later compare cloud vs
|
||
// heuristic verdicts on the same input and tune the heuristic.
|
||
const row = {
|
||
ts: new Date().toISOString(),
|
||
file_path: input.file_path,
|
||
model: input.model,
|
||
attempt: input.attempt,
|
||
response_chars: input.response.length,
|
||
grounding_stats: input.grounding_stats,
|
||
verdict: verdict.verdict,
|
||
confidence: verdict.confidence,
|
||
notes: verdict.notes,
|
||
source: verdict.source,
|
||
duration_ms: Date.now() - t0,
|
||
};
|
||
try {
|
||
const { appendFile } = await import("node:fs/promises");
|
||
await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n");
|
||
} catch { /* best-effort persistence */ }
|
||
|
||
return verdict;
|
||
}
|
||
|
||
async function cloudHandReview(input: HandReviewInput): Promise<HandReviewVerdict> {
|
||
const grounded = input.grounding_stats.grounded;
|
||
const total = input.grounding_stats.total;
|
||
const pct = input.grounding_stats.groundedPct;
|
||
// Truncate to keep the prompt under typical context windows.
|
||
// 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context.
|
||
const responseExcerpt = input.response.slice(0, 2000);
|
||
const sourceExcerpt = input.source_content.slice(0, 4000);
|
||
|
||
const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated.
|
||
|
||
FILE: ${input.file_path}
|
||
MODEL: ${input.model}
|
||
ATTEMPT: ${input.attempt}
|
||
ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""}
|
||
|
||
REVIEW (first 2000 chars):
|
||
\`\`\`
|
||
${responseExcerpt}
|
||
\`\`\`
|
||
|
||
SOURCE EXCERPT (first 4000 chars):
|
||
\`\`\`
|
||
${sourceExcerpt}
|
||
\`\`\`
|
||
|
||
Respond ONLY with a JSON object:
|
||
{
|
||
"verdict": "accept" | "reject" | "cycle",
|
||
"confidence": 0-100,
|
||
"notes": "<1-2 sentences on what makes this grounded or hallucinated>"
|
||
}
|
||
|
||
- accept: review references real symbols/lines in source; findings could be acted on.
|
||
- reject: review invents APIs, fabricates calls, contradicts source. Do NOT record.
|
||
- cycle: review is mediocre — partially grounded but wrong shape, try a stronger model.`;
|
||
|
||
// Hand-review uses paid OpenRouter so it sidesteps the Ollama Cloud
|
||
// throttle that drove every prior iter into the heuristic fallback.
|
||
// Grok 4.1 fast: $0.20 in / $0.50 out per M tokens, 2M ctx. A typical
|
||
// hand-review (~6K input + 300 output) costs ~$0.0014. Selected via
|
||
// J directive 2026-04-25 ("best model under $0.72/M").
|
||
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
provider: "openrouter",
|
||
model: "x-ai/grok-4.1-fast",
|
||
messages: [{ role: "user", content: prompt }],
|
||
max_tokens: 300,
|
||
temperature: 0.0,
|
||
}),
|
||
signal: AbortSignal.timeout(45000),
|
||
});
|
||
if (!resp.ok) {
|
||
throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
|
||
}
|
||
const j: any = await resp.json();
|
||
const content = (j?.choices?.[0]?.message?.content ?? "").trim();
|
||
// Pull JSON object from the response — model may wrap it in prose.
|
||
const m = content.match(/\{[\s\S]*\}/);
|
||
if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`);
|
||
const parsed = JSON.parse(m[0]);
|
||
const v = String(parsed.verdict ?? "accept").toLowerCase();
|
||
return {
|
||
verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept",
|
||
confidence: Number(parsed.confidence ?? 50),
|
||
notes: String(parsed.notes ?? "").slice(0, 500),
|
||
source: "cloud",
|
||
};
|
||
}
|
||
|
||
function heuristicHandReview(input: HandReviewInput): HandReviewVerdict {
|
||
// Deterministic fallback when cloud is throttled. Conservative:
|
||
// only flip to reject when the evidence is overwhelming, otherwise
|
||
// accept (fall-open principle — observer is policy, not blocker).
|
||
const total = input.grounding_stats.total;
|
||
const pct = input.grounding_stats.groundedPct;
|
||
const respLen = input.response.length;
|
||
|
||
// Too short to be a real review
|
||
if (respLen < 1500) {
|
||
return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" };
|
||
}
|
||
// Below 5 quotes — not enough signal to judge grounding; accept
|
||
if (total < 5 || pct === null) {
|
||
return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" };
|
||
}
|
||
// Very heavy hallucination
|
||
if (pct < 20) {
|
||
return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" };
|
||
}
|
||
// Mediocre — cycle to a stronger model
|
||
if (pct < 50) {
|
||
return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" };
|
||
}
|
||
// Good enough
|
||
return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" };
|
||
}
|
||
|
||
async function maybeEscalate(failures: ObservedOp[]) {
|
||
// Group failures by sig_hash
|
||
const bySig = new Map<string, ObservedOp[]>();
|
||
for (const f of failures) {
|
||
const k = f.sig_hash ?? "__no_sig__";
|
||
(bySig.get(k) ?? bySig.set(k, []).get(k)!).push(f);
|
||
}
|
||
for (const [sigHash, cluster] of bySig) {
|
||
if (sigHash === "__no_sig__") continue;
|
||
if (cluster.length < ESCALATION_THRESHOLD) continue;
|
||
if (escalatedSigHashes.has(sigHash)) continue;
|
||
escalatedSigHashes.add(sigHash);
|
||
// Fire-and-forget — don't block the existing analyzer loop.
|
||
escalateFailureClusterToLLMTeam(sigHash, cluster).catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── Error analyzer loop ───
|
||
|
||
async function analyzeErrors() {
|
||
// Read recent failures
|
||
const failures = recentOps.filter(op => !op.success);
|
||
if (failures.length === 0) return;
|
||
|
||
// NEW 2026-04-24: escalate recurring sig_hash clusters to LLM Team
|
||
// code_review mode. Runs in parallel to the local qwen2.5 analysis
|
||
// below — non-blocking, richer downstream signal for scrum/auditor.
|
||
maybeEscalate(failures).catch(() => {});
|
||
|
||
const errorSummary = failures.slice(-10).map(f =>
|
||
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
|
||
).join("\n");
|
||
|
||
// Ask local model to diagnose. Phase 44 migration (2026-04-27):
|
||
// /v1/chat instead of legacy /ai/generate so /v1/usage tracks the
|
||
// call + Langfuse traces it. Same upstream model (qwen2.5 local).
|
||
try {
|
||
const resp = await fetch(`${LAKEHOUSE}/v1/chat`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
model: "qwen2.5",
|
||
provider: "ollama",
|
||
messages: [{
|
||
role: "user",
|
||
content: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
|
||
|
||
${errorSummary}
|
||
|
||
For each error:
|
||
1. What likely caused it?
|
||
2. How should the agent adjust its approach?
|
||
3. Should this be added to the playbook as a "don't do this"?
|
||
|
||
Be specific and actionable. Under 200 words.`,
|
||
}],
|
||
max_tokens: 400,
|
||
temperature: 0.2,
|
||
}),
|
||
});
|
||
const analysis = await resp.json() as any;
|
||
const analysisText = analysis?.choices?.[0]?.message?.content ?? "";
|
||
if (analysisText) {
|
||
console.error(`[observer] Error analysis:\n${analysisText}`);
|
||
// Log the analysis as a playbook entry
|
||
await fetch(`${GATEWAY}/log`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
operation: `error_analysis: ${failures.length} failures`,
|
||
approach: "LLM-analyzed error patterns",
|
||
result: analysisText.slice(0, 500),
|
||
context: errorSummary.slice(0, 500),
|
||
}),
|
||
});
|
||
}
|
||
} catch (e) {
|
||
console.error(`[observer] Analysis failed: ${e}`);
|
||
}
|
||
}
|
||
|
||
// ─── Playbook consolidation ───
|
||
|
||
async function consolidatePlaybooks() {
|
||
const successes = recentOps.filter(op => op.success);
|
||
if (successes.length < 5) return;
|
||
|
||
// Group by endpoint
|
||
const groups: Record<string, ObservedOp[]> = {};
|
||
for (const op of successes) {
|
||
const key = op.endpoint;
|
||
if (!groups[key]) groups[key] = [];
|
||
groups[key].push(op);
|
||
}
|
||
|
||
for (const [endpoint, ops] of Object.entries(groups)) {
|
||
if (ops.length < 3) continue;
|
||
|
||
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
|
||
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
|
||
|
||
await fetch(`${GATEWAY}/log`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
|
||
approach: `common pattern: ${pattern.slice(0, 200)}`,
|
||
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
|
||
context: `endpoint=${endpoint}`,
|
||
}),
|
||
}).catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── HTTP listener for external ops (Phase 24) ───
|
||
|
||
// Scenarios POST per-event outcomes here so the observer's analyzer +
|
||
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
|
||
// also exposed at /stats for external health checks.
|
||
function startHttpListener() {
|
||
Bun.serve({
|
||
port: OBSERVER_PORT,
|
||
hostname: "0.0.0.0",
|
||
fetch(req) {
|
||
const url = new URL(req.url);
|
||
if (req.method === "GET" && url.pathname === "/health") {
|
||
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
|
||
}
|
||
if (req.method === "GET" && url.pathname === "/stats") {
|
||
const bySource = new Map<string, number>();
|
||
for (const o of recentOps) {
|
||
const k = o.source ?? "mcp";
|
||
bySource.set(k, (bySource.get(k) ?? 0) + 1);
|
||
}
|
||
return new Response(JSON.stringify({
|
||
total: recentOps.length,
|
||
successes: recentOps.filter(o => o.success).length,
|
||
failures: recentOps.filter(o => !o.success).length,
|
||
by_source: Object.fromEntries(bySource),
|
||
recent_scenario_ops: recentOps
|
||
.filter(o => o.source === "scenario")
|
||
.slice(-10)
|
||
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
|
||
}));
|
||
}
|
||
// ─── Hand-review endpoint (2026-04-25) ───
|
||
// scrum/agent posts a candidate response + source content + grounding
|
||
// stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with
|
||
// semantic context and returns {verdict, confidence, notes}. On
|
||
// cloud throttle, falls back to a deterministic heuristic over the
|
||
// grounding stats so the loop keeps moving with honest signal.
|
||
//
|
||
// This is the policy layer scrum was missing — pre-2026-04-25 the
|
||
// scrum_master applied a hardcoded grounding-rate threshold inline,
|
||
// which baked judgment into the wrong layer. Now scrum reports data
|
||
// (response + source + stats) and observer decides accept/reject/cycle.
|
||
if (req.method === "POST" && url.pathname === "/review") {
|
||
return req.json().then((body: any) => handReview(body))
|
||
.then((verdict) => new Response(JSON.stringify(verdict), {
|
||
headers: { "content-type": "application/json" },
|
||
}))
|
||
.catch((e: Error) =>
|
||
new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), {
|
||
status: 200, // fall-open shape — scrum keeps moving on observer failure
|
||
headers: { "content-type": "application/json" },
|
||
}));
|
||
}
|
||
if (req.method === "POST" && url.pathname === "/event") {
|
||
return req.json().then((body: any) => {
|
||
const op: ObservedOp = {
|
||
timestamp: body.timestamp ?? new Date().toISOString(),
|
||
endpoint: body.endpoint ?? "scenario:fill",
|
||
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
|
||
success: !!body.success,
|
||
duration_ms: Number(body.duration_ms ?? 0),
|
||
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
|
||
error: body.error,
|
||
// Respect the client's provenance if set (langfuse bridge
|
||
// sends source:"langfuse", etc.). Default to "scenario"
|
||
// to keep legacy Phase 24 callers working.
|
||
source: body.source ?? "scenario",
|
||
staffer_id: body.staffer_id,
|
||
sig_hash: body.sig_hash,
|
||
event_kind: body.event_kind,
|
||
role: body.role,
|
||
city: body.city,
|
||
state: body.state,
|
||
count: body.count,
|
||
rescue_attempted: !!body.rescue_attempted,
|
||
rescue_succeeded: !!body.rescue_succeeded,
|
||
};
|
||
recordExternalOp(op);
|
||
persistOp(op).catch(() => {});
|
||
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
|
||
}).catch((e: Error) =>
|
||
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
|
||
}
|
||
// ─── Relevance filter (2026-04-25) ───
|
||
// Drops "adjacency pollution" from matrix-retrieved chunks before
|
||
// they reach the reviewer LLM. Caller (scrum/agent) posts the
|
||
// focus file + candidate chunks; observer scores via heuristic
|
||
// (path/symbol/token signals) and returns kept + dropped lists.
|
||
// Pure function — no I/O, safe to call hot.
|
||
if (req.method === "POST" && url.pathname === "/relevance") {
|
||
return req.json().then((body: any) => {
|
||
const focus = body.focus_file ?? body.focus ?? {};
|
||
const chunks = body.chunks ?? [];
|
||
const threshold = typeof body.threshold === "number" ? body.threshold : 0.3;
|
||
const result = filterChunks(focus, chunks, threshold);
|
||
return new Response(JSON.stringify(result), {
|
||
headers: { "content-type": "application/json" },
|
||
});
|
||
}).catch((e: Error) =>
|
||
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
|
||
}
|
||
return new Response("not found", { status: 404 });
|
||
},
|
||
});
|
||
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
|
||
}
|
||
|
||
// ─── Overseer corrections tailer (2026-04-23) ───
|
||
|
||
// The gateway's /v1/respond loop writes T3 overseer corrections to
|
||
// data/_kb/overseer_corrections.jsonl. Tail it once per cycle and
|
||
// inject each new row into the same recentOps ring that analyzeErrors
|
||
// + consolidatePlaybooks read — so a correction that just fired shows
|
||
// up alongside the outcomes it was meant to repair, and the analyzer
|
||
// can flag patterns like "three corrections on staffing.fill with the
|
||
// same advice — underlying problem isn't a drift, it's a data gap".
|
||
const CORRECTIONS_PATH = process.env.OVERSEER_CORRECTIONS_PATH
|
||
?? "/home/profit/lakehouse/data/_kb/overseer_corrections.jsonl";
|
||
let correctionsCursor = 0; // byte offset
|
||
|
||
async function tailOverseerCorrections(): Promise<number> {
|
||
const f = Bun.file(CORRECTIONS_PATH);
|
||
if (!(await f.exists())) return 0;
|
||
const size = f.size;
|
||
if (size <= correctionsCursor) return 0;
|
||
|
||
// Read only the suffix since the last cursor; keeps tail work
|
||
// bounded even as the file grows.
|
||
const text = await f.slice(correctionsCursor, size).text();
|
||
correctionsCursor = size;
|
||
|
||
let forwarded = 0;
|
||
for (const line of text.split("\n")) {
|
||
if (!line.trim()) continue;
|
||
let row: any;
|
||
try { row = JSON.parse(line); } catch { continue; }
|
||
const op: ObservedOp = {
|
||
timestamp: row.created_at ?? new Date().toISOString(),
|
||
endpoint: `overseer:${row.model ?? "gpt-oss:120b"}`,
|
||
input_summary: `${row.task_class ?? "?"}: ${row.reason ?? "escalation"}`,
|
||
// Correction itself is neither success nor failure — it's a
|
||
// mitigation attempt. We mark success=true so analyzeErrors
|
||
// doesn't count it as a failure, but the preview lets the
|
||
// analyzer see what was tried.
|
||
success: true,
|
||
duration_ms: Number(row.usage?.latency_ms ?? 0),
|
||
output_summary: String(row.correction ?? "").slice(0, 200),
|
||
source: "overseer_correction",
|
||
sig_hash: row.sig_hash,
|
||
task_class: row.task_class,
|
||
correction: String(row.correction ?? ""),
|
||
applied_at_turn: Number(row.applied_at_turn ?? 0),
|
||
};
|
||
recordExternalOp(op);
|
||
forwarded++;
|
||
}
|
||
return forwarded;
|
||
}
|
||
|
||
// ─── Main loop ───
|
||
|
||
async function main() {
|
||
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
|
||
|
||
// Run a health check first
|
||
const health = await fetch(`${GATEWAY}/health`).then(r => r.ok ? r.text() : null).catch(() => null);
|
||
if (!health) {
|
||
console.error("[observer] gateway unreachable — exiting");
|
||
process.exit(1);
|
||
}
|
||
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
|
||
|
||
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
|
||
startHttpListener();
|
||
|
||
// Main loop
|
||
let cycle = 0;
|
||
while (true) {
|
||
await Bun.sleep(CYCLE_SECS * 1000);
|
||
cycle++;
|
||
|
||
// Every cycle: tail the overseer corrections KB stream, then
|
||
// analyze errors. Order matters — if an overseer correction just
|
||
// landed for a sig_hash that previously failed, the analyzer
|
||
// should see both.
|
||
const newCorrections = await tailOverseerCorrections();
|
||
if (newCorrections > 0) {
|
||
console.error(`[observer] pulled ${newCorrections} new overseer correction(s) into ring`);
|
||
}
|
||
await analyzeErrors();
|
||
|
||
// Every 5 cycles: consolidate playbooks
|
||
if (cycle % 5 === 0) {
|
||
await consolidatePlaybooks();
|
||
}
|
||
|
||
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
|
||
const langfuseOps = recentOps.filter(o => o.source === "langfuse").length;
|
||
const correctionOps = recentOps.filter(o => o.source === "overseer_correction").length;
|
||
const stats = {
|
||
cycle,
|
||
total_ops: recentOps.length,
|
||
successes: recentOps.filter(o => o.success).length,
|
||
failures: recentOps.filter(o => !o.success).length,
|
||
scenario_ops: scenarioOps,
|
||
langfuse_ops: langfuseOps,
|
||
overseer_corrections: correctionOps,
|
||
};
|
||
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
|
||
}
|
||
}
|
||
|
||
// Export the observed wrapper for other agents to use
|
||
export { main as startObserver };
|
||
|
||
// Run if executed directly
|
||
if (import.meta.main) {
|
||
main().catch(console.error);
|
||
}
|