root 25b7e6c3a7 Phase 19 wiring + Path 1/2 work + chain integrity fixes
Backend:
- crates/vectord/src/playbook_memory.rs (new): Phase 19 in-memory boost
  store with seed/rebuild/snapshot, plus temporal decay (e^-age/30 per
  playbook), persist_to_sql endpoint backing successful_playbooks_live,
  and discover_patterns endpoint for meta-index pattern aggregation
  (recurring certs/skills/archetype/reliability across similar past fills).
- DEFAULT_TOP_K_PLAYBOOKS bumped 5 → 25; old default silently missed
  most boosts when memory had > 25 entries.
- service.rs: new routes /vectors/playbook_memory/{seed,rebuild,stats,
  persist_sql,patterns}.

Bun staffing co-pilot (mcp-server/):
- /search, /match, /verify, /proof, /simulation/run, MCP tools all
  forward use_playbook_memory:true and playbook_memory_k:25 to the
  hybrid endpoint. Boost was previously dark across the entire app.
- /log no longer POSTs to /ingest/file — that endpoint REPLACES the
  dataset's object list, so single-row CSV writes were wiping all prior
  rows in successful_playbooks (sp_rows went 33→1 in one /log call).
  /log now seeds playbook_memory with canonical short text and calls
  /persist_sql to keep successful_playbooks_live in sync.
- /simulation/run cumulative end-of-week CSV write removed for the same
  reason. Per-day per-contract /seed (added in this session) is the
  accumulating feedback path now.
- search.html addWorkerInsight renders a green "Endorsed · N playbooks"
  chip with playbook citations when boost > 0.

