Some checks failed
lakehouse/auditor 1 blocking issue: cloud: claim not backed — "journal event verified live (total_events_created 0→1 after probe)."
## Infrastructure (scrum loop hardening)
crates/gateway/src/v1/openrouter.rs — new OpenRouter provider
Direct HTTPS to openrouter.ai/api/v1/chat/completions with OpenAI-compatible shape.
Key resolution: OPENROUTER_API_KEY env → /home/profit/.env → /root/llm_team_config.json
(shares LLM Team UI's quota). Added after iter 5 hit repeated Ollama Cloud 502s on
kimi-k2:1t — different provider backbone as rescue rung. Unit tests pin the URL
stripping and OpenAI wire shape.
crates/gateway/src/v1/mod.rs + main.rs
Added `"openrouter" | "openrouter_free"` arm to /v1/chat dispatch.
V1State.openrouter_key loaded at startup via openrouter::resolve_openrouter_key()
mirroring the Ollama Cloud pattern. Startup log:
"v1: OpenRouter key loaded — /v1/chat provider=openrouter enabled"
tests/real-world/scrum_master_pipeline.ts
* 9-rung ladder — kimi-k2:1t → qwen3-coder:480b → deepseek-v3.1:671b →
mistral-large-3:675b → gpt-oss:120b → qwen3.5:397b → openrouter/gpt-oss-120b:free
→ openrouter/gemma-3-27b-it:free → local qwen3.5:latest.
Added qwen3-coder:480b as rung 2 after live probes confirmed it rescues
kimi-k2:1t 502s cleanly (0.9s latency, substantive reviews).
Dropped devstral-2 (displaced by qwen3-coder); dropped kimi-k2.6 (not available);
dropped minimax-m2.7 (returned 0 chars / 400 thinking tokens).
Local fallback promoted qwen3.5:latest per J's direction 2026-04-24.
* MAX_ATTEMPTS bumped 6 → 9 to accommodate the rescue tier.
* Tree-split scratchpad fixed — was concatenating shard markers directly
into the reviewer input, causing kimi-k2:1t to write titles like
"Forensic Audit Report – file.rs (shard 3)". Now uses internal §N§
markers during accumulation and runs a proper reduce step that
collapses per-shard digests into ONE coherent file-level synthesis
with markers stripped. Matches the Phase 21 aibridge::tree_split
map→reduce design. Fallback to stripped scratchpad if reducer returns thin.
tests/real-world/scrum_applier.ts — NEW (737 lines)
The auto-apply pipeline. Reads scrum_reviews.jsonl, filters rows where
gradient_tier ∈ {auto, dry_run} AND confidence_avg ≥ MIN_CONF (default 90),
asks the reviewer model for concrete old_string/new_string patch JSON,
applies via text replacement, runs cargo check after each file, commits
if green and reverts if red. Deny-list: /etc/, config/, ops/, auditor/,
docs/, data/, mcp-server/, ui/, sidecar/, scripts/. Hard caps: per-patch
confidence ≥ MIN_CONF, old_string must be exactly unique, max 20 lines per
patch. Never runs on main without explicit LH_APPLIER_BRANCH override.
Audit trail in data/_kb/auto_apply.jsonl.
Empirical behavior (dry-run over iter 4 reviews):
5 eligible files → 1 green commit-ready, 2 build-red reverts, 2 all-rejected
The build-green gate caught 2 bad patches before they'd have merged.
mcp-server/observer.ts — LLM Team code_review escalation
When a sig_hash accumulates ≥3 failures (ESCALATION_THRESHOLD), fire-and-forget
POST /api/run?mode=code_review at localhost:5000 with the failure cluster context.
Parses facts/entities/relationships/file_hints from the response. Writes to a
new data/_kb/observer_escalations.jsonl surface. Answers J's vision of the
observer triggering richer LLM Team calls when failures pile up.
Non-blocking: runs parallel to existing qwen2.5 analyzer, never replaces it.
Tracks escalated sig_hashes in a session-local Set to avoid re-hammering
LLM Team when a cluster persists across observer cycles.
crates/aibridge/src/context.rs
First auto-applied patch produced by scrum_applier.ts (dry-run path —
applier writes files in dry-run mode but doesn't commit; bug noted for
iter 6 fix). Adds #[deprecated] annotation to the inline estimate_tokens
helper pointing callers to the centralized shared::model_matrix::ModelMatrix
entry point (P21-002 — duplicate token-estimator surfaces). Cargo check
passes with the annotation (verified by applier's own build gate).
## Visual Control Plane (UI)
ui/server.ts — Bun.serve on :3950 with /data/* fan-out:
/data/services, /data/reviews, /data/metrics, /data/trust, /data/overrides,
/data/findings, /data/outcomes, /data/audit_facts, /data/file/:path,
/data/refactor_signals, /data/search?q=, /data/signal_classes,
/data/logs/:svc (journalctl tail per systemd unit), /data/scrum_log.
Bug fix: tryFetch always attempts JSON.parse before falling back to text
— observer's Bun.serve returns JSON without application/json content-type,
which was displaying stats as a raw string ("0 ops" on map) before.
ui/index.html + ui.css — dark neo-brutalist shell. 6 views:
MAP (D3 force-graph + overlays) / TRACE (per-file iter history) /
TRAJECTORY (signal-class cards + refactor-signals table + reverse-index
search box) / METRICS (every card has SOURCE + GOOD lines explaining
where the number comes from and what target trajectory means) /
KB (card grid with tooltips on every field) / CONSOLE (per-service
journalctl tabs).
ui/ui.js — polling client, D3 wiring, signal-class panel, refactor-signals
table, reverse-index search, per-service console tabs. Bug fix:
renderNodeContext had Object.entries() iterating string characters when
/health returned a plain string — now guards with typeof check so
"lakehouse ok" renders as one row instead of "0 l / 1 a / 2 k / ...".
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
500 lines
18 KiB
TypeScript
500 lines
18 KiB
TypeScript
/**
|
||
* Lakehouse Observer — autonomous iteration loop.
|
||
*
|
||
* Runs continuously alongside the agent gateway. Watches every operation,
|
||
* logs outcomes, detects failures, and feeds learnings back so agents
|
||
* improve over time without retraining.
|
||
*
|
||
* Three loops:
|
||
* 1. OPERATION OBSERVER — wraps gateway calls, timestamps + logs every
|
||
* success/failure to the lakehouse
|
||
* 2. ERROR ANALYZER — periodically reads the error log, asks a local
|
||
* model to diagnose patterns, writes recommendations
|
||
* 3. PLAYBOOK BUILDER — after N successful ops of the same type,
|
||
* consolidates them into a reusable playbook entry
|
||
*
|
||
* This is the "third-party witness" J asked for — it watches what
|
||
* agents do and helps them not repeat mistakes.
|
||
*/
|
||
|
||
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
|
||
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
||
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
|
||
// Phase 24 — observer now listens on an HTTP port for external ops
|
||
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
|
||
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
|
||
|
||
// ─── Observed operation log ───
|
||
|
||
interface ObservedOp {
|
||
timestamp: string;
|
||
endpoint: string;
|
||
input_summary: string;
|
||
success: boolean;
|
||
duration_ms: number;
|
||
output_summary: string;
|
||
error?: string;
|
||
// Phase 24 — optional provenance so error analyzer and playbook
|
||
// builder can differentiate MCP-layer ops from scenario-sourced
|
||
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
|
||
source?: "mcp" | "scenario" | "langfuse" | "overseer_correction";
|
||
staffer_id?: string;
|
||
sig_hash?: string;
|
||
event_kind?: string;
|
||
role?: string;
|
||
city?: string;
|
||
state?: string;
|
||
count?: number;
|
||
rescue_attempted?: boolean;
|
||
rescue_succeeded?: boolean;
|
||
// Overseer-correction-specific (2026-04-23): lets the analyzer
|
||
// correlate corrections with the drift that prompted them and with
|
||
// subsequent outcomes that either validated or invalidated the advice.
|
||
task_class?: string;
|
||
correction?: string;
|
||
applied_at_turn?: number;
|
||
}
|
||
|
||
const recentOps: ObservedOp[] = [];
|
||
|
||
// Phase 24 — external ingest path. Scenarios POST outcome summaries
|
||
// here so the observer's analyzer + playbook builder see them. Called
|
||
// from the Bun.serve() handler below. Same ring buffer as the MCP-
|
||
// wrapped path so downstream loops don't need to know the source.
|
||
export function recordExternalOp(op: ObservedOp): void {
|
||
recentOps.push({ ...op, source: op.source ?? "scenario" });
|
||
if (recentOps.length > 2000) recentOps.shift();
|
||
}
|
||
|
||
// ─── Wrapped gateway caller — every call gets observed ───
|
||
|
||
export async function observed(
|
||
endpoint: string,
|
||
body: any,
|
||
description: string,
|
||
): Promise<{ data: any; op: ObservedOp }> {
|
||
const t0 = Date.now();
|
||
let data: any;
|
||
let error: string | undefined;
|
||
let success = true;
|
||
|
||
try {
|
||
const resp = await fetch(`${GATEWAY}${endpoint}`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify(body),
|
||
});
|
||
data = await resp.json();
|
||
if (data.error) {
|
||
success = false;
|
||
error = data.error;
|
||
}
|
||
} catch (e: any) {
|
||
success = false;
|
||
error = e.message;
|
||
data = { error: e.message };
|
||
}
|
||
|
||
const op: ObservedOp = {
|
||
timestamp: new Date().toISOString(),
|
||
endpoint,
|
||
input_summary: description,
|
||
success,
|
||
duration_ms: Date.now() - t0,
|
||
output_summary: success
|
||
? summarize(data)
|
||
: `ERROR: ${error}`,
|
||
error,
|
||
};
|
||
|
||
recentOps.push(op);
|
||
if (recentOps.length > 1000) recentOps.shift();
|
||
|
||
// Persist to lakehouse
|
||
await persistOp(op);
|
||
|
||
return { data, op };
|
||
}
|
||
|
||
function summarize(data: any): string {
|
||
if (data.sql_matches !== undefined) return `hybrid: ${data.sql_matches} sql → ${data.vector_reranked} results`;
|
||
if (data.rows) return `${data.row_count || data.rows.length} rows`;
|
||
if (data.answer) return `answer: ${data.answer.slice(0, 80)}...`;
|
||
if (data.sources) return `${data.sources.length} sources`;
|
||
return JSON.stringify(data).slice(0, 100);
|
||
}
|
||
|
||
// Phase 24 honesty fix — the old persistOp used /ingest/file which
|
||
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
|
||
// Every op silently wiped all prior ops. Now we append a JSONL line to
|
||
// data/_observer/ops.jsonl so the historical trace is durable. The
|
||
// observer analyzer + playbook builder read from this file when it
|
||
// outgrows the 2000-entry in-memory ring.
|
||
async function persistOp(op: ObservedOp) {
|
||
try {
|
||
const { mkdir, appendFile } = await import("node:fs/promises");
|
||
await mkdir("data/_observer", { recursive: true });
|
||
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
|
||
} catch {
|
||
// Persistence is best-effort; in-memory ring still works.
|
||
}
|
||
}
|
||
|
||
|
||
// ─── LLM Team escalation (code_review mode) ───
|
||
//
|
||
// When recent failures on a single sig_hash cross a threshold the
|
||
// local qwen2.5 analysis is probably insufficient. J's 2026-04-24
|
||
// direction: "the observer would trigger to give more context" —
|
||
// route failure clusters to LLM Team's specialized code_review mode
|
||
// (via /api/run) so richer structured signal lands in the KB for
|
||
// scrum + auditor + playbook memory to consume next pass.
|
||
//
|
||
// Non-destructive: runs in parallel to the existing qwen2.5 analysis,
|
||
// never replaces it. Writes to data/_kb/observer_escalations.jsonl
|
||
// as a dedicated audit surface.
|
||
|
||
const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000";
|
||
const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl";
|
||
const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers
|
||
|
||
async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) {
|
||
// Package the failure cluster as a single context blob for code_review mode.
|
||
const context = cluster.slice(-8).map((o, i) =>
|
||
`[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}`
|
||
).join("\n");
|
||
|
||
try {
|
||
const resp = await fetch(`${LLM_TEAM}/api/run?mode=code_review`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
input: `sig_hash=${sigHash} · ${cluster.length} failures on same signature:\n\n${context}\n\nReview this failure pattern. What is the root cause? What code change would prevent it? Respond with structured facts + specific file hints.`,
|
||
}),
|
||
signal: AbortSignal.timeout(60000),
|
||
});
|
||
if (!resp.ok) {
|
||
console.error(`[observer] LLM Team code_review ${resp.status}: ${(await resp.text()).slice(0, 200)}`);
|
||
return;
|
||
}
|
||
const j: any = await resp.json();
|
||
|
||
// Write an audit row. Fields are deliberately permissive — LLM
|
||
// Team's response shape can evolve without breaking this write.
|
||
const row = {
|
||
ts: new Date().toISOString(),
|
||
source: "observer_escalation",
|
||
mode: "code_review",
|
||
sig_hash: sigHash,
|
||
cluster_size: cluster.length,
|
||
cluster_staffer: cluster[0]?.staffer_id,
|
||
cluster_endpoint: cluster[0]?.endpoint,
|
||
llm_team_run_id: j.run_id ?? j.llm_team_run_id ?? null,
|
||
facts: j.facts ?? [],
|
||
entities: j.entities ?? [],
|
||
relationships: j.relationships ?? [],
|
||
raw_response: typeof j.response === "string" ? j.response.slice(0, 2000) : null,
|
||
recommended_files: j.file_hints ?? j.files ?? [],
|
||
};
|
||
const { appendFile } = await import("node:fs/promises");
|
||
await appendFile(LLM_TEAM_ESCALATIONS, JSON.stringify(row) + "\n");
|
||
console.error(
|
||
`[observer] escalated sig_hash=${sigHash.slice(0, 8)} · cluster=${cluster.length} · facts=${row.facts.length} entities=${row.entities.length}`
|
||
);
|
||
} catch (e) {
|
||
console.error(`[observer] LLM Team escalation failed: ${(e as Error).message}`);
|
||
}
|
||
}
|
||
|
||
// Track which sig_hashes we've already escalated this session so we
|
||
// don't hammer LLM Team on every analyzeErrors tick when a cluster
|
||
// persists across cycles.
|
||
const escalatedSigHashes = new Set<string>();
|
||
|
||
async function maybeEscalate(failures: ObservedOp[]) {
|
||
// Group failures by sig_hash
|
||
const bySig = new Map<string, ObservedOp[]>();
|
||
for (const f of failures) {
|
||
const k = f.sig_hash ?? "__no_sig__";
|
||
(bySig.get(k) ?? bySig.set(k, []).get(k)!).push(f);
|
||
}
|
||
for (const [sigHash, cluster] of bySig) {
|
||
if (sigHash === "__no_sig__") continue;
|
||
if (cluster.length < ESCALATION_THRESHOLD) continue;
|
||
if (escalatedSigHashes.has(sigHash)) continue;
|
||
escalatedSigHashes.add(sigHash);
|
||
// Fire-and-forget — don't block the existing analyzer loop.
|
||
escalateFailureClusterToLLMTeam(sigHash, cluster).catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── Error analyzer loop ───
|
||
|
||
async function analyzeErrors() {
|
||
// Read recent failures
|
||
const failures = recentOps.filter(op => !op.success);
|
||
if (failures.length === 0) return;
|
||
|
||
// NEW 2026-04-24: escalate recurring sig_hash clusters to LLM Team
|
||
// code_review mode. Runs in parallel to the local qwen2.5 analysis
|
||
// below — non-blocking, richer downstream signal for scrum/auditor.
|
||
maybeEscalate(failures).catch(() => {});
|
||
|
||
const errorSummary = failures.slice(-10).map(f =>
|
||
`[${f.endpoint}] ${f.input_summary}: ${f.error}`
|
||
).join("\n");
|
||
|
||
// Ask local model to diagnose
|
||
try {
|
||
const resp = await fetch(`${LAKEHOUSE}/ai/generate`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
prompt: `You are a system reliability observer. Analyze these recent failures and suggest fixes:
|
||
|
||
${errorSummary}
|
||
|
||
For each error:
|
||
1. What likely caused it?
|
||
2. How should the agent adjust its approach?
|
||
3. Should this be added to the playbook as a "don't do this"?
|
||
|
||
Be specific and actionable. Under 200 words.`,
|
||
model: "qwen2.5",
|
||
max_tokens: 400,
|
||
temperature: 0.2,
|
||
}),
|
||
});
|
||
const analysis = await resp.json();
|
||
if (analysis.text) {
|
||
console.error(`[observer] Error analysis:\n${analysis.text}`);
|
||
// Log the analysis as a playbook entry
|
||
await fetch(`${GATEWAY}/log`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
operation: `error_analysis: ${failures.length} failures`,
|
||
approach: "LLM-analyzed error patterns",
|
||
result: analysis.text.slice(0, 500),
|
||
context: errorSummary.slice(0, 500),
|
||
}),
|
||
});
|
||
}
|
||
} catch (e) {
|
||
console.error(`[observer] Analysis failed: ${e}`);
|
||
}
|
||
}
|
||
|
||
// ─── Playbook consolidation ───
|
||
|
||
async function consolidatePlaybooks() {
|
||
const successes = recentOps.filter(op => op.success);
|
||
if (successes.length < 5) return;
|
||
|
||
// Group by endpoint
|
||
const groups: Record<string, ObservedOp[]> = {};
|
||
for (const op of successes) {
|
||
const key = op.endpoint;
|
||
if (!groups[key]) groups[key] = [];
|
||
groups[key].push(op);
|
||
}
|
||
|
||
for (const [endpoint, ops] of Object.entries(groups)) {
|
||
if (ops.length < 3) continue;
|
||
|
||
const avgDuration = ops.reduce((s, o) => s + o.duration_ms, 0) / ops.length;
|
||
const pattern = ops.slice(-3).map(o => o.input_summary).join("; ");
|
||
|
||
await fetch(`${GATEWAY}/log`, {
|
||
method: "POST",
|
||
headers: { "Content-Type": "application/json" },
|
||
body: JSON.stringify({
|
||
operation: `consolidated: ${ops.length} successful ${endpoint} calls`,
|
||
approach: `common pattern: ${pattern.slice(0, 200)}`,
|
||
result: `avg_duration=${avgDuration.toFixed(0)}ms, ${ops.length} successes`,
|
||
context: `endpoint=${endpoint}`,
|
||
}),
|
||
}).catch(() => {});
|
||
}
|
||
}
|
||
|
||
// ─── HTTP listener for external ops (Phase 24) ───
|
||
|
||
// Scenarios POST per-event outcomes here so the observer's analyzer +
|
||
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
|
||
// also exposed at /stats for external health checks.
|
||
function startHttpListener() {
|
||
Bun.serve({
|
||
port: OBSERVER_PORT,
|
||
hostname: "0.0.0.0",
|
||
fetch(req) {
|
||
const url = new URL(req.url);
|
||
if (req.method === "GET" && url.pathname === "/health") {
|
||
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
|
||
}
|
||
if (req.method === "GET" && url.pathname === "/stats") {
|
||
const bySource = new Map<string, number>();
|
||
for (const o of recentOps) {
|
||
const k = o.source ?? "mcp";
|
||
bySource.set(k, (bySource.get(k) ?? 0) + 1);
|
||
}
|
||
return new Response(JSON.stringify({
|
||
total: recentOps.length,
|
||
successes: recentOps.filter(o => o.success).length,
|
||
failures: recentOps.filter(o => !o.success).length,
|
||
by_source: Object.fromEntries(bySource),
|
||
recent_scenario_ops: recentOps
|
||
.filter(o => o.source === "scenario")
|
||
.slice(-10)
|
||
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
|
||
}));
|
||
}
|
||
if (req.method === "POST" && url.pathname === "/event") {
|
||
return req.json().then((body: any) => {
|
||
const op: ObservedOp = {
|
||
timestamp: body.timestamp ?? new Date().toISOString(),
|
||
endpoint: body.endpoint ?? "scenario:fill",
|
||
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
|
||
success: !!body.success,
|
||
duration_ms: Number(body.duration_ms ?? 0),
|
||
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
|
||
error: body.error,
|
||
// Respect the client's provenance if set (langfuse bridge
|
||
// sends source:"langfuse", etc.). Default to "scenario"
|
||
// to keep legacy Phase 24 callers working.
|
||
source: body.source ?? "scenario",
|
||
staffer_id: body.staffer_id,
|
||
sig_hash: body.sig_hash,
|
||
event_kind: body.event_kind,
|
||
role: body.role,
|
||
city: body.city,
|
||
state: body.state,
|
||
count: body.count,
|
||
rescue_attempted: !!body.rescue_attempted,
|
||
rescue_succeeded: !!body.rescue_succeeded,
|
||
};
|
||
recordExternalOp(op);
|
||
persistOp(op).catch(() => {});
|
||
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
|
||
}).catch((e: Error) =>
|
||
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
|
||
}
|
||
return new Response("not found", { status: 404 });
|
||
},
|
||
});
|
||
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
|
||
}
|
||
|
||
// ─── Overseer corrections tailer (2026-04-23) ───
|
||
|
||
// The gateway's /v1/respond loop writes T3 overseer corrections to
|
||
// data/_kb/overseer_corrections.jsonl. Tail it once per cycle and
|
||
// inject each new row into the same recentOps ring that analyzeErrors
|
||
// + consolidatePlaybooks read — so a correction that just fired shows
|
||
// up alongside the outcomes it was meant to repair, and the analyzer
|
||
// can flag patterns like "three corrections on staffing.fill with the
|
||
// same advice — underlying problem isn't a drift, it's a data gap".
|
||
const CORRECTIONS_PATH = process.env.OVERSEER_CORRECTIONS_PATH
|
||
?? "/home/profit/lakehouse/data/_kb/overseer_corrections.jsonl";
|
||
let correctionsCursor = 0; // byte offset
|
||
|
||
async function tailOverseerCorrections(): Promise<number> {
|
||
const f = Bun.file(CORRECTIONS_PATH);
|
||
if (!(await f.exists())) return 0;
|
||
const size = f.size;
|
||
if (size <= correctionsCursor) return 0;
|
||
|
||
// Read only the suffix since the last cursor; keeps tail work
|
||
// bounded even as the file grows.
|
||
const text = await f.slice(correctionsCursor, size).text();
|
||
correctionsCursor = size;
|
||
|
||
let forwarded = 0;
|
||
for (const line of text.split("\n")) {
|
||
if (!line.trim()) continue;
|
||
let row: any;
|
||
try { row = JSON.parse(line); } catch { continue; }
|
||
const op: ObservedOp = {
|
||
timestamp: row.created_at ?? new Date().toISOString(),
|
||
endpoint: `overseer:${row.model ?? "gpt-oss:120b"}`,
|
||
input_summary: `${row.task_class ?? "?"}: ${row.reason ?? "escalation"}`,
|
||
// Correction itself is neither success nor failure — it's a
|
||
// mitigation attempt. We mark success=true so analyzeErrors
|
||
// doesn't count it as a failure, but the preview lets the
|
||
// analyzer see what was tried.
|
||
success: true,
|
||
duration_ms: Number(row.usage?.latency_ms ?? 0),
|
||
output_summary: String(row.correction ?? "").slice(0, 200),
|
||
source: "overseer_correction",
|
||
sig_hash: row.sig_hash,
|
||
task_class: row.task_class,
|
||
correction: String(row.correction ?? ""),
|
||
applied_at_turn: Number(row.applied_at_turn ?? 0),
|
||
};
|
||
recordExternalOp(op);
|
||
forwarded++;
|
||
}
|
||
return forwarded;
|
||
}
|
||
|
||
// ─── Main loop ───
|
||
|
||
async function main() {
|
||
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
|
||
|
||
// Run a health check first
|
||
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
|
||
if (!health) {
|
||
console.error("[observer] gateway unreachable — exiting");
|
||
process.exit(1);
|
||
}
|
||
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
|
||
|
||
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
|
||
startHttpListener();
|
||
|
||
// Main loop
|
||
let cycle = 0;
|
||
while (true) {
|
||
await Bun.sleep(CYCLE_SECS * 1000);
|
||
cycle++;
|
||
|
||
// Every cycle: tail the overseer corrections KB stream, then
|
||
// analyze errors. Order matters — if an overseer correction just
|
||
// landed for a sig_hash that previously failed, the analyzer
|
||
// should see both.
|
||
const newCorrections = await tailOverseerCorrections();
|
||
if (newCorrections > 0) {
|
||
console.error(`[observer] pulled ${newCorrections} new overseer correction(s) into ring`);
|
||
}
|
||
await analyzeErrors();
|
||
|
||
// Every 5 cycles: consolidate playbooks
|
||
if (cycle % 5 === 0) {
|
||
await consolidatePlaybooks();
|
||
}
|
||
|
||
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
|
||
const langfuseOps = recentOps.filter(o => o.source === "langfuse").length;
|
||
const correctionOps = recentOps.filter(o => o.source === "overseer_correction").length;
|
||
const stats = {
|
||
cycle,
|
||
total_ops: recentOps.length,
|
||
successes: recentOps.filter(o => o.success).length,
|
||
failures: recentOps.filter(o => !o.success).length,
|
||
scenario_ops: scenarioOps,
|
||
langfuse_ops: langfuseOps,
|
||
overseer_corrections: correctionOps,
|
||
};
|
||
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
|
||
}
|
||
}
|
||
|
||
// Export the observed wrapper for other agents to use
|
||
export { main as startObserver };
|
||
|
||
// Run if executed directly
|
||
if (import.meta.main) {
|
||
main().catch(console.error);
|
||
}
|