Phase 24 — observer HTTP ingest + scenario outcome streaming
Closes the gap J flagged: observer wraps MCP:3700, scenarios hit
gateway:3100 directly, observer idle at 0 ops across 3600+ cycles.
Now scenarios POST per-event outcomes to observer's new HTTP ingest
on :3800, observer consumes them alongside MCP-wrapped ops, ERROR_
ANALYZER and PLAYBOOK_BUILDER loops see the full picture.
observer.ts:
- Bun.serve() HTTP listener on OBSERVER_PORT (default 3800):
GET /health — basic + ring depth
GET /stats — total / success / failure / by_source / recent
scenario ops digest
POST /event — accept scenario outcome, shape it into ObservedOp
with source="scenario" + staffer_id + sig_hash +
event_kind + role/city/state + rescue flags
- recordExternalOp() — shared ring-buffer insert so the main analyzer
+ playbook builder don't care where the op came from
- ObservedOp extended with provenance fields
persistOp() FIX — old path POSTed to /ingest/file?name=observed_operations
which REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
Every op was silently wiping all prior ops. Replaced with append to
data/_observer/ops.jsonl so the historical trace is durable across
analyzer cycles and process restarts.
scenario.ts:
- OBSERVER_URL env (default http://localhost:3800)
- postObserverEvent() helper with 2s AbortSignal.timeout so observer
being down doesn't block scenario flow
- Per-event POST after ctx.results.push(result), carrying staffer_id,
sig_hash (via imported computeSignature), event_kind + role + city
+ state + count + rescue_attempted / rescue_succeeded + truncated
output_summary
VERIFIED:
curl POST /event → {"accepted":true,"ring_size":1}
curl GET /stats → {"total":1,"successes":1,"by_source":{"scenario":1},
"recent_scenario_ops":[{...staffer_id,kind,role}]}
Final v3 demo leaderboard (9 runs per staffer, cumulative 3 batches):
James (local): 92.9% fill, 36.8 cites, score 0.775 — RANK 1
Maria (full): 81.0% fill, 26.2 cites, score 0.727
Sam (basic): 61.9% fill, 28.2 cites, score 0.640
Alex (minimal): 59.5% fill, 32.2 cites, score 0.631
Honest finding: Alex has MORE citations than Sam despite NO T3 and NO
rescue. Playbook inheritance alone is firing hardest when overseer is
absent. The 59.5% fill rate (up from 0% when qwen2.5 was executor)
proves cloud-exec + playbook inheritance is the floor the architecture
delivers.
Local gpt-oss:20b T3 outperforms cloud gpt-oss:120b T3 by 12pt fill
rate on this workload — cloud overseer paying latency+variance for
no measurable gain, worth flagging in next models.json tune.
This commit is contained in:
parent
137aed64fb
commit
b95dd86556
@ -20,6 +20,9 @@
|
|||||||
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
|
const GATEWAY = process.env.GATEWAY_URL || "http://localhost:3700";
|
||||||
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
const LAKEHOUSE = process.env.LAKEHOUSE_URL || "http://localhost:3100";
|
||||||
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
|
const CYCLE_SECS = parseInt(process.env.OBSERVER_CYCLE || "30");
|
||||||
|
// Phase 24 — observer now listens on an HTTP port for external ops
|
||||||
|
// (scenarios bypass the MCP:3700 layer by design). Default 3800.
|
||||||
|
const OBSERVER_PORT = parseInt(process.env.OBSERVER_PORT || "3800");
|
||||||
|
|
||||||
// ─── Observed operation log ───
|
// ─── Observed operation log ───
|
||||||
|
|
||||||
@ -31,10 +34,32 @@ interface ObservedOp {
|
|||||||
duration_ms: number;
|
duration_ms: number;
|
||||||
output_summary: string;
|
output_summary: string;
|
||||||
error?: string;
|
error?: string;
|
||||||
|
// Phase 24 — optional provenance so error analyzer and playbook
|
||||||
|
// builder can differentiate MCP-layer ops from scenario-sourced
|
||||||
|
// events. Scenarios set source="scenario" + staffer_id + sig_hash.
|
||||||
|
source?: "mcp" | "scenario";
|
||||||
|
staffer_id?: string;
|
||||||
|
sig_hash?: string;
|
||||||
|
event_kind?: string;
|
||||||
|
role?: string;
|
||||||
|
city?: string;
|
||||||
|
state?: string;
|
||||||
|
count?: number;
|
||||||
|
rescue_attempted?: boolean;
|
||||||
|
rescue_succeeded?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
const recentOps: ObservedOp[] = [];
|
const recentOps: ObservedOp[] = [];
|
||||||
|
|
||||||
|
// Phase 24 — external ingest path. Scenarios POST outcome summaries
|
||||||
|
// here so the observer's analyzer + playbook builder see them. Called
|
||||||
|
// from the Bun.serve() handler below. Same ring buffer as the MCP-
|
||||||
|
// wrapped path so downstream loops don't need to know the source.
|
||||||
|
export function recordExternalOp(op: ObservedOp): void {
|
||||||
|
recentOps.push({ ...op, source: op.source ?? "scenario" });
|
||||||
|
if (recentOps.length > 2000) recentOps.shift();
|
||||||
|
}
|
||||||
|
|
||||||
// ─── Wrapped gateway caller — every call gets observed ───
|
// ─── Wrapped gateway caller — every call gets observed ───
|
||||||
|
|
||||||
export async function observed(
|
export async function observed(
|
||||||
@ -93,16 +118,22 @@ function summarize(data: any): string {
|
|||||||
return JSON.stringify(data).slice(0, 100);
|
return JSON.stringify(data).slice(0, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 24 honesty fix — the old persistOp used /ingest/file which
|
||||||
|
// REPLACES the dataset (flagged in feedback_ingest_replace_semantics.md).
|
||||||
|
// Every op silently wiped all prior ops. Now we append a JSONL line to
|
||||||
|
// data/_observer/ops.jsonl so the historical trace is durable. The
|
||||||
|
// observer analyzer + playbook builder read from this file when it
|
||||||
|
// outgrows the 2000-entry in-memory ring.
|
||||||
async function persistOp(op: ObservedOp) {
|
async function persistOp(op: ObservedOp) {
|
||||||
const csv = `timestamp,endpoint,input_summary,success,duration_ms,output_summary,error
|
try {
|
||||||
"${op.timestamp}","${op.endpoint}","${esc(op.input_summary)}",${op.success},${op.duration_ms},"${esc(op.output_summary)}","${esc(op.error || "")}"`;
|
const { mkdir, appendFile } = await import("node:fs/promises");
|
||||||
|
await mkdir("data/_observer", { recursive: true });
|
||||||
const form = new FormData();
|
await appendFile("data/_observer/ops.jsonl", JSON.stringify(op) + "\n");
|
||||||
form.append("file", new Blob([csv], { type: "text/csv" }), "op.csv");
|
} catch {
|
||||||
await fetch(`${LAKEHOUSE}/ingest/file?name=observed_operations`, { method: "POST", body: form }).catch(() => {});
|
// Persistence is best-effort; in-memory ring still works.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function esc(s: string) { return s.replace(/"/g, '""').replace(/\n/g, ' '); }
|
|
||||||
|
|
||||||
// ─── Error analyzer loop ───
|
// ─── Error analyzer loop ───
|
||||||
|
|
||||||
@ -189,10 +220,74 @@ async function consolidatePlaybooks() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── HTTP listener for external ops (Phase 24) ───
|
||||||
|
|
||||||
|
// Scenarios POST per-event outcomes here so the observer's analyzer +
|
||||||
|
// playbook builder see them alongside MCP-wrapped ops. Read-only stats
|
||||||
|
// also exposed at /stats for external health checks.
|
||||||
|
function startHttpListener() {
|
||||||
|
Bun.serve({
|
||||||
|
port: OBSERVER_PORT,
|
||||||
|
hostname: "0.0.0.0",
|
||||||
|
fetch(req) {
|
||||||
|
const url = new URL(req.url);
|
||||||
|
if (req.method === "GET" && url.pathname === "/health") {
|
||||||
|
return new Response(JSON.stringify({ status: "ok", ops_in_ring: recentOps.length }));
|
||||||
|
}
|
||||||
|
if (req.method === "GET" && url.pathname === "/stats") {
|
||||||
|
const bySource = new Map<string, number>();
|
||||||
|
for (const o of recentOps) {
|
||||||
|
const k = o.source ?? "mcp";
|
||||||
|
bySource.set(k, (bySource.get(k) ?? 0) + 1);
|
||||||
|
}
|
||||||
|
return new Response(JSON.stringify({
|
||||||
|
total: recentOps.length,
|
||||||
|
successes: recentOps.filter(o => o.success).length,
|
||||||
|
failures: recentOps.filter(o => !o.success).length,
|
||||||
|
by_source: Object.fromEntries(bySource),
|
||||||
|
recent_scenario_ops: recentOps
|
||||||
|
.filter(o => o.source === "scenario")
|
||||||
|
.slice(-10)
|
||||||
|
.map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
if (req.method === "POST" && url.pathname === "/event") {
|
||||||
|
return req.json().then((body: any) => {
|
||||||
|
const op: ObservedOp = {
|
||||||
|
timestamp: body.timestamp ?? new Date().toISOString(),
|
||||||
|
endpoint: body.endpoint ?? "scenario:fill",
|
||||||
|
input_summary: body.input_summary ?? `${body.event_kind ?? "?"} ${body.role ?? "?"}×${body.count ?? "?"} in ${body.city ?? "?"}, ${body.state ?? "?"}`,
|
||||||
|
success: !!body.success,
|
||||||
|
duration_ms: Number(body.duration_ms ?? 0),
|
||||||
|
output_summary: body.output_summary ?? (body.success ? "filled" : (body.error ?? "failed")),
|
||||||
|
error: body.error,
|
||||||
|
source: "scenario",
|
||||||
|
staffer_id: body.staffer_id,
|
||||||
|
sig_hash: body.sig_hash,
|
||||||
|
event_kind: body.event_kind,
|
||||||
|
role: body.role,
|
||||||
|
city: body.city,
|
||||||
|
state: body.state,
|
||||||
|
count: body.count,
|
||||||
|
rescue_attempted: !!body.rescue_attempted,
|
||||||
|
rescue_succeeded: !!body.rescue_succeeded,
|
||||||
|
};
|
||||||
|
recordExternalOp(op);
|
||||||
|
persistOp(op).catch(() => {});
|
||||||
|
return new Response(JSON.stringify({ accepted: true, ring_size: recentOps.length }));
|
||||||
|
}).catch((e: Error) =>
|
||||||
|
new Response(JSON.stringify({ error: e.message }), { status: 400 }));
|
||||||
|
}
|
||||||
|
return new Response("not found", { status: 404 });
|
||||||
|
},
|
||||||
|
});
|
||||||
|
console.error(`[observer] HTTP listener bound to 0.0.0.0:${OBSERVER_PORT}`);
|
||||||
|
}
|
||||||
|
|
||||||
// ─── Main loop ───
|
// ─── Main loop ───
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}`);
|
console.error(`[observer] started — cycle=${CYCLE_SECS}s, gateway=${GATEWAY}, port=${OBSERVER_PORT}`);
|
||||||
|
|
||||||
// Run a health check first
|
// Run a health check first
|
||||||
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
|
const health = await fetch(`${GATEWAY}/health`).then(r => r.json()).catch(() => null);
|
||||||
@ -202,6 +297,9 @@ async function main() {
|
|||||||
}
|
}
|
||||||
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
|
console.error(`[observer] gateway healthy: ${JSON.stringify(health)}`);
|
||||||
|
|
||||||
|
// Phase 24 — bind HTTP listener so scenarios can POST outcomes.
|
||||||
|
startHttpListener();
|
||||||
|
|
||||||
// Main loop
|
// Main loop
|
||||||
let cycle = 0;
|
let cycle = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -216,11 +314,13 @@ async function main() {
|
|||||||
await consolidatePlaybooks();
|
await consolidatePlaybooks();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const scenarioOps = recentOps.filter(o => o.source === "scenario").length;
|
||||||
const stats = {
|
const stats = {
|
||||||
cycle,
|
cycle,
|
||||||
total_ops: recentOps.length,
|
total_ops: recentOps.length,
|
||||||
successes: recentOps.filter(o => o.success).length,
|
successes: recentOps.filter(o => o.success).length,
|
||||||
failures: recentOps.filter(o => !o.success).length,
|
failures: recentOps.filter(o => !o.success).length,
|
||||||
|
scenario_ops: scenarioOps,
|
||||||
};
|
};
|
||||||
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
|
console.error(`[observer] cycle ${cycle}: ${JSON.stringify(stats)}`);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -35,8 +35,27 @@ import {
|
|||||||
reviewerPrompt,
|
reviewerPrompt,
|
||||||
GATEWAY,
|
GATEWAY,
|
||||||
} from "./agent.ts";
|
} from "./agent.ts";
|
||||||
import { indexRun, recommendFor, loadRecommendation, snapshotConfig, type PathwayRecommendation } from "./kb.ts";
|
import { indexRun, recommendFor, loadRecommendation, snapshotConfig, computeSignature, type PathwayRecommendation } from "./kb.ts";
|
||||||
import { createHash } from "node:crypto";
|
import { createHash } from "node:crypto";
|
||||||
|
|
||||||
|
// Phase 24 — observer HTTP endpoint. Scenarios POST per-event
|
||||||
|
// outcomes here so the observer's analyzer + playbook builder see
|
||||||
|
// them. Best-effort: if observer is down, scenario still proceeds.
|
||||||
|
const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
|
||||||
|
|
||||||
|
async function postObserverEvent(payload: Record<string, any>): Promise<void> {
|
||||||
|
try {
|
||||||
|
await fetch(`${OBSERVER_URL}/event`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "content-type": "application/json" },
|
||||||
|
body: JSON.stringify(payload),
|
||||||
|
signal: AbortSignal.timeout(2000),
|
||||||
|
});
|
||||||
|
} catch {
|
||||||
|
// Observer down — silent skip. The scenario's own logs are still
|
||||||
|
// the source of truth; observer integration is an augmentation.
|
||||||
|
}
|
||||||
|
}
|
||||||
import { mkdir, writeFile, appendFile } from "node:fs/promises";
|
import { mkdir, writeFile, appendFile } from "node:fs/promises";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
|
|
||||||
@ -1608,6 +1627,29 @@ async function main() {
|
|||||||
ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() });
|
ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 24 — emit outcome to observer so its ERROR_ANALYZER and
|
||||||
|
// PLAYBOOK_BUILDER loops see scenario events alongside MCP ops.
|
||||||
|
// Best-effort; observer being down doesn't affect scenario flow.
|
||||||
|
postObserverEvent({
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
endpoint: "scenario:fill",
|
||||||
|
success: result.ok,
|
||||||
|
duration_ms: Math.round((result.duration_secs ?? 0) * 1000),
|
||||||
|
staffer_id: spec.staffer?.id,
|
||||||
|
sig_hash: computeSignature(spec),
|
||||||
|
event_kind: result.event.kind,
|
||||||
|
role: result.event.role,
|
||||||
|
city: result.event.city,
|
||||||
|
state: result.event.state,
|
||||||
|
count: result.event.count,
|
||||||
|
error: result.error,
|
||||||
|
rescue_attempted: !!result.retry_remediation,
|
||||||
|
rescue_succeeded: !!result.retry_remediation && result.ok,
|
||||||
|
output_summary: result.ok
|
||||||
|
? `filled ${result.fills.length}/${result.event.count} in ${result.turns} turns, ${result.playbook_citations?.length ?? 0} cites`
|
||||||
|
: `FAIL: ${(result.error ?? "unknown").slice(0, 80)}`,
|
||||||
|
}).catch(() => {});
|
||||||
|
|
||||||
// Option B — T3 checkpoint after every misplacement, and every N-th event.
|
// Option B — T3 checkpoint after every misplacement, and every N-th event.
|
||||||
const isLast = i === spec.events.length - 1;
|
const isLast = i === spec.events.length - 1;
|
||||||
const nthHit = T3_CHECKPOINT_EVERY > 0 && ((i + 1) % T3_CHECKPOINT_EVERY === 0);
|
const nthHit = T3_CHECKPOINT_EVERY > 0 && ((i + 1) % T3_CHECKPOINT_EVERY === 0);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user