diff --git a/mcp-server/observer.ts b/mcp-server/observer.ts index 84920c9..13a2be2 100644 --- a/mcp-server/observer.ts +++ b/mcp-server/observer.ts @@ -20,6 +20,9 @@ const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700"; const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30"); +// Phase 24 — observer now listens on an HTTP port for external ops +// (scenarios bypass the MCP:3700 layer by design). Default 3800. +const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800"); // ─── Observed operation log ─── @@ -31,10 +34,32 @@ interface ObservedOp { duration_ms: number; output_summary: string; error?: string; + // Phase 24 — optional provenance so error analyzer and playbook + // builder can differentiate MCP-layer ops from scenario-sourced + // events. Scenarios set source="scenario" + staffer_id + sig_hash. + source?: "mcp" | "scenario"; + staffer_id?: string; + sig_hash?: string; + event_kind?: string; + role?: string; + city?: string; + state?: string; + count?: number; + rescue_attempted?: boolean; + rescue_succeeded?: boolean; } const recentOps: ObservedOp[] = []; +// Phase 24 — external ingest path. Scenarios POST outcome summaries +// here so the observer's analyzer + playbook builder see them. Called +// from the Bun.serve() handler below. Same ring buffer as the MCP- +// wrapped path so downstream loops don't need to know the source. +export function recordExternalOp(op: ObservedOp): void { + recentOps.push({ ...op, source: op.source ?? "scenario" }); + if (recentOps.length > 2000) recentOps.shift(); +} + // ─── Wrapped gateway caller — every call gets observed ─── export async function observed( @@ -93,16 +118,22 @@ function summarize(data: any): string { return JSON.stringify(data).slice(0, 100); } +// Phase 24 honesty fix — the old persistOp used /ingest/file which +// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md). +// Every op silently wiped all prior ops. Now we append a JSONL line to +// data/_observer/ops.jsonl so the historical trace is durable. The +// observer analyzer + playbook builder read from this file when it +// outgrows the 2000-entry in-memory ring. async function persistOp(op: ObservedOp) { - const csv = `timestamp,endpoint,input_summary,success,duration_ms,output_summary,error -"${op.timestamp}","${op.endpoint}","${esc(op.input_summary)}",${op.success},${op.duration_ms},"${esc(op.output_summary)}","${esc(op.error || "")}"`; - - const form = new FormData(); - form.append("file", new Blob([csv], { type: "text/csv" }), "op.csv"); - await fetch(`${LAKEHOUSE}/ingest/file?name=observed_operations`, { method: "POST", body: form }).catch(() => {}); + try { + const { mkdir, appendFile } = await import("node:fs/promises"); + await mkdir("data/_observer", { recursive: true }); + await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n"); + } catch { + // Persistence is best-effort; in-memory ring still works. + } } -function esc(s: string) { return s.replace(/"/g, '""').replace(/\n/g, ' '); } // ─── Error analyzer loop ─── @@ -189,10 +220,74 @@ async function consolidatePlaybooks() { } } +// ─── HTTP listener for external ops (Phase 24) ─── + +// Scenarios POST per-event outcomes here so the observer's analyzer + +// playbook builder see them alongside MCP-wrapped ops. Read-only stats +// also exposed at /stats for external health checks. +function startHttpListener() { + Bun.serve({ + port: OBSERVER_PORT, + hostname: "0.0.0.0", + fetch(req) { + const url = new URL(req.url); + if (req.method === "GET" && url.pathname === "/health") { + return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length })); + } + if (req.method === "GET" && url.pathname === "/stats") { + const bySource = new Map(); + for (const o of recentOps) { + const k = o.source ?? "mcp"; + bySource.set(k, (bySource.get(k) ?? 0) + 1); + } + return new Response(JSON.stringify({ + total: recentOps.length, + successes: recentOps.filter(o => o.success).length, + failures: recentOps.filter(o => !o.success).length, + by_source: Object.fromEntries(bySource), + recent_scenario_ops: recentOps + .filter(o => o.source === "scenario") + .slice(-10) + .map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })), + })); + } + if (req.method === "POST" && url.pathname === "/event") { + return req.json().then((body: any) => { + const op: ObservedOp = { + timestamp: body.timestamp ?? new Date().toISOString(), + endpoint: body.endpoint ?? "scenario:fill", + input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`, + success: !!body.success, + duration_ms: Number(body.duration_ms ?? 0), + output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")), + error: body.error, + source: "scenario", + staffer_id: body.staffer_id, + sig_hash: body.sig_hash, + event_kind: body.event_kind, + role: body.role, + city: body.city, + state: body.state, + count: body.count, + rescue_attempted: !!body.rescue_attempted, + rescue_succeeded: !!body.rescue_succeeded, + }; + recordExternalOp(op); + persistOp(op).catch(() => {}); + return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length })); + }).catch((e: Error) => + new Response(JSON.stringify({ error: e.message }), { status: 400 })); + } + return new Response("not found", { status: 404 }); + }, + }); + console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`); +} + // ─── Main loop ─── async function main() { - console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}`); + console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`); // Run a health check first const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null); @@ -202,6 +297,9 @@ async function main() { } console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`); + // Phase 24 — bind HTTP listener so scenarios can POST outcomes. + startHttpListener(); + // Main loop let cycle = 0; while (true) { @@ -216,11 +314,13 @@ async function main() { await consolidatePlaybooks(); } + const scenarioOps = recentOps.filter(o => o.source === "scenario").length; const stats = { cycle, total_ops: recentOps.length, successes: recentOps.filter(o => o.success).length, failures: recentOps.filter(o => !o.success).length, + scenario_ops: scenarioOps, }; console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`); } diff --git a/tests/multi-agent/scenario.ts b/tests/multi-agent/scenario.ts index 06f14da..e3c2088 100644 --- a/tests/multi-agent/scenario.ts +++ b/tests/multi-agent/scenario.ts @@ -35,8 +35,27 @@ import { reviewerPrompt, GATEWAY, } from "./agent.ts"; -import { indexRun, recommendFor, loadRecommendation, snapshotConfig, type PathwayRecommendation } from "./kb.ts"; +import { indexRun, recommendFor, loadRecommendation, snapshotConfig, computeSignature, type PathwayRecommendation } from "./kb.ts"; import { createHash } from "node:crypto"; + +// Phase 24 — observer HTTP endpoint. Scenarios POST per-event +// outcomes here so the observer's analyzer + playbook builder see +// them. Best-effort: if observer is down, scenario still proceeds. +const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800"; + +async function postObserverEvent(payload: Record): Promise { + try { + await fetch(`${OBSERVER_URL}/event`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(2000), + }); + } catch { + // Observer down — silent skip. The scenario's own logs are still + // the source of truth; observer integration is an augmentation. + } +} import { mkdir, writeFile, appendFile } from "node:fs/promises"; import { join } from "node:path"; @@ -1608,6 +1627,29 @@ async function main() { ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() }); } + // Phase 24 — emit outcome to observer so its ERROR_ANALYZER and + // PLAYBOOK_BUILDER loops see scenario events alongside MCP ops. + // Best-effort; observer being down doesn't affect scenario flow. + postObserverEvent({ + timestamp: new Date().toISOString(), + endpoint: "scenario:fill", + success: result.ok, + duration_ms: Math.round((result.duration_secs ?? 0) * 1000), + staffer_id: spec.staffer?.id, + sig_hash: computeSignature(spec), + event_kind: result.event.kind, + role: result.event.role, + city: result.event.city, + state: result.event.state, + count: result.event.count, + error: result.error, + rescue_attempted: !!result.retry_remediation, + rescue_succeeded: !!result.retry_remediation && result.ok, + output_summary: result.ok + ? `filled ${result.fills.length}/${result.event.count} in ${result.turns} turns, ${result.playbook_citations?.length ?? 0} cites` + : `FAIL: ${(result.error ?? "unknown").slice(0, 80)}`, + }).catch(() => {}); + // Option B — T3 checkpoint after every misplacement, and every N-th event. const isLast = i === spec.events.length - 1; const nthHit = T3_CHECKPOINT_EVERY > 0 && ((i + 1) % T3_CHECKPOINT_EVERY === 0);