Accumulated since a6f12e2 (Phase 21 Rust port + Phase 27 versioning): - Phase 36: embed_semaphore on VectorState (permits=1) serializes seed embed calls — prevents sidecar socket collisions under concurrent /seed stress load - Phase 31+: run_stress.ts 6-task diverse stress scaffolding; run_e2e_rated.ts + orchestrator.ts tightening - Catalog dedupe cleanup: 16 duplicate manifests removed; canonical candidates.parquet (10.5MB -> 76KB) + placements.parquet (1.2MB -> 11KB) regenerated post-dedupe; fresh manifests for active datasets - vectord: harness EvalSet refinements (+181), agent portfolio rotation + ingest triggers (+158), autotune + rag adjustments - catalogd/storaged/ingestd/mcp-server: misc tightening - docs: Phase 28-36 PRD entries + DECISIONS ADR additions; control-plane pivot banner added to top of docs/PRD.md (pointing at docs/CONTROL_PLANE_PRD.md which lands in next commit) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
409 lines
18 KiB
TypeScript
409 lines
18 KiB
TypeScript
// Two-agent x two-tasks parallel real-world test with per-playbook rating.
|
|
//
|
|
// Spawns two independent (executor, reviewer) pairs concurrently, each
|
|
// driving a different staffing fill against the live substrate. After
|
|
// each pair seals a playbook, verifies the fill against workers_500k,
|
|
// confirms the seed reached playbook_memory, and re-runs the same query
|
|
// with use_playbook_memory=true to prove the boost fires.
|
|
//
|
|
// Errors fail fast — any HTTP error, parse error, or rating failure is
|
|
// rethrown so bun exits non-zero. Run with:
|
|
//
|
|
// bun run tests/multi-agent/run_e2e_rated.ts
|
|
//
|
|
// VRAM note: both pairs call the same two Ollama models (mistral +
|
|
// qwen2.5). Ollama queues at the model level, so "parallel" is concurrent
|
|
// orchestration, not concurrent inference — the loops interleave on the
|
|
// shared models. That's intentional: it stresses the same realistic
|
|
// path two staffing coordinators would hit if they both opened the app
|
|
// at 8am.
|
|
|
|
import {
|
|
type LogEntry,
|
|
type TaskSpec,
|
|
type Action,
|
|
type Fill,
|
|
GATEWAY,
|
|
generate,
|
|
parseAction,
|
|
executorPrompt,
|
|
reviewerPrompt,
|
|
sqlQuery,
|
|
callTool,
|
|
} from "./agent.ts";
|
|
|
|
const EXECUTOR_MODEL = "qwen3.5:latest";
|
|
const REVIEWER_MODEL = "qwen3:latest";
|
|
const MAX_TURNS = 12;
|
|
const MAX_CONSECUTIVE_DRIFTS = 3;
|
|
const INDEX_NAME = "workers_500k_v1";
|
|
|
|
interface RunResult {
|
|
task: TaskSpec;
|
|
ok: boolean;
|
|
turns: number;
|
|
duration_secs: number;
|
|
fills: Fill[];
|
|
log: LogEntry[];
|
|
approach: string;
|
|
error?: string;
|
|
}
|
|
|
|
// ────────────────────────── orchestrator (function form) ──────────────────────────
|
|
|
|
async function runOrchestrator(task: TaskSpec, prefix: string): Promise<RunResult> {
|
|
const start = Date.now();
|
|
const log: LogEntry[] = [];
|
|
let turn = 0;
|
|
let consecutiveDrifts = 0;
|
|
// Track tool errors separately from drift verdicts. Reviewer saying
|
|
// "continue" or "approve_done" should NOT reset a streak of malformed
|
|
// tool calls — that's a different failure mode (model can't form the
|
|
// call) than "executor is on the wrong path" (model is off-topic).
|
|
let consecutiveToolErrors = 0;
|
|
let sealed: { fills: Fill[]; approach: string } | null = null;
|
|
|
|
const append = (e: Omit<LogEntry, "at">): LogEntry => {
|
|
const full: LogEntry = { ...e, at: new Date().toISOString() };
|
|
log.push(full);
|
|
console.log(`[${prefix}] [t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}] ${shortContent(e)}`);
|
|
return full;
|
|
};
|
|
|
|
try {
|
|
while (turn < MAX_TURNS && !sealed) {
|
|
turn += 1;
|
|
|
|
// Executor
|
|
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), { temperature: 0.2, max_tokens: 1200, think: false });
|
|
const execAction = parseAction(execRaw, "executor");
|
|
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: execAction.kind as any, content: execAction });
|
|
|
|
if (execAction.kind === "tool_call") {
|
|
try {
|
|
const result = await executeToolCall(execAction.tool, execAction.args);
|
|
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result", content: trimResult(result) });
|
|
consecutiveToolErrors = 0;
|
|
} catch (e) {
|
|
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "tool_result",
|
|
content: { error: (e as Error).message, tool: execAction.tool, args: execAction.args } });
|
|
consecutiveToolErrors += 1;
|
|
if (consecutiveToolErrors >= MAX_CONSECUTIVE_DRIFTS) {
|
|
throw new Error(`${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors — executor can't form a valid call`);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reviewer
|
|
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), { temperature: 0.1, max_tokens: 1000, think: false });
|
|
const revAction = parseAction(revRaw, "reviewer");
|
|
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "critique", content: revAction });
|
|
|
|
if (revAction.kind !== "critique") throw new Error(`reviewer non-critique: ${revAction.kind}`);
|
|
if (revAction.verdict === "drift") {
|
|
consecutiveDrifts += 1;
|
|
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) throw new Error(`${MAX_CONSECUTIVE_DRIFTS} consecutive drifts`);
|
|
} else consecutiveDrifts = 0;
|
|
|
|
if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") {
|
|
if (execAction.fills.length !== task.target_count) {
|
|
throw new Error(`fills=${execAction.fills.length} target=${task.target_count}`);
|
|
}
|
|
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done", content: { fills: execAction.fills } });
|
|
sealed = { fills: execAction.fills, approach: (execAction as any).rationale ?? "multi-agent → hybrid" };
|
|
}
|
|
}
|
|
|
|
if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns`);
|
|
|
|
return {
|
|
task, ok: true, turns: turn, fills: sealed.fills, approach: sealed.approach,
|
|
duration_secs: Math.round((Date.now() - start) / 1000), log,
|
|
};
|
|
} catch (e) {
|
|
return {
|
|
task, ok: false, turns: turn, fills: [], approach: "",
|
|
duration_secs: Math.round((Date.now() - start) / 1000), log,
|
|
error: (e as Error).message,
|
|
};
|
|
}
|
|
}
|
|
|
|
async function executeToolCall(name: string, args: Record<string, any>): Promise<any> {
|
|
if (name === "hybrid_search") {
|
|
const { sql_filter, question, index_name, k } = args;
|
|
if (!sql_filter || !question || !index_name) throw new Error(`hybrid_search needs sql_filter+question+index_name`);
|
|
const r = await fetch(`${GATEWAY}/vectors/hybrid`, {
|
|
method: "POST", headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ sql_filter, question, index_name, top_k: k ?? 10, generate: false, use_playbook_memory: true }),
|
|
});
|
|
if (!r.ok) throw new Error(`hybrid_search → ${r.status}: ${await r.text()}`);
|
|
return r.json();
|
|
}
|
|
if (name === "sql") {
|
|
if (!args.query || typeof args.query !== "string") throw new Error("sql needs query");
|
|
if (!/^\s*SELECT/i.test(args.query)) throw new Error("sql allows SELECT only");
|
|
return sqlQuery(args.query);
|
|
}
|
|
return callTool(name, args);
|
|
}
|
|
|
|
function trimResult(r: any): any {
|
|
if (r && Array.isArray(r.rows)) return { ...r, rows: r.rows.slice(0, 20) };
|
|
if (r && Array.isArray(r.sources)) return { ...r, sources: r.sources.slice(0, 12) };
|
|
return r;
|
|
}
|
|
|
|
function shortContent(e: Omit<LogEntry, "at">): string {
|
|
const c: any = e.content ?? {};
|
|
if (e.kind === "tool_call") return `${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 70)})`;
|
|
if (e.kind === "tool_result") {
|
|
if (c.error) return `error: ${c.error}`;
|
|
if (Array.isArray(c.sources)) return `hybrid sql=${c.sql_matches} reranked=${c.vector_reranked}`;
|
|
if (Array.isArray(c.rows)) return `sql ${c.rows.length} rows`;
|
|
return JSON.stringify(c).slice(0, 80);
|
|
}
|
|
if (e.kind === "critique") return `verdict=${c.verdict} ${(c.notes ?? "").slice(0, 60)}`;
|
|
if (e.kind === "propose_done") return `${(c.fills ?? []).length} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`;
|
|
if (e.kind === "consensus_done") return "✓";
|
|
if (e.kind === "plan") return `${(c.steps ?? []).length} steps`;
|
|
return JSON.stringify(c).slice(0, 80);
|
|
}
|
|
|
|
// ────────────────────────── playbook rating ──────────────────────────
|
|
|
|
interface Rating {
|
|
geo: number; // 0-2: fills actually in target city/state
|
|
authenticity: number; // 0-2: fills' worker_ids exist in workers_500k
|
|
persistence: number; // 0-2: playbook_memory entry count grew correctly
|
|
boost_firing: number; // 0-3: follow-up query shows non-zero boost
|
|
speed: number; // 0-1: completed under 4 min
|
|
total: number; // /10
|
|
notes: string[];
|
|
}
|
|
|
|
interface MemoryStats { entries: number; total_names_endorsed: number }
|
|
|
|
async function fetchMemoryStats(): Promise<MemoryStats> {
|
|
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/stats`);
|
|
if (!r.ok) throw new Error(`stats → ${r.status}`);
|
|
return r.json() as Promise<MemoryStats>;
|
|
}
|
|
|
|
// Try to resolve a fill's candidate_id to a workers_500k row. Accepts
|
|
// "W500K-7995" (vector doc_id with prefix) and "7995" (raw worker_id).
|
|
async function lookupWorker(candidate_id: string): Promise<{ worker_id: number; name: string; city: string; state: string; role: string } | null> {
|
|
const numStr = candidate_id.replace(/^W500K-/i, "").replace(/[^\d]/g, "");
|
|
if (!numStr) return null;
|
|
const num = parseInt(numStr, 10);
|
|
if (!Number.isFinite(num)) return null;
|
|
const r = await sqlQuery(`SELECT worker_id, name, city, state, role FROM workers_500k WHERE worker_id = ${num} LIMIT 1`);
|
|
return (r.rows && r.rows[0]) ?? null;
|
|
}
|
|
|
|
// Re-run a hybrid query that mirrors the contract — proves the freshly
|
|
// seeded playbook actually lifts a future search.
|
|
async function verifyBoostFires(task: TaskSpec): Promise<{ boostedHits: number; sampleCitations: string[]; topBoost: number }> {
|
|
// Mirror the contract's actual geo. The playbook stored (city, state)
|
|
// from the operation; if the verify SQL doesn't restrict to the same
|
|
// city, the candidate pool may not include the seeded workers and the
|
|
// boost has nothing to lift. The contract pattern in production also
|
|
// includes city — recruiters fill specific cities, not whole states.
|
|
const sql_filter = `role = '${task.target_role.replace(/'/g, "''")}' `
|
|
+ `AND state = '${task.target_state}' `
|
|
+ `AND city = '${task.target_city.replace(/'/g, "''")}'`;
|
|
const r = await fetch(`${GATEWAY}/vectors/hybrid`, {
|
|
method: "POST", headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
index_name: INDEX_NAME, filter_dataset: "workers_500k", id_column: "worker_id",
|
|
sql_filter, question: `${task.target_role} in ${task.target_city}, ${task.target_state}`,
|
|
top_k: 10, generate: false, use_playbook_memory: true, playbook_memory_k: 15,
|
|
}),
|
|
});
|
|
if (!r.ok) throw new Error(`verify hybrid → ${r.status}: ${await r.text()}`);
|
|
const j = (await r.json()) as any;
|
|
const sources: any[] = j.sources ?? [];
|
|
const boosted = sources.filter(s => (s.playbook_boost ?? 0) > 0);
|
|
const cites = boosted.flatMap(s => s.playbook_citations ?? []).slice(0, 5);
|
|
const top = sources.reduce((m, s) => Math.max(m, s.playbook_boost ?? 0), 0);
|
|
return { boostedHits: boosted.length, sampleCitations: cites, topBoost: top };
|
|
}
|
|
|
|
async function ratePlaybook(
|
|
result: RunResult,
|
|
statsBefore: MemoryStats,
|
|
statsAfter: MemoryStats,
|
|
): Promise<Rating> {
|
|
const notes: string[] = [];
|
|
let geo = 0, authenticity = 0, persistence = 0, boost_firing = 0, speed = 0;
|
|
|
|
// 1. Geo + authenticity per fill
|
|
for (const f of result.fills) {
|
|
const w = await lookupWorker(f.candidate_id).catch(() => null);
|
|
if (!w) { notes.push(`✗ candidate_id ${f.candidate_id} not in workers_500k`); continue; }
|
|
authenticity += 1;
|
|
if (w.city.toLowerCase() === result.task.target_city.toLowerCase()
|
|
&& w.state === result.task.target_state) {
|
|
geo += 1;
|
|
} else {
|
|
notes.push(`◑ ${w.name} (id=${w.worker_id}) is in ${w.city}, ${w.state}, not ${result.task.target_city}, ${result.task.target_state}`);
|
|
}
|
|
}
|
|
geo = Math.min(geo, 2);
|
|
authenticity = Math.min(authenticity, 2);
|
|
|
|
// 2. Persistence
|
|
const grew = statsAfter.entries - statsBefore.entries;
|
|
if (grew === 1) { persistence = 2; notes.push(`✓ playbook_memory grew by exactly 1`); }
|
|
else if (grew >= 1) { persistence = 1; notes.push(`◑ playbook_memory grew by ${grew} (expected 1)`); }
|
|
else { notes.push(`✗ playbook_memory did not grow (before=${statsBefore.entries} after=${statsAfter.entries})`); }
|
|
|
|
// 3. Boost firing — re-run the same query and see if it lifts anything
|
|
const v = await verifyBoostFires(result.task).catch(e => { notes.push(`✗ verify hybrid failed: ${(e as Error).message}`); return null; });
|
|
if (v) {
|
|
if (v.boostedHits >= 2) boost_firing = 3;
|
|
else if (v.boostedHits === 1) boost_firing = 2;
|
|
else if (v.topBoost > 0) boost_firing = 1;
|
|
else boost_firing = 0;
|
|
notes.push(`boost re-query: ${v.boostedHits}/10 hits boosted, top=+${v.topBoost.toFixed(3)}, citations=${v.sampleCitations.slice(0, 3).join(",")}`);
|
|
}
|
|
|
|
// 4. Speed
|
|
if (result.duration_secs <= 240) speed = 1;
|
|
else notes.push(`◑ slow: ${result.duration_secs}s (>240)`);
|
|
|
|
const total = geo + authenticity + persistence + boost_firing + speed;
|
|
return { geo, authenticity, persistence, boost_firing, speed, total, notes };
|
|
}
|
|
|
|
function fmtRating(r: Rating): string {
|
|
return `geo=${r.geo}/2 auth=${r.authenticity}/2 persist=${r.persistence}/2 boost=${r.boost_firing}/3 speed=${r.speed}/1 → ${r.total}/10`;
|
|
}
|
|
|
|
// ────────────────────────── main ──────────────────────────
|
|
|
|
async function main() {
|
|
const taskA: TaskSpec = {
|
|
id: `e2e-A-${Date.now()}`,
|
|
operation: "fill: Welder x2 in Toledo, OH",
|
|
target_role: "Welder", target_count: 2, target_city: "Toledo", target_state: "OH",
|
|
approach_hint: "hybrid_search against workers_500k_v1 with sql_filter on role+city+state, then sql verify",
|
|
};
|
|
const taskB: TaskSpec = {
|
|
id: `e2e-B-${Date.now()}`,
|
|
operation: "fill: Forklift Operator x2 in Nashville, TN",
|
|
target_role: "Forklift Operator", target_count: 2, target_city: "Nashville", target_state: "TN",
|
|
approach_hint: "hybrid_search against workers_500k_v1 with sql_filter on role+city+state, then sql verify",
|
|
};
|
|
|
|
console.log(`▶ parallel real-world test`);
|
|
console.log(` A: ${taskA.operation}`);
|
|
console.log(` B: ${taskB.operation}`);
|
|
console.log(` models: executor=${EXECUTOR_MODEL} reviewer=${REVIEWER_MODEL}\n`);
|
|
|
|
const statsBefore = await fetchMemoryStats();
|
|
console.log(`▶ playbook_memory before: ${statsBefore.entries} entries, ${statsBefore.total_names_endorsed} endorsed names\n`);
|
|
|
|
// Run both pairs in parallel. Each is its own (executor, reviewer)
|
|
// conversation; they do NOT see each other's logs.
|
|
const [resA, resB] = await Promise.all([
|
|
runOrchestrator(taskA, "A"),
|
|
runOrchestrator(taskB, "B"),
|
|
]);
|
|
|
|
console.log(`\n▶ both orchestrators returned`);
|
|
console.log(` A: ok=${resA.ok} turns=${resA.turns} ${resA.duration_secs}s ${resA.error ?? ""}`);
|
|
console.log(` B: ok=${resB.ok} turns=${resB.turns} ${resB.duration_secs}s ${resB.error ?? ""}`);
|
|
|
|
if (!resA.ok && !resB.ok) {
|
|
throw new Error(`both orchestrators failed — substrate or models in bad state`);
|
|
}
|
|
|
|
// Sequential seed after parallel runs — avoids concurrent embed socket collision
|
|
async function seedTask(res: RunResult, prefix: string) {
|
|
if (!res.ok || res.fills.length === 0) return;
|
|
for (let attempt = 0; attempt < 3; attempt++) {
|
|
try {
|
|
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
|
|
method: "POST", headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({
|
|
operation: res.task.operation,
|
|
approach: res.approach || "multi-agent → hybrid search",
|
|
context: res.task.approach_hint ?? `${res.task.target_role} fill in ${res.task.target_city}, ${res.task.target_state}`,
|
|
endorsed_names: res.fills.map(f => f.name),
|
|
append: true,
|
|
}),
|
|
});
|
|
if (r.ok) {
|
|
const j = await r.json() as any;
|
|
console.log(`[${prefix}] ↳ seeded playbook_memory: id=${j.outcome?.playbook_id ?? j.playbook_id} entries=${j.entries_after}`);
|
|
} else {
|
|
console.warn(`[${prefix}] seed warning: ${r.status} ${await r.text()}`);
|
|
}
|
|
return;
|
|
} catch (e) {
|
|
if (attempt === 2) { console.warn(`[${prefix}] seed errored: ${(e as Error).message}`); return; }
|
|
await Bun.sleep(1000 * (attempt + 1));
|
|
}
|
|
}
|
|
}
|
|
|
|
await seedTask(resA, "A");
|
|
await Bun.sleep(5000);
|
|
await seedTask(resB, "B");
|
|
|
|
const statsMid = await fetchMemoryStats();
|
|
console.log(`\n▶ playbook_memory after both runs: ${statsMid.entries} entries (+${statsMid.entries - statsBefore.entries})\n`);
|
|
|
|
// Rate each successful playbook. We compute persistence per task by
|
|
// splitting the growth — both seeded sequentially-ish, so each should
|
|
// contribute 1.
|
|
const ratings: Array<{ id: string; ok: boolean; rating?: Rating; error?: string }> = [];
|
|
|
|
if (resA.ok) {
|
|
const beforeForA: MemoryStats = { entries: statsBefore.entries, total_names_endorsed: statsBefore.total_names_endorsed };
|
|
const afterForA: MemoryStats = { entries: statsBefore.entries + (resA.fills.length > 0 ? 1 : 0), total_names_endorsed: statsBefore.total_names_endorsed };
|
|
// Use real measured numbers when they're unambiguous (only one task succeeded)
|
|
const ra = await ratePlaybook(resA, beforeForA, resB.ok ? afterForA : statsMid);
|
|
ratings.push({ id: "A", ok: true, rating: ra });
|
|
} else ratings.push({ id: "A", ok: false, error: resA.error });
|
|
|
|
if (resB.ok) {
|
|
const beforeForB: MemoryStats = resA.ok
|
|
? { entries: statsBefore.entries + 1, total_names_endorsed: statsBefore.total_names_endorsed }
|
|
: statsBefore;
|
|
const rb = await ratePlaybook(resB, beforeForB, statsMid);
|
|
ratings.push({ id: "B", ok: true, rating: rb });
|
|
} else ratings.push({ id: "B", ok: false, error: resB.error });
|
|
|
|
console.log(`\n▶ Per-playbook ratings:\n`);
|
|
for (const r of ratings) {
|
|
if (!r.ok) {
|
|
console.log(` ${r.id}: FAILED — ${r.error}`);
|
|
continue;
|
|
}
|
|
console.log(` ${r.id}: ${fmtRating(r.rating!)}`);
|
|
for (const n of r.rating!.notes) console.log(` ${n}`);
|
|
}
|
|
|
|
const totals = ratings.filter(r => r.rating).map(r => r.rating!.total);
|
|
if (totals.length === 0) {
|
|
throw new Error(`no playbooks rated — both orchestrators failed`);
|
|
}
|
|
const min = Math.min(...totals);
|
|
const avg = totals.reduce((s, t) => s + t, 0) / totals.length;
|
|
console.log(`\n▶ Summary: avg=${avg.toFixed(1)}/10 min=${min}/10`);
|
|
|
|
// Hard gate: any rating below 5 means the loop is broken end-to-end.
|
|
if (min < 5) throw new Error(`rating gate failed — min ${min}/10 (need ≥5)`);
|
|
|
|
console.log(`\n✓ end-to-end real-world test passed`);
|
|
process.exit(0);
|
|
}
|
|
|
|
main().catch(e => {
|
|
console.error(`\n✗ ${(e as Error).message}`);
|
|
if ((e as any).stack) console.error((e as any).stack);
|
|
process.exit(1);
|
|
});
|