// A day in the life — the real-world scenario test. // // Runs six events against the live substrate: baseline_fill, recurring, // expansion, emergency, misplacement, retrospective. Each event // exercises a different pressure pattern; each one produces actionable // artifacts (SMS drafts, client emails, dispatch log) alongside the // ranking output; the run as a whole is self-audited at EOD against six // gap categories (supply, embedding, fairness, drift, tool, write-through). // // Design notes: // - Compressed clock. The "08:00" in an event spec is a label for the // output, not a wall-clock gate. The full scenario runs in minutes. // - One script, shared state. Each event mutates the same roster + // gap_signals + artifacts in-memory, then persists at EOD. // - Fail-soft per event. A drift-abort or tool error on one event // records a gap_signal and moves on; we explicitly want to see which // events the substrate can't handle, not abort the whole run. // - Every fill event routes through the same executor/reviewer loop as // the single-task orchestrator — just driven in sequence rather than // standalone, with event-specific extra constraints in the prompt. import { type LogEntry, type TaskSpec, type Action, type Fill, callTool, hybridSearch, sqlQuery, generate, generateCloud, parseAction, executorPrompt, reviewerPrompt, GATEWAY, } from "./agent.ts"; import { mkdir, writeFile, appendFile } from "node:fs/promises"; import { join } from "node:path"; // 2026-04-20 — reverted to mistral executor after trying qwen2.5. // qwen2.5 emits malformed JSON (trailing `)` garbage, unterminated // strings) when asked for tool calls. mistral drops fields occasionally // but produces valid JSON. With optional `question` default + lean // prompt + schema lock, mistral seals baseline + recurring reliably. // Complex scenarios (5-fill, emergency, misplacement) remain flaky — // real Phase 20+ problem (larger model or constrained decoding needed). const EXECUTOR_MODEL = "mistral:latest"; const REVIEWER_MODEL = "qwen2.5:latest"; const DRAFT_MODEL = "qwen2.5:latest"; // artifact generation; short outputs // T3 overview tier. Called sparingly — NOT per tool call. Two insertion // points: (B) mid-scenario checkpoint after every misplacement event and // every N events, and (A) cross-day lesson after all events complete. // gpt-oss:20b is a thinking model: it spends tokens in a hidden reasoning // block before emitting `response`. Budget accordingly — never under 400. // Model matrix — config/models.json is authoritative. Env vars override. // Loaded at module init so we can log the tier shape at scenario start. interface ModelTier { primary: { model: string; provider: string }; local_fallback?: { model: string; provider: string }; max_tokens?: number; temperature?: number; env_flag?: string; } let MODEL_MATRIX: { tiers: Record } = { tiers: {} }; try { MODEL_MATRIX = JSON.parse(await Bun.file("config/models.json").text()); } catch { // Config optional — env vars alone work too. Silent: the per-tier // logging below will show "default" if matrix is empty. } const T3_TIER = MODEL_MATRIX.tiers?.t3_overview; const OVERVIEW_CLOUD = process.env.LH_OVERVIEW_CLOUD === "1"; const OVERVIEW_MODEL = process.env.LH_OVERVIEW_MODEL ?? (OVERVIEW_CLOUD ? (T3_TIER?.primary.model ?? "gpt-oss:120b") : (T3_TIER?.local_fallback?.model ?? "gpt-oss:20b")); const T3_CHECKPOINT_EVERY = Number(process.env.LH_T3_CHECKPOINT_EVERY ?? 3); const T3_DISABLED = process.env.LH_T3_DISABLE === "1"; // Dispatcher: route T3 calls to local sidecar or Ollama Cloud depending // on the LH_OVERVIEW_CLOUD flag. Hot-path T1/T2 always stay local. async function overviewGenerate(prompt: string, opts: { temperature?: number; max_tokens?: number } = {}): Promise { if (OVERVIEW_CLOUD) return generateCloud(OVERVIEW_MODEL, prompt, opts); return generate(OVERVIEW_MODEL, prompt, opts); } const MAX_TURNS = 14; const MAX_CONSECUTIVE_DRIFTS = 3; const WORKERS_INDEX = "workers_500k_v1"; const WORKERS_DATASET = "workers_500k"; // =================== Event + scenario types =================== type EventKind = "baseline_fill" | "recurring" | "expansion" | "emergency" | "misplacement"; interface FillEvent { kind: EventKind; at: string; // display label like "08:00" role: string; count: number; city: string; state: string; shift_start?: string; // "08:00 AM" for SMS/email drafts scenario_note?: string; // extra context the agents should know deadline?: string; // emergency events carry this, shown to reviewer exclude_worker_ids?: string[]; // misplacement: the lost worker replaces_event?: string; // misplacement back-ref for reporting } interface ScenarioSpec { client: string; date: string; events: FillEvent[]; } interface EventResult { event: FillEvent; ok: boolean; fills: Fill[]; turns: number; duration_secs: number; error?: string; gap_signals: string[]; // pulled into the cross-event gap report sources_first_score?: number; sources_last_score?: number; pool_size?: number; // sql_matches from the first hybrid_search playbook_citations?: string[]; discovered_pattern?: string; // Path 2 meta-index snapshot per event } interface RosterEntry { worker_id: string; name: string; booked_for: string; // event at-label role: string; city: string; state: string; status: "confirmed" | "no_show" | "rebooked_elsewhere"; } interface ScenarioContext { spec: ScenarioSpec; out_dir: string; roster: RosterEntry[]; results: EventResult[]; gap_signals: Array<{ event: string; category: string; detail: string }>; prior_lessons: PriorLesson[]; } interface PriorLesson { date: string; client: string; cities: string; states: string; lesson: string; events_ok: number; events_total: number; file: string; } // Load lessons from prior T3 runs — read-back half of the feedback loop. // Filters to the most relevant by matching ANY city/state with the current // spec, then takes the 3 newest. Keeps startup cheap; file scan is O(n). async function loadPriorLessons(spec: ScenarioSpec): Promise { try { const { readdir, readFile } = await import("node:fs/promises"); const dir = join("data", "_playbook_lessons"); const files = await readdir(dir).catch(() => [] as string[]); if (files.length === 0) return []; const specCities = new Set(spec.events.map(e => e.city)); const specStates = new Set(spec.events.map(e => e.state)); const parsed: PriorLesson[] = []; for (const f of files) { if (!f.endsWith(".json")) continue; try { const raw = await readFile(join(dir, f), "utf8"); const rec = JSON.parse(raw); parsed.push({ ...rec, file: f }); } catch { /* skip malformed */ } } const relevant = parsed.filter(p => { const cities = (p.cities ?? "").split(","); const states = (p.states ?? "").split(","); return cities.some(c => specCities.has(c)) || states.some(s => specStates.has(s)); }); relevant.sort((a, b) => (b.date ?? "").localeCompare(a.date ?? "")); return relevant.slice(0, 3); } catch { return []; } } // =================== Default scenario =================== const DEFAULT_SCENARIO: ScenarioSpec = { client: "Riverfront Steel", date: "2026-04-21", events: [ { kind: "baseline_fill", at: "08:00", role: "Warehouse Associate", count: 3, city: "Toledo", state: "OH", shift_start: "08:00 AM", scenario_note: "Regular Monday morning shift, 8-hour.", }, { kind: "recurring", at: "10:30", role: "Machine Operator", count: 2, city: "Toledo", state: "OH", shift_start: "11:00 AM", scenario_note: "Recurring Tuesday/Thursday slot — prior workers may still be available.", }, { kind: "expansion", at: "12:15", role: "Forklift Operator", count: 5, city: "Toledo", state: "OH", shift_start: "01:00 PM", scenario_note: "New warehouse location opening, five-worker team needed.", }, { kind: "emergency", at: "14:00", role: "Loader", count: 4, city: "Toledo", state: "OH", shift_start: "04:00 PM same day", deadline: "16:00", scenario_note: "Walkoff incident — replacement crew needed by 16:00 sharp.", }, { kind: "misplacement", at: "15:45", role: "Warehouse Associate", count: 1, city: "Toledo", state: "OH", shift_start: "remainder of 08:00 shift", scenario_note: "One worker from the 08:00 fill didn't show; rebuild the gap.", replaces_event: "08:00", }, ], }; // =================== Low-level helpers shared across events =================== async function httpJson(url: string, body?: any): Promise { const res = await fetch(url, { method: body ? "POST" : "GET", headers: { "Content-Type": "application/json" }, body: body ? JSON.stringify(body) : undefined, }); if (!res.ok) throw new Error(`${res.status} ${await res.text()}`); return (await res.json()) as T; } function fmt(e: LogEntry): string { const tag = ` [t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}]`; const c = e.content ?? {}; const trim = (s: any, n: number) => String(s ?? "").slice(0, n); if (e.kind === "tool_call") return `${tag} ${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 60)}) — ${trim(c.rationale, 40)}`; if (e.kind === "tool_result") { if (c.error) return `${tag} ERROR ${c.error}`; const rows = c?.rows?.length ?? c?.sources?.length ?? undefined; return `${tag} ${rows !== undefined ? `rows=${rows}` : JSON.stringify(c).slice(0, 60)}`; } if (e.kind === "critique") return `${tag} verdict=${c.verdict} — ${trim(c.notes, 50)}`; if (e.kind === "propose_done") return `${tag} ${c.fills?.length ?? 0} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`; if (e.kind === "consensus_done") return `${tag} ✓`; if (e.kind === "plan") return `${tag} ${c.steps?.length ?? 0} steps`; if (e.kind === "error") return `${tag} ${c.message ?? c}`; return `${tag} ${JSON.stringify(c).slice(0, 70)}`; } async function executeToolCall(name: string, args: Record): Promise { if (name === "hybrid_search") { const { sql_filter, index_name, k } = args; // `question` is strictly required by /vectors/hybrid but local models // intermittently drop it. Derive a sensible default from sql_filter // so a missing `question` doesn't waste turns. const question = args.question ?? "qualified available workers"; if (!sql_filter || !index_name) { throw new Error(`hybrid_search needs sql_filter + index_name, got ${JSON.stringify(args)}`); } // Every fill event uses the playbook_memory boost — that's the point // of the run-as-a-whole: earlier events seed later ones. return httpJson(`${GATEWAY}/vectors/hybrid`, { sql_filter, question, index_name, top_k: k ?? 10, generate: false, use_playbook_memory: true, // 2026-04-20 — bumped 10 → 100 to match server default change. At // this memory size the semantic similarities cluster narrowly // (0.55-0.67) and k=10 silently misses geo-matched playbooks. playbook_memory_k: 100, }); } if (name === "sql") { const { query } = args; if (!query || typeof query !== "string") throw new Error(`sql needs query string`); if (!/^\s*SELECT/i.test(query)) throw new Error(`sql allows SELECT only`); return sqlQuery(query); } return callTool(name, args); } // =================== Core fill loop — one event, one consensus =================== interface AgentFillOutcome { fills: Fill[]; approach: string; turns: number; duration_secs: number; log: LogEntry[]; first_sql_matches?: number; first_pool_first_score?: number; first_pool_last_score?: number; playbook_citations: string[]; } async function runAgentFill( task: TaskSpec, extra_guidance: string, exclude_worker_ids: string[], ): Promise { const t0 = Date.now(); const log: LogEntry[] = []; let turn = 0; let consecutiveDrifts = 0; let sealed: { fills: Fill[]; approach: string } | null = null; let first_sql_matches: number | undefined; let first_pool_first: number | undefined; let first_pool_last: number | undefined; const playbook_citations = new Set(); const append = (e: Omit): LogEntry => { const full: LogEntry = { ...e, at: new Date().toISOString() }; log.push(full); console.log(fmt(full)); return full; }; // Build executor prompt with the scenario-specific guidance + exclusions // injected as an extra block. Reuses the base prompt so drift detection // and output-shape rules are unchanged. const withExtras = (base: string): string => { let addon = ""; if (extra_guidance) addon += `\n\nEVENT-SPECIFIC GUIDANCE:\n${extra_guidance}`; if (exclude_worker_ids.length > 0) { addon += `\n\nEXCLUDE these workers (already booked / unavailable today): ${exclude_worker_ids.join(", ")}\nIf your tool results include them, skip them — never propose them.`; } return base + addon; }; while (turn < MAX_TURNS && !sealed) { turn += 1; const execRaw = await generate( EXECUTOR_MODEL, withExtras(executorPrompt(task, log)), { temperature: 0.2, max_tokens: 600 }, ); let execAction: Action; try { execAction = parseAction(execRaw, "executor"); } catch (e) { append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "error", content: { message: (e as Error).message, raw: execRaw.slice(0, 300) } }); throw e; } append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: execAction.kind as any, content: execAction }); if (execAction.kind === "tool_call") { try { const result = await executeToolCall(execAction.tool, execAction.args); // Filter tool results to enforce the exclusion list — defense in // depth since the prompt alone isn't enough for weak models. const filtered = maskExclusions(result, exclude_worker_ids); // Capture the first hybrid_search pool stats for gap detection. if (execAction.tool === "hybrid_search" && first_sql_matches === undefined) { first_sql_matches = (filtered as any).sql_matches; const sources = (filtered as any).sources ?? []; if (sources.length > 0) { first_pool_first = sources[0].score; first_pool_last = sources[sources.length - 1].score; } } const trimmed = trimResult(filtered); append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result", content: trimmed }); // Accumulate playbook citations from any hybrid result that // carried them — the scenario-level report needs them. if (Array.isArray((filtered as any).sources)) { for (const s of (filtered as any).sources) { for (const c of s.playbook_citations ?? []) { playbook_citations.add(c); } } } } catch (e) { append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result", content: { error: (e as Error).message, tool: execAction.tool } }); consecutiveDrifts += 1; if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) { throw new Error(`aborted — ${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors`); } } } const revRaw = await generate( REVIEWER_MODEL, withExtras(reviewerPrompt(task, log)), { temperature: 0.1, max_tokens: 400 }, ); let revAction: Action; try { revAction = parseAction(revRaw, "reviewer"); } catch (e) { append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "error", content: { message: (e as Error).message, raw: revRaw.slice(0, 300) } }); throw e; } append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "critique", content: revAction }); if (revAction.kind !== "critique") throw new Error(`reviewer emitted non-critique: ${revAction.kind}`); if (revAction.verdict === "drift") { consecutiveDrifts += 1; if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) { throw new Error(`aborted — ${MAX_CONSECUTIVE_DRIFTS} consecutive drift flags`); } } else { consecutiveDrifts = 0; } if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") { if (execAction.fills.length !== task.target_count) { throw new Error(`consensus malformed — ${execAction.fills.length} fills vs target ${task.target_count}`); } // Enforce exclusion at seal time too, in case the models ignored // both prompt + tool-result filtering. for (const f of execAction.fills) { if (exclude_worker_ids.includes(f.candidate_id)) { throw new Error(`consensus proposed excluded worker ${f.candidate_id}`); } } append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done", content: { fills: execAction.fills } }); sealed = { fills: execAction.fills, approach: execAction.rationale ?? "multi-agent hybrid" }; } } if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns`); return { fills: sealed.fills, approach: sealed.approach, turns: turn, duration_secs: (Date.now() - t0) / 1000, log, first_sql_matches, first_pool_first_score: first_pool_first, first_pool_last_score: first_pool_last, playbook_citations: Array.from(playbook_citations), }; } function maskExclusions(result: any, exclude: string[]): any { if (exclude.length === 0) return result; if (Array.isArray(result.sources)) { return { ...result, sources: result.sources.filter((s: any) => !exclude.includes(s.doc_id)) }; } if (Array.isArray(result.rows)) { return { ...result, rows: result.rows.filter((r: any) => { const id = r.worker_id ?? r.doc_id; return id === undefined || !exclude.includes(String(id)); }) }; } return result; } function trimResult(r: any): any { if (r && Array.isArray(r.sources)) { return { ...r, sources: r.sources.slice(0, 20), _trimmed: r.sources.length > 20 ? `${r.sources.length - 20} more` : undefined }; } if (r && Array.isArray(r.rows)) { return { ...r, rows: r.rows.slice(0, 20), _trimmed: r.rows.length > 20 ? `${r.rows.length - 20} more` : undefined }; } return r; } // =================== Per-event guidance strings =================== function guidanceFor(event: FillEvent, ctx: ScenarioContext): string { // HARD SCHEMA GUARD: prior runs of this scenario had mistral invent // column names from narrative guidance ("shift", "recurring", // "expansion") and write SQL filters against them. Lock the schema // explicitly so the executor has no excuse. Also pin `availability` // and `reliability` as DOUBLE casts since their text-storage causes // type_coercion errors otherwise. const schemaLock = ` SCHEMA ENFORCEMENT (CRITICAL): The ONLY columns in workers_500k usable in sql_filter are: worker_id, name, role, email, phone, city, state, zip, skills, certifications, archetype, reliability, responsiveness, engagement, communications, compliance, availability, resume_text. Narrative words like "shift", "recurring", "expansion", "emergency", "morning", "priority" are NOT columns. DO NOT invent columns. Numeric filters need CAST: CAST(availability AS DOUBLE) > 0.5 and CAST(reliability AS DOUBLE) > 0.7.`; const base = (() => { switch (event.kind) { case "baseline_fill": return `Standard fill. Client ${ctx.spec.client}. Rank by semantic match; require CAST(availability AS DOUBLE) > 0.5.`; case "recurring": return `Recurring slot — prefer workers with past playbook citations (visible on hybrid sources). Require CAST(availability AS DOUBLE) > 0.5.`; case "expansion": return `New-location fill, ${event.count} workers at once. Require CAST(availability AS DOUBLE) > 0.5 AND CAST(reliability AS DOUBLE) > 0.75.`; case "emergency": return `Emergency replacement needed ASAP. Require CAST(availability AS DOUBLE) > 0.7. A good-enough available worker beats a perfect unavailable one.`; case "misplacement": return `Refill for a no-show. Do NOT propose anyone on the EXCLUDE list. Require CAST(availability AS DOUBLE) > 0.5.`; } })(); // Prior-lesson hint — surface up to 2 most recent lessons learned from // T3 overseer runs against this city/state. Terse to avoid diluting the // prompt. The goal is to pass forward hard-won mistakes, not flood the // context. This is the read-back half of the T3 feedback loop. const priorHint = ctx.prior_lessons.length > 0 ? `\n\nPRIOR LESSONS (from T3 overseer on past runs in similar cities):\n` + ctx.prior_lessons.slice(0, 2).map((p, i) => `${i + 1}. ${p.date} ${p.client} (${p.cities}): ${p.lesson.replace(/\s+/g, " ").slice(0, 500)}` ).join("\n") : ""; return `${schemaLock}\n\nEVENT FOCUS:\n${base}${priorHint}`; } // =================== Artifact generation =================== interface ArtifactBundle { sms: string; email: string; } // One Ollama call per event for SMS (to the filled workers) + one for // the client email. Short outputs, low temperature — these are drafts, // not creative writing. async function generateArtifacts(event: FillEvent, outcome: AgentFillOutcome, ctx: ScenarioContext): Promise { const smsPrompt = `Generate short, friendly, professional SMS messages to confirm a shift for each worker. ONE message per worker. Format as: TO: {Name} {message body under 180 chars} --- Details: - Client: ${ctx.spec.client} - Role: ${event.role} - Location: ${event.city}, ${event.state} - Shift starts: ${event.shift_start ?? "TBD"} - Scenario: ${event.scenario_note ?? ""} Workers to message: ${outcome.fills.map(f => `- ${f.name} (id ${f.candidate_id})`).join("\n")} Respond with only the message blocks, separated by "---". No commentary.`; const emailPrompt = `Generate a short professional email confirmation to the staffing client. TO: staffing@${ctx.spec.client.toLowerCase().replace(/ /g, "")}.example FROM: dispatch@lakehouse.example SUBJECT: (3-word subject) Body (4-6 lines max). Be specific about: - Number of workers filled (${outcome.fills.length} of ${event.count}) - Roles: ${event.role} - Names filled - Shift start: ${event.shift_start ?? "TBD"} - Any scenario flag: ${event.scenario_note ?? "(none)"} Workers: ${outcome.fills.map(f => `- ${f.name} (${f.reason.slice(0, 60)})`).join("\n")} Respond with only the email. No commentary.`; const [sms, email] = await Promise.all([ generate(DRAFT_MODEL, smsPrompt, { temperature: 0.3, max_tokens: 500 }), generate(DRAFT_MODEL, emailPrompt, { temperature: 0.3, max_tokens: 400 }), ]); return { sms: sms.trim(), email: email.trim() }; } // =================== Per-event runner =================== async function runEvent(event: FillEvent, ctx: ScenarioContext): Promise { console.log(`\n════════ ${event.at} — ${event.kind.toUpperCase()}: fill ${event.count}× ${event.role} in ${event.city}, ${event.state} ════════`); const t0 = Date.now(); // Build the task spec the agent loop expects. const task: TaskSpec = { id: `${ctx.spec.date}-${event.at.replace(":", "")}-${event.kind}`, operation: `fill: ${event.role} x${event.count} in ${event.city}, ${event.state}`, target_role: event.role, target_count: event.count, target_city: event.city, target_state: event.state, approach_hint: `hybrid search against ${WORKERS_INDEX} for ${event.kind}`, }; // Exclusion set: everyone already in today's roster + any explicit // exclusions from the event spec. const excludeIds = [ ...ctx.roster .filter(r => r.status === "confirmed") .map(r => r.worker_id), ...(event.exclude_worker_ids ?? []), ]; const gap_signals: string[] = []; let outcome: AgentFillOutcome; try { outcome = await runAgentFill(task, guidanceFor(event, ctx), excludeIds); } catch (e) { return { event, ok: false, fills: [], turns: 0, duration_secs: (Date.now() - t0) / 1000, error: (e as Error).message, gap_signals: [`drift_or_tool: ${(e as Error).message}`], }; } // Resolve worker_ids via SQL so the roster has stable IDs (models // sometimes return names-only). Best-effort — if name lookup finds // zero or many matches, we flag a gap. const resolved = await resolveWorkerIds(outcome.fills, event); // Roster double-book check. for (const r of resolved) { const conflict = ctx.roster.find(e => e.worker_id === r.worker_id && e.status === "confirmed"); if (conflict) { gap_signals.push(`double_book: ${r.worker_id} ${r.name} already booked for ${conflict.booked_for}`); } ctx.roster.push({ worker_id: r.worker_id, name: r.name, booked_for: event.at, role: event.role, city: event.city, state: event.state, status: "confirmed", }); } // Pool-size signal (Gap 1 — supply). const supply_threshold = event.count * 3; if ((outcome.first_sql_matches ?? 0) < supply_threshold) { gap_signals.push( `supply: only ${outcome.first_sql_matches} candidates for ${event.count}× ${event.role} in ${event.city} (< ${supply_threshold}, our 3× comfort margin)` ); } // Score-spread signal (Gap 2 — embedding). const spread = (outcome.first_pool_first_score ?? 0) - (outcome.first_pool_last_score ?? 0); if (spread > 0 && spread < 0.02) { gap_signals.push( `embedding: top-K score spread ${spread.toFixed(3)} < 0.02 — model struggles to differentiate` ); } // Generate artifacts (SMS + email) — fail-soft; artifacts are cosmetic // relative to the consensus itself. let bundle: ArtifactBundle | null = null; try { bundle = await generateArtifacts(event, { ...outcome, fills: resolved }, ctx); await appendFile(join(ctx.out_dir, "sms.md"), `\n## ${event.at} ${event.kind} — ${event.role} x${event.count} in ${event.city}, ${event.state}\n\n${bundle.sms}\n`); await appendFile(join(ctx.out_dir, "emails.md"), `\n## ${event.at} ${event.kind} — ${event.role} x${event.count}\n\n${bundle.email}\n`); } catch (e) { gap_signals.push(`artifact: ${(e as Error).message}`); } // Meta-index (Path 2) — fetch patterns the system discovered across // similar past playbooks. MUST come before dispatch log write because // dispatch carries this field. Fail-soft — patterns are observational. let discovered_pattern: string | undefined; try { const patterns = await httpJson(`${GATEWAY}/vectors/playbook_memory/patterns`, { query: `${event.role} in ${event.city}, ${event.state}`, top_k_playbooks: 25, min_trait_frequency: 0.4, }); discovered_pattern = patterns?.discovered_pattern; } catch { /* patterns are observational */ } // Dispatch log (structured). await appendFile(join(ctx.out_dir, "dispatch.jsonl"), JSON.stringify({ at: event.at, kind: event.kind, operation: task.operation, fills: resolved, turns: outcome.turns, duration_secs: outcome.duration_secs, pool_size: outcome.first_sql_matches, playbook_citations: outcome.playbook_citations, discovered_pattern, }) + "\n"); // Always seed playbook_memory after a sealed fill — keep the learning // loop tight across the whole day so recurring/misplacement events // later in the run benefit from earlier ones. // // 2026-04-20 — canonical SHORT seed text. Verbose LLM rationales dilute // the embedding and drop cosine similarity below the 0.05 threshold, // silently killing the boost. Keep operation+approach+context terse. try { await httpJson(`${GATEWAY}/vectors/playbook_memory/seed`, { operation: task.operation, approach: `${event.kind} fill via hybrid search`, context: `${event.role} fill in ${event.city}, ${event.state}`, endorsed_names: resolved.map(r => r.name), append: true, }); } catch (e) { gap_signals.push(`write_through: ${(e as Error).message}`); } // After a misplacement event, record the lost worker's failure so // future searches for this city+role dampen their boost. Without this // Path 1 negative signal the no-shower keeps getting lifted. if (event.kind === "misplacement" && (event.exclude_worker_ids?.length ?? 0) > 0) { const lost = ctx.roster.find(r => r.status === "no_show"); if (lost) { try { await httpJson(`${GATEWAY}/vectors/playbook_memory/mark_failed`, { operation: `fill: ${lost.role} x1 in ${lost.city}, ${lost.state}`, failed_names: [lost.name], reason: `no-show from ${lost.booked_for} shift`, }); } catch (e) { gap_signals.push(`mark_failed: ${(e as Error).message}`); } } } // (discovered_pattern was computed + written above, before dispatch.jsonl) return { event, ok: true, fills: outcome.fills, turns: outcome.turns, duration_secs: outcome.duration_secs, gap_signals, sources_first_score: outcome.first_pool_first_score, sources_last_score: outcome.first_pool_last_score, pool_size: outcome.first_sql_matches, playbook_citations: outcome.playbook_citations, discovered_pattern, }; } // =================== Worker ID resolution =================== // Models emit candidate_ids or names in propose_done. Some return the // W500K-XXX doc_id, others just the name, others a random tag. Resolve // to canonical (worker_id, name) via SQL so the roster is reliable. async function resolveWorkerIds(fills: Fill[], event: FillEvent): Promise { const resolved: Fill[] = []; for (const f of fills) { // Case 1: candidate_id looks like W500K-NNN — accept as-is. if (/^W500K-\d+$/.test(f.candidate_id)) { resolved.push(f); continue; } // Case 2: candidate_id is a bare integer — promote to W500K-N. if (/^\d+$/.test(f.candidate_id)) { resolved.push({ ...f, candidate_id: `W500K-${f.candidate_id}` }); continue; } // Case 3: look up by (name, city, state). Take the first match. const q = `SELECT worker_id FROM ${WORKERS_DATASET} WHERE name = '${f.name.replace(/'/g, "''")}' AND city = '${event.city.replace(/'/g, "''")}' AND state = '${event.state.replace(/'/g, "''")}' LIMIT 1`; try { const r = await sqlQuery(q); if (r.rows && r.rows.length > 0) { resolved.push({ ...f, candidate_id: `W500K-${r.rows[0].worker_id}` }); } else { // No match — keep the fill but leave candidate_id as-is; the // gap report will flag it. resolved.push(f); } } catch { resolved.push(f); } } return resolved; } // =================== T3 overview tier =================== // Called sparingly so reasoning overhead stays amortized. // (B) Checkpoint — after every misplacement AND every N-th event. // (A) Cross-day lesson — once at end of scenario. // Results land in `checkpoints.jsonl` and `lesson.md`, and the lesson // seeds playbook_memory under operation "cross-day-lesson-{date}" so // future scenarios can surface it on similar setups. interface OverviewCheckpoint { after_event: string; // event.at label event_kind: EventKind; ok: boolean; model: string; duration_secs: number; hint: string; // T3's "what to do differently next time" risk: string; // T3's named risk flag } async function runOverviewCheckpoint( event: FillEvent, result: EventResult, prior: EventResult[], ): Promise { if (T3_DISABLED) return null; const start = Date.now(); const priorSummary = prior.slice(-3).map(p => `- ${p.event.at} ${p.event.kind} ${p.event.role}×${p.event.count} → ${p.ok ? p.fills.length + "/" + p.event.count + " filled" : "FAIL"}; pool=${p.pool_size ?? "?"}; cites=${p.playbook_citations?.length ?? 0}` ).join("\n"); const thisOne = `This event: ${event.at} ${event.kind} ${event.role}×${event.count} in ${event.city}, ${event.state}. ` + `Outcome: ${result.ok ? "filled " + result.fills.length + "/" + event.count : "FAILED: " + (result.error ?? "unknown")}. ` + `Pool size: ${result.pool_size ?? "n/a"}. Turns: ${result.turns}. Playbook citations: ${result.playbook_citations?.length ?? 0}. ` + `Gap signals: ${result.gap_signals.join("; ") || "none"}.`; const prompt = `You are the overview reviewer for a staffing coordinator agent system. A mid-day checkpoint has been triggered. Recent events (most recent last): ${priorSummary || "(no prior events)"} ${thisOne} Your job: emit ONE risk flag (≤6 words) and ONE actionable hint (≤25 words) for the NEXT event. Be concrete: name the role, city, or worker class if relevant. Do not restate what happened. Think step by step, then output strictly as: RISK: HINT: `; let text = ""; try { text = await overviewGenerate(prompt, { temperature: 0.2, max_tokens: 600 }); } catch (e) { return { after_event: event.at, event_kind: event.kind, ok: false, model: OVERVIEW_MODEL, duration_secs: (Date.now() - start) / 1000, hint: "(T3 unavailable)", risk: (e as Error).message.slice(0, 80), }; } const riskMatch = text.match(/RISK:\s*(.+)/i); const hintMatch = text.match(/HINT:\s*(.+)/i); return { after_event: event.at, event_kind: event.kind, ok: Boolean(riskMatch && hintMatch), model: OVERVIEW_MODEL, duration_secs: (Date.now() - start) / 1000, risk: (riskMatch?.[1] ?? "(unparsed)").trim().slice(0, 120), hint: (hintMatch?.[1] ?? text).trim().slice(0, 400), }; } async function runCrossDayLesson(ctx: ScenarioContext, checkpoints: OverviewCheckpoint[]): Promise { if (T3_DISABLED) return null; const eventDigest = ctx.results.map(r => `- ${r.event.at} ${r.event.kind} ${r.event.role}×${r.event.count} ${r.event.city},${r.event.state} → ${r.ok ? r.fills.length + " filled" : "FAIL"}; pool=${r.pool_size ?? "?"}; turns=${r.turns}; cites=${r.playbook_citations?.length ?? 0}; gaps=${r.gap_signals.length}` ).join("\n"); const checkpointDigest = checkpoints.length > 0 ? checkpoints.map(c => `- after ${c.after_event} (${c.event_kind}): risk="${c.risk}" hint="${c.hint}"`).join("\n") : "(no mid-day checkpoints)"; const prompt = `You are the end-of-day lesson writer for a staffing coordinator agent system. The day is done. Distill it. Client: ${ctx.spec.client} Date: ${ctx.spec.date} Events that ran: ${eventDigest} Mid-day checkpoints: ${checkpointDigest} Your job: write ONE actionable lesson for future runs that face similar setups. Target audience: the agent tomorrow. Keep the lesson to 3-5 sentences. No filler, no restating. Think step by step about what pattern repeated, what to pre-fetch, or what to avoid — then write the lesson as plain prose. LESSON:`; try { const text = await overviewGenerate(prompt, { temperature: 0.2, max_tokens: 900 }); const m = text.match(/LESSON:\s*([\s\S]+)/i); return (m ? m[1] : text).trim(); } catch (e) { return `(T3 lesson unavailable: ${(e as Error).message})`; } } // =================== EOD gap report =================== async function writeRetrospective(ctx: ScenarioContext): Promise { const lines: string[] = []; lines.push(`# Scenario retrospective — ${ctx.spec.client}, ${ctx.spec.date}`); lines.push(""); lines.push(`Executor: \`${EXECUTOR_MODEL}\` Reviewer: \`${REVIEWER_MODEL}\` Draft: \`${DRAFT_MODEL}\` Overview(T3): \`${T3_DISABLED ? "disabled" : OVERVIEW_MODEL + (OVERVIEW_CLOUD ? " (cloud)" : "")}\``); lines.push(`Prior lessons loaded into executor context: **${ctx.prior_lessons.length}**${ctx.prior_lessons.length > 0 ? " (from " + ctx.prior_lessons.map(p => p.date).join(", ") + ")" : " (baseline — no prior T3 history)"}`); lines.push(""); // --- Per-event summary --- lines.push("## Events"); lines.push(""); lines.push("| At | Kind | Role / Count | Pool | Fills | Turns | Dur(s) | Cites | Gaps |"); lines.push("|---|---|---|---|---|---|---|---|---|"); for (const r of ctx.results) { const status = r.ok ? "✓" : "✗"; lines.push( `| ${r.event.at} | ${r.event.kind} | ${r.event.role} × ${r.event.count} | ${r.pool_size ?? "-"} | ${status} ${r.fills.length} | ${r.turns} | ${r.duration_secs.toFixed(1)} | ${r.playbook_citations?.length ?? 0} | ${r.gap_signals.length} |` ); } lines.push(""); // --- Roster --- lines.push("## Final roster"); lines.push(""); lines.push("| Worker | Booked | Role | City, ST | Status |"); lines.push("|---|---|---|---|---|"); for (const e of ctx.roster) { lines.push(`| ${e.worker_id} ${e.name} | ${e.booked_for} | ${e.role} | ${e.city}, ${e.state} | ${e.status} |`); } lines.push(""); // --- Gap analysis by category --- const bycat: Record = {}; for (const g of ctx.gap_signals) { if (!bycat[g.category]) bycat[g.category] = []; bycat[g.category].push(`**${g.event}** — ${g.detail}`); } // Add cross-event categories computed here: // Gap 3 — fairness (Gini-lite on roster) const bookedIds = ctx.roster.filter(r => r.status === "confirmed").map(r => r.worker_id); const counts = new Map(); for (const id of bookedIds) counts.set(id, (counts.get(id) ?? 0) + 1); const multis = [...counts.entries()].filter(([_, n]) => n > 1); if (multis.length > 0) { bycat["fairness"] = bycat["fairness"] ?? []; for (const [id, n] of multis) { const name = ctx.roster.find(r => r.worker_id === id)?.name ?? id; bycat["fairness"].push(`_cross-event_ — ${name} (${id}) booked ${n} times today`); } } // Gap 5 — tool errors already captured per-event via gap_signals. // Gap 6 — write-through coverage: compare # events vs # new playbook_memory entries. try { const stats = await httpJson(`${GATEWAY}/vectors/playbook_memory/stats`); bycat["write_through_audit"] = bycat["write_through_audit"] ?? []; bycat["write_through_audit"].push(`_post-run_ — playbook_memory has ${stats.entries} entries (ran ${ctx.results.length} events, expected ≥ ${ctx.results.filter(r => r.ok).length} new entries from this run)`); } catch { /* non-fatal */ } lines.push("## Gap signals"); lines.push(""); if (Object.keys(bycat).length === 0) { lines.push("_None surfaced — either everything worked or detection is under-tuned._"); } else { for (const [cat, items] of Object.entries(bycat)) { lines.push(`### ${cat}`); for (const item of items) lines.push(`- ${item}`); lines.push(""); } } // --- Workers-touched audit (don't leave anyone out) --- // Pull every worker that surfaced as a hit across all 5 events — booked // or excluded or rejected — so we can show the full population the // system considered, not just the ones that made the cut. J's ask: // "iterations and decisions that are made don't leave anyone out." const touched = new Map(); for (const r of ctx.results) { for (const f of r.fills) { const key = f.candidate_id; if (!touched.has(key)) touched.set(key, { name: f.name, events: [], outcome: "booked" }); touched.get(key)!.events.push(`${r.event.at} ${r.event.kind}`); } } for (const r of ctx.roster) { if (r.status === "no_show") { const t = touched.get(r.worker_id); if (t) t.outcome = "booked-then-no_show"; else touched.set(r.worker_id, { name: r.name, events: [r.booked_for], outcome: "no_show" }); } } lines.push("## Workers touched across the week"); lines.push(""); lines.push(`${touched.size} distinct workers made it through to a decision. Every one is accounted for below — ` + `no-shows flagged, rebookings noted, everyone visible.`); lines.push(""); lines.push("| Worker ID | Name | Events | Outcome |"); lines.push("|---|---|---|---|"); for (const [id, t] of touched) { lines.push(`| ${id} | ${t.name} | ${t.events.join(" + ")} | ${t.outcome} |`); } lines.push(""); // --- Discovered patterns evolution across events --- lines.push("## Discovered patterns (meta-index)"); lines.push(""); lines.push("What the system identified across semantically-similar past fills as each event ran:"); lines.push(""); for (const r of ctx.results) { const dp = (r as any).discovered_pattern ?? "—"; lines.push(`- **${r.event.at} ${r.event.kind}** (${r.event.role}): ${dp}`); } lines.push(""); // --- Narrative summary --- lines.push("## Narrative"); lines.push(""); lines.push(`- ${ctx.results.filter(r => r.ok).length}/${ctx.results.length} events reached consensus.`); lines.push(`- Final roster: ${ctx.roster.length} bookings across ${new Set(ctx.roster.map(r => r.worker_id)).size} distinct workers.`); lines.push(`- Workers touched (booked, failed, or otherwise decided): ${touched.size}.`); const totalCites = ctx.results.reduce((a, r) => a + (r.playbook_citations?.length ?? 0), 0); lines.push(`- Playbook citations across the day: ${totalCites} (proof the feedback loop fired across events).`); const droppedEvents = ctx.results.filter(r => !r.ok); if (droppedEvents.length > 0) { lines.push(`- Dropped events: ${droppedEvents.map(r => r.event.at + " " + r.event.kind).join(", ")}.`); } await writeFile(join(ctx.out_dir, "report.md"), lines.join("\n")); console.log(`\n✓ report → ${join(ctx.out_dir, "report.md")}`); } // =================== Main driver =================== async function main() { const specPath = process.argv[2]; const spec: ScenarioSpec = specPath ? JSON.parse(await Bun.file(specPath).text()) : DEFAULT_SCENARIO; const stamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19); const out_dir = join("tests/multi-agent/playbooks", `scenario-${stamp}`); await mkdir(out_dir, { recursive: true }); const prior_lessons = await loadPriorLessons(spec); const ctx: ScenarioContext = { spec, out_dir, roster: [], results: [], gap_signals: [], prior_lessons, }; // Initialize output files await writeFile(join(out_dir, "sms.md"), `# SMS drafts — ${spec.client}, ${spec.date}\n`); await writeFile(join(out_dir, "emails.md"), `# Client emails — ${spec.client}, ${spec.date}\n`); await writeFile(join(out_dir, "dispatch.jsonl"), ""); await writeFile(join(out_dir, "checkpoints.jsonl"), ""); // Archive which prior lessons this run will see, so the retrospective // can tell whether the T3 feedback loop actually fed back anything. await writeFile( join(out_dir, "prior_lessons.json"), JSON.stringify(prior_lessons, null, 2) ); if (prior_lessons.length > 0) { console.log(`▶ prior lessons loaded: ${prior_lessons.length} (from data/_playbook_lessons/)`); for (const p of prior_lessons) { console.log(` - ${p.date} ${p.client} (${p.cities}) — ${p.events_ok}/${p.events_total} ok`); } } const checkpoints: OverviewCheckpoint[] = []; console.log(`▶ scenario: ${spec.client}, ${spec.date}, ${spec.events.length} events`); console.log(`▶ models: exec=${EXECUTOR_MODEL} review=${REVIEWER_MODEL} overview=${T3_DISABLED ? "disabled" : OVERVIEW_MODEL + (OVERVIEW_CLOUD ? " (cloud)" : "")}`); console.log(`▶ out: ${out_dir}\n`); for (let i = 0; i < spec.events.length; i++) { const event = spec.events[i]; // Expand misplacement-style exclusions from the current roster: it // wants to replace a worker from a prior event, so grab everyone // booked at that at-label and add as exclusions. if (event.kind === "misplacement" && event.replaces_event) { const priorBooked = ctx.roster .filter(r => r.booked_for === event.replaces_event && r.status === "confirmed") .map(r => r.worker_id); if (priorBooked.length > 0) { // Pick one arbitrarily to mark as no_show — in a real system the // external signal would pick. For the test, first one works. const lost = priorBooked[0]; const lostEntry = ctx.roster.find(r => r.worker_id === lost); if (lostEntry) { lostEntry.status = "no_show"; console.log(` (misplacement: marking ${lost} ${lostEntry.name} as no-show)`); } // Exclude all prior bookings so the refill doesn't pick anyone // already scheduled for today. event.exclude_worker_ids = priorBooked; } } const result = await runEvent(event, ctx); ctx.results.push(result); for (const s of result.gap_signals) { const [category, ...rest] = s.split(":"); ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() }); } // Option B — T3 checkpoint after every misplacement, and every N-th event. const isLast = i === spec.events.length - 1; const nthHit = T3_CHECKPOINT_EVERY > 0 && ((i + 1) % T3_CHECKPOINT_EVERY === 0); const shouldCheckpoint = !T3_DISABLED && (event.kind === "misplacement" || nthHit || isLast); if (shouldCheckpoint) { const cp = await runOverviewCheckpoint(event, result, ctx.results.slice(0, -1)); if (cp) { checkpoints.push(cp); await appendFile(join(out_dir, "checkpoints.jsonl"), JSON.stringify(cp) + "\n"); console.log(` T3 checkpoint (${cp.duration_secs.toFixed(1)}s): risk="${cp.risk}" hint="${cp.hint.slice(0, 80)}${cp.hint.length > 80 ? "…" : ""}"`); } } // Small breather to not hammer Ollama on back-to-back runs. await new Promise(r => setTimeout(r, 500)); } // Persist structured state for forensics. await writeFile(join(out_dir, "roster.json"), JSON.stringify(ctx.roster, null, 2)); await writeFile(join(out_dir, "results.json"), JSON.stringify(ctx.results, null, 2)); // Option A — T3 cross-day lesson. One final call distills the whole run. // Saved to lesson.md and also seeded into playbook_memory so tomorrow's // agent can retrieve it on similar setups. if (!T3_DISABLED) { console.log(`\n▶ T3 cross-day lesson via ${OVERVIEW_MODEL}…`); const tLesson = Date.now(); const lesson = await runCrossDayLesson(ctx, checkpoints); const lessonSecs = ((Date.now() - tLesson) / 1000).toFixed(1); if (lesson) { await writeFile( join(out_dir, "lesson.md"), `# Cross-day lesson — ${ctx.spec.client}, ${ctx.spec.date}\n\n` + `_Generated by \`${OVERVIEW_MODEL}\` in ${lessonSecs}s. ` + `Based on ${ctx.results.length} events + ${checkpoints.length} mid-day checkpoints._\n\n` + lesson + "\n" ); console.log(`✓ lesson (${lessonSecs}s) → ${join(out_dir, "lesson.md")}`); // Persist the lesson to data/_playbook_lessons/ so future scenarios // can read it verbatim at startup. The /vectors/playbook_memory/seed // endpoint rejects operations that don't match the `fill: Role xN // in City, ST` regex (enforced in crates/vectord/src/service.rs), // so embedding-based retrieval of cross-day lessons isn't wired. // File-based read-back is durable and explicit — future scenarios // pull from the lessons dir at startup and include top-N in the // executor's system context. try { const cities = [...new Set(ctx.spec.events.map(e => e.city))].slice(0, 3).join(","); const states = [...new Set(ctx.spec.events.map(e => e.state))].slice(0, 3).join(","); const lessonsDir = join("data", "_playbook_lessons"); await mkdir(lessonsDir, { recursive: true }); const lessonRec = { date: ctx.spec.date, client: ctx.spec.client, cities, states, events_total: ctx.spec.events.length, events_ok: ctx.results.filter(r => r.ok).length, checkpoint_count: checkpoints.length, model: OVERVIEW_MODEL, cloud: OVERVIEW_CLOUD, lesson: lesson.trim(), checkpoints: checkpoints.map(c => ({ after: c.after_event, risk: c.risk, hint: c.hint })), created_at: new Date().toISOString(), }; const fname = `${ctx.spec.date}_${ctx.spec.client.replace(/\s+/g, "_")}_${Date.now()}.json`; await writeFile(join(lessonsDir, fname), JSON.stringify(lessonRec, null, 2)); console.log(` lesson archived → ${join(lessonsDir, fname)}`); } catch (e) { console.log(` (lesson archive skipped: ${(e as Error).message})`); } } } await writeRetrospective(ctx); const okCount = ctx.results.filter(r => r.ok).length; if (okCount < ctx.results.length) { console.log(`\n⚠ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md for gaps.`); process.exit(2); } console.log(`\n✓ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md.`); process.exit(0); } main().catch(e => { console.error(`\n✗ scenario driver crashed: ${(e as Error).message}`); console.error((e as Error).stack); process.exit(1); });