parseAction now strips stray `)` before `}` and trailing commas —
qwen2.5 emits those regularly on tool_call outputs; soft-fix beats
retry-loops. hybrid_search no longer hard-requires `question`; defaults
to "qualified available workers" when the model drops it (mistral's
most common failure mode on complex events).
Kept original TOOL_CATALOG shape (args examples only, not full
action envelopes). The verbose few-shot version from the prior
iteration confused mistral into wrapping propose_done as tool_call.
Scenario V7 result: expansion (5 Forklift Ops) and emergency
(4 Loaders) — previously-failing complex events — now seal reliably.
Pool sizes: 687 and 380 from 500K corpus. Patterns endpoint produces
real operator-actionable signals:
expansion: "recurring certifications: Forklift (40%), OSHA-10 (40%)
· recurring skills: mill (40%) · archetype mostly: leader
· reliability median 0.83"
Baseline + recurring are now flaky (inverted trade-off, pure
model-reliability variance).
363 lines
15 KiB
TypeScript
363 lines
15 KiB
TypeScript
// Shared runtime for one agent. An agent is a role (executor or reviewer),
|
||
// a model name, and a conversation the orchestrator hands it. The agent
|
||
// produces ONE structured Action per turn; the orchestrator applies tool
|
||
// calls and feeds results back.
|
||
//
|
||
// Fail-fast: every HTTP error, parse error, and Ollama error throws. The
|
||
// orchestrator catches at the top and exits non-zero with the full log.
|
||
|
||
export const GATEWAY = "http://localhost:3100";
|
||
export const SIDECAR = "http://localhost:3200";
|
||
|
||
// --- Shared types ---
|
||
|
||
export type Role = "executor" | "reviewer";
|
||
|
||
export interface TaskSpec {
|
||
id: string;
|
||
operation: string; // "fill: Welder x2 in Columbus, OH"
|
||
target_role: string; // "Welder"
|
||
target_count: number; // 2
|
||
target_city: string; // "Columbus"
|
||
target_state: string; // "OH"
|
||
approach_hint?: string; // e.g. "hybrid search"; agent is free to ignore
|
||
}
|
||
|
||
export interface LogEntry {
|
||
turn: number;
|
||
role: Role;
|
||
model: string;
|
||
at: string;
|
||
kind:
|
||
| "plan"
|
||
| "tool_call"
|
||
| "tool_result"
|
||
| "critique"
|
||
| "propose_done"
|
||
| "consensus_done"
|
||
| "error";
|
||
content: any;
|
||
}
|
||
|
||
// Action = what an agent returns on one turn. Strict shape so we can
|
||
// enforce it at parse time rather than prompt-engineer around malformed
|
||
// JSON.
|
||
export type Action =
|
||
| { kind: "tool_call"; tool: string; args: Record<string, any>; rationale: string }
|
||
| { kind: "propose_done"; fills: Fill[]; rationale: string }
|
||
| { kind: "critique"; verdict: "continue" | "drift" | "approve_done"; notes: string }
|
||
| { kind: "plan"; steps: string[] };
|
||
|
||
export interface Fill {
|
||
candidate_id: string;
|
||
name: string;
|
||
reason: string;
|
||
}
|
||
|
||
// --- HTTP helpers (fail-fast) ---
|
||
|
||
async function http<T>(method: string, url: string, body?: any): Promise<T> {
|
||
const res = await fetch(url, {
|
||
method,
|
||
headers: { "Content-Type": "application/json" },
|
||
body: body ? JSON.stringify(body) : undefined,
|
||
});
|
||
if (!res.ok) {
|
||
const text = await res.text();
|
||
throw new Error(`${method} ${url} → ${res.status}: ${text}`);
|
||
}
|
||
return (await res.json()) as T;
|
||
}
|
||
|
||
// Tool calls land in the Phase 12 audit log keyed by this agent name.
|
||
// Distinguishable from human-driven calls (agent=="operator" or similar)
|
||
// so post-hoc queries can separate multi-agent runs.
|
||
export const TOOL_AGENT_ID = "multi-agent-test";
|
||
|
||
export async function callTool(tool: string, args: Record<string, any>): Promise<any> {
|
||
return http("POST", `${GATEWAY}/tools/${tool}/call`, {
|
||
params: args,
|
||
agent: TOOL_AGENT_ID,
|
||
});
|
||
}
|
||
|
||
export async function hybridSearch(sql_filter: string, question: string, k = 10): Promise<any> {
|
||
return http("POST", `${GATEWAY}/vectors/hybrid`, { sql_filter, question, k });
|
||
}
|
||
|
||
export async function sqlQuery(sql: string): Promise<any> {
|
||
return http("POST", `${GATEWAY}/query/sql`, { sql, format: "json" });
|
||
}
|
||
|
||
// Sidecar generate. Ollama's default keep_alive (5 min) keeps the model
|
||
// warm between turns on its own, so we don't need to pass it through.
|
||
export async function generate(model: string, prompt: string, opts: {
|
||
max_tokens?: number;
|
||
temperature?: number;
|
||
system?: string;
|
||
} = {}): Promise<string> {
|
||
const body: Record<string, any> = {
|
||
model,
|
||
prompt,
|
||
temperature: opts.temperature ?? 0.3,
|
||
max_tokens: opts.max_tokens ?? 800,
|
||
};
|
||
if (opts.system) body.system = opts.system;
|
||
const r = await http<any>("POST", `${SIDECAR}/generate`, body);
|
||
const text = r.text ?? "";
|
||
if (!text || typeof text !== "string") {
|
||
throw new Error(`generate returned empty text from ${model}: ${JSON.stringify(r).slice(0, 200)}`);
|
||
}
|
||
return text;
|
||
}
|
||
|
||
// --- Prompt construction ---
|
||
|
||
const TOOL_CATALOG = `
|
||
Available tools (each takes a JSON "args" object):
|
||
|
||
- hybrid_search(sql_filter: string, question: string, index_name: string, k?: number)
|
||
→ Narrow workers via SQL WHERE clause, then rank by semantic match.
|
||
→ Canonical production tool for fill tasks. Always use this FIRST.
|
||
→ Example args:
|
||
{"index_name":"workers_500k_v1",
|
||
"sql_filter":"role = 'Forklift Operator' AND city = 'Toledo' AND state = 'OH' AND CAST(availability AS DOUBLE) > 0.5",
|
||
"question":"reliable forklift operator Toledo",
|
||
"k":10}
|
||
|
||
- sql(query: string)
|
||
→ Raw read-only SELECT. Use for verification (confirm a worker exists,
|
||
check city/role/availability) after hybrid_search surfaces candidates.
|
||
→ Schema of workers_500k: worker_id, name, role, email, phone, city,
|
||
state, zip, skills, certifications, archetype, reliability,
|
||
responsiveness, engagement, communications, compliance, availability,
|
||
resume_text.
|
||
→ Example args:
|
||
{"query":"SELECT worker_id, name, role, city, state FROM workers_500k WHERE worker_id = 40123"}
|
||
|
||
Rules:
|
||
- hybrid_search returns sources[] each with {doc_id, chunk_text, score,
|
||
sql_verified, playbook_boost, playbook_citations}.
|
||
- **ID mapping:** vector doc_ids look like "W500K-7995" (prefix + number).
|
||
The SQL worker_id is an INTEGER. Use WHERE worker_id = 7995 directly.
|
||
- Names are NOT unique. Always identify by worker_id.
|
||
- availability and reliability are stored as text; ALWAYS cast as
|
||
DOUBLE in filters: CAST(availability AS DOUBLE) > 0.5.
|
||
- Narrative words from the guidance ("shift", "recurring", "expansion",
|
||
"emergency") are NOT columns. Only use columns listed above.
|
||
- Return EXACTLY ONE JSON object per turn. No markdown fences, no prose.
|
||
`;
|
||
|
||
// Smart per-kind summary so agents see the substance of each prior turn
|
||
// without a raw-JSON wall of text. hybrid_search results especially need
|
||
// this — raw JSON buries sources[] past any reasonable 400-char truncation.
|
||
function summarizeEntry(e: LogEntry): string {
|
||
const c = e.content ?? {};
|
||
switch (e.kind) {
|
||
case "plan":
|
||
return `PLAN: ${(c.steps ?? []).map((s: string, i: number) => `${i + 1}.${s}`).join(" ")}`;
|
||
case "tool_call":
|
||
return `TOOL_CALL ${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 250)})${c.rationale ? ` — ${c.rationale}` : ""}`;
|
||
case "tool_result": {
|
||
if (c.error) return `TOOL_RESULT error: ${c.error}`;
|
||
// hybrid_search response
|
||
if (Array.isArray(c.sources)) {
|
||
const head = c.sources.slice(0, 5).map((s: any) =>
|
||
`${s.doc_id}${s.sql_verified ? "✓" : ""} score=${(s.score ?? 0).toFixed(2)}: ${String(s.chunk_text ?? "").slice(0, 80)}`
|
||
).join(" | ");
|
||
return `TOOL_RESULT hybrid: sql_matches=${c.sql_matches} vector_reranked=${c.vector_reranked} sources=[${head}${c.sources.length > 5 ? ` +${c.sources.length - 5} more` : ""}]`;
|
||
}
|
||
// sql response
|
||
if (Array.isArray(c.rows)) {
|
||
const head = c.rows.slice(0, 5).map((r: any) => JSON.stringify(r)).join(" | ");
|
||
return `TOOL_RESULT sql: ${c.rows.length} rows${c.rows.length > 0 ? ` — ${head}${c.rows.length > 5 ? ` +${c.rows.length - 5} more` : ""}` : ""}`;
|
||
}
|
||
// fallback
|
||
return `TOOL_RESULT ${JSON.stringify(c).slice(0, 250)}`;
|
||
}
|
||
case "critique":
|
||
return `CRITIQUE verdict=${c.verdict} notes: ${String(c.notes ?? "").slice(0, 200)}`;
|
||
case "propose_done":
|
||
return `PROPOSE_DONE fills=[${(c.fills ?? []).map((f: Fill) => `${f.candidate_id}:${f.name}`).join(", ")}] rationale: ${String(c.rationale ?? "").slice(0, 120)}`;
|
||
case "consensus_done":
|
||
return `CONSENSUS ✓`;
|
||
case "error":
|
||
return `ERROR ${c.message ?? JSON.stringify(c)}`;
|
||
}
|
||
return JSON.stringify(c).slice(0, 200);
|
||
}
|
||
|
||
function renderLogForPrompt(log: LogEntry[]): string {
|
||
if (log.length === 0) return "(no turns yet)";
|
||
return log.slice(-12).map(e =>
|
||
`[t${e.turn} ${e.role}] ${summarizeEntry(e)}`
|
||
).join("\n");
|
||
}
|
||
|
||
// Crawl the log for every hybrid_search tool_result and collect the
|
||
// worker names + ids seen so far. LLMs routinely "forget" earlier turns
|
||
// once the conversation grows, so we surface a running ledger in the
|
||
// prompt as orchestrator-maintained state. The executor doesn't have to
|
||
// track this itself — it just reads it.
|
||
function candidatesSeen(log: LogEntry[]): Array<{ doc_id: string; name: string; city: string; state: string }> {
|
||
const seen = new Map<string, { doc_id: string; name: string; city: string; state: string }>();
|
||
for (const e of log) {
|
||
if (e.kind !== "tool_result") continue;
|
||
const sources = (e.content as any)?.sources;
|
||
if (!Array.isArray(sources)) continue;
|
||
for (const s of sources) {
|
||
// chunk_text shape "Name — Role in City, ST. …"
|
||
const t = String(s.chunk_text ?? "");
|
||
const [namePart, rest] = t.split("—", 2);
|
||
if (!namePart || !rest) continue;
|
||
const loc = rest.split(" in ")[1] ?? "";
|
||
const [city, stateRaw] = loc.split(",", 2);
|
||
const state = (stateRaw ?? "").trim().replace(/[^A-Za-z].*/, "");
|
||
if (!s.doc_id || !namePart.trim() || !city?.trim() || !state) continue;
|
||
if (!seen.has(s.doc_id)) {
|
||
seen.set(s.doc_id, {
|
||
doc_id: s.doc_id,
|
||
name: namePart.trim(),
|
||
city: city.trim(),
|
||
state,
|
||
});
|
||
}
|
||
}
|
||
}
|
||
return Array.from(seen.values());
|
||
}
|
||
|
||
export function executorPrompt(task: TaskSpec, log: LogEntry[]): string {
|
||
const logStr = renderLogForPrompt(log);
|
||
const seen = candidatesSeen(log);
|
||
const seenBlock = seen.length === 0
|
||
? "(no candidates surfaced yet — start with hybrid_search)"
|
||
: seen.map(s => ` - ${s.doc_id} ${s.name} (${s.city}, ${s.state})`).join("\n");
|
||
|
||
return `You are the EXECUTOR agent. Your job is to complete this task:
|
||
|
||
OPERATION: ${task.operation}
|
||
TARGET: ${task.target_count} × ${task.target_role} in ${task.target_city}, ${task.target_state}
|
||
${task.approach_hint ? `HINT: ${task.approach_hint}` : ""}
|
||
|
||
The REVIEWER agent is watching every turn. They will flag drift. Stay on target.
|
||
|
||
${TOOL_CATALOG}
|
||
|
||
CANDIDATES SURFACED SO FAR (orchestrator-tracked, do not forget these):
|
||
${seenBlock}
|
||
|
||
SHARED LOG (recent turns):
|
||
${logStr}
|
||
|
||
Your next action MUST be a JSON object matching one of these shapes:
|
||
{"kind":"plan","steps":["short step 1","short step 2",...]}
|
||
— use on turn 1 to outline your approach. Steps must be concrete.
|
||
{"kind":"tool_call","tool":"...","args":{...},"rationale":"why"}
|
||
— call a tool and see its result next turn.
|
||
{"kind":"propose_done","fills":[{"candidate_id":"...","name":"First Last","reason":"why them"}],"rationale":"..."}
|
||
— propose you've met the target. fills MUST have EXACTLY ${task.target_count} entries — count twice before emitting.
|
||
|
||
Strategy tip: once "CANDIDATES SURFACED SO FAR" has ≥ ${task.target_count} entries in ${task.target_city}, ${task.target_state} matching ${task.target_role}, verify ONE via the sql tool (to satisfy the reviewer's SQL-verification criterion) and then propose_done with the top ${task.target_count}. Don't keep re-searching.
|
||
|
||
Respond with ONLY the JSON object. No markdown fences, no prose.`;
|
||
}
|
||
|
||
export function reviewerPrompt(task: TaskSpec, log: LogEntry[]): string {
|
||
const logStr = renderLogForPrompt(log);
|
||
|
||
// If the most recent executor action was propose_done, the reviewer
|
||
// must commit to an up-or-down vote this turn — "continue" would stall
|
||
// the orchestrator forever. The wider prompt still describes all three
|
||
// verdicts, but we add a hard rule at the end that the model must obey.
|
||
const lastExec = [...log].reverse().find(e => e.role === "executor");
|
||
const awaitingApproval = lastExec?.kind === "propose_done";
|
||
|
||
return `You are the REVIEWER agent. The EXECUTOR is trying to complete this task:
|
||
|
||
OPERATION: ${task.operation}
|
||
TARGET: ${task.target_count} × ${task.target_role} in ${task.target_city}, ${task.target_state}
|
||
|
||
Your job: catch drift. Agents often wander from the actual objective. Specifically watch for:
|
||
- Proposing candidates who aren't in ${task.target_city}, ${task.target_state}.
|
||
- Proposing candidates who don't have ${task.target_role} skill.
|
||
- Proposing fewer or more than ${task.target_count} fills.
|
||
- Irrelevant tool calls (e.g. revenue_by_client when the task is a fill).
|
||
|
||
Available tools (for reference, but YOU don't call them):
|
||
- hybrid_search(sql_filter, question, index_name, k) — production fill path
|
||
- sql(query) — read-only SELECT for verification
|
||
|
||
SHARED LOG (recent turns):
|
||
${logStr}
|
||
|
||
Your next action MUST be a JSON object:
|
||
{"kind":"critique","verdict":"continue" | "drift" | "approve_done","notes":"..."}
|
||
|
||
- "continue" → executor is on a reasonable path, let them keep going.
|
||
- "drift" → executor is off-track; notes MUST tell them how to redirect.
|
||
- "approve_done" → executor's propose_done meets the criteria. Seal it.
|
||
|
||
APPROVAL CRITERIA (use these only for propose_done):
|
||
1. Exactly ${task.target_count} fills.
|
||
2. Each fill's name appears in a prior tool_result from ${task.target_city}, ${task.target_state} matching role "${task.target_role}".
|
||
3. Executor has SQL-verified at least one of the fills (any prior sql tool_result with that worker).
|
||
If 1–3 all hold, return approve_done. Do not demand further verification.
|
||
${awaitingApproval ? `
|
||
|
||
HARD RULE: The executor's most recent action was propose_done. On this turn you CANNOT return "continue" — it would stall the task. Choose approve_done (proposal is valid by the 3 criteria above) or drift (it fails one; state which in notes).` : ""}
|
||
|
||
Respond with ONLY the JSON object.`;
|
||
}
|
||
|
||
// Parse an agent's response into an Action, or throw.
|
||
export function parseAction(raw: string, role: Role): Action {
|
||
// Models sometimes wrap JSON in ```json fences; strip them.
|
||
let s = raw.trim();
|
||
if (s.startsWith("```")) {
|
||
s = s.replace(/^```(?:json)?\n?/, "").replace(/```$/, "").trim();
|
||
}
|
||
// Find the first {...} block.
|
||
const start = s.indexOf("{");
|
||
const end = s.lastIndexOf("}");
|
||
if (start < 0 || end <= start) {
|
||
throw new Error(`no JSON object in ${role} response: ${raw.slice(0, 300)}`);
|
||
}
|
||
let json = s.slice(start, end + 1);
|
||
// Soft-tolerate common model mistakes: stray `)` before closing brace
|
||
// (qwen2.5 does this on tool_call), trailing commas, etc. Fix the
|
||
// cheapest ones that are unambiguous.
|
||
json = json.replace(/\)\s*\}/g, "}"); // "...)}" → "...}"
|
||
json = json.replace(/,(\s*[}\]])/g, "$1"); // trailing comma before } or ]
|
||
let obj: any;
|
||
try {
|
||
obj = JSON.parse(json);
|
||
} catch (e) {
|
||
throw new Error(`invalid JSON from ${role}: ${(e as Error).message} | raw: ${json.slice(0, 300)}`);
|
||
}
|
||
|
||
if (role === "executor") {
|
||
if (obj.kind === "plan" && Array.isArray(obj.steps)) return obj as Action;
|
||
if (obj.kind === "tool_call" && typeof obj.tool === "string" && typeof obj.args === "object") return obj as Action;
|
||
if (obj.kind === "propose_done" && Array.isArray(obj.fills)) return obj as Action;
|
||
// Tolerance: some model outputs put a stray closing paren or
|
||
// trailing garbage after the main object. If the kind looks
|
||
// recognizable but shape doesn't match, bubble a cleaner error so
|
||
// the orchestrator's soft-fail path doesn't swallow it.
|
||
throw new Error(`executor returned unexpected shape: ${JSON.stringify(obj).slice(0, 200)}`);
|
||
} else {
|
||
// Normalize: some models (qwen2.5, mistral) emit the verdict AS the
|
||
// `kind` field directly instead of nesting it under a "critique"
|
||
// wrapper. Accept both shapes rather than hard-failing — the
|
||
// semantic content is identical, and rejecting would stall the
|
||
// orchestrator on a cosmetic schema miss.
|
||
if (obj.kind === "critique" && ["continue", "drift", "approve_done"].includes(obj.verdict)) {
|
||
return obj as Action;
|
||
}
|
||
if (["continue", "drift", "approve_done"].includes(obj.kind)) {
|
||
return { kind: "critique", verdict: obj.kind, notes: obj.notes ?? "" } as Action;
|
||
}
|
||
throw new Error(`reviewer returned unexpected shape: ${JSON.stringify(obj).slice(0, 200)}`);
|
||
}
|
||
}
|