ITEM 1 — k CAP + REASON FIELD The hybrid_search default k was hard-coded to 10. For multi-fill events (5× expansion, 4× emergency) that's pool=10 → propose 5-of-10, half the candidates become the answer with no room for rejection. Executor prompt now instructs k to scale with target_count: k = max(count*5, 20), cap 80. Default helper bumped 10 → 20. Fill.reason dropped from required to optional. Nothing downstream ever consumed it — resolveWorkerIds, sealSale, retrospective all use candidate_id and name. Models loved to write 100-150 char justifications per fill; on 4+ fills that blew the JSON budget before the structure closed. Test 1 run result after this change: FIRST EVER 5/5 on the Riverfront Steel scenario, 13 total turns across 5 events. The event that failed last run (emergency 4×Loader with truncated reason-field continuation) now clears in 2 turns. Progression: mistral baseline: 0/5 qwen3.5 + continuation + think:false: 4/5 qwen3.5 + k=20 + no-reason: 5/5 ✓ ITEM 2 — SCENARIO GENERATOR (NOT YET TESTED E2E) tests/multi-agent/gen_scenarios.ts emits N deterministic ScenarioSpecs with varied clients (15 companies), cities (20 Midwest cities known to exist in workers_500k), role mixes (14 industrial staffing roles, weighted realistic), and event sequences. Each gets a unique sig_hash so the KB populates with distinct neighbor signatures. scripts/run_kb_batch.sh runs all generated specs sequentially against scenario.ts, logs per-scenario outcomes, and reports KB state at the end. Each run takes ~2-4min; 20-30 scenarios = 1-2hr unattended. Next: test the generator+batch on a small N (3-5) to verify KB populates correctly and pathway recommendations start getting neighbor signal instead of cold-starts. Then item 3 (Rust re-weighting of hybrid_search by playbook_memory success).
699 lines
29 KiB
TypeScript
699 lines
29 KiB
TypeScript
// Shared runtime for one agent. An agent is a role (executor or reviewer),
|
||
// a model name, and a conversation the orchestrator hands it. The agent
|
||
// produces ONE structured Action per turn; the orchestrator applies tool
|
||
// calls and feeds results back.
|
||
//
|
||
// Fail-fast: every HTTP error, parse error, and Ollama error throws. The
|
||
// orchestrator catches at the top and exits non-zero with the full log.
|
||
|
||
export const GATEWAY = "http://localhost:3100";
|
||
export const SIDECAR = "http://localhost:3200";
|
||
|
||
// Ollama Cloud — used for the T3 overview tier when LH_OVERVIEW_CLOUD=1.
|
||
// Same /api/generate surface as local Ollama; just needs the bearer key.
|
||
// Default base and key are read from env so secrets never land in git.
|
||
export const OLLAMA_CLOUD_URL = process.env.OLLAMA_CLOUD_URL ?? "https://ollama.com";
|
||
export const OLLAMA_CLOUD_KEY = process.env.OLLAMA_CLOUD_KEY ?? "";
|
||
|
||
// Rough token estimator — chars/4 biased safe by ~15%. Swap to a real
|
||
// tokenizer (tiktoken or provider endpoint) once Phase 21 lands. Good
|
||
// enough to stop the "context silently truncated" failure mode today.
|
||
export function estimateTokens(text: string): number {
|
||
return Math.ceil(text.length / 4);
|
||
}
|
||
|
||
// ============================================================
|
||
// Scratchpad + tree-split continuation
|
||
// ============================================================
|
||
//
|
||
// Core problem: when a prompt OR response would exceed a model's window
|
||
// — e.g. 200 playbooks pasted in, or a long propose_done payload — we
|
||
// cannot just raise max_tokens forever. That's the tourniquet approach
|
||
// J explicitly rejected. The right answer is: split into sub-calls
|
||
// glued by a persistent scratchpad, so no context is ever lost.
|
||
//
|
||
// Two primitives:
|
||
//
|
||
// 1. generateContinuable(): for OUTPUT overflow. If the model returns
|
||
// a structurally-incomplete JSON (unmatched braces, truncated
|
||
// mid-value), auto-continue with the partial as scratchpad until
|
||
// the JSON parses or max_continuations is hit.
|
||
//
|
||
// 2. generateTreeSplit(): for INPUT overflow. If prompt + system +
|
||
// max_tokens exceeds window, shard the input at semantic boundaries,
|
||
// run each shard through the model with a running scratchpad digest,
|
||
// then run a final reduce pass to combine. This is map-reduce with
|
||
// glue.
|
||
//
|
||
// Both are tier-agnostic — they wrap generate() and generateCloud().
|
||
|
||
// Check structural completeness of a response. For non-JSON responses
|
||
// (lesson text, checkpoint hints) we treat "non-empty" as complete.
|
||
// For JSON-shaped responses (executor/reviewer actions) we check that
|
||
// braces match AND that JSON.parse succeeds on the first {...} block.
|
||
function isStructurallyComplete(text: string, shape: "json" | "text"): boolean {
|
||
if (!text || !text.trim()) return false;
|
||
if (shape === "text") return true;
|
||
// Strip ``` fences
|
||
let s = text.trim();
|
||
if (s.startsWith("```")) s = s.replace(/^```(?:json)?\n?/, "").replace(/```$/, "").trim();
|
||
const start = s.indexOf("{");
|
||
const end = s.lastIndexOf("}");
|
||
if (start < 0 || end <= start) return false;
|
||
const slice = s.slice(start, end + 1);
|
||
// Balance check — cheaper than JSON.parse and catches truncated nests
|
||
let depth = 0, inStr = false, esc = false;
|
||
for (const c of slice) {
|
||
if (esc) { esc = false; continue; }
|
||
if (c === "\\") { esc = true; continue; }
|
||
if (c === '"') { inStr = !inStr; continue; }
|
||
if (inStr) continue;
|
||
if (c === "{") depth++;
|
||
else if (c === "}") depth--;
|
||
}
|
||
if (depth !== 0) return false;
|
||
try { JSON.parse(slice); return true; } catch { return false; }
|
||
}
|
||
|
||
// Continue a truncated response. We do NOT ask the model to start over —
|
||
// we ask it to continue from exactly where it stopped. The partial goes
|
||
// in as scratchpad so it knows what's already committed.
|
||
async function continueResponse(
|
||
model: string,
|
||
originalPrompt: string,
|
||
partial: string,
|
||
opts: { max_tokens: number; temperature: number; system?: string; cloud: boolean; think?: boolean },
|
||
): Promise<string> {
|
||
const continuationPrompt = `${originalPrompt}
|
||
|
||
PARTIAL RESPONSE SO FAR (continue from here — do NOT restart, do NOT repeat what's already there, emit ONLY the remaining tokens to close the structure):
|
||
${partial}`;
|
||
const fn = opts.cloud ? generateCloud : generate;
|
||
const rest = await fn(model, continuationPrompt, {
|
||
max_tokens: opts.max_tokens,
|
||
temperature: opts.temperature,
|
||
system: opts.system,
|
||
bypass_budget: true, // the caller already checked; continuation doesn't double-count
|
||
think: opts.think,
|
||
});
|
||
return partial + rest;
|
||
}
|
||
|
||
// Output-overflow handler. Handles two distinct failure modes:
|
||
// (a) EMPTY response — thinking model ate the whole budget. Retry the
|
||
// ORIGINAL prompt with 2x the budget (geometric backoff up to cap).
|
||
// (b) TRUNCATED non-empty — model got most of the way there but hit
|
||
// max_tokens before closing. Continue with the partial as
|
||
// scratchpad.
|
||
// The two modes need different repair: (a) needs more budget, not
|
||
// continuation from ""; (b) needs scratchpad-glued continuation.
|
||
export async function generateContinuable(
|
||
model: string,
|
||
prompt: string,
|
||
opts: {
|
||
max_tokens?: number;
|
||
temperature?: number;
|
||
system?: string;
|
||
shape?: "json" | "text";
|
||
max_continuations?: number;
|
||
cloud?: boolean;
|
||
think?: boolean;
|
||
on_continuation?: (n: number, combined_len: number) => void;
|
||
} = {},
|
||
): Promise<string> {
|
||
const shape = opts.shape ?? "json";
|
||
const initialMax = opts.max_tokens ?? 800;
|
||
const maxConts = opts.max_continuations ?? 3;
|
||
const cloud = opts.cloud ?? false;
|
||
const budgetCap = 8000; // don't geometric-backoff forever
|
||
const fn = cloud ? generateCloud : generate;
|
||
|
||
let combined = "";
|
||
let currentMax = initialMax;
|
||
|
||
// Initial call + empty-response backoff loop.
|
||
for (let retry = 0; retry < 3; retry++) {
|
||
const out = await fn(model, prompt, {
|
||
max_tokens: currentMax,
|
||
temperature: opts.temperature,
|
||
system: opts.system,
|
||
think: opts.think,
|
||
});
|
||
if (out.trim().length > 0) { combined = out; break; }
|
||
// Empty — thinking model ate the budget. Double it and retry.
|
||
if (opts.on_continuation) opts.on_continuation(retry + 1, 0);
|
||
currentMax = Math.min(currentMax * 2, budgetCap);
|
||
}
|
||
|
||
// Structural completion loop (continuation from partial).
|
||
for (let i = 0; i < maxConts; i++) {
|
||
if (isStructurallyComplete(combined, shape)) return combined;
|
||
if (opts.on_continuation) opts.on_continuation(i + 1, combined.length);
|
||
combined = await continueResponse(model, prompt, combined, {
|
||
max_tokens: Math.min(currentMax, budgetCap),
|
||
temperature: opts.temperature ?? 0.3,
|
||
system: opts.system,
|
||
cloud,
|
||
think: opts.think,
|
||
});
|
||
}
|
||
// Last-resort: return combined even if incomplete; caller's parser
|
||
// will throw with the raw text for forensics rather than silently
|
||
// truncating.
|
||
return combined;
|
||
}
|
||
|
||
// Input-overflow handler. When the input corpus exceeds the window,
|
||
// shard → map → reduce with a running scratchpad digest.
|
||
//
|
||
// shards: array of input chunks the caller already split at semantic
|
||
// boundaries (paragraphs, records, playbook entries).
|
||
// map_prompt: fn taking (shard, running_scratchpad) → prompt for a
|
||
// single map call. Must fit within window.
|
||
// reduce_prompt: fn taking (combined_scratchpad) → final prompt. Also
|
||
// must fit window; if the scratchpad itself overflows, this triggers
|
||
// a recursive tree-split.
|
||
//
|
||
// Result: the reduce call's response.
|
||
export async function generateTreeSplit(
|
||
model: string,
|
||
shards: string[],
|
||
map_prompt: (shard: string, scratchpad: string) => string,
|
||
reduce_prompt: (scratchpad: string) => string,
|
||
opts: {
|
||
max_tokens?: number;
|
||
temperature?: number;
|
||
system?: string;
|
||
cloud?: boolean;
|
||
on_shard?: (i: number, total: number) => void;
|
||
scratchpad_budget?: number;
|
||
} = {},
|
||
): Promise<{ response: string; scratchpad: string; shards_processed: number }> {
|
||
const cloud = opts.cloud ?? false;
|
||
const scratchpadBudget = opts.scratchpad_budget ?? 6000;
|
||
let scratchpad = "";
|
||
|
||
for (let i = 0; i < shards.length; i++) {
|
||
if (opts.on_shard) opts.on_shard(i + 1, shards.length);
|
||
const shardPrompt = map_prompt(shards[i], scratchpad);
|
||
// If the per-shard prompt alone exceeds window, the caller sharded
|
||
// too coarsely — bubble up rather than silently truncating.
|
||
assertContextBudget(model, shardPrompt, {
|
||
system: opts.system,
|
||
max_tokens: opts.max_tokens,
|
||
bypass: false,
|
||
});
|
||
const shardOut = await generateContinuable(model, shardPrompt, {
|
||
max_tokens: opts.max_tokens ?? 800,
|
||
temperature: opts.temperature,
|
||
system: opts.system,
|
||
shape: "text",
|
||
cloud,
|
||
});
|
||
// Append to scratchpad; truncate oldest if over budget.
|
||
scratchpad = (scratchpad + `\n— shard ${i + 1}/${shards.length} digest —\n` + shardOut.trim()).slice(-scratchpadBudget * 4);
|
||
}
|
||
|
||
const reducePrompt = reduce_prompt(scratchpad);
|
||
assertContextBudget(model, reducePrompt, {
|
||
system: opts.system,
|
||
max_tokens: opts.max_tokens,
|
||
bypass: false,
|
||
});
|
||
const response = await generateContinuable(model, reducePrompt, {
|
||
max_tokens: opts.max_tokens ?? 1500,
|
||
temperature: opts.temperature,
|
||
system: opts.system,
|
||
shape: "text",
|
||
cloud,
|
||
});
|
||
return { response, scratchpad, shards_processed: shards.length };
|
||
}
|
||
|
||
// Known context windows — matches crates/../config/models.json. Kept in
|
||
// code as a fallback so the test harness doesn't crash if the config is
|
||
// missing. Production path should read from models.json.
|
||
export const CONTEXT_WINDOWS: Record<string, number> = {
|
||
"mistral:latest": 32768,
|
||
"qwen2.5:latest": 32768,
|
||
"qwen3:latest": 40960,
|
||
"qwen3.5:latest": 262144, // local 9.7B — new executor
|
||
"qwen3-embedding": 32768, // local embedding model
|
||
"nomic-embed-text-v2-moe": 2048, // local embedding, MoE
|
||
"gpt-oss:20b": 131072,
|
||
"gpt-oss:120b": 131072,
|
||
"qwen3.5:397b": 131072,
|
||
"kimi-k2-thinking": 200000,
|
||
"kimi-k2.6": 200000, // cloud — new T4 candidate
|
||
"kimi-k2:1t": 1048576,
|
||
"deepseek-v3.1:671b": 131072,
|
||
"glm-4.7": 131072,
|
||
};
|
||
const DEFAULT_CONTEXT_WINDOW = 32768;
|
||
const DEFAULT_SAFETY_MARGIN = 2000;
|
||
|
||
// Fail LOUDLY if a prompt would blow the model's context. The whole
|
||
// point of Phase 21 is to stop silent truncation — so we throw with the
|
||
// numbers. Callers that expect to handle overflow should chunk BEFORE
|
||
// calling; they can also set bypass: true to opt out (T5 gatekeeper
|
||
// handles its own overflow policy).
|
||
export function assertContextBudget(
|
||
model: string,
|
||
prompt: string,
|
||
opts: { system?: string; max_tokens?: number; safety_margin?: number; bypass?: boolean } = {}
|
||
): { estimated: number; window: number; remaining: number } {
|
||
const window = CONTEXT_WINDOWS[model] ?? DEFAULT_CONTEXT_WINDOW;
|
||
const safety = opts.safety_margin ?? DEFAULT_SAFETY_MARGIN;
|
||
const estimated = estimateTokens(prompt) + estimateTokens(opts.system ?? "") + (opts.max_tokens ?? 800);
|
||
const remaining = window - estimated - safety;
|
||
if (remaining < 0 && !opts.bypass) {
|
||
throw new Error(
|
||
`context overflow: model=${model} est=${estimated}t window=${window}t safety=${safety}t over=${-remaining}t. ` +
|
||
`Chunk the prompt (see config/models.json overflow_policies) or set bypass:true if you know the risk.`
|
||
);
|
||
}
|
||
return { estimated, window, remaining };
|
||
}
|
||
|
||
// --- Shared types ---
|
||
|
||
export type Role = "executor" | "reviewer";
|
||
|
||
export interface TaskSpec {
|
||
id: string;
|
||
operation: string; // "fill: Welder x2 in Columbus, OH"
|
||
target_role: string; // "Welder"
|
||
target_count: number; // 2
|
||
target_city: string; // "Columbus"
|
||
target_state: string; // "OH"
|
||
approach_hint?: string; // e.g. "hybrid search"; agent is free to ignore
|
||
}
|
||
|
||
export interface LogEntry {
|
||
turn: number;
|
||
role: Role;
|
||
model: string;
|
||
at: string;
|
||
kind:
|
||
| "plan"
|
||
| "tool_call"
|
||
| "tool_result"
|
||
| "critique"
|
||
| "propose_done"
|
||
| "consensus_done"
|
||
| "note"
|
||
| "error";
|
||
content: any;
|
||
}
|
||
|
||
// Action = what an agent returns on one turn. Strict shape so we can
|
||
// enforce it at parse time rather than prompt-engineer around malformed
|
||
// JSON.
|
||
export type Action =
|
||
| { kind: "tool_call"; tool: string; args: Record<string, any>; rationale: string }
|
||
| { kind: "propose_done"; fills: Fill[]; rationale: string }
|
||
| { kind: "critique"; verdict: "continue" | "drift" | "approve_done"; notes: string }
|
||
| { kind: "plan"; steps: string[] };
|
||
|
||
export interface Fill {
|
||
candidate_id: string;
|
||
name: string;
|
||
reason?: string; // optional — the schema used to require it; nothing
|
||
// downstream consumed it, and qwen3.5 would emit
|
||
// 100-150 char justifications per fill that blew
|
||
// the JSON budget on 4+ fills. Kept optional so
|
||
// models that still emit it don't break parsing.
|
||
}
|
||
|
||
// --- HTTP helpers (fail-fast) ---
|
||
|
||
async function http<T>(method: string, url: string, body?: any): Promise<T> {
|
||
const res = await fetch(url, {
|
||
method,
|
||
headers: { "Content-Type": "application/json" },
|
||
body: body ? JSON.stringify(body) : undefined,
|
||
});
|
||
if (!res.ok) {
|
||
const text = await res.text();
|
||
throw new Error(`${method} ${url} → ${res.status}: ${text}`);
|
||
}
|
||
return (await res.json()) as T;
|
||
}
|
||
|
||
// Tool calls land in the Phase 12 audit log keyed by this agent name.
|
||
// Distinguishable from human-driven calls (agent=="operator" or similar)
|
||
// so post-hoc queries can separate multi-agent runs.
|
||
export const TOOL_AGENT_ID = "multi-agent-test";
|
||
|
||
export async function callTool(tool: string, args: Record<string, any>): Promise<any> {
|
||
return http("POST", `${GATEWAY}/tools/${tool}/call`, {
|
||
params: args,
|
||
agent: TOOL_AGENT_ID,
|
||
});
|
||
}
|
||
|
||
// Default k=20 is a floor, not a ceiling — executor prompt instructs
|
||
// models to scale k to 5× target_count (cap 80) so multi-fill events
|
||
// get a meaningfully deep pool to rank within.
|
||
export async function hybridSearch(sql_filter: string, question: string, k = 20): Promise<any> {
|
||
return http("POST", `${GATEWAY}/vectors/hybrid`, { sql_filter, question, k });
|
||
}
|
||
|
||
export async function sqlQuery(sql: string): Promise<any> {
|
||
return http("POST", `${GATEWAY}/query/sql`, { sql, format: "json" });
|
||
}
|
||
|
||
// Sidecar generate. Ollama's default keep_alive (5 min) keeps the model
|
||
// warm between turns on its own, so we don't need to pass it through.
|
||
export async function generate(model: string, prompt: string, opts: {
|
||
max_tokens?: number;
|
||
temperature?: number;
|
||
system?: string;
|
||
bypass_budget?: boolean;
|
||
think?: boolean;
|
||
} = {}): Promise<string> {
|
||
assertContextBudget(model, prompt, {
|
||
system: opts.system,
|
||
max_tokens: opts.max_tokens,
|
||
bypass: opts.bypass_budget,
|
||
});
|
||
const body: Record<string, any> = {
|
||
model,
|
||
prompt,
|
||
temperature: opts.temperature ?? 0.3,
|
||
max_tokens: opts.max_tokens ?? 800,
|
||
};
|
||
if (opts.system) body.system = opts.system;
|
||
if (opts.think !== undefined) body.think = opts.think;
|
||
const r = await http<any>("POST", `${SIDECAR}/generate`, body);
|
||
const text = typeof r.text === "string" ? r.text : "";
|
||
// Do NOT throw on empty. Thinking models (gpt-oss, qwen3.5) burn the
|
||
// max_tokens budget on hidden reasoning and emit "" when budget was
|
||
// too tight. generateContinuable detects empty + continues with more
|
||
// budget. Callers that expected non-empty can check themselves.
|
||
return text;
|
||
}
|
||
|
||
// Cloud generate — hits Ollama Cloud directly with the bearer key. Same
|
||
// /api/generate shape as local Ollama; `thinking` field (for gpt-oss:Nb)
|
||
// is discarded, only `response` is returned. Caller should budget
|
||
// num_predict ≥ 400 so thinking-model reasoning has room before the
|
||
// visible response starts.
|
||
export async function generateCloud(model: string, prompt: string, opts: {
|
||
max_tokens?: number;
|
||
temperature?: number;
|
||
system?: string;
|
||
bypass_budget?: boolean;
|
||
think?: boolean;
|
||
} = {}): Promise<string> {
|
||
if (!OLLAMA_CLOUD_KEY) {
|
||
throw new Error("OLLAMA_CLOUD_KEY not set; cannot reach Ollama Cloud");
|
||
}
|
||
assertContextBudget(model, prompt, {
|
||
system: opts.system,
|
||
max_tokens: opts.max_tokens,
|
||
bypass: opts.bypass_budget,
|
||
});
|
||
const body: Record<string, any> = {
|
||
model,
|
||
prompt,
|
||
stream: false,
|
||
options: {
|
||
temperature: opts.temperature ?? 0.3,
|
||
num_predict: Math.max(opts.max_tokens ?? 800, 400),
|
||
},
|
||
};
|
||
if (opts.system) body.system = opts.system;
|
||
if (opts.think !== undefined) body.think = opts.think;
|
||
const resp = await fetch(`${OLLAMA_CLOUD_URL}/api/generate`, {
|
||
method: "POST",
|
||
headers: {
|
||
"Authorization": `Bearer ${OLLAMA_CLOUD_KEY}`,
|
||
"Content-Type": "application/json",
|
||
},
|
||
body: JSON.stringify(body),
|
||
});
|
||
if (!resp.ok) {
|
||
throw new Error(`Ollama Cloud ${resp.status}: ${await resp.text().catch(() => "?")}`);
|
||
}
|
||
const data: any = await resp.json();
|
||
const text = typeof data.response === "string" ? data.response : "";
|
||
// Same non-throw policy as local generate() — empty text is a valid
|
||
// signal that thinking ate the budget. Let generateContinuable retry.
|
||
return text;
|
||
}
|
||
|
||
// --- Prompt construction ---
|
||
|
||
const TOOL_CATALOG = `
|
||
Available tools (each takes a JSON "args" object):
|
||
|
||
- hybrid_search(sql_filter: string, question: string, index_name: string, k?: number)
|
||
→ Narrow workers via SQL WHERE clause, then rank by semantic match.
|
||
→ Canonical production tool for fill tasks. Always use this FIRST.
|
||
→ Example args:
|
||
{"index_name":"workers_500k_v1",
|
||
"sql_filter":"role = 'Forklift Operator' AND city = 'Toledo' AND state = 'OH' AND CAST(availability AS DOUBLE) > 0.5",
|
||
"question":"reliable forklift operator Toledo",
|
||
"k":40}
|
||
→ k should scale with target_count: roughly 5× the number of fills
|
||
needed, floor 20, cap 80. For 1-2 fills use k=20. For 5 fills use
|
||
k=40. A deep pool lets the ranker discriminate across a larger
|
||
candidate set; k=10 was too tight for multi-fill events.
|
||
|
||
- sql(query: string)
|
||
→ Raw read-only SELECT. Use for verification (confirm a worker exists,
|
||
check city/role/availability) after hybrid_search surfaces candidates.
|
||
→ Schema of workers_500k: worker_id, name, role, email, phone, city,
|
||
state, zip, skills, certifications, archetype, reliability,
|
||
responsiveness, engagement, communications, compliance, availability,
|
||
resume_text.
|
||
→ Example args:
|
||
{"query":"SELECT worker_id, name, role, city, state FROM workers_500k WHERE worker_id = 40123"}
|
||
|
||
Rules:
|
||
- hybrid_search returns sources[] each with {doc_id, chunk_text, score,
|
||
sql_verified, playbook_boost, playbook_citations}.
|
||
- **ID mapping:** vector doc_ids look like "W500K-7995" (prefix + number).
|
||
The SQL worker_id is an INTEGER. Use WHERE worker_id = 7995 directly.
|
||
- Names are NOT unique. Always identify by worker_id.
|
||
- availability and reliability are stored as text; ALWAYS cast as
|
||
DOUBLE in filters: CAST(availability AS DOUBLE) > 0.5.
|
||
- Narrative words from the guidance ("shift", "recurring", "expansion",
|
||
"emergency") are NOT columns. Only use columns listed above.
|
||
- Return EXACTLY ONE JSON object per turn. No markdown fences, no prose.
|
||
`;
|
||
|
||
// Smart per-kind summary so agents see the substance of each prior turn
|
||
// without a raw-JSON wall of text. hybrid_search results especially need
|
||
// this — raw JSON buries sources[] past any reasonable 400-char truncation.
|
||
function summarizeEntry(e: LogEntry): string {
|
||
const c = e.content ?? {};
|
||
switch (e.kind) {
|
||
case "plan":
|
||
return `PLAN: ${(c.steps ?? []).map((s: string, i: number) => `${i + 1}.${s}`).join(" ")}`;
|
||
case "tool_call":
|
||
return `TOOL_CALL ${c.tool}(${JSON.stringify(c.args ?? {}).slice(0, 250)})${c.rationale ? ` — ${c.rationale}` : ""}`;
|
||
case "tool_result": {
|
||
if (c.error) return `TOOL_RESULT error: ${c.error}`;
|
||
// hybrid_search response
|
||
if (Array.isArray(c.sources)) {
|
||
const head = c.sources.slice(0, 5).map((s: any) =>
|
||
`${s.doc_id}${s.sql_verified ? "✓" : ""} score=${(s.score ?? 0).toFixed(2)}: ${String(s.chunk_text ?? "").slice(0, 80)}`
|
||
).join(" | ");
|
||
return `TOOL_RESULT hybrid: sql_matches=${c.sql_matches} vector_reranked=${c.vector_reranked} sources=[${head}${c.sources.length > 5 ? ` +${c.sources.length - 5} more` : ""}]`;
|
||
}
|
||
// sql response
|
||
if (Array.isArray(c.rows)) {
|
||
const head = c.rows.slice(0, 5).map((r: any) => JSON.stringify(r)).join(" | ");
|
||
return `TOOL_RESULT sql: ${c.rows.length} rows${c.rows.length > 0 ? ` — ${head}${c.rows.length > 5 ? ` +${c.rows.length - 5} more` : ""}` : ""}`;
|
||
}
|
||
// fallback
|
||
return `TOOL_RESULT ${JSON.stringify(c).slice(0, 250)}`;
|
||
}
|
||
case "critique":
|
||
return `CRITIQUE verdict=${c.verdict} notes: ${String(c.notes ?? "").slice(0, 200)}`;
|
||
case "propose_done":
|
||
return `PROPOSE_DONE fills=[${(c.fills ?? []).map((f: Fill) => `${f.candidate_id}:${f.name}`).join(", ")}] rationale: ${String(c.rationale ?? "").slice(0, 120)}`;
|
||
case "consensus_done":
|
||
return `CONSENSUS ✓`;
|
||
case "error":
|
||
return `ERROR ${c.message ?? JSON.stringify(c)}`;
|
||
}
|
||
return JSON.stringify(c).slice(0, 200);
|
||
}
|
||
|
||
function renderLogForPrompt(log: LogEntry[]): string {
|
||
if (log.length === 0) return "(no turns yet)";
|
||
return log.slice(-12).map(e =>
|
||
`[t${e.turn} ${e.role}] ${summarizeEntry(e)}`
|
||
).join("\n");
|
||
}
|
||
|
||
// Crawl the log for every hybrid_search tool_result and collect the
|
||
// worker names + ids seen so far. LLMs routinely "forget" earlier turns
|
||
// once the conversation grows, so we surface a running ledger in the
|
||
// prompt as orchestrator-maintained state. The executor doesn't have to
|
||
// track this itself — it just reads it.
|
||
function candidatesSeen(log: LogEntry[]): Array<{ doc_id: string; name: string; city: string; state: string }> {
|
||
const seen = new Map<string, { doc_id: string; name: string; city: string; state: string }>();
|
||
for (const e of log) {
|
||
if (e.kind !== "tool_result") continue;
|
||
const sources = (e.content as any)?.sources;
|
||
if (!Array.isArray(sources)) continue;
|
||
for (const s of sources) {
|
||
// chunk_text shape "Name — Role in City, ST. …"
|
||
const t = String(s.chunk_text ?? "");
|
||
const [namePart, rest] = t.split("—", 2);
|
||
if (!namePart || !rest) continue;
|
||
const loc = rest.split(" in ")[1] ?? "";
|
||
const [city, stateRaw] = loc.split(",", 2);
|
||
const state = (stateRaw ?? "").trim().replace(/[^A-Za-z].*/, "");
|
||
if (!s.doc_id || !namePart.trim() || !city?.trim() || !state) continue;
|
||
if (!seen.has(s.doc_id)) {
|
||
seen.set(s.doc_id, {
|
||
doc_id: s.doc_id,
|
||
name: namePart.trim(),
|
||
city: city.trim(),
|
||
state,
|
||
});
|
||
}
|
||
}
|
||
}
|
||
return Array.from(seen.values());
|
||
}
|
||
|
||
export function executorPrompt(task: TaskSpec, log: LogEntry[]): string {
|
||
const logStr = renderLogForPrompt(log);
|
||
const seen = candidatesSeen(log);
|
||
const seenBlock = seen.length === 0
|
||
? "(no candidates surfaced yet — start with hybrid_search)"
|
||
: seen.map(s => ` - ${s.doc_id} ${s.name} (${s.city}, ${s.state})`).join("\n");
|
||
|
||
return `You are the EXECUTOR agent. Your job is to complete this task:
|
||
|
||
OPERATION: ${task.operation}
|
||
TARGET: ${task.target_count} × ${task.target_role} in ${task.target_city}, ${task.target_state}
|
||
${task.approach_hint ? `HINT: ${task.approach_hint}` : ""}
|
||
|
||
The REVIEWER agent is watching every turn. They will flag drift. Stay on target.
|
||
|
||
${TOOL_CATALOG}
|
||
|
||
CANDIDATES SURFACED SO FAR (orchestrator-tracked, do not forget these):
|
||
${seenBlock}
|
||
|
||
SHARED LOG (recent turns):
|
||
${logStr}
|
||
|
||
Your next action MUST be a JSON object matching one of these shapes:
|
||
{"kind":"plan","steps":["short step 1","short step 2",...]}
|
||
— use on turn 1 to outline your approach. Steps must be concrete.
|
||
{"kind":"tool_call","tool":"...","args":{...},"rationale":"why"}
|
||
— call a tool and see its result next turn.
|
||
{"kind":"propose_done","fills":[{"candidate_id":"...","name":"First Last"}],"rationale":"..."}
|
||
— propose you've met the target. fills MUST have EXACTLY ${task.target_count} entries — count twice before emitting. Each fill is ONLY {candidate_id, name} — no reason field, no scores, no commentary.
|
||
|
||
Strategy tip: once "CANDIDATES SURFACED SO FAR" has ≥ ${task.target_count} entries in ${task.target_city}, ${task.target_state} matching ${task.target_role}, verify ONE via the sql tool (to satisfy the reviewer's SQL-verification criterion) and then propose_done with the top ${task.target_count}. Don't keep re-searching.
|
||
|
||
Respond with ONLY the JSON object. No markdown fences, no prose.`;
|
||
}
|
||
|
||
export function reviewerPrompt(task: TaskSpec, log: LogEntry[]): string {
|
||
const logStr = renderLogForPrompt(log);
|
||
|
||
// If the most recent executor action was propose_done, the reviewer
|
||
// must commit to an up-or-down vote this turn — "continue" would stall
|
||
// the orchestrator forever. The wider prompt still describes all three
|
||
// verdicts, but we add a hard rule at the end that the model must obey.
|
||
const lastExec = [...log].reverse().find(e => e.role === "executor");
|
||
const awaitingApproval = lastExec?.kind === "propose_done";
|
||
|
||
return `You are the REVIEWER agent. The EXECUTOR is trying to complete this task:
|
||
|
||
OPERATION: ${task.operation}
|
||
TARGET: ${task.target_count} × ${task.target_role} in ${task.target_city}, ${task.target_state}
|
||
|
||
Your job: catch drift. Agents often wander from the actual objective. Specifically watch for:
|
||
- Proposing candidates who aren't in ${task.target_city}, ${task.target_state}.
|
||
- Proposing candidates who don't have ${task.target_role} skill.
|
||
- Proposing fewer or more than ${task.target_count} fills.
|
||
- Irrelevant tool calls (e.g. revenue_by_client when the task is a fill).
|
||
|
||
Available tools (for reference, but YOU don't call them):
|
||
- hybrid_search(sql_filter, question, index_name, k) — production fill path
|
||
- sql(query) — read-only SELECT for verification
|
||
|
||
SHARED LOG (recent turns):
|
||
${logStr}
|
||
|
||
Your next action MUST be a JSON object:
|
||
{"kind":"critique","verdict":"continue" | "drift" | "approve_done","notes":"..."}
|
||
|
||
- "continue" → executor is on a reasonable path, let them keep going.
|
||
- "drift" → executor is off-track; notes MUST tell them how to redirect.
|
||
- "approve_done" → executor's propose_done meets the criteria. Seal it.
|
||
|
||
APPROVAL CRITERIA (use these only for propose_done):
|
||
1. Exactly ${task.target_count} fills.
|
||
2. Each fill's name appears in a prior tool_result from ${task.target_city}, ${task.target_state} matching role "${task.target_role}".
|
||
3. Executor has SQL-verified at least one of the fills (any prior sql tool_result with that worker).
|
||
If 1–3 all hold, return approve_done. Do not demand further verification.
|
||
${awaitingApproval ? `
|
||
|
||
HARD RULE: The executor's most recent action was propose_done. On this turn you CANNOT return "continue" — it would stall the task. Choose approve_done (proposal is valid by the 3 criteria above) or drift (it fails one; state which in notes).` : ""}
|
||
|
||
Respond with ONLY the JSON object.`;
|
||
}
|
||
|
||
// Parse an agent's response into an Action, or throw.
|
||
export function parseAction(raw: string, role: Role): Action {
|
||
// Models sometimes wrap JSON in ```json fences; strip them.
|
||
let s = raw.trim();
|
||
if (s.startsWith("```")) {
|
||
s = s.replace(/^```(?:json)?\n?/, "").replace(/```$/, "").trim();
|
||
}
|
||
// Find the first {...} block.
|
||
const start = s.indexOf("{");
|
||
const end = s.lastIndexOf("}");
|
||
if (start < 0 || end <= start) {
|
||
throw new Error(`no JSON object in ${role} response: ${raw.slice(0, 300)}`);
|
||
}
|
||
let json = s.slice(start, end + 1);
|
||
// Soft-tolerate common model mistakes: stray `)` before closing brace
|
||
// (qwen2.5 does this on tool_call), trailing commas, etc. Fix the
|
||
// cheapest ones that are unambiguous.
|
||
json = json.replace(/\)\s*\}/g, "}"); // "...)}" → "...}"
|
||
json = json.replace(/,(\s*[}\]])/g, "$1"); // trailing comma before } or ]
|
||
let obj: any;
|
||
try {
|
||
obj = JSON.parse(json);
|
||
} catch (e) {
|
||
throw new Error(`invalid JSON from ${role}: ${(e as Error).message} | raw: ${json.slice(0, 300)}`);
|
||
}
|
||
|
||
if (role === "executor") {
|
||
if (obj.kind === "plan" && Array.isArray(obj.steps)) return obj as Action;
|
||
if (obj.kind === "tool_call" && typeof obj.tool === "string" && typeof obj.args === "object") return obj as Action;
|
||
if (obj.kind === "propose_done" && Array.isArray(obj.fills)) return obj as Action;
|
||
// Tolerance: some model outputs put a stray closing paren or
|
||
// trailing garbage after the main object. If the kind looks
|
||
// recognizable but shape doesn't match, bubble a cleaner error so
|
||
// the orchestrator's soft-fail path doesn't swallow it.
|
||
throw new Error(`executor returned unexpected shape: ${JSON.stringify(obj).slice(0, 200)}`);
|
||
} else {
|
||
// Normalize: some models (qwen2.5, mistral) emit the verdict AS the
|
||
// `kind` field directly instead of nesting it under a "critique"
|
||
// wrapper. Accept both shapes rather than hard-failing — the
|
||
// semantic content is identical, and rejecting would stall the
|
||
// orchestrator on a cosmetic schema miss.
|
||
if (obj.kind === "critique" && ["continue", "drift", "approve_done"].includes(obj.verdict)) {
|
||
return obj as Action;
|
||
}
|
||
if (["continue", "drift", "approve_done"].includes(obj.kind)) {
|
||
return { kind: "critique", verdict: obj.kind, notes: obj.notes ?? "" } as Action;
|
||
}
|
||
throw new Error(`reviewer returned unexpected shape: ${JSON.stringify(obj).slice(0, 200)}`);
|
||
}
|
||
}
|