lakehouse/tests/multi-agent/run_e2e_rated.ts
profit 5b1fcf6d27 Phase 28-36 body of work
Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning):

- Phase 36: embed_semaphore on VectorState (permits=1) serializes
  seed embed calls — prevents sidecar socket collisions under
  concurrent /seed stress load
- Phase 31+: run_stress.ts 6-task diverse stress scaffolding;
  run_e2e_rated.ts + orchestrator.ts tightening
- Catalog dedupe cleanup: 16 duplicate manifests removed; canonical
  candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB ->
  11KB) regenerated post-dedupe; fresh manifests for active datasets
- vectord: harness EvalSet refinements (+181), agent portfolio
  rotation + ingest triggers (+158), autotune + rag adjustments
- catalogd/storaged/ingestd/mcp-server: misc tightening
- docs: Phase 28-36 PRD entries + DECISIONS ADR additions;
  control-plane pivot banner added to top of docs/PRD.md (pointing
  at docs/CONTROL_PLANE_PRD.md which lands in next commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 02:41:15 -05:00

409 lines
18 KiB
TypeScript

// Two-agent x two-tasks parallel real-world test with per-playbook rating.
//
// Spawns two independent (executor, reviewer) pairs concurrently, each
// driving a different staffing fill against the live substrate. After
// each pair seals a playbook, verifies the fill against workers_500k,
// confirms the seed reached playbook_memory, and re-runs the same query
// with use_playbook_memory=true to prove the boost fires.
//
// Errors fail fast — any HTTP error, parse error, or rating failure is
// rethrown so bun exits non-zero. Run with:
//
// bun run tests/multi-agent/run_e2e_rated.ts
//
// VRAM note: both pairs call the same two Ollama models (mistral +
// qwen2.5). Ollama queues at the model level, so "parallel" is concurrent
// orchestration, not concurrent inference — the loops interleave on the
// shared models. That's intentional: it stresses the same realistic
// path two staffing coordinators would hit if they both opened the app
// at 8am.
import {
type LogEntry,
type TaskSpec,
type Action,
type Fill,
GATEWAY,
generate,
parseAction,
executorPrompt,
reviewerPrompt,
sqlQuery,
callTool,
} from "./agent.ts";
const EXECUTOR_MODEL = "qwen3.5:latest";
const REVIEWER_MODEL = "qwen3:latest";
const MAX_TURNS = 12;
const MAX_CONSECUTIVE_DRIFTS = 3;
const INDEX_NAME = "workers_500k_v1";
interface RunResult {
task: TaskSpec;
ok: boolean;
turns: number;
duration_secs: number;
fills: Fill[];
log: LogEntry[];
approach: string;
error?: string;
}
// ────────────────────────── orchestrator (function form) ──────────────────────────
async function runOrchestrator(task: TaskSpec, prefix: string): Promise<RunResult> {
const start = Date.now();
const log: LogEntry[] = [];
let turn = 0;
let consecutiveDrifts = 0;
// Track tool errors separately from drift verdicts. Reviewer saying
// "continue" or "approve_done" should NOT reset a streak of malformed
// tool calls — that's a different failure mode (model can't form the
// call) than "executor is on the wrong path" (model is off-topic).
let consecutiveToolErrors = 0;
let sealed: { fills: Fill[]; approach: string } | null = null;
const append = (e: Omit<LogEntry, "at">): LogEntry => {
const full: LogEntry = { ...e, at: new Date().toISOString() };
log.push(full);
console.log(`[${prefix}] [t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}] ${shortContent(e)}`);
return full;
};
try {
while (turn < MAX_TURNS && !sealed) {
turn += 1;
// Executor
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 1200, think: false });
const execAction = parseAction(execRaw, "executor");
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: execAction.kind as any, content: execAction });
if (execAction.kind === "tool_call") {
try {
const result = await executeToolCall(execAction.tool, execAction.args);
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result", content: trimResult(result) });
consecutiveToolErrors = 0;
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result",
content: { error: (e as Error).message, tool: execAction.tool, args: execAction.args } });
consecutiveToolErrors += 1;
if (consecutiveToolErrors >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors — executor can't form a valid call`);
}
}
}
// Reviewer
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 1000, think: false });
const revAction = parseAction(revRaw, "reviewer");
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "critique", content: revAction });
if (revAction.kind !== "critique") throw new Error(`reviewer non-critique: ${revAction.kind}`);
if (revAction.verdict === "drift") {
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) throw new Error(`${MAX_CONSECUTIVE_DRIFTS} consecutive drifts`);
} else consecutiveDrifts = 0;
if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") {
if (execAction.fills.length !== task.target_count) {
throw new Error(`fills=${execAction.fills.length} target=${task.target_count}`);
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done", content: { fills: execAction.fills } });
sealed = { fills: execAction.fills, approach: (execAction as any).rationale ?? "multi-agent → hybrid" };
}
}
if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns`);
return {
task, ok: true, turns: turn, fills: sealed.fills, approach: sealed.approach,
duration_secs: Math.round((Date.now() - start) / 1000), log,
};
} catch (e) {
return {
task, ok: false, turns: turn, fills: [], approach: "",
duration_secs: Math.round((Date.now() - start) / 1000), log,
error: (e as Error).message,
};
}
}
async function executeToolCall(name: string, args: Record<string, any>): Promise<any> {
if (name === "hybrid_search") {
const { sql_filter, question, index_name, k } = args;
if (!sql_filter || !question || !index_name) throw new Error(`hybrid_search needs sql_filter+question+index_name`);
const r = await fetch(`${GATEWAY}/vectors/hybrid`, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({ sql_filter, question, index_name, top_k: k ?? 10, generate: false, use_playbook_memory: true }),
});
if (!r.ok) throw new Error(`hybrid_search → ${r.status}: ${await r.text()}`);
return r.json();
}
if (name === "sql") {
if (!args.query || typeof args.query !== "string") throw new Error("sql needs query");
if (!/^\s*SELECT/i.test(args.query)) throw new Error("sql allows SELECT only");
return sqlQuery(args.query);
}
return callTool(name, args);
}
function trimResult(r: any): any {
if (r && Array.isArray(r.rows)) return { ...r, rows: r.rows.slice(0, 20) };
if (r && Array.isArray(r.sources)) return { ...r, sources: r.sources.slice(0, 12) };
return r;
}
function shortContent(e: Omit<LogEntry, "at">): string {
const c: any = e.content ?? {};
if (e.kind === "tool_call") return `${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 70)})`;
if (e.kind === "tool_result") {
if (c.error) return `error: ${c.error}`;
if (Array.isArray(c.sources)) return `hybrid sql=${c.sql_matches} reranked=${c.vector_reranked}`;
if (Array.isArray(c.rows)) return `sql ${c.rows.length} rows`;
return JSON.stringify(c).slice(0, 80);
}
if (e.kind === "critique") return `verdict=${c.verdict} ${(c.notes ?? "").slice(0, 60)}`;
if (e.kind === "propose_done") return `${(c.fills ?? []).length} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`;
if (e.kind === "consensus_done") return "✓";
if (e.kind === "plan") return `${(c.steps ?? []).length} steps`;
return JSON.stringify(c).slice(0, 80);
}
// ────────────────────────── playbook rating ──────────────────────────
interface Rating {
geo: number; // 0-2: fills actually in target city/state
authenticity: number; // 0-2: fills' worker_ids exist in workers_500k
persistence: number; // 0-2: playbook_memory entry count grew correctly
boost_firing: number; // 0-3: follow-up query shows non-zero boost
speed: number; // 0-1: completed under 4 min
total: number; // /10
notes: string[];
}
interface MemoryStats { entries: number; total_names_endorsed: number }
async function fetchMemoryStats(): Promise<MemoryStats> {
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/stats`);
if (!r.ok) throw new Error(`stats → ${r.status}`);
return r.json() as Promise<MemoryStats>;
}
// Try to resolve a fill's candidate_id to a workers_500k row. Accepts
// "W500K-7995" (vector doc_id with prefix) and "7995" (raw worker_id).
async function lookupWorker(candidate_id: string): Promise<{ worker_id: number; name: string; city: string; state: string; role: string } | null> {
const numStr = candidate_id.replace(/^W500K-/i, "").replace(/[^\d]/g, "");
if (!numStr) return null;
const num = parseInt(numStr, 10);
if (!Number.isFinite(num)) return null;
const r = await sqlQuery(`SELECT worker_id, name, city, state, role FROM workers_500k WHERE worker_id = ${num} LIMIT 1`);
return (r.rows && r.rows[0]) ?? null;
}
// Re-run a hybrid query that mirrors the contract — proves the freshly
// seeded playbook actually lifts a future search.
async function verifyBoostFires(task: TaskSpec): Promise<{ boostedHits: number; sampleCitations: string[]; topBoost: number }> {
// Mirror the contract's actual geo. The playbook stored (city, state)
// from the operation; if the verify SQL doesn't restrict to the same
// city, the candidate pool may not include the seeded workers and the
// boost has nothing to lift. The contract pattern in production also
// includes city — recruiters fill specific cities, not whole states.
const sql_filter = `role = '${task.target_role.replace(/'/g, "''")}' `
+ `AND state = '${task.target_state}' `
+ `AND city = '${task.target_city.replace(/'/g, "''")}'`;
const r = await fetch(`${GATEWAY}/vectors/hybrid`, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({
index_name: INDEX_NAME, filter_dataset: "workers_500k", id_column: "worker_id",
sql_filter, question: `${task.target_role} in ${task.target_city}, ${task.target_state}`,
top_k: 10, generate: false, use_playbook_memory: true, playbook_memory_k: 15,
}),
});
if (!r.ok) throw new Error(`verify hybrid → ${r.status}: ${await r.text()}`);
const j = (await r.json()) as any;
const sources: any[] = j.sources ?? [];
const boosted = sources.filter(s => (s.playbook_boost ?? 0) > 0);
const cites = boosted.flatMap(s => s.playbook_citations ?? []).slice(0, 5);
const top = sources.reduce((m, s) => Math.max(m, s.playbook_boost ?? 0), 0);
return { boostedHits: boosted.length, sampleCitations: cites, topBoost: top };
}
async function ratePlaybook(
result: RunResult,
statsBefore: MemoryStats,
statsAfter: MemoryStats,
): Promise<Rating> {
const notes: string[] = [];
let geo = 0, authenticity = 0, persistence = 0, boost_firing = 0, speed = 0;
// 1. Geo + authenticity per fill
for (const f of result.fills) {
const w = await lookupWorker(f.candidate_id).catch(() => null);
if (!w) { notes.push(`✗ candidate_id ${f.candidate_id} not in workers_500k`); continue; }
authenticity += 1;
if (w.city.toLowerCase() === result.task.target_city.toLowerCase()
&& w.state === result.task.target_state) {
geo += 1;
} else {
notes.push(`${w.name} (id=${w.worker_id}) is in ${w.city}, ${w.state}, not ${result.task.target_city}, ${result.task.target_state}`);
}
}
geo = Math.min(geo, 2);
authenticity = Math.min(authenticity, 2);
// 2. Persistence
const grew = statsAfter.entries - statsBefore.entries;
if (grew === 1) { persistence = 2; notes.push(`✓ playbook_memory grew by exactly 1`); }
else if (grew >= 1) { persistence = 1; notes.push(`◑ playbook_memory grew by ${grew} (expected 1)`); }
else { notes.push(`✗ playbook_memory did not grow (before=${statsBefore.entries} after=${statsAfter.entries})`); }
// 3. Boost firing — re-run the same query and see if it lifts anything
const v = await verifyBoostFires(result.task).catch(e => { notes.push(`✗ verify hybrid failed: ${(e as Error).message}`); return null; });
if (v) {
if (v.boostedHits >= 2) boost_firing = 3;
else if (v.boostedHits === 1) boost_firing = 2;
else if (v.topBoost > 0) boost_firing = 1;
else boost_firing = 0;
notes.push(`boost re-query: ${v.boostedHits}/10 hits boosted, top=+${v.topBoost.toFixed(3)}, citations=${v.sampleCitations.slice(0, 3).join(",")}`);
}
// 4. Speed
if (result.duration_secs <= 240) speed = 1;
else notes.push(`◑ slow: ${result.duration_secs}s (>240)`);
const total = geo + authenticity + persistence + boost_firing + speed;
return { geo, authenticity, persistence, boost_firing, speed, total, notes };
}
function fmtRating(r: Rating): string {
return `geo=${r.geo}/2 auth=${r.authenticity}/2 persist=${r.persistence}/2 boost=${r.boost_firing}/3 speed=${r.speed}/1 → ${r.total}/10`;
}
// ────────────────────────── main ──────────────────────────
async function main() {
const taskA: TaskSpec = {
id: `e2e-A-${Date.now()}`,
operation: "fill: Welder x2 in Toledo, OH",
target_role: "Welder", target_count: 2, target_city: "Toledo", target_state: "OH",
approach_hint: "hybrid_search against workers_500k_v1 with sql_filter on role+city+state, then sql verify",
};
const taskB: TaskSpec = {
id: `e2e-B-${Date.now()}`,
operation: "fill: Forklift Operator x2 in Nashville, TN",
target_role: "Forklift Operator", target_count: 2, target_city: "Nashville", target_state: "TN",
approach_hint: "hybrid_search against workers_500k_v1 with sql_filter on role+city+state, then sql verify",
};
console.log(`▶ parallel real-world test`);
console.log(` A: ${taskA.operation}`);
console.log(` B: ${taskB.operation}`);
console.log(` models: executor=${EXECUTOR_MODEL} reviewer=${REVIEWER_MODEL}\n`);
const statsBefore = await fetchMemoryStats();
console.log(`▶ playbook_memory before: ${statsBefore.entries} entries, ${statsBefore.total_names_endorsed} endorsed names\n`);
// Run both pairs in parallel. Each is its own (executor, reviewer)
// conversation; they do NOT see each other's logs.
const [resA, resB] = await Promise.all([
runOrchestrator(taskA, "A"),
runOrchestrator(taskB, "B"),
]);
console.log(`\n▶ both orchestrators returned`);
console.log(` A: ok=${resA.ok} turns=${resA.turns} ${resA.duration_secs}s ${resA.error ?? ""}`);
console.log(` B: ok=${resB.ok} turns=${resB.turns} ${resB.duration_secs}s ${resB.error ?? ""}`);
if (!resA.ok && !resB.ok) {
throw new Error(`both orchestrators failed — substrate or models in bad state`);
}
// Sequential seed after parallel runs — avoids concurrent embed socket collision
async function seedTask(res: RunResult, prefix: string) {
if (!res.ok || res.fills.length === 0) return;
for (let attempt = 0; attempt < 3; attempt++) {
try {
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
method: "POST", headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: res.task.operation,
approach: res.approach || "multi-agent → hybrid search",
context: res.task.approach_hint ?? `${res.task.target_role} fill in ${res.task.target_city}, ${res.task.target_state}`,
endorsed_names: res.fills.map(f => f.name),
append: true,
}),
});
if (r.ok) {
const j = await r.json() as any;
console.log(`[${prefix}] ↳ seeded playbook_memory: id=${j.outcome?.playbook_id ?? j.playbook_id} entries=${j.entries_after}`);
} else {
console.warn(`[${prefix}] seed warning: ${r.status} ${await r.text()}`);
}
return;
} catch (e) {
if (attempt === 2) { console.warn(`[${prefix}] seed errored: ${(e as Error).message}`); return; }
await Bun.sleep(1000 * (attempt + 1));
}
}
}
await seedTask(resA, "A");
await Bun.sleep(5000);
await seedTask(resB, "B");
const statsMid = await fetchMemoryStats();
console.log(`\n▶ playbook_memory after both runs: ${statsMid.entries} entries (+${statsMid.entries - statsBefore.entries})\n`);
// Rate each successful playbook. We compute persistence per task by
// splitting the growth — both seeded sequentially-ish, so each should
// contribute 1.
const ratings: Array<{ id: string; ok: boolean; rating?: Rating; error?: string }> = [];
if (resA.ok) {
const beforeForA: MemoryStats = { entries: statsBefore.entries, total_names_endorsed: statsBefore.total_names_endorsed };
const afterForA: MemoryStats = { entries: statsBefore.entries + (resA.fills.length > 0 ? 1 : 0), total_names_endorsed: statsBefore.total_names_endorsed };
// Use real measured numbers when they're unambiguous (only one task succeeded)
const ra = await ratePlaybook(resA, beforeForA, resB.ok ? afterForA : statsMid);
ratings.push({ id: "A", ok: true, rating: ra });
} else ratings.push({ id: "A", ok: false, error: resA.error });
if (resB.ok) {
const beforeForB: MemoryStats = resA.ok
? { entries: statsBefore.entries + 1, total_names_endorsed: statsBefore.total_names_endorsed }
: statsBefore;
const rb = await ratePlaybook(resB, beforeForB, statsMid);
ratings.push({ id: "B", ok: true, rating: rb });
} else ratings.push({ id: "B", ok: false, error: resB.error });
console.log(`\n▶ Per-playbook ratings:\n`);
for (const r of ratings) {
if (!r.ok) {
console.log(` ${r.id}: FAILED — ${r.error}`);
continue;
}
console.log(` ${r.id}: ${fmtRating(r.rating!)}`);
for (const n of r.rating!.notes) console.log(` ${n}`);
}
const totals = ratings.filter(r => r.rating).map(r => r.rating!.total);
if (totals.length === 0) {
throw new Error(`no playbooks rated — both orchestrators failed`);
}
const min = Math.min(...totals);
const avg = totals.reduce((s, t) => s + t, 0) / totals.length;
console.log(`\n▶ Summary: avg=${avg.toFixed(1)}/10 min=${min}/10`);
// Hard gate: any rating below 5 means the loop is broken end-to-end.
if (min < 5) throw new Error(`rating gate failed — min ${min}/10 (need ≥5)`);
console.log(`\n✓ end-to-end real-world test passed`);
process.exit(0);
}
main().catch(e => {
console.error(`\n✗ ${(e as Error).message}`);
if ((e as any).stack) console.error((e as any).stack);
process.exit(1);
});