root e4ae5b646e T3 overview tier — mid-day checkpoints + cross-day lesson
Hot path (T1/T2) stays mistral + qwen2.5. The new T3 tier runs a
thinking model SPARINGLY — after every misplacement, every N-th event
(default N=3), and once post-scenario for the cross-day lesson.

- agent.ts: generateCloud() for Ollama Cloud (gpt-oss:120b etc). Uses
  the same /api/generate shape; thinking field is discarded.
- scenario.ts: runOverviewCheckpoint + runCrossDayLesson. Outputs land
  in checkpoints.jsonl and lesson.md. Lesson also seeds playbook_memory
  under operation "cross-day-lesson-{date}" — future runs pick it up
  through the existing similarity boost.
- Env knobs: LH_OVERVIEW_CLOUD=1 routes T3 to cloud, LH_OVERVIEW_MODEL
  overrides (default gpt-oss:20b local, gpt-oss:120b cloud),
  LH_T3_CHECKPOINT_EVERY controls cadence, LH_T3_DISABLE=1 turns it off.

Why this shape: prior feedback_phase19_seed_text.md warned that verbose
seeds dilute the embedding and silently kill the boost. T3's rich prose
goes to lesson.md; the embedded "approach" + "context" stay terse.

Verified end-to-end: local 20b checkpoint 10.9s, lesson 4.0s; cloud
120b lesson 3.7s. Cloud output is both faster AND more specific than
local (sequenced, tactical, logging advice included).
2026-04-20 19:21:45 -05:00

