lakehouse/tests/multi-agent/orchestrator.ts
root 25b7e6c3a7 Phase 19 wiring + Path 1/2 work + chain integrity fixes
Backend:
- crates/vectord/src/playbook_memory.rs (new): Phase 19 in-memory boost
  store with seed/rebuild/snapshot, plus temporal decay (e^-age/30 per
  playbook), persist_to_sql endpoint backing successful_playbooks_live,
  and discover_patterns endpoint for meta-index pattern aggregation
  (recurring certs/skills/archetype/reliability across similar past fills).
- DEFAULT_TOP_K_PLAYBOOKS bumped 5 → 25; old default silently missed
  most boosts when memory had > 25 entries.
- service.rs: new routes /vectors/playbook_memory/{seed,rebuild,stats,
  persist_sql,patterns}.

Bun staffing co-pilot (mcp-server/):
- /search, /match, /verify, /proof, /simulation/run, MCP tools all
  forward use_playbook_memory:true and playbook_memory_k:25 to the
  hybrid endpoint. Boost was previously dark across the entire app.
- /log no longer POSTs to /ingest/file — that endpoint REPLACES the
  dataset's object list, so single-row CSV writes were wiping all prior
  rows in successful_playbooks (sp_rows went 33→1 in one /log call).
  /log now seeds playbook_memory with canonical short text and calls
  /persist_sql to keep successful_playbooks_live in sync.
- /simulation/run cumulative end-of-week CSV write removed for the same
  reason. Per-day per-contract /seed (added in this session) is the
  accumulating feedback path now.
- search.html addWorkerInsight renders a green "Endorsed · N playbooks"
  chip with playbook citations when boost > 0.