Internal Dioxus UI (crates/ui/):
- Dashboard phase list rewritten through Phase 19 (was stuck at "Phase
  16: File Watcher" / "Phase 17: DB Connector" — both wrong).
- Removed fabricated "27ms" stat label.
- Ask tab examples + SQL default replaced with real staffing prompts
  against candidates/clients/job_orders (was referencing nonexistent
  employees/products/events).
- New Playbook tab exposes /vectors/playbook_memory/{stats,rebuild} and
  side-by-side hybrid search (boost OFF vs ON) with citations.

Tests (tests/multi-agent/):
- run_e2e_rated.ts: parallel two-agent (mistral + qwen2.5) build phase
  + verifier rating (geo, auth, persist, boost, speed → /10).
- network_proving.ts: continuous build → verify → repeat with
  staffing-recruiter profile hot-swap; geo-discrimination check.
- chain_of_custody.ts: single recruiter operation traced through every
  layer (Bun /search, direct /vectors/hybrid parity, /log, SQL,
  playbook_memory growth, profile activation, post-op boost lift).
2026-04-20 06:21:13 -05:00

823 lines
31 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// A day in the life — the real-world scenario test.
//
// Runs six events against the live substrate: baseline_fill, recurring,
// expansion, emergency, misplacement, retrospective. Each event
// exercises a different pressure pattern; each one produces actionable
// artifacts (SMS drafts, client emails, dispatch log) alongside the
// ranking output; the run as a whole is self-audited at EOD against six
// gap categories (supply, embedding, fairness, drift, tool, write-through).
//
// Design notes:
// - Compressed clock. The "08:00" in an event spec is a label for the
// output, not a wall-clock gate. The full scenario runs in minutes.
// - One script, shared state. Each event mutates the same roster +
// gap_signals + artifacts in-memory, then persists at EOD.
// - Fail-soft per event. A drift-abort or tool error on one event
// records a gap_signal and moves on; we explicitly want to see which
// events the substrate can't handle, not abort the whole run.
// - Every fill event routes through the same executor/reviewer loop as
// the single-task orchestrator — just driven in sequence rather than
// standalone, with event-specific extra constraints in the prompt.
import {
type LogEntry,
type TaskSpec,
type Action,
type Fill,
callTool,
hybridSearch,
sqlQuery,
generate,
parseAction,
executorPrompt,
reviewerPrompt,
GATEWAY,
} from "./agent.ts";
import { mkdir, writeFile, appendFile } from "node:fs/promises";
import { join } from "node:path";
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
const DRAFT_MODEL = "qwen2.5:latest"; // artifact generation; short outputs
const MAX_TURNS = 14;
const MAX_CONSECUTIVE_DRIFTS = 3;
const WORKERS_INDEX = "workers_500k_v1";
const WORKERS_DATASET = "workers_500k";
// =================== Event + scenario types ===================
type EventKind = "baseline_fill" | "recurring" | "expansion" | "emergency" | "misplacement";
interface FillEvent {
kind: EventKind;
at: string; // display label like "08:00"
role: string;
count: number;
city: string;
state: string;
shift_start?: string; // "08:00 AM" for SMS/email drafts
scenario_note?: string; // extra context the agents should know
deadline?: string; // emergency events carry this, shown to reviewer
exclude_worker_ids?: string[]; // misplacement: the lost worker
replaces_event?: string; // misplacement back-ref for reporting
}
interface ScenarioSpec {
client: string;
date: string;
events: FillEvent[];
}
interface EventResult {
event: FillEvent;
ok: boolean;
fills: Fill[];
turns: number;
duration_secs: number;
error?: string;
gap_signals: string[]; // pulled into the cross-event gap report
sources_first_score?: number;
sources_last_score?: number;
pool_size?: number; // sql_matches from the first hybrid_search
playbook_citations?: string[];
}
interface RosterEntry {
worker_id: string;
name: string;
booked_for: string; // event at-label
role: string;
city: string;
state: string;
status: "confirmed" | "no_show" | "rebooked_elsewhere";
}
interface ScenarioContext {
spec: ScenarioSpec;
out_dir: string;
roster: RosterEntry[];
results: EventResult[];
gap_signals: Array<{ event: string; category: string; detail: string }>;
}
// =================== Default scenario ===================
const DEFAULT_SCENARIO: ScenarioSpec = {
client: "Riverfront Steel",
date: "2026-04-21",
events: [
{
kind: "baseline_fill",
at: "08:00",
role: "Warehouse Associate",
count: 3,
city: "Toledo",
state: "OH",
shift_start: "08:00 AM",
scenario_note: "Regular Monday morning shift, 8-hour.",
},
{
kind: "recurring",
at: "10:30",
role: "Machine Operator",
count: 2,
city: "Toledo",
state: "OH",
shift_start: "11:00 AM",
scenario_note: "Recurring Tuesday/Thursday slot — prior workers may still be available.",
},
{
kind: "expansion",
at: "12:15",
role: "Forklift Operator",
count: 5,
city: "Toledo",
state: "OH",
shift_start: "01:00 PM",
scenario_note: "New warehouse location opening, five-worker team needed.",
},
{
kind: "emergency",
at: "14:00",
role: "Loader",
count: 4,
city: "Toledo",
state: "OH",
shift_start: "04:00 PM same day",
deadline: "16:00",
scenario_note: "Walkoff incident — replacement crew needed by 16:00 sharp.",
},
{
kind: "misplacement",
at: "15:45",
role: "Warehouse Associate",
count: 1,
city: "Toledo",
state: "OH",
shift_start: "remainder of 08:00 shift",
scenario_note: "One worker from the 08:00 fill didn't show; rebuild the gap.",
replaces_event: "08:00",
},
],
};
// =================== Low-level helpers shared across events ===================
async function httpJson<T>(url: string, body?: any): Promise<T> {
const res = await fetch(url, {
method: body ? "POST" : "GET",
headers: { "Content-Type": "application/json" },
body: body ? JSON.stringify(body) : undefined,
});
if (!res.ok) throw new Error(`${res.status} ${await res.text()}`);
return (await res.json()) as T;
}
function fmt(e: LogEntry): string {
const tag = ` [t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}]`;
const c = e.content ?? {};
const trim = (s: any, n: number) => String(s ?? "").slice(0, n);
if (e.kind === "tool_call") return `${tag} ${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 60)}) — ${trim(c.rationale, 40)}`;
if (e.kind === "tool_result") {
if (c.error) return `${tag} ERROR ${c.error}`;
const rows = c?.rows?.length ?? c?.sources?.length ?? undefined;
return `${tag} ${rows !== undefined ? `rows=${rows}` : JSON.stringify(c).slice(0, 60)}`;
}
if (e.kind === "critique") return `${tag} verdict=${c.verdict}${trim(c.notes, 50)}`;
if (e.kind === "propose_done") return `${tag} ${c.fills?.length ?? 0} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`;
if (e.kind === "consensus_done") return `${tag}`;
if (e.kind === "plan") return `${tag} ${c.steps?.length ?? 0} steps`;
if (e.kind === "error") return `${tag} ${c.message ?? c}`;
return `${tag} ${JSON.stringify(c).slice(0, 70)}`;
}
async function executeToolCall(name: string, args: Record<string, any>): Promise<any> {
if (name === "hybrid_search") {
const { sql_filter, question, index_name, k } = args;
if (!sql_filter || !question || !index_name) {
throw new Error(`hybrid_search needs sql_filter + question + index_name, got ${JSON.stringify(args)}`);
}
// Every fill event uses the playbook_memory boost — that's the point
// of the run-as-a-whole: earlier events seed later ones.
return httpJson(`${GATEWAY}/vectors/hybrid`, {
sql_filter, question, index_name,
top_k: k ?? 10, generate: false,
use_playbook_memory: true,
playbook_memory_k: 10,
});
}
if (name === "sql") {
const { query } = args;
if (!query || typeof query !== "string") throw new Error(`sql needs query string`);
if (!/^\s*SELECT/i.test(query)) throw new Error(`sql allows SELECT only`);
return sqlQuery(query);
}
return callTool(name, args);
}
// =================== Core fill loop — one event, one consensus ===================
interface AgentFillOutcome {
fills: Fill[];
approach: string;
turns: number;
duration_secs: number;
log: LogEntry[];
first_sql_matches?: number;
first_pool_first_score?: number;
first_pool_last_score?: number;
playbook_citations: string[];
}
async function runAgentFill(
task: TaskSpec,
extra_guidance: string,
exclude_worker_ids: string[],
): Promise<AgentFillOutcome> {
const t0 = Date.now();
const log: LogEntry[] = [];
let turn = 0;
let consecutiveDrifts = 0;
let sealed: { fills: Fill[]; approach: string } | null = null;
let first_sql_matches: number | undefined;
let first_pool_first: number | undefined;
let first_pool_last: number | undefined;
const playbook_citations = new Set<string>();
const append = (e: Omit<LogEntry, "at">): LogEntry => {
const full: LogEntry = { ...e, at: new Date().toISOString() };
log.push(full);
console.log(fmt(full));
return full;
};
// Build executor prompt with the scenario-specific guidance + exclusions
// injected as an extra block. Reuses the base prompt so drift detection
// and output-shape rules are unchanged.
const withExtras = (base: string): string => {
let addon = "";
if (extra_guidance) addon += `\n\nEVENT-SPECIFIC GUIDANCE:\n${extra_guidance}`;
if (exclude_worker_ids.length > 0) {
addon += `\n\nEXCLUDE these workers (already booked / unavailable today): ${exclude_worker_ids.join(", ")}\nIf your tool results include them, skip them — never propose them.`;
}
return base + addon;
};
while (turn < MAX_TURNS && !sealed) {
turn += 1;
const execRaw = await generate(
EXECUTOR_MODEL,
withExtras(executorPrompt(task, log)),
{ temperature: 0.2, max_tokens: 600 },
);
let execAction: Action;
try {
execAction = parseAction(execRaw, "executor");
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "error",
content: { message: (e as Error).message, raw: execRaw.slice(0, 300) } });
throw e;
}
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: execAction.kind as any, content: execAction });
if (execAction.kind === "tool_call") {
try {
const result = await executeToolCall(execAction.tool, execAction.args);
// Filter tool results to enforce the exclusion list — defense in
// depth since the prompt alone isn't enough for weak models.
const filtered = maskExclusions(result, exclude_worker_ids);
// Capture the first hybrid_search pool stats for gap detection.
if (execAction.tool === "hybrid_search" && first_sql_matches === undefined) {
first_sql_matches = (filtered as any).sql_matches;
const sources = (filtered as any).sources ?? [];
if (sources.length > 0) {
first_pool_first = sources[0].score;
first_pool_last = sources[sources.length - 1].score;
}
}
const trimmed = trimResult(filtered);
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: "tool_result", content: trimmed });
// Accumulate playbook citations from any hybrid result that
// carried them — the scenario-level report needs them.
if (Array.isArray((filtered as any).sources)) {
for (const s of (filtered as any).sources) {
for (const c of s.playbook_citations ?? []) {
playbook_citations.add(c);
}
}
}
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result",
content: { error: (e as Error).message, tool: execAction.tool } });
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`aborted — ${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors`);
}
}
}
const revRaw = await generate(
REVIEWER_MODEL,
withExtras(reviewerPrompt(task, log)),
{ temperature: 0.1, max_tokens: 400 },
);
let revAction: Action;
try {
revAction = parseAction(revRaw, "reviewer");
} catch (e) {
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "error",
content: { message: (e as Error).message, raw: revRaw.slice(0, 300) } });
throw e;
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL,
kind: "critique", content: revAction });
if (revAction.kind !== "critique") throw new Error(`reviewer emitted non-critique: ${revAction.kind}`);
if (revAction.verdict === "drift") {
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`aborted — ${MAX_CONSECUTIVE_DRIFTS} consecutive drift flags`);
}
} else {
consecutiveDrifts = 0;
}
if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") {
if (execAction.fills.length !== task.target_count) {
throw new Error(`consensus malformed — ${execAction.fills.length} fills vs target ${task.target_count}`);
}
// Enforce exclusion at seal time too, in case the models ignored
// both prompt + tool-result filtering.
for (const f of execAction.fills) {
if (exclude_worker_ids.includes(f.candidate_id)) {
throw new Error(`consensus proposed excluded worker ${f.candidate_id}`);
}
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done",
content: { fills: execAction.fills } });
sealed = { fills: execAction.fills, approach: execAction.rationale ?? "multi-agent hybrid" };
}
}
if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns`);
return {
fills: sealed.fills,
approach: sealed.approach,
turns: turn,
duration_secs: (Date.now() - t0) / 1000,
log,
first_sql_matches,
first_pool_first_score: first_pool_first,
first_pool_last_score: first_pool_last,
playbook_citations: Array.from(playbook_citations),
};
}
function maskExclusions(result: any, exclude: string[]): any {
if (exclude.length === 0) return result;
if (Array.isArray(result.sources)) {
return { ...result, sources: result.sources.filter((s: any) => !exclude.includes(s.doc_id)) };
}
if (Array.isArray(result.rows)) {
return { ...result, rows: result.rows.filter((r: any) => {
const id = r.worker_id ?? r.doc_id;
return id === undefined || !exclude.includes(String(id));
}) };
}
return result;
}
function trimResult(r: any): any {
if (r && Array.isArray(r.sources)) {
return { ...r, sources: r.sources.slice(0, 20), _trimmed: r.sources.length > 20 ? `${r.sources.length - 20} more` : undefined };
}
if (r && Array.isArray(r.rows)) {
return { ...r, rows: r.rows.slice(0, 20), _trimmed: r.rows.length > 20 ? `${r.rows.length - 20} more` : undefined };
}
return r;
}
// =================== Per-event guidance strings ===================
function guidanceFor(event: FillEvent, ctx: ScenarioContext): string {
switch (event.kind) {
case "baseline_fill":
return `Standard Monday fill. Client ${ctx.spec.client}. Shift starts ${event.shift_start ?? "at start time"}. Take the top candidates by semantic match and availability.`;
case "recurring":
return `RECURRING slot — ${ctx.spec.client} runs this shift every Tues/Thurs. If playbook_memory surfaces candidates endorsed by past similar fills (you'll see 'cites' on hybrid sources), those are the preferred workers. Shift starts ${event.shift_start ?? "at start time"}.`;
case "expansion":
return `EXPANSION at ${ctx.spec.client}. New location, ${event.count}-worker team needed at once — search broadly and prefer workers with team/collaboration signals (engagement, communications scores). Shift starts ${event.shift_start ?? "at start time"}.`;
case "emergency":
return `EMERGENCY walkoff — ${ctx.spec.client} needs ${event.count} ${event.role}s BY ${event.deadline ?? "end of day"}. Prioritize availability over perfect skill match. A good-enough worker who can report today beats a perfect worker who can't.`;
case "misplacement":
return `MISPLACEMENT refill. A worker from the 08:00 shift no-showed. You must replace them WITHOUT proposing the same worker or anyone already booked today (see EXCLUDE list). Shift is ${event.shift_start ?? "in progress"} so speed matters.`;
}
}
// =================== Artifact generation ===================
interface ArtifactBundle {
sms: string;
email: string;
}
// One Ollama call per event for SMS (to the filled workers) + one for
// the client email. Short outputs, low temperature — these are drafts,
// not creative writing.
async function generateArtifacts(event: FillEvent, outcome: AgentFillOutcome, ctx: ScenarioContext): Promise<ArtifactBundle> {
const smsPrompt = `Generate short, friendly, professional SMS messages to confirm a shift for each worker. ONE message per worker. Format as:
TO: {Name}
{message body under 180 chars}
---
Details:
- Client: ${ctx.spec.client}
- Role: ${event.role}
- Location: ${event.city}, ${event.state}
- Shift starts: ${event.shift_start ?? "TBD"}
- Scenario: ${event.scenario_note ?? ""}
Workers to message:
${outcome.fills.map(f => `- ${f.name} (id ${f.candidate_id})`).join("\n")}
Respond with only the message blocks, separated by "---". No commentary.`;
const emailPrompt = `Generate a short professional email confirmation to the staffing client.
TO: staffing@${ctx.spec.client.toLowerCase().replace(/ /g, "")}.example
FROM: dispatch@lakehouse.example
SUBJECT: (3-word subject)
Body (4-6 lines max). Be specific about:
- Number of workers filled (${outcome.fills.length} of ${event.count})
- Roles: ${event.role}
- Names filled
- Shift start: ${event.shift_start ?? "TBD"}
- Any scenario flag: ${event.scenario_note ?? "(none)"}
Workers:
${outcome.fills.map(f => `- ${f.name} (${f.reason.slice(0, 60)})`).join("\n")}
Respond with only the email. No commentary.`;
const [sms, email] = await Promise.all([
generate(DRAFT_MODEL, smsPrompt, { temperature: 0.3, max_tokens: 500 }),
generate(DRAFT_MODEL, emailPrompt, { temperature: 0.3, max_tokens: 400 }),
]);
return { sms: sms.trim(), email: email.trim() };
}
// =================== Per-event runner ===================
async function runEvent(event: FillEvent, ctx: ScenarioContext): Promise<EventResult> {
console.log(`\n════════ ${event.at}${event.kind.toUpperCase()}: fill ${event.count}× ${event.role} in ${event.city}, ${event.state} ════════`);
const t0 = Date.now();
// Build the task spec the agent loop expects.
const task: TaskSpec = {
id: `${ctx.spec.date}-${event.at.replace(":", "")}-${event.kind}`,
operation: `fill: ${event.role} x${event.count} in ${event.city}, ${event.state}`,
target_role: event.role,
target_count: event.count,
target_city: event.city,
target_state: event.state,
approach_hint: `hybrid search against ${WORKERS_INDEX} for ${event.kind}`,
};
// Exclusion set: everyone already in today's roster + any explicit
// exclusions from the event spec.
const excludeIds = [
...ctx.roster
.filter(r => r.status === "confirmed")
.map(r => r.worker_id),
...(event.exclude_worker_ids ?? []),
];
const gap_signals: string[] = [];
let outcome: AgentFillOutcome;
try {
outcome = await runAgentFill(task, guidanceFor(event, ctx), excludeIds);
} catch (e) {
return {
event,
ok: false,
fills: [],
turns: 0,
duration_secs: (Date.now() - t0) / 1000,
error: (e as Error).message,
gap_signals: [`drift_or_tool: ${(e as Error).message}`],
};
}
// Resolve worker_ids via SQL so the roster has stable IDs (models
// sometimes return names-only). Best-effort — if name lookup finds
// zero or many matches, we flag a gap.
const resolved = await resolveWorkerIds(outcome.fills, event);
// Roster double-book check.
for (const r of resolved) {
const conflict = ctx.roster.find(e => e.worker_id === r.worker_id && e.status === "confirmed");
if (conflict) {
gap_signals.push(`double_book: ${r.worker_id} ${r.name} already booked for ${conflict.booked_for}`);
}
ctx.roster.push({
worker_id: r.worker_id,
name: r.name,
booked_for: event.at,
role: event.role,
city: event.city,
state: event.state,
status: "confirmed",
});
}
// Pool-size signal (Gap 1 — supply).
const supply_threshold = event.count * 3;
if ((outcome.first_sql_matches ?? 0) < supply_threshold) {
gap_signals.push(
`supply: only ${outcome.first_sql_matches} candidates for ${event.count}× ${event.role} in ${event.city} (< ${supply_threshold}, our 3× comfort margin)`
);
}
// Score-spread signal (Gap 2 — embedding).
const spread = (outcome.first_pool_first_score ?? 0) - (outcome.first_pool_last_score ?? 0);
if (spread > 0 && spread < 0.02) {
gap_signals.push(
`embedding: top-K score spread ${spread.toFixed(3)} < 0.02 — model struggles to differentiate`
);
}
// Generate artifacts (SMS + email) — fail-soft; artifacts are cosmetic
// relative to the consensus itself.
let bundle: ArtifactBundle | null = null;
try {
bundle = await generateArtifacts(event, { ...outcome, fills: resolved }, ctx);
await appendFile(join(ctx.out_dir, "sms.md"),
`\n## ${event.at} ${event.kind}${event.role} x${event.count} in ${event.city}, ${event.state}\n\n${bundle.sms}\n`);
await appendFile(join(ctx.out_dir, "emails.md"),
`\n## ${event.at} ${event.kind}${event.role} x${event.count}\n\n${bundle.email}\n`);
} catch (e) {
gap_signals.push(`artifact: ${(e as Error).message}`);
}
// Dispatch log (structured).
await appendFile(join(ctx.out_dir, "dispatch.jsonl"),
JSON.stringify({
at: event.at,
kind: event.kind,
operation: task.operation,
fills: resolved,
turns: outcome.turns,
duration_secs: outcome.duration_secs,
pool_size: outcome.first_sql_matches,
playbook_citations: outcome.playbook_citations,
}) + "\n");
// Always seed playbook_memory after a sealed fill — keep the learning
// loop tight across the whole day so recurring/misplacement events
// later in the run benefit from earlier ones.
try {
await httpJson(`${GATEWAY}/vectors/playbook_memory/seed`, {
operation: task.operation,
approach: outcome.approach || `${event.kind} → hybrid search`,
context: `client=${ctx.spec.client} scenario=${event.kind} shift=${event.shift_start ?? "tbd"}`,
endorsed_names: resolved.map(r => r.name),
append: true,
});
} catch (e) {
gap_signals.push(`write_through: ${(e as Error).message}`);
}
return {
event,
ok: true,
fills: outcome.fills,
turns: outcome.turns,
duration_secs: outcome.duration_secs,
gap_signals,
sources_first_score: outcome.first_pool_first_score,
sources_last_score: outcome.first_pool_last_score,
pool_size: outcome.first_sql_matches,
playbook_citations: outcome.playbook_citations,
};
}
// =================== Worker ID resolution ===================
// Models emit candidate_ids or names in propose_done. Some return the
// W500K-XXX doc_id, others just the name, others a random tag. Resolve
// to canonical (worker_id, name) via SQL so the roster is reliable.
async function resolveWorkerIds(fills: Fill[], event: FillEvent): Promise<Fill[]> {
const resolved: Fill[] = [];
for (const f of fills) {
// Case 1: candidate_id looks like W500K-NNN — accept as-is.
if (/^W500K-\d+$/.test(f.candidate_id)) {
resolved.push(f);
continue;
}
// Case 2: candidate_id is a bare integer — promote to W500K-N.
if (/^\d+$/.test(f.candidate_id)) {
resolved.push({ ...f, candidate_id: `W500K-${f.candidate_id}` });
continue;
}
// Case 3: look up by (name, city, state). Take the first match.
const q = `SELECT worker_id FROM ${WORKERS_DATASET} WHERE name = '${f.name.replace(/'/g, "''")}' AND city = '${event.city.replace(/'/g, "''")}' AND state = '${event.state.replace(/'/g, "''")}' LIMIT 1`;
try {
const r = await sqlQuery(q);
if (r.rows && r.rows.length > 0) {
resolved.push({ ...f, candidate_id: `W500K-${r.rows[0].worker_id}` });
} else {
// No match — keep the fill but leave candidate_id as-is; the
// gap report will flag it.
resolved.push(f);
}
} catch {
resolved.push(f);
}
}
return resolved;
}
// =================== EOD gap report ===================
async function writeRetrospective(ctx: ScenarioContext): Promise<void> {
const lines: string[] = [];
lines.push(`# Scenario retrospective — ${ctx.spec.client}, ${ctx.spec.date}`);
lines.push("");
lines.push(`Executor: \`${EXECUTOR_MODEL}\` Reviewer: \`${REVIEWER_MODEL}\` Draft: \`${DRAFT_MODEL}\``);
lines.push("");
// --- Per-event summary ---
lines.push("## Events");
lines.push("");
lines.push("| At | Kind | Role / Count | Pool | Fills | Turns | Dur(s) | Cites | Gaps |");
lines.push("|---|---|---|---|---|---|---|---|---|");
for (const r of ctx.results) {
const status = r.ok ? "✓" : "✗";
lines.push(
`| ${r.event.at} | ${r.event.kind} | ${r.event.role} × ${r.event.count} | ${r.pool_size ?? "-"} | ${status} ${r.fills.length} | ${r.turns} | ${r.duration_secs.toFixed(1)} | ${r.playbook_citations?.length ?? 0} | ${r.gap_signals.length} |`
);
}
lines.push("");
// --- Roster ---
lines.push("## Final roster");
lines.push("");
lines.push("| Worker | Booked | Role | City, ST | Status |");
lines.push("|---|---|---|---|---|");
for (const e of ctx.roster) {
lines.push(`| ${e.worker_id} ${e.name} | ${e.booked_for} | ${e.role} | ${e.city}, ${e.state} | ${e.status} |`);
}
lines.push("");
// --- Gap analysis by category ---
const bycat: Record<string, string[]> = {};
for (const g of ctx.gap_signals) {
if (!bycat[g.category]) bycat[g.category] = [];
bycat[g.category].push(`**${g.event}** — ${g.detail}`);
}
// Add cross-event categories computed here:
// Gap 3 — fairness (Gini-lite on roster)
const bookedIds = ctx.roster.filter(r => r.status === "confirmed").map(r => r.worker_id);
const counts = new Map<string, number>();
for (const id of bookedIds) counts.set(id, (counts.get(id) ?? 0) + 1);
const multis = [...counts.entries()].filter(([_, n]) => n > 1);
if (multis.length > 0) {
bycat["fairness"] = bycat["fairness"] ?? [];
for (const [id, n] of multis) {
const name = ctx.roster.find(r => r.worker_id === id)?.name ?? id;
bycat["fairness"].push(`_cross-event_ — ${name} (${id}) booked ${n} times today`);
}
}
// Gap 5 — tool errors already captured per-event via gap_signals.
// Gap 6 — write-through coverage: compare # events vs # new playbook_memory entries.
try {
const stats = await httpJson<any>(`${GATEWAY}/vectors/playbook_memory/stats`);
bycat["write_through_audit"] = bycat["write_through_audit"] ?? [];
bycat["write_through_audit"].push(`_post-run_ — playbook_memory has ${stats.entries} entries (ran ${ctx.results.length} events, expected ≥ ${ctx.results.filter(r => r.ok).length} new entries from this run)`);
} catch { /* non-fatal */ }
lines.push("## Gap signals");
lines.push("");
if (Object.keys(bycat).length === 0) {
lines.push("_None surfaced — either everything worked or detection is under-tuned._");
} else {
for (const [cat, items] of Object.entries(bycat)) {
lines.push(`### ${cat}`);
for (const item of items) lines.push(`- ${item}`);
lines.push("");
}
}
// --- Narrative summary ---
lines.push("## Narrative");
lines.push("");
lines.push(`- ${ctx.results.filter(r => r.ok).length}/${ctx.results.length} events reached consensus.`);
lines.push(`- Final roster: ${ctx.roster.length} bookings across ${new Set(ctx.roster.map(r => r.worker_id)).size} distinct workers.`);
const totalCites = ctx.results.reduce((a, r) => a + (r.playbook_citations?.length ?? 0), 0);
lines.push(`- Playbook citations across the day: ${totalCites} (proof the feedback loop fired across events).`);
const droppedEvents = ctx.results.filter(r => !r.ok);
if (droppedEvents.length > 0) {
lines.push(`- Dropped events: ${droppedEvents.map(r => r.event.at + " " + r.event.kind).join(", ")}.`);
}
await writeFile(join(ctx.out_dir, "report.md"), lines.join("\n"));
console.log(`\n✓ report → ${join(ctx.out_dir, "report.md")}`);
}
// =================== Main driver ===================
async function main() {
const specPath = process.argv[2];
const spec: ScenarioSpec = specPath
? JSON.parse(await Bun.file(specPath).text())
: DEFAULT_SCENARIO;
const stamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const out_dir = join("tests/multi-agent/playbooks", `scenario-${stamp}`);
await mkdir(out_dir, { recursive: true });
const ctx: ScenarioContext = {
spec,
out_dir,
roster: [],
results: [],
gap_signals: [],
};
// Initialize output files
await writeFile(join(out_dir, "sms.md"), `# SMS drafts — ${spec.client}, ${spec.date}\n`);
await writeFile(join(out_dir, "emails.md"), `# Client emails — ${spec.client}, ${spec.date}\n`);
await writeFile(join(out_dir, "dispatch.jsonl"), "");
console.log(`▶ scenario: ${spec.client}, ${spec.date}, ${spec.events.length} events`);
console.log(`▶ out: ${out_dir}\n`);
for (const event of spec.events) {
// Expand misplacement-style exclusions from the current roster: it
// wants to replace a worker from a prior event, so grab everyone
// booked at that at-label and add as exclusions.
if (event.kind === "misplacement" && event.replaces_event) {
const priorBooked = ctx.roster
.filter(r => r.booked_for === event.replaces_event && r.status === "confirmed")
.map(r => r.worker_id);
if (priorBooked.length > 0) {
// Pick one arbitrarily to mark as no_show — in a real system the
// external signal would pick. For the test, first one works.
const lost = priorBooked[0];
const lostEntry = ctx.roster.find(r => r.worker_id === lost);
if (lostEntry) {
lostEntry.status = "no_show";
console.log(` (misplacement: marking ${lost} ${lostEntry.name} as no-show)`);
}
// Exclude all prior bookings so the refill doesn't pick anyone
// already scheduled for today.
event.exclude_worker_ids = priorBooked;
}
}
const result = await runEvent(event, ctx);
ctx.results.push(result);
for (const s of result.gap_signals) {
const [category, ...rest] = s.split(":");
ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() });
}
// Small breather to not hammer Ollama on back-to-back runs.
await new Promise(r => setTimeout(r, 500));
}
// Persist structured state for forensics.
await writeFile(join(out_dir, "roster.json"), JSON.stringify(ctx.roster, null, 2));
await writeFile(join(out_dir, "results.json"), JSON.stringify(ctx.results, null, 2));
await writeRetrospective(ctx);
const okCount = ctx.results.filter(r => r.ok).length;
if (okCount < ctx.results.length) {
console.log(`\n⚠ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md for gaps.`);
process.exit(2);
}
console.log(`\n✓ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md.`);
process.exit(0);
}
main().catch(e => {
console.error(`\n✗ scenario driver crashed: ${(e as Error).message}`);
console.error((e as Error).stack);
process.exit(1);
});