1131 lines
44 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// A day in the life — the real-world scenario test.
//
// Runs six events against the live substrate: baseline_fill, recurring,
// expansion, emergency, misplacement, retrospective. Each event
// exercises a different pressure pattern; each one produces actionable
// artifacts (SMS drafts, client emails, dispatch log) alongside the
// ranking output; the run as a whole is self-audited at EOD against six
// gap categories (supply, embedding, fairness, drift, tool, write-through).
//
// Design notes:
// - Compressed clock. The "08:00" in an event spec is a label for the
// output, not a wall-clock gate. The full scenario runs in minutes.
// - One script, shared state. Each event mutates the same roster +
// gap_signals + artifacts in-memory, then persists at EOD.
// - Fail-soft per event. A drift-abort or tool error on one event
// records a gap_signal and moves on; we explicitly want to see which
// events the substrate can't handle, not abort the whole run.
// - Every fill event routes through the same executor/reviewer loop as
// the single-task orchestrator — just driven in sequence rather than
// standalone, with event-specific extra constraints in the prompt.
import {
type LogEntry,
type TaskSpec,
type Action,
type Fill,
callTool,
hybridSearch,
sqlQuery,
generate,
generateCloud,
parseAction,
executorPrompt,
reviewerPrompt,
GATEWAY,
} from "./agent.ts";
import { mkdir, writeFile, appendFile } from "node:fs/promises";
import { join } from "node:path";
// 2026-04-20 — reverted to mistral executor after trying qwen2.5.
// qwen2.5 emits malformed JSON (trailing `)` garbage, unterminated
// strings) when asked for tool calls. mistral drops fields occasionally
// but produces valid JSON. With optional `question` default + lean
// prompt + schema lock, mistral seals baseline + recurring reliably.
// Complex scenarios (5-fill, emergency, misplacement) remain flaky —
// real Phase 20+ problem (larger model or constrained decoding needed).
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
const DRAFT_MODEL = "qwen2.5:latest"; // artifact generation; short outputs
// T3 overview tier. Called sparingly — NOT per tool call. Two insertion
// points: (B) mid-scenario checkpoint after every misplacement event and
// every N events, and (A) cross-day lesson after all events complete.
// gpt-oss:20b is a thinking model: it spends tokens in a hidden reasoning
// block before emitting `response`. Budget accordingly — never under 400.
const OVERVIEW_CLOUD = process.env.LH_OVERVIEW_CLOUD === "1";
const OVERVIEW_MODEL = process.env.LH_OVERVIEW_MODEL ?? (OVERVIEW_CLOUD ? "gpt-oss:120b" : "gpt-oss:20b");
const T3_CHECKPOINT_EVERY = Number(process.env.LH_T3_CHECKPOINT_EVERY ?? 3);
const T3_DISABLED = process.env.LH_T3_DISABLE === "1";
// Dispatcher: route T3 calls to local sidecar or Ollama Cloud depending
// on the LH_OVERVIEW_CLOUD flag. Hot-path T1/T2 always stay local.
async function overviewGenerate(prompt: string, opts: { temperature?: number; max_tokens?: number } = {}): Promise<string> {
if (OVERVIEW_CLOUD) return generateCloud(OVERVIEW_MODEL, prompt, opts);
return generate(OVERVIEW_MODEL, prompt, opts);
}
const MAX_TURNS = 14;
const MAX_CONSECUTIVE_DRIFTS = 3;
const WORKERS_INDEX = "workers_500k_v1";
const WORKERS_DATASET = "workers_500k";
// =================== Event + scenario types ===================
type EventKind = "baseline_fill" | "recurring" | "expansion" | "emergency" | "misplacement";
interface FillEvent {
kind: EventKind;
at: string; // display label like "08:00"
role: string;
count: number;
city: string;
state: string;
shift_start?: string; // "08:00 AM" for SMS/email drafts
scenario_note?: string; // extra context the agents should know
deadline?: string; // emergency events carry this, shown to reviewer
exclude_worker_ids?: string[]; // misplacement: the lost worker
replaces_event?: string; // misplacement back-ref for reporting
}
interface ScenarioSpec {
client: string;
date: string;
events: FillEvent[];
}
interface EventResult {
event: FillEvent;
ok: boolean;
fills: Fill[];
turns: number;
duration_secs: number;
error?: string;
gap_signals: string[]; // pulled into the cross-event gap report
sources_first_score?: number;
sources_last_score?: number;
pool_size?: number; // sql_matches from the first hybrid_search
playbook_citations?: string[];
discovered_pattern?: string; // Path 2 meta-index snapshot per event
}
interface RosterEntry {
worker_id: string;
name: string;
booked_for: string; // event at-label
role: string;
city: string;
state: string;
status: "confirmed" | "no_show" | "rebooked_elsewhere";
}
interface ScenarioContext {
spec: ScenarioSpec;
out_dir: string;
roster: RosterEntry[];
results: EventResult[];
gap_signals: Array<{ event: string; category: string; detail: string }>;
}
// =================== Default scenario ===================
const DEFAULT_SCENARIO: ScenarioSpec = {
client: "Riverfront Steel",
date: "2026-04-21",
events: [
{
kind: "baseline_fill",
at: "08:00",
role: "Warehouse Associate",
count: 3,
city: "Toledo",
state: "OH",
shift_start: "08:00 AM",
scenario_note: "Regular Monday morning shift, 8-hour.",
},
{
kind: "recurring",
at: "10:30",
role: "Machine Operator",
count: 2,
city: "Toledo",
state: "OH",
shift_start: "11:00 AM",
scenario_note: "Recurring Tuesday/Thursday slot — prior workers may still be available.",
},
{
kind: "expansion",
at: "12:15",
role: "Forklift Operator",
count: 5,
city: "Toledo",
state: "OH",
shift_start: "01:00 PM",
scenario_note: "New warehouse location opening, five-worker team needed.",
},
{
kind: "emergency",
at: "14:00",
role: "Loader",
count: 4,
city: "Toledo",
state: "OH",
shift_start: "04:00 PM same day",
deadline: "16:00",
scenario_note: "Walkoff incident — replacement crew needed by 16:00 sharp.",
},
{
kind: "misplacement",
at: "15:45",
role: "Warehouse Associate",
count: 1,
city: "Toledo",
state: "OH",
shift_start: "remainder of 08:00 shift",
scenario_note: "One worker from the 08:00 fill didn't show; rebuild the gap.",
replaces_event: "08:00",
},
],
};
// =================== Low-level helpers shared across events ===================
async function httpJson<T>(url: string, body?: any): Promise<T> {
const res = await fetch(url, {
method: body ? "POST" : "GET",
headers: { "Content-Type": "application/json" },
body: body ? JSON.stringify(body) : undefined,
});
if (!res.ok) throw new Error(`${res.status} ${await res.text()}`);
return (await res.json()) as T;
}
function fmt(e: LogEntry): string {
const tag = ` [t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}]`;
const c = e.content ?? {};
const trim = (s: any, n: number) => String(s ?? "").slice(0, n);
if (e.kind === "tool_call") return `${tag} ${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 60)}) — ${trim(c.rationale, 40)}`;
if (e.kind === "tool_result") {
if (c.error) return `${tag} ERROR ${c.error}`;
const rows = c?.rows?.length ?? c?.sources?.length ?? undefined;
return `${tag} ${rows !== undefined ? `rows=${rows}` : JSON.stringify(c).slice(0, 60)}`;
}
if (e.kind === "critique") return `${tag} verdict=${c.verdict}${trim(c.notes, 50)}`;
if (e.kind === "propose_done") return `${tag} ${c.fills?.length ?? 0} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`;
if (e.kind === "consensus_done") return `${tag}`;
if (e.kind === "plan") return `${tag} ${c.steps?.length ?? 0} steps`;
if (e.kind === "error") return `${tag} ${c.message ?? c}`;
return `${tag} ${JSON.stringify(c).slice(0, 70)}`;
}
async function executeToolCall(name: string, args: Record<string, any>): Promise<any> {
if (name === "hybrid_search") {
const { sql_filter, index_name, k } = args;
// `question` is strictly required by /vectors/hybrid but local models
// intermittently drop it. Derive a sensible default from sql_filter
// so a missing `question` doesn't waste turns.
const question = args.question ?? "qualified available workers";
if (!sql_filter || !index_name) {
throw new Error(`hybrid_search needs sql_filter + index_name, got ${JSON.stringify(args)}`);
}
// Every fill event uses the playbook_memory boost — that's the point
// of the run-as-a-whole: earlier events seed later ones.
return httpJson(`${GATEWAY}/vectors/hybrid`, {
sql_filter, question, index_name,
top_k: k ?? 10, generate: false,
use_playbook_memory: true,
// 2026-04-20 — bumped 10 → 100 to match server default change. At
// this memory size the semantic similarities cluster narrowly
// (0.55-0.67) and k=10 silently misses geo-matched playbooks.
playbook_memory_k: 100,
});
}
if (name === "sql") {
const { query } = args;
if (!query || typeof query !== "string") throw new Error(`sql needs query string`);
if (!/^\s*SELECT/i.test(query)) throw new Error(`sql allows SELECT only`);
return sqlQuery(query);
}
return callTool(name, args);
}
// =================== Core fill loop — one event, one consensus ===================
interface AgentFillOutcome {
fills: Fill[];
approach: string;
turns: number;
duration_secs: number;
log: LogEntry[];
first_sql_matches?: number;
first_pool_first_score?: number;
first_pool_last_score?: number;
playbook_citations: string[];
}
async function runAgentFill(
task: TaskSpec,
extra_guidance: string,
exclude_worker_ids: string[],
): Promise<AgentFillOutcome> {
const t0 = Date.now();
const log: LogEntry[] = [];
let turn = 0;
let consecutiveDrifts = 0;
let sealed: { fills: Fill[]; approach: string } | null = null;
let first_sql_matches: number | undefined;
let first_pool_first: number | undefined;
let first_pool_last: number | undefined;
const playbook_citations = new Set<string>();
const append = (e: Omit<LogEntry, "at">): LogEntry => {
const full: LogEntry = { ...e, at: new Date().toISOString() };
log.push(full);
console.log(fmt(full));
return full;
};
// Build executor prompt with the scenario-specific guidance + exclusions
// injected as an extra block. Reuses the base prompt so drift detection
// and output-shape rules are unchanged.
const withExtras = (base: string): string => {
let addon = "";
if (extra_guidance) addon += `\n\nEVENT-SPECIFIC GUIDANCE:\n${extra_guidance}`;
if (exclude_worker_ids.length > 0) {
addon += `\n\nEXCLUDE these workers (already booked / unavailable today): ${exclude_worker_ids.join(", ")}\nIf your tool results include them, skip them — never propose them.`;
}
return base + addon;
};
while (turn < MAX_TURNS && !sealed) {
turn += 1;
const execRaw = await generate(
EXECUTOR_MODEL,
withExtras(executorPrompt(task, log)),
{ temperature: 0.2, max_tokens: 600 },
);
let execAction: Action;
try {
execAction = parseAction(execRaw, "executor");
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "error",
content: { message: (e as Error).message, raw: execRaw.slice(0, 300) } });
throw e;
}
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: execAction.kind as any, content: execAction });
if (execAction.kind === "tool_call") {
try {
const result = await executeToolCall(execAction.tool, execAction.args);
// Filter tool results to enforce the exclusion list — defense in
// depth since the prompt alone isn't enough for weak models.
const filtered = maskExclusions(result, exclude_worker_ids);
// Capture the first hybrid_search pool stats for gap detection.
if (execAction.tool === "hybrid_search" && first_sql_matches === undefined) {
first_sql_matches = (filtered as any).sql_matches;
const sources = (filtered as any).sources ?? [];
if (sources.length > 0) {
first_pool_first = sources[0].score;
first_pool_last = sources[sources.length - 1].score;
}
}
const trimmed = trimResult(filtered);
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: "tool_result", content: trimmed });
// Accumulate playbook citations from any hybrid result that
// carried them — the scenario-level report needs them.
if (Array.isArray((filtered as any).sources)) {
for (const s of (filtered as any).sources) {
for (const c of s.playbook_citations ?? []) {
playbook_citations.add(c);
}
}
}
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result",
content: { error: (e as Error).message, tool: execAction.tool } });
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`aborted — ${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors`);
}
}
}
const revRaw = await generate(
REVIEWER_MODEL,
withExtras(reviewerPrompt(task, log)),
{ temperature: 0.1, max_tokens: 400 },
);
let revAction: Action;
try {
revAction = parseAction(revRaw, "reviewer");
} catch (e) {
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "error",
content: { message: (e as Error).message, raw: revRaw.slice(0, 300) } });
throw e;
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL,
kind: "critique", content: revAction });
if (revAction.kind !== "critique") throw new Error(`reviewer emitted non-critique: ${revAction.kind}`);
if (revAction.verdict === "drift") {
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`aborted — ${MAX_CONSECUTIVE_DRIFTS} consecutive drift flags`);
}
} else {
consecutiveDrifts = 0;
}
if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") {
if (execAction.fills.length !== task.target_count) {
throw new Error(`consensus malformed — ${execAction.fills.length} fills vs target ${task.target_count}`);
}
// Enforce exclusion at seal time too, in case the models ignored
// both prompt + tool-result filtering.
for (const f of execAction.fills) {
if (exclude_worker_ids.includes(f.candidate_id)) {
throw new Error(`consensus proposed excluded worker ${f.candidate_id}`);
}
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done",
content: { fills: execAction.fills } });
sealed = { fills: execAction.fills, approach: execAction.rationale ?? "multi-agent hybrid" };
}
}
if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns`);
return {
fills: sealed.fills,
approach: sealed.approach,
turns: turn,
duration_secs: (Date.now() - t0) / 1000,
log,
first_sql_matches,
first_pool_first_score: first_pool_first,
first_pool_last_score: first_pool_last,
playbook_citations: Array.from(playbook_citations),
};
}
function maskExclusions(result: any, exclude: string[]): any {
if (exclude.length === 0) return result;
if (Array.isArray(result.sources)) {
return { ...result, sources: result.sources.filter((s: any) => !exclude.includes(s.doc_id)) };
}
if (Array.isArray(result.rows)) {
return { ...result, rows: result.rows.filter((r: any) => {
const id = r.worker_id ?? r.doc_id;
return id === undefined || !exclude.includes(String(id));
}) };
}
return result;
}
function trimResult(r: any): any {
if (r && Array.isArray(r.sources)) {
return { ...r, sources: r.sources.slice(0, 20), _trimmed: r.sources.length > 20 ? `${r.sources.length - 20} more` : undefined };
}
if (r && Array.isArray(r.rows)) {
return { ...r, rows: r.rows.slice(0, 20), _trimmed: r.rows.length > 20 ? `${r.rows.length - 20} more` : undefined };
}
return r;
}
// =================== Per-event guidance strings ===================
function guidanceFor(event: FillEvent, ctx: ScenarioContext): string {
// HARD SCHEMA GUARD: prior runs of this scenario had mistral invent
// column names from narrative guidance ("shift", "recurring",
// "expansion") and write SQL filters against them. Lock the schema
// explicitly so the executor has no excuse. Also pin `availability`
// and `reliability` as DOUBLE casts since their text-storage causes
// type_coercion errors otherwise.
const schemaLock = `
SCHEMA ENFORCEMENT (CRITICAL):
The ONLY columns in workers_500k usable in sql_filter are:
worker_id, name, role, email, phone, city, state, zip,
skills, certifications, archetype, reliability, responsiveness,
engagement, communications, compliance, availability, resume_text.
Narrative words like "shift", "recurring", "expansion", "emergency",
"morning", "priority" are NOT columns. DO NOT invent columns.
Numeric filters need CAST: CAST(availability AS DOUBLE) > 0.5 and
CAST(reliability AS DOUBLE) > 0.7.`;
const base = (() => {
switch (event.kind) {
case "baseline_fill":
return `Standard fill. Client ${ctx.spec.client}. Rank by semantic match; require CAST(availability AS DOUBLE) > 0.5.`;
case "recurring":
return `Recurring slot — prefer workers with past playbook citations (visible on hybrid sources). Require CAST(availability AS DOUBLE) > 0.5.`;
case "expansion":
return `New-location fill, ${event.count} workers at once. Require CAST(availability AS DOUBLE) > 0.5 AND CAST(reliability AS DOUBLE) > 0.75.`;
case "emergency":
return `Emergency replacement needed ASAP. Require CAST(availability AS DOUBLE) > 0.7. A good-enough available worker beats a perfect unavailable one.`;
case "misplacement":
return `Refill for a no-show. Do NOT propose anyone on the EXCLUDE list. Require CAST(availability AS DOUBLE) > 0.5.`;
}
})();
return `${schemaLock}\n\nEVENT FOCUS:\n${base}`;
}
// =================== Artifact generation ===================
interface ArtifactBundle {
sms: string;
email: string;
}
// One Ollama call per event for SMS (to the filled workers) + one for
// the client email. Short outputs, low temperature — these are drafts,
// not creative writing.
async function generateArtifacts(event: FillEvent, outcome: AgentFillOutcome, ctx: ScenarioContext): Promise<ArtifactBundle> {
const smsPrompt = `Generate short, friendly, professional SMS messages to confirm a shift for each worker. ONE message per worker. Format as:
TO: {Name}
{message body under 180 chars}
---
Details:
- Client: ${ctx.spec.client}
- Role: ${event.role}
- Location: ${event.city}, ${event.state}
- Shift starts: ${event.shift_start ?? "TBD"}
- Scenario: ${event.scenario_note ?? ""}
Workers to message:
${outcome.fills.map(f => `- ${f.name} (id ${f.candidate_id})`).join("\n")}
Respond with only the message blocks, separated by "---". No commentary.`;
const emailPrompt = `Generate a short professional email confirmation to the staffing client.
TO: staffing@${ctx.spec.client.toLowerCase().replace(/ /g, "")}.example
FROM: dispatch@lakehouse.example
SUBJECT: (3-word subject)
Body (4-6 lines max). Be specific about:
- Number of workers filled (${outcome.fills.length} of ${event.count})
- Roles: ${event.role}
- Names filled
- Shift start: ${event.shift_start ?? "TBD"}
- Any scenario flag: ${event.scenario_note ?? "(none)"}
Workers:
${outcome.fills.map(f => `- ${f.name} (${f.reason.slice(0, 60)})`).join("\n")}
Respond with only the email. No commentary.`;
const [sms, email] = await Promise.all([
generate(DRAFT_MODEL, smsPrompt, { temperature: 0.3, max_tokens: 500 }),
generate(DRAFT_MODEL, emailPrompt, { temperature: 0.3, max_tokens: 400 }),
]);
return { sms: sms.trim(), email: email.trim() };
}
// =================== Per-event runner ===================
async function runEvent(event: FillEvent, ctx: ScenarioContext): Promise<EventResult> {
console.log(`\n════════ ${event.at}${event.kind.toUpperCase()}: fill ${event.count}× ${event.role} in ${event.city}, ${event.state} ════════`);
const t0 = Date.now();
// Build the task spec the agent loop expects.
const task: TaskSpec = {
id: `${ctx.spec.date}-${event.at.replace(":", "")}-${event.kind}`,
operation: `fill: ${event.role} x${event.count} in ${event.city}, ${event.state}`,
target_role: event.role,
target_count: event.count,
target_city: event.city,
target_state: event.state,
approach_hint: `hybrid search against ${WORKERS_INDEX} for ${event.kind}`,
};
// Exclusion set: everyone already in today's roster + any explicit
// exclusions from the event spec.
const excludeIds = [
...ctx.roster
.filter(r => r.status === "confirmed")
.map(r => r.worker_id),
...(event.exclude_worker_ids ?? []),
];
const gap_signals: string[] = [];
let outcome: AgentFillOutcome;
try {
outcome = await runAgentFill(task, guidanceFor(event, ctx), excludeIds);
} catch (e) {
return {
event,
ok: false,
fills: [],
turns: 0,
duration_secs: (Date.now() - t0) / 1000,
error: (e as Error).message,
gap_signals: [`drift_or_tool: ${(e as Error).message}`],
};
}
// Resolve worker_ids via SQL so the roster has stable IDs (models
// sometimes return names-only). Best-effort — if name lookup finds
// zero or many matches, we flag a gap.
const resolved = await resolveWorkerIds(outcome.fills, event);
// Roster double-book check.
for (const r of resolved) {
const conflict = ctx.roster.find(e => e.worker_id === r.worker_id && e.status === "confirmed");
if (conflict) {
gap_signals.push(`double_book: ${r.worker_id} ${r.name} already booked for ${conflict.booked_for}`);
}
ctx.roster.push({
worker_id: r.worker_id,
name: r.name,
booked_for: event.at,
role: event.role,
city: event.city,
state: event.state,
status: "confirmed",
});
}
// Pool-size signal (Gap 1 — supply).
const supply_threshold = event.count * 3;
if ((outcome.first_sql_matches ?? 0) < supply_threshold) {
gap_signals.push(
`supply: only ${outcome.first_sql_matches} candidates for ${event.count}× ${event.role} in ${event.city} (< ${supply_threshold}, our 3× comfort margin)`
);
}
// Score-spread signal (Gap 2 — embedding).
const spread = (outcome.first_pool_first_score ?? 0) - (outcome.first_pool_last_score ?? 0);
if (spread > 0 && spread < 0.02) {
gap_signals.push(
`embedding: top-K score spread ${spread.toFixed(3)} < 0.02 — model struggles to differentiate`
);
}
// Generate artifacts (SMS + email) — fail-soft; artifacts are cosmetic
// relative to the consensus itself.
let bundle: ArtifactBundle | null = null;
try {
bundle = await generateArtifacts(event, { ...outcome, fills: resolved }, ctx);
await appendFile(join(ctx.out_dir, "sms.md"),
`\n## ${event.at} ${event.kind}${event.role} x${event.count} in ${event.city}, ${event.state}\n\n${bundle.sms}\n`);
await appendFile(join(ctx.out_dir, "emails.md"),
`\n## ${event.at} ${event.kind}${event.role} x${event.count}\n\n${bundle.email}\n`);
} catch (e) {
gap_signals.push(`artifact: ${(e as Error).message}`);
}
// Meta-index (Path 2) — fetch patterns the system discovered across
// similar past playbooks. MUST come before dispatch log write because
// dispatch carries this field. Fail-soft — patterns are observational.
let discovered_pattern: string | undefined;
try {
const patterns = await httpJson<any>(`${GATEWAY}/vectors/playbook_memory/patterns`, {
query: `${event.role} in ${event.city}, ${event.state}`,
top_k_playbooks: 25,
min_trait_frequency: 0.4,
});
discovered_pattern = patterns?.discovered_pattern;
} catch { /* patterns are observational */ }
// Dispatch log (structured).
await appendFile(join(ctx.out_dir, "dispatch.jsonl"),
JSON.stringify({
at: event.at,
kind: event.kind,
operation: task.operation,
fills: resolved,
turns: outcome.turns,
duration_secs: outcome.duration_secs,
pool_size: outcome.first_sql_matches,
playbook_citations: outcome.playbook_citations,
discovered_pattern,
}) + "\n");
// Always seed playbook_memory after a sealed fill — keep the learning
// loop tight across the whole day so recurring/misplacement events
// later in the run benefit from earlier ones.
//
// 2026-04-20 — canonical SHORT seed text. Verbose LLM rationales dilute
// the embedding and drop cosine similarity below the 0.05 threshold,
// silently killing the boost. Keep operation+approach+context terse.
try {
await httpJson(`${GATEWAY}/vectors/playbook_memory/seed`, {
operation: task.operation,
approach: `${event.kind} fill via hybrid search`,
context: `${event.role} fill in ${event.city}, ${event.state}`,
endorsed_names: resolved.map(r => r.name),
append: true,
});
} catch (e) {
gap_signals.push(`write_through: ${(e as Error).message}`);
}
// After a misplacement event, record the lost worker's failure so
// future searches for this city+role dampen their boost. Without this
// Path 1 negative signal the no-shower keeps getting lifted.
if (event.kind === "misplacement" && (event.exclude_worker_ids?.length ?? 0) > 0) {
const lost = ctx.roster.find(r => r.status === "no_show");
if (lost) {
try {
await httpJson(`${GATEWAY}/vectors/playbook_memory/mark_failed`, {
operation: `fill: ${lost.role} x1 in ${lost.city}, ${lost.state}`,
failed_names: [lost.name],
reason: `no-show from ${lost.booked_for} shift`,
});
} catch (e) {
gap_signals.push(`mark_failed: ${(e as Error).message}`);
}
}
}
// (discovered_pattern was computed + written above, before dispatch.jsonl)
return {
event,
ok: true,
fills: outcome.fills,
turns: outcome.turns,
duration_secs: outcome.duration_secs,
gap_signals,
sources_first_score: outcome.first_pool_first_score,
sources_last_score: outcome.first_pool_last_score,
pool_size: outcome.first_sql_matches,
playbook_citations: outcome.playbook_citations,
discovered_pattern,
};
}
// =================== Worker ID resolution ===================
// Models emit candidate_ids or names in propose_done. Some return the
// W500K-XXX doc_id, others just the name, others a random tag. Resolve
// to canonical (worker_id, name) via SQL so the roster is reliable.
async function resolveWorkerIds(fills: Fill[], event: FillEvent): Promise<Fill[]> {
const resolved: Fill[] = [];
for (const f of fills) {
// Case 1: candidate_id looks like W500K-NNN — accept as-is.
if (/^W500K-\d+$/.test(f.candidate_id)) {
resolved.push(f);
continue;
}
// Case 2: candidate_id is a bare integer — promote to W500K-N.
if (/^\d+$/.test(f.candidate_id)) {
resolved.push({ ...f, candidate_id: `W500K-${f.candidate_id}` });
continue;
}
// Case 3: look up by (name, city, state). Take the first match.
const q = `SELECT worker_id FROM ${WORKERS_DATASET} WHERE name = '${f.name.replace(/'/g, "''")}' AND city = '${event.city.replace(/'/g, "''")}' AND state = '${event.state.replace(/'/g, "''")}' LIMIT 1`;
try {
const r = await sqlQuery(q);
if (r.rows && r.rows.length > 0) {
resolved.push({ ...f, candidate_id: `W500K-${r.rows[0].worker_id}` });
} else {
// No match — keep the fill but leave candidate_id as-is; the
// gap report will flag it.
resolved.push(f);
}
} catch {
resolved.push(f);
}
}
return resolved;
}
// =================== T3 overview tier ===================
// Called sparingly so reasoning overhead stays amortized.
// (B) Checkpoint — after every misplacement AND every N-th event.
// (A) Cross-day lesson — once at end of scenario.
// Results land in `checkpoints.jsonl` and `lesson.md`, and the lesson
// seeds playbook_memory under operation "cross-day-lesson-{date}" so
// future scenarios can surface it on similar setups.
interface OverviewCheckpoint {
after_event: string; // event.at label
event_kind: EventKind;
ok: boolean;
model: string;
duration_secs: number;
hint: string; // T3's "what to do differently next time"
risk: string; // T3's named risk flag
}
async function runOverviewCheckpoint(
event: FillEvent,
result: EventResult,
prior: EventResult[],
): Promise<OverviewCheckpoint | null> {
if (T3_DISABLED) return null;
const start = Date.now();
const priorSummary = prior.slice(-3).map(p =>
`- ${p.event.at} ${p.event.kind} ${p.event.role}×${p.event.count}${p.ok ? p.fills.length + "/" + p.event.count + " filled" : "FAIL"}; pool=${p.pool_size ?? "?"}; cites=${p.playbook_citations?.length ?? 0}`
).join("\n");
const thisOne = `This event: ${event.at} ${event.kind} ${event.role}×${event.count} in ${event.city}, ${event.state}. `
+ `Outcome: ${result.ok ? "filled " + result.fills.length + "/" + event.count : "FAILED: " + (result.error ?? "unknown")}. `
+ `Pool size: ${result.pool_size ?? "n/a"}. Turns: ${result.turns}. Playbook citations: ${result.playbook_citations?.length ?? 0}. `
+ `Gap signals: ${result.gap_signals.join("; ") || "none"}.`;
const prompt = `You are the overview reviewer for a staffing coordinator agent system. A mid-day checkpoint has been triggered.
Recent events (most recent last):
${priorSummary || "(no prior events)"}
${thisOne}
Your job: emit ONE risk flag (≤6 words) and ONE actionable hint (≤25 words) for the NEXT event. Be concrete: name the role, city, or worker class if relevant. Do not restate what happened. Think step by step, then output strictly as:
RISK: <flag>
HINT: <hint>`;
let text = "";
try {
text = await overviewGenerate(prompt, { temperature: 0.2, max_tokens: 600 });
} catch (e) {
return {
after_event: event.at,
event_kind: event.kind,
ok: false,
model: OVERVIEW_MODEL,
duration_secs: (Date.now() - start) / 1000,
hint: "(T3 unavailable)",
risk: (e as Error).message.slice(0, 80),
};
}
const riskMatch = text.match(/RISK:\s*(.+)/i);
const hintMatch = text.match(/HINT:\s*(.+)/i);
return {
after_event: event.at,
event_kind: event.kind,
ok: Boolean(riskMatch && hintMatch),
model: OVERVIEW_MODEL,
duration_secs: (Date.now() - start) / 1000,
risk: (riskMatch?.[1] ?? "(unparsed)").trim().slice(0, 120),
hint: (hintMatch?.[1] ?? text).trim().slice(0, 400),
};
}
async function runCrossDayLesson(ctx: ScenarioContext, checkpoints: OverviewCheckpoint[]): Promise<string | null> {
if (T3_DISABLED) return null;
const eventDigest = ctx.results.map(r =>
`- ${r.event.at} ${r.event.kind} ${r.event.role}×${r.event.count} ${r.event.city},${r.event.state}${r.ok ? r.fills.length + " filled" : "FAIL"}; pool=${r.pool_size ?? "?"}; turns=${r.turns}; cites=${r.playbook_citations?.length ?? 0}; gaps=${r.gap_signals.length}`
).join("\n");
const checkpointDigest = checkpoints.length > 0
? checkpoints.map(c => `- after ${c.after_event} (${c.event_kind}): risk="${c.risk}" hint="${c.hint}"`).join("\n")
: "(no mid-day checkpoints)";
const prompt = `You are the end-of-day lesson writer for a staffing coordinator agent system. The day is done. Distill it.
Client: ${ctx.spec.client} Date: ${ctx.spec.date}
Events that ran:
${eventDigest}
Mid-day checkpoints:
${checkpointDigest}
Your job: write ONE actionable lesson for future runs that face similar setups. Target audience: the agent tomorrow. Keep the lesson to 3-5 sentences. No filler, no restating. Think step by step about what pattern repeated, what to pre-fetch, or what to avoid — then write the lesson as plain prose.
LESSON:`;
try {
const text = await overviewGenerate(prompt, { temperature: 0.2, max_tokens: 900 });
const m = text.match(/LESSON:\s*([\s\S]+)/i);
return (m ? m[1] : text).trim();
} catch (e) {
return `(T3 lesson unavailable: ${(e as Error).message})`;
}
}
// =================== EOD gap report ===================
async function writeRetrospective(ctx: ScenarioContext): Promise<void> {
const lines: string[] = [];
lines.push(`# Scenario retrospective — ${ctx.spec.client}, ${ctx.spec.date}`);
lines.push("");
lines.push(`Executor: \`${EXECUTOR_MODEL}\` Reviewer: \`${REVIEWER_MODEL}\` Draft: \`${DRAFT_MODEL}\` Overview(T3): \`${T3_DISABLED ? "disabled" : OVERVIEW_MODEL}\``);
lines.push("");
// --- Per-event summary ---
lines.push("## Events");
lines.push("");
lines.push("| At | Kind | Role / Count | Pool | Fills | Turns | Dur(s) | Cites | Gaps |");
lines.push("|---|---|---|---|---|---|---|---|---|");
for (const r of ctx.results) {
const status = r.ok ? "✓" : "✗";
lines.push(
`| ${r.event.at} | ${r.event.kind} | ${r.event.role} × ${r.event.count} | ${r.pool_size ?? "-"} | ${status} ${r.fills.length} | ${r.turns} | ${r.duration_secs.toFixed(1)} | ${r.playbook_citations?.length ?? 0} | ${r.gap_signals.length} |`
);
}
lines.push("");
// --- Roster ---
lines.push("## Final roster");
lines.push("");
lines.push("| Worker | Booked | Role | City, ST | Status |");
lines.push("|---|---|---|---|---|");
for (const e of ctx.roster) {
lines.push(`| ${e.worker_id} ${e.name} | ${e.booked_for} | ${e.role} | ${e.city}, ${e.state} | ${e.status} |`);
}
lines.push("");
// --- Gap analysis by category ---
const bycat: Record<string, string[]> = {};
for (const g of ctx.gap_signals) {
if (!bycat[g.category]) bycat[g.category] = [];
bycat[g.category].push(`**${g.event}** — ${g.detail}`);
}
// Add cross-event categories computed here:
// Gap 3 — fairness (Gini-lite on roster)
const bookedIds = ctx.roster.filter(r => r.status === "confirmed").map(r => r.worker_id);
const counts = new Map<string, number>();
for (const id of bookedIds) counts.set(id, (counts.get(id) ?? 0) + 1);
const multis = [...counts.entries()].filter(([_, n]) => n > 1);
if (multis.length > 0) {
bycat["fairness"] = bycat["fairness"] ?? [];
for (const [id, n] of multis) {
const name = ctx.roster.find(r => r.worker_id === id)?.name ?? id;
bycat["fairness"].push(`_cross-event_ — ${name} (${id}) booked ${n} times today`);
}
}
// Gap 5 — tool errors already captured per-event via gap_signals.
// Gap 6 — write-through coverage: compare # events vs # new playbook_memory entries.
try {
const stats = await httpJson<any>(`${GATEWAY}/vectors/playbook_memory/stats`);
bycat["write_through_audit"] = bycat["write_through_audit"] ?? [];
bycat["write_through_audit"].push(`_post-run_ — playbook_memory has ${stats.entries} entries (ran ${ctx.results.length} events, expected ≥ ${ctx.results.filter(r => r.ok).length} new entries from this run)`);
} catch { /* non-fatal */ }
lines.push("## Gap signals");
lines.push("");
if (Object.keys(bycat).length === 0) {
lines.push("_None surfaced — either everything worked or detection is under-tuned._");
} else {
for (const [cat, items] of Object.entries(bycat)) {
lines.push(`### ${cat}`);
for (const item of items) lines.push(`- ${item}`);
lines.push("");
}
}
// --- Workers-touched audit (don't leave anyone out) ---
// Pull every worker that surfaced as a hit across all 5 events — booked
// or excluded or rejected — so we can show the full population the
// system considered, not just the ones that made the cut. J's ask:
// "iterations and decisions that are made don't leave anyone out."
const touched = new Map<string, { name: string; events: string[]; outcome: string }>();
for (const r of ctx.results) {
for (const f of r.fills) {
const key = f.candidate_id;
if (!touched.has(key)) touched.set(key, { name: f.name, events: [], outcome: "booked" });
touched.get(key)!.events.push(`${r.event.at} ${r.event.kind}`);
}
}
for (const r of ctx.roster) {
if (r.status === "no_show") {
const t = touched.get(r.worker_id);
if (t) t.outcome = "booked-then-no_show";
else touched.set(r.worker_id, { name: r.name, events: [r.booked_for], outcome: "no_show" });
}
}
lines.push("## Workers touched across the week");
lines.push("");
lines.push(`${touched.size} distinct workers made it through to a decision. Every one is accounted for below — `
+ `no-shows flagged, rebookings noted, everyone visible.`);
lines.push("");
lines.push("| Worker ID | Name | Events | Outcome |");
lines.push("|---|---|---|---|");
for (const [id, t] of touched) {
lines.push(`| ${id} | ${t.name} | ${t.events.join(" + ")} | ${t.outcome} |`);
}
lines.push("");
// --- Discovered patterns evolution across events ---
lines.push("## Discovered patterns (meta-index)");
lines.push("");
lines.push("What the system identified across semantically-similar past fills as each event ran:");
lines.push("");
for (const r of ctx.results) {
const dp = (r as any).discovered_pattern ?? "—";
lines.push(`- **${r.event.at} ${r.event.kind}** (${r.event.role}): ${dp}`);
}
lines.push("");
// --- Narrative summary ---
lines.push("## Narrative");
lines.push("");
lines.push(`- ${ctx.results.filter(r => r.ok).length}/${ctx.results.length} events reached consensus.`);
lines.push(`- Final roster: ${ctx.roster.length} bookings across ${new Set(ctx.roster.map(r => r.worker_id)).size} distinct workers.`);
lines.push(`- Workers touched (booked, failed, or otherwise decided): ${touched.size}.`);
const totalCites = ctx.results.reduce((a, r) => a + (r.playbook_citations?.length ?? 0), 0);
lines.push(`- Playbook citations across the day: ${totalCites} (proof the feedback loop fired across events).`);
const droppedEvents = ctx.results.filter(r => !r.ok);
if (droppedEvents.length > 0) {
lines.push(`- Dropped events: ${droppedEvents.map(r => r.event.at + " " + r.event.kind).join(", ")}.`);
}
await writeFile(join(ctx.out_dir, "report.md"), lines.join("\n"));
console.log(`\n✓ report → ${join(ctx.out_dir, "report.md")}`);
}
// =================== Main driver ===================
async function main() {
const specPath = process.argv[2];
const spec: ScenarioSpec = specPath
? JSON.parse(await Bun.file(specPath).text())
: DEFAULT_SCENARIO;
const stamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const out_dir = join("tests/multi-agent/playbooks", `scenario-${stamp}`);
await mkdir(out_dir, { recursive: true });
const ctx: ScenarioContext = {
spec,
out_dir,
roster: [],
results: [],
gap_signals: [],
};
// Initialize output files
await writeFile(join(out_dir, "sms.md"), `# SMS drafts — ${spec.client}, ${spec.date}\n`);
await writeFile(join(out_dir, "emails.md"), `# Client emails — ${spec.client}, ${spec.date}\n`);
await writeFile(join(out_dir, "dispatch.jsonl"), "");
await writeFile(join(out_dir, "checkpoints.jsonl"), "");
const checkpoints: OverviewCheckpoint[] = [];
console.log(`▶ scenario: ${spec.client}, ${spec.date}, ${spec.events.length} events`);
console.log(`▶ models: exec=${EXECUTOR_MODEL} review=${REVIEWER_MODEL} overview=${T3_DISABLED ? "disabled" : OVERVIEW_MODEL + (OVERVIEW_CLOUD ? " (cloud)" : "")}`);
console.log(`▶ out: ${out_dir}\n`);
for (let i = 0; i < spec.events.length; i++) {
const event = spec.events[i];
// Expand misplacement-style exclusions from the current roster: it
// wants to replace a worker from a prior event, so grab everyone
// booked at that at-label and add as exclusions.
if (event.kind === "misplacement" && event.replaces_event) {
const priorBooked = ctx.roster
.filter(r => r.booked_for === event.replaces_event && r.status === "confirmed")
.map(r => r.worker_id);
if (priorBooked.length > 0) {
// Pick one arbitrarily to mark as no_show — in a real system the
// external signal would pick. For the test, first one works.
const lost = priorBooked[0];
const lostEntry = ctx.roster.find(r => r.worker_id === lost);
if (lostEntry) {
lostEntry.status = "no_show";
console.log(` (misplacement: marking ${lost} ${lostEntry.name} as no-show)`);
}
// Exclude all prior bookings so the refill doesn't pick anyone
// already scheduled for today.
event.exclude_worker_ids = priorBooked;
}
}
const result = await runEvent(event, ctx);
ctx.results.push(result);
for (const s of result.gap_signals) {
const [category, ...rest] = s.split(":");
ctx.gap_signals.push({ event: event.at, category: category.trim(), detail: rest.join(":").trim() });
}
// Option B — T3 checkpoint after every misplacement, and every N-th event.
const isLast = i === spec.events.length - 1;
const nthHit = T3_CHECKPOINT_EVERY > 0 && ((i + 1) % T3_CHECKPOINT_EVERY === 0);
const shouldCheckpoint = !T3_DISABLED && (event.kind === "misplacement" || nthHit || isLast);
if (shouldCheckpoint) {
const cp = await runOverviewCheckpoint(event, result, ctx.results.slice(0, -1));
if (cp) {
checkpoints.push(cp);
await appendFile(join(out_dir, "checkpoints.jsonl"), JSON.stringify(cp) + "\n");
console.log(` T3 checkpoint (${cp.duration_secs.toFixed(1)}s): risk="${cp.risk}" hint="${cp.hint.slice(0, 80)}${cp.hint.length > 80 ? "…" : ""}"`);
}
}
// Small breather to not hammer Ollama on back-to-back runs.
await new Promise(r => setTimeout(r, 500));
}
// Persist structured state for forensics.
await writeFile(join(out_dir, "roster.json"), JSON.stringify(ctx.roster, null, 2));
await writeFile(join(out_dir, "results.json"), JSON.stringify(ctx.results, null, 2));
// Option A — T3 cross-day lesson. One final call distills the whole run.
// Saved to lesson.md and also seeded into playbook_memory so tomorrow's
// agent can retrieve it on similar setups.
if (!T3_DISABLED) {
console.log(`\n▶ T3 cross-day lesson via ${OVERVIEW_MODEL}`);
const tLesson = Date.now();
const lesson = await runCrossDayLesson(ctx, checkpoints);
const lessonSecs = ((Date.now() - tLesson) / 1000).toFixed(1);
if (lesson) {
await writeFile(
join(out_dir, "lesson.md"),
`# Cross-day lesson — ${ctx.spec.client}, ${ctx.spec.date}\n\n`
+ `_Generated by \`${OVERVIEW_MODEL}\` in ${lessonSecs}s. `
+ `Based on ${ctx.results.length} events + ${checkpoints.length} mid-day checkpoints._\n\n`
+ lesson + "\n"
);
console.log(`✓ lesson (${lessonSecs}s) → ${join(out_dir, "lesson.md")}`);
// Seed the lesson into playbook_memory for future retrieval. Keep
// the embedded `approach` + `context` terse per feedback_phase19_seed_text.md;
// the rich prose lives in lesson.md and a separate `rationale` field.
try {
const kinds = [...new Set(ctx.spec.events.map(e => e.kind))].join("+");
const cities = [...new Set(ctx.spec.events.map(e => e.city))].slice(0, 3).join(",");
await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
operation: `cross-day-lesson-${ctx.spec.date}`,
approach: `${kinds} day in ${cities}`,
context: `${ctx.spec.client} ${ctx.spec.date}`,
rationale: lesson.slice(0, 2000),
endorsed_names: [],
append: true,
}),
});
} catch (e) {
console.log(` (lesson seed skipped: ${(e as Error).message})`);
}
}
}
await writeRetrospective(ctx);
const okCount = ctx.results.filter(r => r.ok).length;
if (okCount < ctx.results.length) {
console.log(`\n⚠ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md for gaps.`);
process.exit(2);
}
console.log(`\n✓ ${okCount}/${ctx.results.length} events succeeded. See ${out_dir}/report.md.`);
process.exit(0);
}
main().catch(e => {
console.error(`\n✗ scenario driver crashed: ${(e as Error).message}`);
console.error((e as Error).stack);
process.exit(1);
});