Internal Dioxus UI (crates/ui/):
- Dashboard phase list rewritten through Phase 19 (was stuck at "Phase
  16: File Watcher" / "Phase 17: DB Connector" — both wrong).
- Removed fabricated "27ms" stat label.
- Ask tab examples + SQL default replaced with real staffing prompts
  against candidates/clients/job_orders (was referencing nonexistent
  employees/products/events).
- New Playbook tab exposes /vectors/playbook_memory/{stats,rebuild} and
  side-by-side hybrid search (boost OFF vs ON) with citations.

Tests (tests/multi-agent/):
- run_e2e_rated.ts: parallel two-agent (mistral + qwen2.5) build phase
  + verifier rating (geo, auth, persist, boost, speed → /10).
- network_proving.ts: continuous build → verify → repeat with
  staffing-recruiter profile hot-swap; geo-discrimination check.
- chain_of_custody.ts: single recruiter operation traced through every
  layer (Bun /search, direct /vectors/hybrid parity, /log, SQL,
  playbook_memory growth, profile activation, post-op boost lift).
2026-04-20 06:21:13 -05:00

303 lines
13 KiB
TypeScript

// Two-agent orchestrator. Both agents run as concurrent async loops
// coordinated through a shared in-memory log; one turn of executor then
// one turn of reviewer, interleaved until consensus_done, drift-cycle
// blown, or hard turn cap. On success writes a playbook JSON; on failure
// exits non-zero with the full log for inspection.
//
// Fail-fast: every caught error is appended to the log AND rethrown, so
// the orchestrator top-level catches, dumps, and exits with code 1. The
// test harness reads the exit code to decide if the substrate is healthy.
import {
type LogEntry,
type TaskSpec,
type Action,
type Fill,
callTool,
hybridSearch,
sqlQuery,
generate,
parseAction,
executorPrompt,
reviewerPrompt,
GATEWAY,
} from "./agent.ts";
import { mkdir, writeFile } from "node:fs/promises";
import { join } from "node:path";
const EXECUTOR_MODEL = "mistral:latest";
const REVIEWER_MODEL = "qwen2.5:latest";
const MAX_TURNS = 12; // executor turns; reviewer gets one per
const MAX_CONSECUTIVE_DRIFTS = 3; // drift-cycle blown → give up
// Default task. Override via argv[2] if you want something else; see
// `parseTaskFromArg`. Picked from the real-world staffing pattern but
// not in the existing successful_playbooks list — this is a fresh fill.
// Default task lifted from the production pattern in successful_playbooks.
// Toledo, OH has 342 welders in workers_500k so supply is ample — the test
// is about collaboration and drift correction, not needle-in-haystack.
const DEFAULT_TASK: TaskSpec = {
id: `task-${Date.now()}`,
operation: "fill: Welder x2 in Toledo, OH",
target_role: "Welder",
target_count: 2,
target_city: "Toledo",
target_state: "OH",
approach_hint: "hybrid search against workers_500k_v1, narrow by role+city+state+availability, rank semantically",
};
function parseTaskFromArg(): TaskSpec {
const arg = process.argv[2];
if (!arg) return DEFAULT_TASK;
// Accept "role:Welder count:2 city:Columbus state:OH" style for ad-hoc
// tasks without standing up a JSON file. Anything more complex, feed
// it a JSON path.
if (arg.endsWith(".json")) {
return JSON.parse(require("node:fs").readFileSync(arg, "utf-8"));
}
const kv: Record<string, string> = {};
for (const token of arg.split(/\s+/)) {
const [k, ...v] = token.split(":");
kv[k] = v.join(":");
}
return {
id: `task-${Date.now()}`,
operation: `fill: ${kv.role} x${kv.count} in ${kv.city}, ${kv.state}`,
target_role: kv.role,
target_count: Number(kv.count),
target_city: kv.city,
target_state: kv.state,
approach_hint: kv.hint ?? "hybrid search",
};
}
// Helper: pretty one-line print for each log entry so the human watching
// stdout can follow without pulling the JSONL file.
// Defensive one-line formatter. Models sometimes omit optional fields
// (rationale, notes), so every access is guarded.
function fmt(e: LogEntry): string {
const tag = `[t${e.turn.toString().padStart(2, "0")} ${e.role.padEnd(8)} ${e.kind.padEnd(14)}]`;
const c = e.content ?? {};
const trim = (s: any, n: number) => String(s ?? "").slice(0, n);
if (e.kind === "tool_call")
return `${tag} ${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 80)}) — ${trim(c.rationale, 60)}`;
if (e.kind === "tool_result") {
const rows = c?.rows?.length ?? c?.sources?.length ?? undefined;
return `${tag} ${rows !== undefined ? `rows=${rows}` : JSON.stringify(c).slice(0, 80)}`;
}
if (e.kind === "critique") return `${tag} verdict=${c.verdict}${trim(c.notes, 80)}`;
if (e.kind === "propose_done")
return `${tag} ${c.fills?.length ?? 0} fills: ${(c.fills ?? []).map((f: Fill) => f.name).join(", ")}`;
if (e.kind === "consensus_done") return `${tag}`;
if (e.kind === "plan") return `${tag} ${c.steps?.length ?? 0} steps: ${(c.steps ?? []).slice(0, 2).join(" / ")}`;
if (e.kind === "error") return `${tag} ${c.message ?? c}`;
return `${tag} ${JSON.stringify(c).slice(0, 100)}`;
}
// Execute one tool call. The tool catalog in the prompt lists both the
// registered Phase 12 tools AND a pseudo-tool "hybrid_search" for the
// /vectors/hybrid endpoint — unify here so the executor doesn't need to
// know which surface a capability lives on.
async function executeToolCall(name: string, args: Record<string, any>): Promise<any> {
if (name === "hybrid_search") {
const { sql_filter, question, index_name, k } = args;
if (!sql_filter || !question || !index_name) {
throw new Error(`hybrid_search needs sql_filter + question + index_name, got ${JSON.stringify(args)}`);
}
// Pass through to /vectors/hybrid. id_column defaults to worker_id
// server-side, which is what workers_500k uses.
const body: any = { sql_filter, question, index_name, top_k: k ?? 10, generate: false };
return (await (await fetch("http://localhost:3100/vectors/hybrid", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
})).json());
}
if (name === "sql") {
const { query } = args;
if (!query || typeof query !== "string") throw new Error(`sql needs query (string), got ${JSON.stringify(args)}`);
if (!/^\s*SELECT/i.test(query)) throw new Error(`sql tool allows SELECT only: ${query}`);
return sqlQuery(query);
}
// Fall through to Phase 12 registry for any other named tool.
return callTool(name, args);
}
async function main() {
const task = parseTaskFromArg();
const log: LogEntry[] = [];
let turn = 0;
let consecutiveDrifts = 0;
let sealed: { fills: Fill[]; approach: string } | null = null;
const append = (e: Omit<LogEntry, "at">): LogEntry => {
const full: LogEntry = { ...e, at: new Date().toISOString() };
log.push(full);
console.log(fmt(full));
return full;
};
console.log(`▶ task: ${task.operation}`);
console.log(`▶ executor=${EXECUTOR_MODEL} reviewer=${REVIEWER_MODEL}`);
console.log();
try {
while (turn < MAX_TURNS && !sealed) {
turn += 1;
// --- EXECUTOR TURN ---
const execRaw = await generate(EXECUTOR_MODEL, executorPrompt(task, log), {
temperature: 0.2,
max_tokens: 600,
});
let execAction: Action;
try {
execAction = parseAction(execRaw, "executor");
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL, kind: "error",
content: { message: (e as Error).message, raw: execRaw.slice(0, 400) } });
throw e;
}
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: execAction.kind as any, content: execAction });
// If tool_call, execute and feed result back into the log. Tool
// validation / server errors come back as a tool_result with an
// `error` field — the executor reads its own error on the next turn
// and self-corrects (e.g. "oh, I forgot the `question` argument").
// This is softer than hard-failing the orchestrator: the whole
// point of two-agent collaboration is letting agents learn from
// immediate feedback instead of crashing the run.
if (execAction.kind === "tool_call") {
try {
const result = await executeToolCall(execAction.tool, execAction.args);
const trimmed = trimResult(result);
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: "tool_result", content: trimmed });
} catch (e) {
append({ turn, role: "executor", model: EXECUTOR_MODEL,
kind: "tool_result",
content: { error: (e as Error).message, tool: execAction.tool, args: execAction.args } });
// Count as a soft drift — if the executor keeps throwing tool
// errors, consecutiveDrifts still trips the abort.
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`aborting — ${MAX_CONSECUTIVE_DRIFTS} consecutive tool errors, executor can't self-correct`);
}
}
}
// --- REVIEWER TURN ---
const revRaw = await generate(REVIEWER_MODEL, reviewerPrompt(task, log), {
temperature: 0.1,
max_tokens: 400,
});
let revAction: Action;
try {
revAction = parseAction(revRaw, "reviewer");
} catch (e) {
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "error",
content: { message: (e as Error).message, raw: revRaw.slice(0, 400) } });
throw e;
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL,
kind: "critique", content: revAction });
if (revAction.kind !== "critique") throw new Error(`reviewer emitted non-critique: ${revAction.kind}`);
if (revAction.verdict === "drift") {
consecutiveDrifts += 1;
if (consecutiveDrifts >= MAX_CONSECUTIVE_DRIFTS) {
throw new Error(`aborting — ${MAX_CONSECUTIVE_DRIFTS} consecutive drift flags, executor can't self-correct`);
}
} else {
consecutiveDrifts = 0;
}
// Consensus: executor proposed done AND reviewer approved.
if (execAction.kind === "propose_done" && revAction.verdict === "approve_done") {
if (execAction.fills.length !== task.target_count) {
throw new Error(`consensus malformed — ${execAction.fills.length} fills vs target ${task.target_count}`);
}
append({ turn, role: "reviewer", model: REVIEWER_MODEL, kind: "consensus_done",
content: { fills: execAction.fills } });
sealed = { fills: execAction.fills, approach: execAction.rationale };
}
}
if (!sealed) throw new Error(`no consensus after ${MAX_TURNS} turns — task incomplete`);
// Write playbook entry matching the successful_playbooks schema.
const playbook = {
timestamp: new Date().toISOString(),
operation: task.operation,
approach: sealed.approach,
result: `${sealed.fills.length}/${task.target_count} filled → ${sealed.fills.map(f => f.name).join(", ")}`,
context: `executor=${EXECUTOR_MODEL} reviewer=${REVIEWER_MODEL} turns=${turn}`,
task,
fills: sealed.fills,
log,
};
await mkdir("./tests/multi-agent/playbooks", { recursive: true });
const path = join("./tests/multi-agent/playbooks", `${task.id}.json`);
await writeFile(path, JSON.stringify(playbook, null, 2));
console.log(`\n✓ playbook written: ${path}`);
console.log(` ${playbook.result}`);
// Phase 19.5: write-through to playbook_memory. The sealed fills are
// the endorsement; next semantically-similar query will surface them
// higher. /seed bypasses the successful_playbooks ingest round-trip
// — when that ingest path ships, this block should switch to append
// + rebuild instead.
try {
// Seed context is what the embedding model actually sees alongside
// the operation — so it has to carry task-semantic content (role,
// city, scenario) rather than orchestrator bookkeeping. We stash
// the bookkeeping in the full playbook JSON instead (see playbook
// object above) where operators can grep it without it polluting
// the ranking signal.
const seedContext = task.approach_hint
?? `${task.target_role} fill in ${task.target_city}, ${task.target_state}`;
const seedRes = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
operation: task.operation,
approach: sealed.approach || "multi-agent → hybrid search",
context: seedContext,
endorsed_names: sealed.fills.map(f => f.name),
append: true,
}),
});
if (seedRes.ok) {
const j = await seedRes.json() as any;
console.log(` ↳ playbook_memory seeded: id=${j.playbook_id} entries=${j.entries_after}`);
} else {
console.warn(` ↳ playbook_memory seed failed: ${seedRes.status} ${await seedRes.text()}`);
}
} catch (e) {
console.warn(` ↳ playbook_memory seed errored: ${(e as Error).message}`);
}
process.exit(0);
} catch (e) {
console.error(`\n✗ ${(e as Error).message}`);
// Still persist the log for inspection.
await mkdir("./tests/multi-agent/playbooks", { recursive: true });
const path = join("./tests/multi-agent/playbooks", `${task.id}-FAILED.json`);
await writeFile(path, JSON.stringify({ task, error: (e as Error).message, log }, null, 2));
console.error(` log dumped: ${path}`);
process.exit(1);
}
}
function trimResult(r: any): any {
if (r && Array.isArray(r.rows)) {
return { ...r, rows: r.rows.slice(0, 20), _trimmed: r.rows.length > 20 ? `${r.rows.length - 20} more rows` : undefined };
}
return r;
}
main();