lakehouse/tests/multi-agent/run_e2e_rated.ts
root 0c4868c191 qwen3.5 executor + continuation primitive + think:false
Three coupled fixes that together turned the Riverfront Steel scenario
from 0/5 (mistral) to 4/5 (qwen3.5) with T3 flagging real staffing
concerns rather than linter advice.

MODEL SWAP
- Executor: mistral → qwen3.5:latest (9.7B, 262K ctx, thinking).
  mistral's decoder emitted malformed JSON on complex SQL filters
  regardless of prompt; J called it — stop using mistral.
- Reviewer: qwen2.5 → qwen3:latest (40K ctx)
- Applied to scenario.ts, orchestrator.ts, network_proving.ts,
  run_e2e_rated.ts

CONTINUATION PRIMITIVE (agent.ts)
- generateContinuable(): empty-response → geometric backoff retry;
  truncated-JSON → continue from partial as scratchpad; bounded by
  budget cap + max_continuations. No more "bump max_tokens until it
  stops truncating" tourniquet.
- generateTreeSplit(): map-reduce for oversized input corpora with
  running scratchpad digest, reduce pass for final synthesis.
- Empty text no longer throws — it's a signal to continuable that
  thinking ate the budget.

think:false FOR HOT PATH
- qwen3.5 burned ~650 tokens of hidden thinking for trivial JSON
  emission. For executor/reviewer/draft: think:false. For T3/T4/T5
  overseers: thinking stays on (that's the point).
- Sidecar generate endpoint accepts `think` bool, passes through to
  Ollama's /api/generate.

VERIFIED OUTCOMES
Riverfront Steel 2026-04-21, qwen3.5+continuable+think:false:
  08:00 baseline_fill  3/3  4 turns
  10:30 recurring      2/2  3 turns (1 playbook citation)
  12:15 expansion      0/5  drift-aborted (5-fill orchestration
                            problem, separate work)
  14:00 emergency      4/4  3 turns (1 citation)
  15:45 misplacement   1/1  3 turns
  → T3 caught Patrick Ross double-booking across events
  → T3 flagged forklift cert drift on the event that failed
  → Cross-day lesson proposed "maintain buffer of ≥3 emergency
    candidates, pre-fetch certs for expansion, booking system
    cross-check" — real staffing advice, not generic linter output

PRD PHASE 21 rewritten to reflect the actual primitive shape (two-
call map-reduce with scratchpad glue) instead of the tourniquet
approach originally documented. Rust port queued for next sprint.

scripts/ab_t3_test.sh: A/B harness that chains B→C→D runs and emits
tests/multi-agent/playbooks/ab_scorecard.json.
2026-04-20 20:19:02 -05:00

401 lines
18 KiB
TypeScript

// Two-agent x two-tasks parallel real-world test with per-playbook rating.
//
// Spawns two independent (executor, reviewer) pairs concurrently, each
// driving a different staffing fill against the live substrate. After
// each pair seals a playbook, verifies the fill against workers_500k,
// confirms the seed reached playbook_memory, and re-runs the same query
// with use_playbook_memory=true to prove the boost fires.
//
// Errors fail fast — any HTTP error, parse error, or rating failure is
// rethrown so bun exits non-zero. Run with:
//
// bun run tests/multi-agent/run_e2e_rated.ts
//
// VRAM note: both pairs call the same two Ollama models (mistral +
// qwen2.5). Ollama queues at the model level, so "parallel" is concurrent
// orchestration, not concurrent inference — the loops interleave on the
// shared models. That's intentional: it stresses the same realistic
// path two staffing coordinators would hit if they both opened the app
// at 8am.
import {
type LogEntry,
type TaskSpec,
type Action,
type Fill,
GATEWAY,
generate,
parseAction,
executorPrompt,
reviewerPrompt,
sqlQuery,
callTool,
} from "./agent.ts";
const EXECUTOR_MODEL = "qwen3.5:latest";
const REVIEWER_MODEL = "qwen3:latest";
const MAX_TURNS = 12;
const MAX_CONSECUTIVE_DRIFTS = 3;
const INDEX_NAME = "workers_500k_v1";
interface RunResult {
task: TaskSpec;
ok: boolean;
turns: number;
duration_secs: number;
fills: Fill[];
log: LogEntry[];
approach: string;
error?: string;
}
// ────────────────────────── orchestrator (function form) ──────────────────────────
async function runOrchestrator(task: TaskSpec, prefix: string): Promise<RunResult> {
const start = Date.now();
const log: LogEntry[] = [];
let turn = 0;
let consecutiveDrifts = 0;
// Track tool errors separately from drift verdicts. Reviewer saying
// "continue" or "approve_done" should NOT reset a streak of malformed
// tool calls — that's a different failure mode (model can't form the
// call) than "executor is on the wrong path" (model is off-topic).
let consecutiveToolErrors = 0;
let sealed: { fills: Fill[]; approach: string } | null = null;
const append = (e: Omit<LogEntry, "at">): LogEntry => {
const full: LogEntry = { ...e, at: new Date().toISOString() };
log.push(full);
console.log(`[${prefix}] [t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}] ${shortContent(e)}`);
return full;
};
try {
while (turn < MAX_TURNS && !sealed) {
turn += 1;
// Executor
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 1200 });
const execAction = parseAction(execRaw, "executor");
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: execAction.kind as any, content: execAction });
if (execAction.kind === "tool_call") {
try {
const result = await executeToolCall(execAction.tool, execAction.args);
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result", content: trimResult(result) });
consecutiveToolErrors = 0;
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result",
content: { error: (e as Error).message, tool: execAction.tool, args: execAction.args } });
consecutiveToolErrors += 1;
if (consecutiveToolErrors >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors — executor can't form a valid call`);
}
}
}
// Reviewer
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 1000 });
const revAction = parseAction(revRaw, "reviewer");
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "critique", content: revAction });
if (revAction.kind !== "critique") throw new Error(`reviewer non-critique: ${revAction.kind}`);
if (revAction.verdict === "drift") {
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) throw new Error(`${MAX_CONSECUTIVE_DRIFTS} consecutive drifts`);
} else consecutiveDrifts = 0;
if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") {
if (execAction.fills.length !== task.target_count) {
throw new Error(`fills=${execAction.fills.length} target=${task.target_count}`);
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done", content: { fills: execAction.fills } });
sealed = { fills: execAction.fills, approach: (execAction as any).rationale ?? "multi-agent → hybrid" };
}
}
if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns`);
// Phase 19 write-through: seed playbook_memory so the next semantically
// similar query benefits from this fill. Mirrors orchestrator.ts. Names
// are the consensus fills' display names — that's what the boost-key
// matcher (city, state, name) will look up against worker chunks.
try {
const seedRes = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: task.operation,
approach: sealed.approach || "multi-agent → hybrid search",
context: task.approach_hint ?? `${task.target_role} fill in ${task.target_city}, ${task.target_state}`,
endorsed_names: sealed.fills.map(f => f.name),
append: true,
}),
});
if (!seedRes.ok) {
console.warn(`[${prefix}] seed warning: ${seedRes.status} ${await seedRes.text()}`);
} else {
const j = await seedRes.json() as any;
console.log(`[${prefix}] ↳ seeded playbook_memory: id=${j.playbook_id} entries=${j.entries_after}`);
}
} catch (e) {
console.warn(`[${prefix}] seed errored: ${(e as Error).message}`);
}
return {
task, ok: true, turns: turn, fills: sealed.fills, approach: sealed.approach,
duration_secs: Math.round((Date.now() - start) / 1000), log,
};
} catch (e) {
return {
task, ok: false, turns: turn, fills: [], approach: "",
duration_secs: Math.round((Date.now() - start) / 1000), log,
error: (e as Error).message,
};
}
}
async function executeToolCall(name: string, args: Record<string, any>): Promise<any> {
if (name === "hybrid_search") {
const { sql_filter, question, index_name, k } = args;
if (!sql_filter || !question || !index_name) throw new Error(`hybrid_search needs sql_filter+question+index_name`);
const r = await fetch(`${GATEWAY}/vectors/hybrid`, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({ sql_filter, question, index_name, top_k: k ?? 10, generate: false, use_playbook_memory: true }),
});
if (!r.ok) throw new Error(`hybrid_search → ${r.status}: ${await r.text()}`);
return r.json();
}
if (name === "sql") {
if (!args.query || typeof args.query !== "string") throw new Error("sql needs query");
if (!/^\s*SELECT/i.test(args.query)) throw new Error("sql allows SELECT only");
return sqlQuery(args.query);
}
return callTool(name, args);
}
function trimResult(r: any): any {
if (r && Array.isArray(r.rows)) return { ...r, rows: r.rows.slice(0, 20) };
if (r && Array.isArray(r.sources)) return { ...r, sources: r.sources.slice(0, 12) };
return r;
}
function shortContent(e: Omit<LogEntry, "at">): string {
const c: any = e.content ?? {};
if (e.kind === "tool_call") return `${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 70)})`;
if (e.kind === "tool_result") {
if (c.error) return `error: ${c.error}`;
if (Array.isArray(c.sources)) return `hybrid sql=${c.sql_matches} reranked=${c.vector_reranked}`;
if (Array.isArray(c.rows)) return `sql ${c.rows.length} rows`;
return JSON.stringify(c).slice(0, 80);
}
if (e.kind === "critique") return `verdict=${c.verdict} ${(c.notes ?? "").slice(0, 60)}`;
if (e.kind === "propose_done") return `${(c.fills ?? []).length} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`;
if (e.kind === "consensus_done") return "✓";
if (e.kind === "plan") return `${(c.steps ?? []).length} steps`;
return JSON.stringify(c).slice(0, 80);
}
// ────────────────────────── playbook rating ──────────────────────────
interface Rating {
geo: number; // 0-2: fills actually in target city/state
authenticity: number; // 0-2: fills' worker_ids exist in workers_500k
persistence: number; // 0-2: playbook_memory entry count grew correctly
boost_firing: number; // 0-3: follow-up query shows non-zero boost
speed: number; // 0-1: completed under 4 min
total: number; // /10
notes: string[];
}
interface MemoryStats { entries: number; total_names_endorsed: number }
async function fetchMemoryStats(): Promise<MemoryStats> {
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/stats`);
if (!r.ok) throw new Error(`stats → ${r.status}`);
return r.json() as Promise<MemoryStats>;
}
// Try to resolve a fill's candidate_id to a workers_500k row. Accepts
// "W500K-7995" (vector doc_id with prefix) and "7995" (raw worker_id).
async function lookupWorker(candidate_id: string): Promise<{ worker_id: number; name: string; city: string; state: string; role: string } | null> {
const numStr = candidate_id.replace(/^W500K-/i, "").replace(/[^\d]/g, "");
if (!numStr) return null;
const num = parseInt(numStr, 10);
if (!Number.isFinite(num)) return null;
const r = await sqlQuery(`SELECT worker_id, name, city, state, role FROM workers_500k WHERE worker_id = ${num} LIMIT 1`);
return (r.rows && r.rows[0]) ?? null;
}
// Re-run a hybrid query that mirrors the contract — proves the freshly
// seeded playbook actually lifts a future search.
async function verifyBoostFires(task: TaskSpec): Promise<{ boostedHits: number; sampleCitations: string[]; topBoost: number }> {
// Mirror the contract's actual geo. The playbook stored (city, state)
// from the operation; if the verify SQL doesn't restrict to the same
// city, the candidate pool may not include the seeded workers and the
// boost has nothing to lift. The contract pattern in production also
// includes city — recruiters fill specific cities, not whole states.
const sql_filter = `role = '${task.target_role.replace(/'/g, "''")}' `
+ `AND state = '${task.target_state}' `
+ `AND city = '${task.target_city.replace(/'/g, "''")}'`;
const r = await fetch(`${GATEWAY}/vectors/hybrid`, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({
index_name: INDEX_NAME, filter_dataset: "workers_500k", id_column: "worker_id",
sql_filter, question: `${task.target_role} in ${task.target_city}, ${task.target_state}`,
top_k: 10, generate: false, use_playbook_memory: true, playbook_memory_k: 15,
}),
});
if (!r.ok) throw new Error(`verify hybrid → ${r.status}: ${await r.text()}`);
const j = (await r.json()) as any;
const sources: any[] = j.sources ?? [];
const boosted = sources.filter(s => (s.playbook_boost ?? 0) > 0);
const cites = boosted.flatMap(s => s.playbook_citations ?? []).slice(0, 5);
const top = sources.reduce((m, s) => Math.max(m, s.playbook_boost ?? 0), 0);
return { boostedHits: boosted.length, sampleCitations: cites, topBoost: top };
}
async function ratePlaybook(
result: RunResult,
statsBefore: MemoryStats,
statsAfter: MemoryStats,
): Promise<Rating> {
const notes: string[] = [];
let geo = 0, authenticity = 0, persistence = 0, boost_firing = 0, speed = 0;
// 1. Geo + authenticity per fill
for (const f of result.fills) {
const w = await lookupWorker(f.candidate_id).catch(() => null);
if (!w) { notes.push(`✗ candidate_id ${f.candidate_id} not in workers_500k`); continue; }
authenticity += 1;
if (w.city.toLowerCase() === result.task.target_city.toLowerCase()
&& w.state === result.task.target_state) {
geo += 1;
} else {
notes.push(`${w.name} (id=${w.worker_id}) is in ${w.city}, ${w.state}, not ${result.task.target_city}, ${result.task.target_state}`);
}
}
geo = Math.min(geo, 2);
authenticity = Math.min(authenticity, 2);
// 2. Persistence
const grew = statsAfter.entries - statsBefore.entries;
if (grew === 1) { persistence = 2; notes.push(`✓ playbook_memory grew by exactly 1`); }
else if (grew >= 1) { persistence = 1; notes.push(`◑ playbook_memory grew by ${grew} (expected 1)`); }
else { notes.push(`✗ playbook_memory did not grow (before=${statsBefore.entries} after=${statsAfter.entries})`); }
// 3. Boost firing — re-run the same query and see if it lifts anything
const v = await verifyBoostFires(result.task).catch(e => { notes.push(`✗ verify hybrid failed: ${(e as Error).message}`); return null; });
if (v) {
if (v.boostedHits >= 2) boost_firing = 3;
else if (v.boostedHits === 1) boost_firing = 2;
else if (v.topBoost > 0) boost_firing = 1;
else boost_firing = 0;
notes.push(`boost re-query: ${v.boostedHits}/10 hits boosted, top=+${v.topBoost.toFixed(3)}, citations=${v.sampleCitations.slice(0, 3).join(",")}`);
}
// 4. Speed
if (result.duration_secs <= 240) speed = 1;
else notes.push(`◑ slow: ${result.duration_secs}s (>240)`);
const total = geo + authenticity + persistence + boost_firing + speed;
return { geo, authenticity, persistence, boost_firing, speed, total, notes };
}
function fmtRating(r: Rating): string {
return `geo=${r.geo}/2 auth=${r.authenticity}/2 persist=${r.persistence}/2 boost=${r.boost_firing}/3 speed=${r.speed}/1 → ${r.total}/10`;
}
// ────────────────────────── main ──────────────────────────
async function main() {
const taskA: TaskSpec = {
id: `e2e-A-${Date.now()}`,
operation: "fill: Welder x2 in Toledo, OH",
target_role: "Welder", target_count: 2, target_city: "Toledo", target_state: "OH",
approach_hint: "hybrid_search against workers_500k_v1 with sql_filter on role+city+state, then sql verify",
};
const taskB: TaskSpec = {
id: `e2e-B-${Date.now()}`,
operation: "fill: Forklift Operator x2 in Nashville, TN",
target_role: "Forklift Operator", target_count: 2, target_city: "Nashville", target_state: "TN",
approach_hint: "hybrid_search against workers_500k_v1 with sql_filter on role+city+state, then sql verify",
};
console.log(`▶ parallel real-world test`);
console.log(` A: ${taskA.operation}`);
console.log(` B: ${taskB.operation}`);
console.log(` models: executor=${EXECUTOR_MODEL} reviewer=${REVIEWER_MODEL}\n`);
const statsBefore = await fetchMemoryStats();
console.log(`▶ playbook_memory before: ${statsBefore.entries} entries, ${statsBefore.total_names_endorsed} endorsed names\n`);
// Run both pairs in parallel. Each is its own (executor, reviewer)
// conversation; they do NOT see each other's logs.
const [resA, resB] = await Promise.all([
runOrchestrator(taskA, "A"),
runOrchestrator(taskB, "B"),
]);
console.log(`\n▶ both orchestrators returned`);
console.log(` A: ok=${resA.ok} turns=${resA.turns} ${resA.duration_secs}s ${resA.error ?? ""}`);
console.log(` B: ok=${resB.ok} turns=${resB.turns} ${resB.duration_secs}s ${resB.error ?? ""}`);
if (!resA.ok && !resB.ok) {
throw new Error(`both orchestrators failed — substrate or models in bad state`);
}
const statsMid = await fetchMemoryStats();
console.log(`\n▶ playbook_memory after both runs: ${statsMid.entries} entries (+${statsMid.entries - statsBefore.entries})\n`);
// Rate each successful playbook. We compute persistence per task by
// splitting the growth — both seeded sequentially-ish, so each should
// contribute 1.
const ratings: Array<{ id: string; ok: boolean; rating?: Rating; error?: string }> = [];
if (resA.ok) {
const beforeForA: MemoryStats = { entries: statsBefore.entries, total_names_endorsed: statsBefore.total_names_endorsed };
const afterForA: MemoryStats = { entries: statsBefore.entries + (resA.fills.length > 0 ? 1 : 0), total_names_endorsed: statsBefore.total_names_endorsed };
// Use real measured numbers when they're unambiguous (only one task succeeded)
const ra = await ratePlaybook(resA, beforeForA, resB.ok ? afterForA : statsMid);
ratings.push({ id: "A", ok: true, rating: ra });
} else ratings.push({ id: "A", ok: false, error: resA.error });
if (resB.ok) {
const beforeForB: MemoryStats = resA.ok
? { entries: statsBefore.entries + 1, total_names_endorsed: statsBefore.total_names_endorsed }
: statsBefore;
const rb = await ratePlaybook(resB, beforeForB, statsMid);
ratings.push({ id: "B", ok: true, rating: rb });
} else ratings.push({ id: "B", ok: false, error: resB.error });
console.log(`\n▶ Per-playbook ratings:\n`);
for (const r of ratings) {
if (!r.ok) {
console.log(` ${r.id}: FAILED — ${r.error}`);
continue;
}
console.log(` ${r.id}: ${fmtRating(r.rating!)}`);
for (const n of r.rating!.notes) console.log(` ${n}`);
}
const totals = ratings.filter(r => r.rating).map(r => r.rating!.total);
if (totals.length === 0) {
throw new Error(`no playbooks rated — both orchestrators failed`);
}
const min = Math.min(...totals);
const avg = totals.reduce((s, t) => s + t, 0) / totals.length;
console.log(`\n▶ Summary: avg=${avg.toFixed(1)}/10 min=${min}/10`);
// Hard gate: any rating below 5 means the loop is broken end-to-end.
if (min < 5) throw new Error(`rating gate failed — min ${min}/10 (need ≥5)`);
console.log(`\n✓ end-to-end real-world test passed`);
process.exit(0);
}
main().catch(e => {
console.error(`\n✗ ${(e as Error).message}`);
if ((e as any).stack) console.error((e as any).stack);
process.exit(1);
});