//! `ExecutionLoop` — the Rust port of `tests/multi-agent/orchestrator.ts`. //! //! Incremental port (2026-04-23). Pieces in order of landing: //! 1. ✅ Playbook-boost context retrieval //! 2. ✅ Executor turn via the shared ollama::chat path //! 3. ✅ Reviewer turn + critique parse (this commit) //! 4. ⬜ Tool-call dispatch — hybrid_search / sql / Phase-12 tools (orchestrator.ts:101-124) //! 5. ✅ Consensus detection + drift counter (this commit) //! 6. ⬜ Truth-layer gate (Phase 42 — refuse before burning tokens) //! 7. ⬜ Validator call (Phase 43 stub) //! 8. ⬜ Cloud escalation on repeat failure (T3 gpt-oss:120b) //! 9. ⬜ Playbook seal + /vectors/playbook_memory/seed (orchestrator.ts:255-293) //! 10. ⬜ KB write-through: outcomes + facts (Phase 22) pub mod kb_context; use serde::{Deserialize, Serialize}; use crate::v1::{respond::RespondRequest, V1State}; use kb_context::KbContext; const DEFAULT_EXECUTOR_MODEL: &str = "qwen3.5:latest"; const DEFAULT_REVIEWER_MODEL: &str = "qwen3:latest"; const DEFAULT_MAX_TURNS: u32 = 12; /// Matches orchestrator.ts:31. Three consecutive drift flags OR tool /// errors aborts the loop — the executor isn't self-correcting. const MAX_CONSECUTIVE_DRIFTS: u32 = 3; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct LogEntry { pub turn: u32, pub role: String, pub model: String, pub kind: String, pub content: serde_json::Value, pub at: String, } impl LogEntry { fn new(turn: u32, role: &str, model: &str, kind: &str, content: serde_json::Value) -> Self { Self { turn, role: role.to_string(), model: model.to_string(), kind: kind.to_string(), content, at: chrono::Utc::now().to_rfc3339(), } } } /// Action = what an agent returns on one turn. PORT FROM agent.ts:312. /// Strict-shape enum so the executor/reviewer can't wedge the loop /// with ambiguous output — either it parses, or `parse_action` throws /// and the orchestrator appends an error turn. #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum Action { Plan { steps: Vec }, ToolCall { tool: String, args: serde_json::Value, #[serde(default)] rationale: String }, ProposeDone { fills: Vec, #[serde(default)] rationale: String }, Critique { verdict: Verdict, #[serde(default)] notes: String }, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[serde(rename_all = "snake_case")] pub enum Verdict { Continue, Drift, ApproveDone, } #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Fill { pub candidate_id: String, pub name: String, /// Optional — legacy models still emit it. agent.ts:321 rationale. #[serde(default, skip_serializing_if = "Option::is_none")] pub reason: Option, } pub enum RespondOutcome { Ok { artifact: serde_json::Value, log: Vec }, Failed { reason: String, log: Vec }, // Constructed by the truth-gate check in run_inner (step 6, 2026-04-24). Blocked { reason: String, log: Vec }, } impl RespondOutcome { pub fn artifact(&self) -> serde_json::Value { match self { Self::Ok { artifact, .. } => artifact.clone(), _ => serde_json::Value::Null, } } pub fn into_log(self) -> Vec { match self { Self::Ok { log, .. } | Self::Failed { log, .. } | Self::Blocked { log, .. } => log, } } } pub struct ExecutionLoop { state: V1State, req: RespondRequest, log: Vec, turns_used: u32, stats: LoopStats, /// Phase 20 budget — at most one T3 overseer call per loop /// invocation. Cloud calls cost real money and the whole point is /// "hyperfocus local + one strategic cloud nudge", not a cloud /// retry loop. See docs/CONTROL_PLANE_PRD.md §4.3. overseer_called: bool, } /// Per-invocation usage accumulator. Separate from the gateway-wide /// `V1State.usage` (which is lifetime-across-all-requests) so the /// outcomes row can stamp this-task tokens/latency without subtracting /// two snapshots. #[derive(Default, Clone, Serialize)] pub struct LoopStats { pub requests: u64, pub prompt_tokens: u64, pub completion_tokens: u64, pub total_tokens: u64, pub latency_ms: u64, } impl ExecutionLoop { pub fn new(state: V1State, req: RespondRequest) -> Self { Self { state, req, log: Vec::new(), turns_used: 0, stats: LoopStats::default(), overseer_called: false, } } pub fn turns_used(&self) -> u32 { self.turns_used } pub async fn run(&mut self) -> Result { let outcome = self.run_inner().await?; Ok(self.finalize(outcome).await) } async fn run_inner(&mut self) -> Result { let executor_model = self.req.executor_model .as_deref().unwrap_or(DEFAULT_EXECUTOR_MODEL).to_string(); let reviewer_model = self.req.reviewer_model .as_deref().unwrap_or(DEFAULT_REVIEWER_MODEL).to_string(); let max_turns = self.req.max_turns.unwrap_or(DEFAULT_MAX_TURNS); // --- (6) TRUTH GATE — Phase 42 wiring (2026-04-24) --- // Evaluate truth rules for the request's task_class against a // ctx built from the spec. Any rule whose condition holds AND // whose action is Reject/Block short-circuits to Blocked before // the executor loop runs. Mirrors queryd/service.rs SQL gate. let truth_store = truth::default_truth_store(); for outcome in truth_store.evaluate(&self.req.task_class, &self.req.spec) { if !outcome.passed { continue; } if let truth::RuleAction::Reject { message } | truth::RuleAction::Block { message } = &outcome.action { let reason = format!("truth rule {} blocked: {message}", outcome.rule_id); self.append(LogEntry::new(0, "system", "truth", "block", serde_json::json!({ "rule_id": outcome.rule_id, "reason": reason.clone() }))); return Ok(RespondOutcome::Blocked { reason, log: self.log.clone() }); } } // --- (1) PLAYBOOK BOOST --- let boost = self.fetch_playbook_boost(&self.req.operation).await.unwrap_or_default(); if !boost.is_empty() { self.append(LogEntry::new( 0, "system", "playbook_memory", "boost_loaded", serde_json::json!({ "count": boost.len(), "preview": boost.iter().take(3).collect::>() }), )); } let mut consecutive_drifts: u32 = 0; // --- MAIN TURN LOOP --- for turn in 1..=max_turns { self.turns_used = turn; // --- (2) EXECUTOR TURN --- let executor_prompt = build_executor_prompt(&self.req, &boost, &self.log); let executor_raw = self.chat_once(&executor_model, &executor_prompt, 0.2, false).await?; let exec_action = match parse_action(&executor_raw, Role::Executor) { Ok(a) => a, Err(e) => { self.append(LogEntry::new( turn, "executor", &executor_model, "error", serde_json::json!({ "message": e, "raw": truncate(&executor_raw, 400) }), )); return Ok(RespondOutcome::Failed { reason: format!("executor parse failure on turn {turn}: {e}"), log: std::mem::take(&mut self.log), }); } }; self.append(LogEntry::new( turn, "executor", &executor_model, action_kind(&exec_action), action_content(&exec_action), )); // --- (4) TOOL DISPATCH — PORT FROM orchestrator.ts:101-124 --- // Soft-fail: a tool error is a log entry, not a loop abort. // The executor reads its own error next turn and self-corrects // (orchestrator.ts:169-189). Only MAX_CONSECUTIVE_DRIFTS tool // errors in a row → hard abort. if let Action::ToolCall { tool, args, .. } = &exec_action { match self.dispatch_tool(tool, args).await { Ok(result) => { let trimmed = trim_result(&result); self.append(LogEntry::new( turn, "executor", &executor_model, "tool_result", trimmed, )); } Err(e) => { self.append(LogEntry::new( turn, "executor", &executor_model, "tool_result", serde_json::json!({ "error": e, "tool": tool, "args": args }), )); consecutive_drifts += 1; if consecutive_drifts >= MAX_CONSECUTIVE_DRIFTS { return Ok(RespondOutcome::Failed { reason: format!( "aborting — {MAX_CONSECUTIVE_DRIFTS} consecutive tool errors, executor can't self-correct" ), log: std::mem::take(&mut self.log), }); } } } } // --- (3) REVIEWER TURN --- let reviewer_prompt = build_reviewer_prompt(&self.req, &self.log); let reviewer_raw = self.chat_once(&reviewer_model, &reviewer_prompt, 0.1, false).await?; let rev_action = match parse_action(&reviewer_raw, Role::Reviewer) { Ok(a) => a, Err(e) => { self.append(LogEntry::new( turn, "reviewer", &reviewer_model, "error", serde_json::json!({ "message": e, "raw": truncate(&reviewer_raw, 400) }), )); return Ok(RespondOutcome::Failed { reason: format!("reviewer parse failure on turn {turn}: {e}"), log: std::mem::take(&mut self.log), }); } }; self.append(LogEntry::new( turn, "reviewer", &reviewer_model, "critique", action_content(&rev_action), )); let verdict = match &rev_action { Action::Critique { verdict, .. } => verdict.clone(), _ => { return Ok(RespondOutcome::Failed { reason: format!("reviewer emitted non-critique on turn {turn}"), log: std::mem::take(&mut self.log), }); } }; // --- (5) CONSENSUS DETECTION + DRIFT COUNTER --- if verdict == Verdict::Drift { consecutive_drifts += 1; // --- (8) OVERSEER ESCALATION --- // One chance before abort: when the local loop is // about to give up, call the T3 overseer with the KB // context (what worked / didn't on this task class // historically) + the recent log tail. The overseer // emits a correction which feeds back into the next // executor turn. Only fires once per loop to honor // Phase 20 "1-3 calls/scenario" budget. if consecutive_drifts == MAX_CONSECUTIVE_DRIFTS.saturating_sub(1) && !self.overseer_called { if let Err(e) = self.escalate_to_overseer(turn, "drift_approaching_abort").await { tracing::warn!("overseer escalation failed: {e}"); } // Reset so the executor gets one clean turn with // the correction in context before we re-evaluate. consecutive_drifts = 0; } else if consecutive_drifts >= MAX_CONSECUTIVE_DRIFTS { return Ok(RespondOutcome::Failed { reason: format!( "aborting — {MAX_CONSECUTIVE_DRIFTS} consecutive drift flags, executor can't self-correct (overseer_called={})", self.overseer_called, ), log: std::mem::take(&mut self.log), }); } } else { consecutive_drifts = 0; } if let (Action::ProposeDone { fills, rationale }, Verdict::ApproveDone) = (&exec_action, &verdict) { let target_count = spec_target_count(&self.req.spec); if target_count > 0 && fills.len() as u64 != target_count { return Ok(RespondOutcome::Failed { reason: format!( "consensus malformed — {} fills vs target {}", fills.len(), target_count ), log: std::mem::take(&mut self.log), }); } self.append(LogEntry::new( turn, "reviewer", &reviewer_model, "consensus_done", serde_json::json!({ "fills": fills }), )); // Seal + write-through runs in `finalize` after this // returns — outcomes row + playbook_memory seed with // retries + stats stamping all land there. let artifact = serde_json::json!({ "fills": fills, "approach": rationale, "turns": turn, }); return Ok(RespondOutcome::Ok { artifact, log: std::mem::take(&mut self.log), }); } } Ok(RespondOutcome::Failed { reason: format!("no consensus after {max_turns} turns — task incomplete"), log: std::mem::take(&mut self.log), }) } fn append(&mut self, e: LogEntry) { tracing::debug!(turn = e.turn, role = %e.role, kind = %e.kind, "execution_loop"); self.log.push(e); } /// Dispatch: model name prefix → provider. /// Local path uses Phase 21 `generate_continuable` (auto-continuation, /// retry on empty thinking-model response). Cloud path hits /// Ollama Cloud directly — no continuation since cloud budgets are /// generous and Phase 21's Rust port is local-only. Truncation on /// cloud surfaces as a parse failure in the loop; that's fail-fast /// and a real signal (we want to know when cloud didn't finish). async fn chat_once( &mut self, model: &str, prompt: &str, temperature: f64, think: bool, ) -> Result { let is_cloud = is_cloud_model(model); let provider = if is_cloud { "ollama_cloud" } else { "ollama" }; let start_time = chrono::Utc::now(); let started = std::time::Instant::now(); let (text, prompt_tokens, completion_tokens, calls) = if is_cloud { let key = self.state.ollama_cloud_key.as_deref().ok_or_else(|| { format!("cloud model {model} requested but OLLAMA_CLOUD_KEY not configured") })?; use crate::v1::{ChatRequest, Message}; // Cloud path: retry up to 3× on empty response. gpt-oss:* // models sometimes return empty after internal reasoning // — this is the cloud-side analog of Phase 21's empty- // response backoff, inlined since generate_continuable is // local-only. let mut text = String::new(); let mut tokens_p = 0u32; let mut tokens_c = 0u32; let mut attempts = 0u32; for attempt in 0..3 { attempts = attempt + 1; let req = ChatRequest { model: model.to_string(), messages: vec![Message { role: "user".into(), content: prompt.to_string() }], temperature: Some(temperature), max_tokens: None, stream: Some(false), think: Some(think), provider: Some("ollama_cloud".into()), }; let resp = crate::v1::ollama_cloud::chat(key, &req).await .map_err(|e| format!("ollama_cloud: {e}"))?; tokens_p = tokens_p.saturating_add(resp.usage.prompt_tokens); tokens_c = tokens_c.saturating_add(resp.usage.completion_tokens); let t = resp.choices.into_iter().next() .map(|c| c.message.content).unwrap_or_default(); if !t.trim().is_empty() { text = t; break; } tracing::warn!(model = %model, attempt, "cloud returned empty, retrying"); } (text, tokens_p, tokens_c, attempts) } else { use aibridge::continuation::{generate_continuable, ContinuableOpts, ResponseShape}; let mut opts = ContinuableOpts::new(model); opts.temperature = Some(temperature); opts.think = Some(think); opts.shape = ResponseShape::Json; let outcome = generate_continuable(&self.state.ai_client, prompt, &opts).await?; if outcome.empty_retries > 0 || outcome.continuations > 0 || !outcome.final_complete { tracing::info!( model = %model, empty_retries = outcome.empty_retries, continuations = outcome.continuations, final_complete = outcome.final_complete, calls = outcome.calls, "execution_loop.chat_once: continuation telemetry" ); } (outcome.text, outcome.prompt_tokens, outcome.completion_tokens, outcome.calls) }; let elapsed_ms = started.elapsed().as_millis() as u64; let end_time = chrono::Utc::now(); // Langfuse trace — uniform across local + cloud, provider tag // lets the bridge / observer differentiate downstream. if let Some(lf) = &self.state.langfuse { use crate::v1::{langfuse_trace::ChatTrace, Message}; lf.emit_chat(ChatTrace { provider: provider.to_string(), model: model.to_string(), input: vec![Message { role: "user".into(), content: prompt.to_string() }], output: text.clone(), prompt_tokens, completion_tokens, temperature: Some(temperature), max_tokens: None, think: Some(think), start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), latency_ms: elapsed_ms, }); } // Per-task stats (stamps the outcomes row) + gateway-wide // /v1/usage counters. Both updated uniformly; the by_provider // split lets operators see the local/cloud mix per task. let total_tokens = (prompt_tokens + completion_tokens) as u64; self.stats.requests = self.stats.requests.saturating_add(calls as u64); self.stats.prompt_tokens = self.stats.prompt_tokens.saturating_add(prompt_tokens as u64); self.stats.completion_tokens = self.stats.completion_tokens.saturating_add(completion_tokens as u64); self.stats.total_tokens = self.stats.total_tokens.saturating_add(total_tokens); self.stats.latency_ms += elapsed_ms; { let mut u = self.state.usage.write().await; u.requests = u.requests.saturating_add(calls as u64); u.prompt_tokens = u.prompt_tokens.saturating_add(prompt_tokens as u64); u.completion_tokens = u.completion_tokens.saturating_add(completion_tokens as u64); u.total_tokens = u.total_tokens.saturating_add(total_tokens); let pu = u.by_provider.entry(provider.to_string()).or_default(); pu.requests = pu.requests.saturating_add(calls as u64); pu.prompt_tokens = pu.prompt_tokens.saturating_add(prompt_tokens as u64); pu.completion_tokens = pu.completion_tokens.saturating_add(completion_tokens as u64); pu.total_tokens = pu.total_tokens.saturating_add(total_tokens); } Ok(text) } /// Final step for every terminal path — write the outcomes row (with /// the full indicator set stamped) and, on success, seed the playbook /// back into memory so the next similar task hits the fast path. /// The write-through is what closes the 0→85% compounding loop. /// /// Both writes are best-effort: KB-write failure emits a warn but /// doesn't convert an Ok into a Failed. The caller's response should /// reflect what the loop actually accomplished, not whether the log /// sink was reachable. async fn finalize(&mut self, mut outcome: RespondOutcome) -> RespondOutcome { // PORT FROM orchestrator.ts:251-293. On consensus, write-through // to playbook_memory so the next semantically-similar query // surfaces the endorsed names. let seed_outcome = if let RespondOutcome::Ok { artifact, .. } = &outcome { match self.seed_playbook_memory(artifact).await { Ok(v) => Some(v), Err(e) => { tracing::warn!("playbook_memory seed failed: {e}"); Some(serde_json::json!({ "error": e })) } } } else { None }; // Append the outcomes row — polarity derived from the variant, // indicators stamped from loop state. schema_version=2 flags // this as a per-task row (distinct from the scenario-level rows // already in outcomes.jsonl). let outcomes_row = build_outcomes_row( &self.req, &self.stats, self.turns_used, self.overseer_called, &outcome, seed_outcome.clone(), ); if let Err(e) = append_outcomes_row(&outcomes_row).await { tracing::warn!("outcomes.jsonl append failed: {e}"); } // Enrich the response artifact with the seed + usage info so // the API caller can see compounding state without a second call. if let RespondOutcome::Ok { artifact, .. } = &mut outcome { if let Some(obj) = artifact.as_object_mut() { if let Some(seed) = seed_outcome { obj.insert("playbook_seed".into(), seed); } obj.insert("usage".into(), serde_json::to_value(&self.stats).unwrap_or_default()); obj.insert("sig_hash".into(), serde_json::Value::String(sig_hash(&self.req))); } } outcome } /// PORT FROM orchestrator.ts:255-293. Three retries with geometric /// backoff. `append: true` routes through Phase 26 upsert semantics /// (ADD/UPDATE/NOOP on operation+day+city+state), so a re-seal of /// the same fill on the same day merges names instead of duplicating. async fn seed_playbook_memory( &self, artifact: &serde_json::Value, ) -> Result { let fills = artifact.get("fills").and_then(|v| v.as_array()) .ok_or_else(|| "artifact missing fills".to_string())?; let endorsed_names: Vec = fills.iter() .filter_map(|f| f.get("name").and_then(|v| v.as_str()).map(String::from)) .collect(); if endorsed_names.is_empty() { return Err("no endorsed_names to seed".into()); } // Seed context is what the embedding model sees — carry // task-semantic content (role, city, scenario) not orchestrator // bookkeeping. Falls back to approach_hint, then to a built // string from spec. Matches orchestrator.ts:262-263. let approach = artifact.get("approach").and_then(|v| v.as_str()) .filter(|s| !s.is_empty()) .unwrap_or("multi-agent → hybrid search") .to_string(); let context = seed_context(&self.req); let body = serde_json::json!({ "operation": self.req.operation, "approach": approach, "context": context, "endorsed_names": endorsed_names, "append": true, }); let client = reqwest::Client::new(); let mut last_err = String::new(); for attempt in 0..3u32 { match client.post("http://127.0.0.1:3100/vectors/playbook_memory/seed") .json(&body).send().await { Ok(resp) => { let status = resp.status(); let text = resp.text().await.unwrap_or_default(); if status.is_success() { let j: serde_json::Value = serde_json::from_str(&text) .unwrap_or(serde_json::json!({ "raw": text })); return Ok(j); } last_err = format!("{}: {}", status, truncate(&text, 200)); } Err(e) => last_err = format!("transport: {e}"), } // Geometric backoff: 1s, 2s, 3s (matches orchestrator.ts:281). tokio::time::sleep(std::time::Duration::from_secs(attempt as u64 + 1)).await; } Err(format!("after 3 attempts: {last_err}")) } /// Phase 20 step (8) — T3 overseer escalation. /// /// When the local executor/reviewer loop can't self-correct, call /// the cloud overseer (`gpt-oss:120b` via Ollama Cloud) with (a) /// the KB context — recent outcomes + prior corrections for this /// sig_hash + task_class, across every profile that has run it — /// and (b) the recent log tail. Its output is appended as a /// `system` role turn so the next executor generation sees it, /// AND written to `data/_kb/overseer_corrections.jsonl` so every /// future profile activation reads from the same learning pool. /// /// This is the "pipe to the overviewer" piece from 2026-04-23 — /// the overseer is now a first-class KB consumer AND producer, not /// a one-shot correction oracle. async fn escalate_to_overseer(&mut self, turn: u32, reason: &str) -> Result<(), String> { let Some(cloud_key) = self.state.ollama_cloud_key.clone() else { return Err("OLLAMA_CLOUD_KEY not configured — skipping escalation".into()); }; let kb = KbContext::load_for(&sig_hash(&self.req), &self.req.task_class).await; let prompt = build_overseer_prompt(&self.req, &kb, &self.log, reason); let started = std::time::Instant::now(); let start_time = chrono::Utc::now(); let chat_req = crate::v1::ChatRequest { model: "gpt-oss:120b".to_string(), messages: vec![crate::v1::Message { role: "user".into(), content: prompt.clone(), }], temperature: Some(0.1), max_tokens: None, stream: Some(false), think: Some(true), // overseer KEEPS thinking (Phase 20 rule) provider: Some("ollama_cloud".into()), }; let resp = crate::v1::ollama_cloud::chat(&cloud_key, &chat_req).await .map_err(|e| format!("ollama_cloud: {e}"))?; let latency_ms = started.elapsed().as_millis() as u64; let end_time = chrono::Utc::now(); let correction_text = resp.choices.into_iter().next() .map(|c| c.message.content).unwrap_or_default(); // Stamp per-task stats — cloud call counts against the same // usage counter so `/v1/usage` shows cloud token spend too. self.stats.requests = self.stats.requests.saturating_add(1); self.stats.prompt_tokens = self.stats.prompt_tokens.saturating_add(resp.usage.prompt_tokens as u64); self.stats.completion_tokens = self.stats.completion_tokens.saturating_add(resp.usage.completion_tokens as u64); self.stats.total_tokens = self.stats.total_tokens.saturating_add(resp.usage.total_tokens as u64); self.stats.latency_ms = self.stats.latency_ms.saturating_add(latency_ms); // Langfuse trace for the overseer call (same pipe that feeds // the observer/KB, so this correction's cost lands in the KB // too — closing the loop). if let Some(lf) = &self.state.langfuse { use crate::v1::langfuse_trace::ChatTrace; lf.emit_chat(ChatTrace { provider: "ollama_cloud".into(), model: "gpt-oss:120b".into(), input: vec![crate::v1::Message { role: "user".into(), content: prompt.clone() }], output: correction_text.clone(), prompt_tokens: resp.usage.prompt_tokens, completion_tokens: resp.usage.completion_tokens, temperature: Some(0.1), max_tokens: None, think: Some(true), start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), latency_ms, }); } // Append to the transcript so the next executor turn sees it. self.append(LogEntry::new( turn, "system", "gpt-oss:120b", "overseer_correction", serde_json::json!({ "reason": reason, "correction": correction_text, "kb_context_summary": { "total_observed": kb.total_observed, "success_rate": kb.success_rate, "prior_corrections": kb.recent_corrections.len(), }, }), )); // Write to the KB — read by KbContext::load_for on every // subsequent escalation, AND by any profile that iterates on // this task class later. let row = serde_json::json!({ "schema_version": 2, "source_service": "v1.respond.overseer", "sig_hash": sig_hash(&self.req), "task_class": self.req.task_class, "operation": self.req.operation, "reason": reason, "model": "gpt-oss:120b", "correction": correction_text, "applied_at_turn": turn, "kb_context_used": kb, "usage": { "prompt_tokens": resp.usage.prompt_tokens, "completion_tokens": resp.usage.completion_tokens, "total_tokens": resp.usage.total_tokens, "latency_ms": latency_ms, }, "created_at": chrono::Utc::now().to_rfc3339(), }); if let Err(e) = append_corrections_row(&row).await { tracing::warn!("overseer_corrections.jsonl append failed: {e}"); } self.overseer_called = true; Ok(()) } async fn fetch_playbook_boost(&self, operation: &str) -> Result, ()> { let body = serde_json::json!({ "operation": operation, "top_k": 5 }); let client = reqwest::Client::new(); let resp = client .post("http://127.0.0.1:3100/vectors/playbook_memory/search") .json(&body) .send().await.map_err(|_| ())?; if !resp.status().is_success() { return Ok(Vec::new()); } let j: serde_json::Value = resp.json().await.map_err(|_| ())?; Ok(j.get("boosts").and_then(|v| v.as_array()).cloned().unwrap_or_default()) } /// PORT FROM orchestrator.ts:101-124 + agent.ts:348-364. /// Three tool surfaces unified behind one dispatcher: /// - `hybrid_search` → `POST /vectors/hybrid` (pseudo-tool, not in /// the Phase 12 registry — lives in vectord) /// - `sql` → `POST /query/sql` with a SELECT-only guard /// - anything else → `POST /tools/{name}/call` via the Phase 12 /// registry (permissions, audit, validation all happen there) /// /// Loopback HTTP on 127.0.0.1:3100 on purpose: mirrors the TS /// behavior exactly (every call goes through the same middleware, /// auth, audit, CORS path), and lets us swap to in-process routing /// later without changing the dispatch contract. async fn dispatch_tool( &self, tool: &str, args: &serde_json::Value, ) -> Result { let client = reqwest::Client::new(); match tool { "hybrid_search" => { let sql_filter = args.get("sql_filter").and_then(|v| v.as_str()) .ok_or_else(|| "hybrid_search needs sql_filter (string)".to_string())?; let question = args.get("question").and_then(|v| v.as_str()) .ok_or_else(|| "hybrid_search needs question (string)".to_string())?; let index_name = args.get("index_name").and_then(|v| v.as_str()) .ok_or_else(|| "hybrid_search needs index_name (string)".to_string())?; // Accept either `top_k` or `k` from the model — same // tolerance as orchestrator.ts. Default 10. let top_k = args.get("top_k").or_else(|| args.get("k")) .and_then(|v| v.as_u64()).unwrap_or(10); let body = serde_json::json!({ "sql_filter": sql_filter, "question": question, "index_name": index_name, "top_k": top_k, "generate": false, }); let resp = client.post("http://127.0.0.1:3100/vectors/hybrid") .json(&body).send().await .map_err(|e| format!("hybrid_search transport: {e}"))?; parse_tool_response(resp).await } "sql" => { let query = args.get("query").and_then(|v| v.as_str()) .ok_or_else(|| "sql needs query (string)".to_string())?; // SELECT-only guard mirroring orchestrator.ts:119. The // tool is read-only; any mutation needs the Phase 12 // registry + its permission + audit flow, not the // unchecked raw sql surface. if !query.trim_start().to_ascii_uppercase().starts_with("SELECT") { return Err(format!("sql tool allows SELECT only: {}", truncate(query, 120))); } let body = serde_json::json!({ "sql": query, "format": "json" }); let resp = client.post("http://127.0.0.1:3100/query/sql") .json(&body).send().await .map_err(|e| format!("sql transport: {e}"))?; parse_tool_response(resp).await } other => { // Phase 12 registry — any registered staffing tool lands here. // Body shape matches agent.ts::callTool (POST /tools/{name}/call // with {params, agent}). let url = format!("http://127.0.0.1:3100/tools/{}/call", other); let body = serde_json::json!({ "params": args, "agent": "v1.respond", }); let resp = client.post(&url).json(&body).send().await .map_err(|e| format!("{other} transport: {e}"))?; parse_tool_response(resp).await } } } } /// Read a tool response body into JSON, or surface the status + text /// as an error. Keeps the `error` path structurally identical whether /// the transport fails (caller handles), the server 5xx's (here), or /// the tool returns a 200 with an `{"error":"..."}` payload (caller /// surfaces to the executor as normal tool_result content). async fn parse_tool_response(resp: reqwest::Response) -> Result { let status = resp.status(); let text = resp.text().await.map_err(|e| format!("body read: {e}"))?; if !status.is_success() { return Err(format!("{}: {}", status, truncate(&text, 300))); } serde_json::from_str(&text) .map_err(|e| format!("non-JSON response: {e} | body: {}", truncate(&text, 200))) } fn seed_context(req: &RespondRequest) -> String { let hint = spec_field_str(&req.spec, "approach_hint"); if !hint.is_empty() { return hint.to_string(); } let role = spec_field_str(&req.spec, "target_role"); let city = spec_field_str(&req.spec, "target_city"); let state = spec_field_str(&req.spec, "target_state"); if !role.is_empty() && !city.is_empty() { return format!("{role} fill in {city}, {state}"); } // Non-staffing task class — use the operation verbatim. The // embedding surface still works; it just has less geo signal. req.operation.clone() } /// Stable rollup key. PORT FROM the sig_hash usage in observer/kb. /// DefaultHasher isn't cryptographic but is stable for a single /// deployment and matches the 16-char hex format already in /// outcomes.jsonl. Swap to sha256 if cross-deployment stability is /// needed. fn sig_hash(req: &RespondRequest) -> String { use std::hash::{Hash, Hasher}; let mut h = std::collections::hash_map::DefaultHasher::new(); req.task_class.hash(&mut h); req.operation.hash(&mut h); spec_field_str(&req.spec, "target_role").hash(&mut h); spec_field_str(&req.spec, "target_city").hash(&mut h); spec_field_str(&req.spec, "target_state").hash(&mut h); format!("{:016x}", h.finish()) } /// Build the per-task outcomes row with every indicator the /// 2026-04-23 audit called out. schema_version=2 distinguishes /// per-task rows from the scenario-level rows already in the file. fn build_outcomes_row( req: &RespondRequest, stats: &LoopStats, turns_used: u32, overseer_called: bool, outcome: &RespondOutcome, seed_outcome: Option, ) -> serde_json::Value { let (ok, polarity, error) = match outcome { RespondOutcome::Ok { .. } => (true, "success_confirmation", serde_json::Value::Null), RespondOutcome::Failed { reason, .. } => (false, "failure_pattern", serde_json::Value::String(reason.clone())), RespondOutcome::Blocked { reason, .. } => (false, "truth_block", serde_json::Value::String(reason.clone())), }; let fills = match outcome { RespondOutcome::Ok { artifact, .. } => artifact.get("fills").cloned().unwrap_or(serde_json::Value::Null), _ => serde_json::Value::Null, }; // Correction effectiveness: if the overseer was called this loop, // the outcome tells us whether the correction helped. OK = it // worked, Failed/Blocked = it didn't. When overseer wasn't called, // these fields stay null so aggregators can filter cleanly. let correction_applied = overseer_called; let correction_effective = if overseer_called { serde_json::Value::Bool(ok) } else { serde_json::Value::Null }; serde_json::json!({ "schema_version": 2, "source_service": "v1.respond", "sig_hash": sig_hash(req), "task_class": req.task_class, "operation": req.operation, "ok": ok, "polarity": polarity, "iterations": turns_used, "turns": turns_used, "fills": fills, "models": { "executor": req.executor_model.clone().unwrap_or_else(|| DEFAULT_EXECUTOR_MODEL.to_string()), "reviewer": req.reviewer_model.clone().unwrap_or_else(|| DEFAULT_REVIEWER_MODEL.to_string()), }, "usage": stats, "provider": "ollama", "playbook_seed": seed_outcome, "truth_rule_citations": [], // Phase 42 gate hook — empty until wired "validator_report": null, // Phase 43 hook "correction_applied": correction_applied, "correction_effective": correction_effective, "error": error, "created_at": chrono::Utc::now().to_rfc3339(), }) } /// PORT FROM Phase 20's T3 overseer prompt shape. The overseer sees: /// - Task + spec /// - KB context (historical outcomes + prior corrections across /// every profile that ran this task class) /// - Recent log tail (last 12 turns) /// - Specific reason the local loop escalated /// It returns prose guidance the executor reads next turn. We do NOT /// ask it to emit a JSON action — the executor still owns the final /// shape. The overseer is a strategist, not a tool-caller. fn build_overseer_prompt( req: &RespondRequest, kb: &KbContext, log: &[LogEntry], reason: &str, ) -> String { let mut p = String::new(); p.push_str("You are the OVERSEER (T3 strategic tier). The local executor/reviewer loop has hit a wall and escalated to you for a strategic correction. You do not call tools; you read the record and tell the executor what to do differently on its next turn.\n\n"); p.push_str(&format!("## Task\n{}\n", req.operation)); p.push_str(&format!("Task class: {}\n", req.task_class)); if !req.spec.is_null() { p.push_str(&format!("Spec: {}\n", req.spec)); } p.push_str(&format!("\n## Reason for escalation\n{}\n\n", reason)); p.push_str(&kb.to_prompt_section()); p.push_str("\n## Recent log (last 12 turns, most recent last):\n"); let start = log.len().saturating_sub(12); for e in &log[start..] { let content = e.content.to_string(); p.push_str(&format!( " [t{:02} {} {}] {}\n", e.turn, e.role, e.kind, truncate(&content, 200), )); } p.push_str("\n## Your output\n"); p.push_str("Write 3-6 sentences of CONCRETE guidance the executor will read next turn. "); p.push_str("Reference what specifically went wrong, what to try instead, and what to AVOID "); p.push_str("(especially if it appears in the \"Recent overseer corrections\" above — don't repeat yourself). "); p.push_str("No JSON, no tool syntax — the executor will translate your guidance into action.\n"); p } async fn append_corrections_row(row: &serde_json::Value) -> Result<(), String> { append_outcomes_row_at( std::path::Path::new("data/_kb/overseer_corrections.jsonl"), row, ).await } /// Append one JSONL row to `data/_kb/outcomes.jsonl`. Creates the /// directory if missing. Same write shape as the TS pipeline; the /// Phase 24 observer fix taught us `/ingest/file` has REPLACE /// semantics, so this writes the JSONL directly — APPEND, not replace. async fn append_outcomes_row(row: &serde_json::Value) -> Result<(), String> { append_outcomes_row_at(std::path::Path::new("data/_kb/outcomes.jsonl"), row).await } /// Path-taking variant — lets tests write to a tmp path without /// mutating the process CWD (which isn't thread-safe under parallel /// test execution). async fn append_outcomes_row_at( path: &std::path::Path, row: &serde_json::Value, ) -> Result<(), String> { use tokio::io::AsyncWriteExt; if let Some(dir) = path.parent() { tokio::fs::create_dir_all(dir).await.map_err(|e| format!("mkdir: {e}"))?; } let mut line = serde_json::to_string(row).map_err(|e| format!("serialize: {e}"))?; line.push('\n'); let mut f = tokio::fs::OpenOptions::new() .create(true).append(true).open(path).await .map_err(|e| format!("open: {e}"))?; f.write_all(line.as_bytes()).await.map_err(|e| format!("write: {e}"))?; // Explicit flush + sync before drop. tokio::fs::File uses a // threadpool; plain drop doesn't guarantee the write is // durable by the time the next open sees the file, which // surfaced as a 3/8 flake on the back-to-back-append test. f.flush().await.map_err(|e| format!("flush: {e}"))?; Ok(()) } /// PORT FROM orchestrator.ts:306-311. Cap `rows` at 20 entries and /// annotate the truncation so the executor sees it on the next turn /// prompt — prevents a 1000-row hybrid_search result from wiping the /// context budget on a single tool call. fn trim_result(r: &serde_json::Value) -> serde_json::Value { if let Some(rows) = r.get("rows").and_then(|v| v.as_array()) { if rows.len() > 20 { let mut truncated = r.clone(); if let Some(obj) = truncated.as_object_mut() { obj.insert("rows".into(), serde_json::Value::Array(rows.iter().take(20).cloned().collect())); obj.insert("_trimmed".into(), serde_json::Value::String( format!("{} more rows", rows.len() - 20), )); } return truncated; } } r.clone() } // --- Parsing + prompt builders (PORT FROM agent.ts:566-698) --- #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum Role { Executor, Reviewer } /// PORT FROM agent.ts:650. Strip ```json fences, find the first {...} /// block, soft-fix the two common model mistakes: stray `)}`, trailing /// commas. Reviewer shape tolerance: bare `kind: "approve_done"` gets /// normalized to `{kind: "critique", verdict: "approve_done"}` — some /// models (qwen2.5) emit that way and the semantic content is identical. pub fn parse_action(raw: &str, role: Role) -> Result { let mut s = raw.trim().to_string(); if let Some(stripped) = s.strip_prefix("```json") { s = stripped.trim_start_matches('\n').to_string(); } else if let Some(stripped) = s.strip_prefix("```") { s = stripped.trim_start_matches('\n').to_string(); } if let Some(stripped) = s.strip_suffix("```") { s = stripped.trim_end().to_string(); } let start = s.find('{').ok_or_else(|| format!("no JSON object in {role:?} response: {}", truncate(raw, 300)))?; let end = s.rfind('}').ok_or_else(|| format!("no closing brace in {role:?} response: {}", truncate(raw, 300)))?; if end <= start { return Err(format!("no JSON object in {role:?} response: {}", truncate(raw, 300))); } // Soft-fix: stray ")}" (qwen2.5 tool_call quirk) + trailing commas. let mut json = s[start..=end].to_string(); json = json.replace(")}", "}"); json = fix_trailing_commas(&json); let obj: serde_json::Value = serde_json::from_str(&json) .map_err(|e| format!("invalid JSON from {role:?}: {e} | raw: {}", truncate(&json, 300)))?; let kind = obj.get("kind").and_then(|v| v.as_str()).unwrap_or("").to_string(); match role { Role::Executor => match kind.as_str() { "plan" | "tool_call" | "propose_done" => { serde_json::from_value(obj).map_err(|e| format!("executor shape mismatch: {e}")) } _ => Err(format!("executor returned unexpected shape: {}", truncate(&obj.to_string(), 200))), }, Role::Reviewer => { // Accept the wrapped shape: {kind:"critique", verdict:"continue"|...} if kind == "critique" { return serde_json::from_value(obj) .map_err(|e| format!("reviewer shape mismatch: {e}")); } // Accept the bare-verdict shape: {kind:"approve_done", notes:"..."} if matches!(kind.as_str(), "continue" | "drift" | "approve_done") { let verdict = match kind.as_str() { "continue" => Verdict::Continue, "drift" => Verdict::Drift, "approve_done" => Verdict::ApproveDone, _ => unreachable!(), }; let notes = obj.get("notes").and_then(|v| v.as_str()).unwrap_or("").to_string(); return Ok(Action::Critique { verdict, notes }); } Err(format!("reviewer returned unexpected shape: {}", truncate(&obj.to_string(), 200))) } } } /// Remove `,` immediately followed by `}` or `]` (with optional whitespace). /// Same intent as the TS regex `,(\s*[}\]])`. fn fix_trailing_commas(s: &str) -> String { let bytes = s.as_bytes(); let mut out = String::with_capacity(s.len()); let mut i = 0; while i < bytes.len() { if bytes[i] == b',' { let mut j = i + 1; while j < bytes.len() && bytes[j].is_ascii_whitespace() { j += 1; } if j < bytes.len() && (bytes[j] == b'}' || bytes[j] == b']') { // skip the comma i += 1; continue; } } out.push(bytes[i] as char); i += 1; } out } fn action_kind(a: &Action) -> &'static str { match a { Action::Plan { .. } => "plan", Action::ToolCall { .. } => "tool_call", Action::ProposeDone { .. } => "propose_done", Action::Critique { .. } => "critique", } } fn action_content(a: &Action) -> serde_json::Value { serde_json::to_value(a).unwrap_or(serde_json::Value::Null) } /// Returns true if the model name belongs to Ollama Cloud. Prefix-based /// so new cloud models are pickable by name without a config update — /// match the rough family prefixes Phase 20's matrix declares. /// `qwen3.5:397b` lives in the cloud; `qwen3.5:latest` is local — /// hence the `:3` suffix check rather than matching all of `qwen3.5:`. pub fn is_cloud_model(model: &str) -> bool { model.starts_with("gpt-oss:") || model.starts_with("qwen3-coder:") || model.starts_with("qwen3.5:3") || model.starts_with("kimi-") || model.starts_with("kimi/") } fn truncate(s: &str, n: usize) -> String { if s.len() <= n { s.to_string() } else { format!("{}…", &s[..n]) } } fn spec_field_str<'a>(spec: &'a serde_json::Value, key: &str) -> &'a str { spec.get(key).and_then(|v| v.as_str()).unwrap_or("") } fn spec_target_count(spec: &serde_json::Value) -> u64 { spec.get("target_count").and_then(|v| v.as_u64()).unwrap_or(0) } /// PORT FROM agent.ts:566. Same structural shape: operation + target + /// candidates-surfaced hint + recent log + ONE-JSON-action instruction. /// Staffing-specific fields degrade gracefully when spec is empty (non- /// staffing task classes still get a usable prompt, just without the /// target_role / target_count scaffolding). fn build_executor_prompt( req: &RespondRequest, boost: &[serde_json::Value], log: &[LogEntry], ) -> String { let target_role = spec_field_str(&req.spec, "target_role"); let target_count = spec_target_count(&req.spec); let target_city = spec_field_str(&req.spec, "target_city"); let target_state = spec_field_str(&req.spec, "target_state"); let approach_hint = spec_field_str(&req.spec, "approach_hint"); let mut p = String::new(); p.push_str("You are the EXECUTOR agent. Your job is to complete this task:\n\n"); p.push_str(&format!("OPERATION: {}\n", req.operation)); if target_count > 0 && !target_role.is_empty() { p.push_str(&format!( "TARGET: {target_count} × {target_role} in {target_city}, {target_state}\n" )); } else { p.push_str(&format!("TASK CLASS: {}\n", req.task_class)); if !req.spec.is_null() { p.push_str(&format!("SPEC: {}\n", req.spec)); } } if !approach_hint.is_empty() { p.push_str(&format!("HINT: {approach_hint}\n")); } p.push_str("\nThe REVIEWER agent is watching every turn. They will flag drift. Stay on target.\n\n"); if !boost.is_empty() { p.push_str("SIMILAR PAST PLAYBOOKS (reference, not prescription):\n"); for (i, b) in boost.iter().take(3).enumerate() { p.push_str(&format!(" {}. {}\n", i + 1, b)); } p.push('\n'); } // Orchestrator-tracked candidate memory (agent.ts:568). The log- // render cap chops tool_result content, so the executor can't // always see what earlier searches returned. This block is a // durable rollup — every candidate the loop has seen, formatted // for prompt reading. Critical for letting the executor reach // propose_done instead of re-searching. let seen = candidates_seen(log); p.push_str("CANDIDATES SURFACED SO FAR (orchestrator-tracked, do not forget):\n"); if seen.is_empty() { p.push_str(" (none yet — start with hybrid_search)\n"); } else { p.push_str(" # Use the name + city + state for sql verification (NOT doc_id — that's the vector-index key, not workers_500k.worker_id)\n"); for c in seen.iter().take(30) { p.push_str(&format!(" - name=\"{}\" city=\"{}\" state=\"{}\" (vector doc_id={})\n", c.name, c.city, c.state, c.doc_id)); } if seen.len() > 30 { p.push_str(&format!(" ... {} more surfaced\n", seen.len() - 30)); } } p.push('\n'); p.push_str("SHARED LOG (recent turns):\n"); p.push_str(&render_log_for_prompt(log, 8)); p.push('\n'); p.push_str("AVAILABLE TOOLS (use tool_call with these exact names — DO NOT invent others):\n"); p.push_str(" hybrid_search(sql_filter: string, question: string, index_name: string, k?: number)\n"); p.push_str(" SQL-narrow + vector-rerank. Use for: \"find candidates matching criteria X, ranked by semantic match to Y\".\n"); p.push_str(" For staffing fills, index_name is typically \"w500k_b18\" or \"w500k_b3\" (workers_500k).\n"); p.push_str(" Example: {\"tool\":\"hybrid_search\",\"args\":{\"sql_filter\":\"role='Welder' AND city='Toledo' AND state='OH'\",\"question\":\"reliable welders with OSHA certs\",\"index_name\":\"w500k_b18\",\"k\":10},\"rationale\":\"pull top 10 welder candidates in Toledo\"}\n"); p.push_str(" sql(query: string) — SELECT-only. Use for: verification queries before propose_done.\n"); p.push_str(" IMPORTANT: workers_500k.worker_id is an INTEGER internal key — NOT the doc_id from hybrid_search.\n"); p.push_str(" To verify a candidate from hybrid_search results, query by name+city+state (which ARE in the chunk_text you already received):\n"); p.push_str(" Example: {\"tool\":\"sql\",\"args\":{\"query\":\"SELECT worker_id, name, role FROM workers_500k WHERE name = 'Donna Hall' AND city = 'Columbus' AND state = 'OH' LIMIT 1\"},\"rationale\":\"confirm Donna Hall exists as a Warehouse Associate in Columbus\"}\n\n"); p.push_str("Your next action MUST be a JSON object matching one of these shapes:\n"); p.push_str("{\"kind\":\"plan\",\"steps\":[\"short step 1\",\"short step 2\"]}\n"); p.push_str("{\"kind\":\"tool_call\",\"tool\":\"...\",\"args\":{...},\"rationale\":\"why\"}\n"); if target_count > 0 { p.push_str(&format!( "{{\"kind\":\"propose_done\",\"fills\":[{{\"candidate_id\":\"...\",\"name\":\"First Last\"}}],\"rationale\":\"...\"}} — fills MUST have EXACTLY {target_count} entries.\n" )); } else { p.push_str("{\"kind\":\"propose_done\",\"fills\":[...],\"rationale\":\"...\"}\n"); } if target_count > 0 { p.push_str(&format!( "\nSTRATEGY: once prior tool_result rows contain ≥ {target_count} candidates in {target_city}, {target_state} matching role \"{target_role}\", STOP SEARCHING. Pick the top {target_count} by score, verify ONE via `sql` tool, then emit propose_done. Do NOT repeat hybrid_search if you already have enough candidates.\n" )); } p.push_str("\nRespond with ONLY the JSON object. No markdown fences, no prose.\n"); p } /// PORT FROM agent.ts:602. Reviewer prompt with the `awaitingApproval` /// hard rule: if the most recent executor action was propose_done, the /// reviewer cannot emit `continue` (would stall the loop). fn build_reviewer_prompt(req: &RespondRequest, log: &[LogEntry]) -> String { let target_role = spec_field_str(&req.spec, "target_role"); let target_count = spec_target_count(&req.spec); let target_city = spec_field_str(&req.spec, "target_city"); let target_state = spec_field_str(&req.spec, "target_state"); let last_executor_kind = log.iter().rev() .find(|e| e.role == "executor") .map(|e| e.kind.as_str()) .unwrap_or(""); let awaiting_approval = last_executor_kind == "propose_done"; let mut p = String::new(); p.push_str("You are the REVIEWER agent. The EXECUTOR is trying to complete this task:\n\n"); p.push_str(&format!("OPERATION: {}\n", req.operation)); if target_count > 0 && !target_role.is_empty() { p.push_str(&format!( "TARGET: {target_count} × {target_role} in {target_city}, {target_state}\n\n" )); } p.push_str("Your job: catch drift. Agents often wander from the actual objective. Specifically watch for:\n"); if target_count > 0 && !target_city.is_empty() { p.push_str(&format!("- Proposing candidates who aren't in {target_city}, {target_state}.\n")); p.push_str(&format!("- Proposing candidates who don't have {target_role} skill.\n")); p.push_str(&format!("- Proposing fewer or more than {target_count} fills.\n")); } else { p.push_str("- Drifting from the stated task class or spec.\n"); } p.push_str("- Irrelevant tool calls.\n\n"); p.push_str("SHARED LOG (recent turns):\n"); p.push_str(&render_log_for_prompt(log, 10)); p.push('\n'); p.push_str("Your next action MUST be a JSON object:\n"); p.push_str("{\"kind\":\"critique\",\"verdict\":\"continue\" | \"drift\" | \"approve_done\",\"notes\":\"...\"}\n\n"); p.push_str("- \"continue\" → executor is on a reasonable path, let them keep going.\n"); p.push_str("- \"drift\" → executor is off-track; notes MUST tell them how to redirect.\n"); p.push_str("- \"approve_done\" → executor's propose_done meets the criteria. Seal it.\n\n"); if target_count > 0 { p.push_str(&format!( "APPROVAL CRITERIA (use only for propose_done):\n\ 1. Exactly {target_count} fills.\n\ 2. Each fill's name appears in a prior tool_result from {target_city}, {target_state} matching role \"{target_role}\".\n\ 3. Executor has SQL-verified at least one fill.\n\ If 1-3 all hold, return approve_done.\n" )); } if awaiting_approval { p.push_str("\nHARD RULE: The executor's most recent action was propose_done. On this turn you CANNOT return \"continue\" — it would stall the task. Choose approve_done or drift (state which criterion failed in notes).\n"); } // Loop-detection: if the executor has tool_called ≥ 3 times since // the last propose_done without proposing, it's stuck in a search // loop. Reviewer rubber-stamping "continue" here is the failure // pattern the 2026-04-23 battery surfaced in phase α task 2 — // 12 turns, 0 proposes, 100% reviewer:continue. let stuck_tool_calls = tool_calls_since_last_propose(log); if stuck_tool_calls >= 3 { p.push_str(&format!( "\nLOOP DETECTION: The executor has called tools {stuck_tool_calls} times without proposing done. \ Look at the CANDIDATES SURFACED SO FAR (visible in executor's view): if there are already ≥ {} \ matching candidates in {target_city}, {target_state} for role \"{target_role}\", respond with \ verdict=\"drift\" and notes=\"You have enough candidates — pick the top {} by score and emit \ propose_done this turn. Stop re-searching.\"\n", target_count, target_count, )); } p.push_str("\nRespond with ONLY the JSON object.\n"); p } fn render_log_for_prompt(log: &[LogEntry], tail: usize) -> String { if log.is_empty() { return "(no prior turns)\n".into(); } let start = log.len().saturating_sub(tail); let mut s = String::new(); for e in &log[start..] { let content = e.content.to_string(); // tool_result is the executor's eyes — candidate data lives // there and a 160-char cap chops off every name/doc_id the // executor needs for propose_done. Keep these generous; cap // other kinds tighter since they're decision/status entries // and don't carry payload the executor will re-read. let cap = if e.kind == "tool_result" { 1200 } else { 200 }; s.push_str(&format!( " [t{:02} {} {}] {}\n", e.turn, e.role, e.kind, truncate(&content, cap) )); } s } /// Ports agent.ts:538 `candidatesSeen`. Walks tool_result entries, /// parses `sources[].chunk_text` for the staffing "Name — Role in /// City, ST" shape, dedupes by doc_id. Returns an orchestrator-tracked /// surface the executor prompt can show verbatim — stopping the /// executor from "forgetting" candidates when the log-render truncates. fn candidates_seen(log: &[LogEntry]) -> Vec { let mut out: Vec = Vec::new(); let mut seen_ids: std::collections::HashSet = std::collections::HashSet::new(); for e in log { if e.kind != "tool_result" { continue; } let Some(sources) = e.content.get("sources").and_then(|v| v.as_array()) else { continue }; for s in sources { let Some(doc_id) = s.get("doc_id").and_then(|v| v.as_str()) else { continue }; if seen_ids.contains(doc_id) { continue; } let chunk_text = s.get("chunk_text").and_then(|v| v.as_str()).unwrap_or(""); let Some((name_part, rest)) = chunk_text.split_once('—') else { continue }; let name = name_part.trim().to_string(); let loc = rest.split_once(" in ").map(|(_, r)| r).unwrap_or(""); let Some((city, state_raw)) = loc.split_once(',') else { continue }; let city = city.trim().to_string(); let state = state_raw .trim() .chars() .take_while(|c| c.is_alphabetic()) .collect::(); if name.is_empty() || city.is_empty() || state.is_empty() { continue; } seen_ids.insert(doc_id.to_string()); out.push(CandidateHint { doc_id: doc_id.to_string(), name, city, state, }); } } out } #[derive(Debug, Clone)] struct CandidateHint { doc_id: String, name: String, city: String, state: String, } /// Count executor tool_calls since the last propose_done (or since /// loop start if none). Used by the reviewer prompt to flag stuck /// search loops — if an executor has tool_called ≥ 3× without /// proposing, the reviewer should verdict:drift with a stop-searching /// note rather than rubber-stamping continue. fn tool_calls_since_last_propose(log: &[LogEntry]) -> u32 { let mut count = 0u32; for e in log.iter().rev() { if e.role != "executor" { continue; } if e.kind == "propose_done" { break; } if e.kind == "tool_call" { count += 1; } } count } #[cfg(test)] mod tests { use super::*; #[test] fn log_entry_serializes_to_orchestrator_shape() { let e = LogEntry::new(3, "executor", "qwen3.5:latest", "tool_call", serde_json::json!({"tool": "hybrid_search"})); let j = serde_json::to_value(&e).unwrap(); for k in ["turn", "role", "kind", "model", "content", "at"] { assert!(j.get(k).is_some(), "missing field: {k}"); } } #[test] fn outcome_into_log_is_lossless() { let e = LogEntry::new(1, "system", "m", "boost_loaded", serde_json::json!({})); let o = RespondOutcome::Failed { reason: "scaffold".into(), log: vec![e] }; assert_eq!(o.into_log().len(), 1); } #[test] fn parse_executor_plan() { let raw = r#"{"kind":"plan","steps":["hybrid_search","verify","propose_done"]}"#; let a = parse_action(raw, Role::Executor).unwrap(); match a { Action::Plan { steps } => assert_eq!(steps.len(), 3), _ => panic!("wrong variant"), } } #[test] fn parse_executor_tool_call_with_stray_paren() { // Mimics the qwen2.5 quirk where the model closes with ")}" — // agent.ts:666 has the same fix. PORT from TS test territory. let raw = r#"{"kind":"tool_call","tool":"sql","args":{"query":"SELECT 1"},"rationale":"verify")}"#; let a = parse_action(raw, Role::Executor).unwrap(); match a { Action::ToolCall { tool, .. } => assert_eq!(tool, "sql"), _ => panic!("wrong variant"), } } #[test] fn parse_executor_propose_done_with_fence() { let raw = "```json\n{\"kind\":\"propose_done\",\"fills\":[{\"candidate_id\":\"W-1\",\"name\":\"A B\"}],\"rationale\":\"ok\"}\n```"; let a = parse_action(raw, Role::Executor).unwrap(); match a { Action::ProposeDone { fills, .. } => { assert_eq!(fills.len(), 1); assert_eq!(fills[0].candidate_id, "W-1"); } _ => panic!("wrong variant"), } } #[test] fn parse_reviewer_wrapped_verdict() { let raw = r#"{"kind":"critique","verdict":"approve_done","notes":"ok"}"#; let a = parse_action(raw, Role::Reviewer).unwrap(); match a { Action::Critique { verdict, .. } => assert_eq!(verdict, Verdict::ApproveDone), _ => panic!("wrong variant"), } } #[test] fn parse_reviewer_bare_verdict_normalizes() { // agent.ts:690-694 — qwen2.5/mistral emit the verdict as `kind`. let raw = r#"{"kind":"drift","notes":"wrong city"}"#; let a = parse_action(raw, Role::Reviewer).unwrap(); match a { Action::Critique { verdict, notes } => { assert_eq!(verdict, Verdict::Drift); assert_eq!(notes, "wrong city"); } _ => panic!("wrong variant"), } } #[test] fn parse_reviewer_rejects_unknown_verdict() { let raw = r#"{"kind":"maybe","notes":"?"}"#; assert!(parse_action(raw, Role::Reviewer).is_err()); } #[test] fn parse_trailing_comma() { let raw = r#"{"kind":"plan","steps":["a","b",]}"#; assert!(parse_action(raw, Role::Executor).is_ok()); } #[test] fn parse_no_json_errors_cleanly() { let raw = "sorry I cannot comply"; let err = parse_action(raw, Role::Executor).unwrap_err(); assert!(err.contains("no JSON")); } #[test] fn candidates_seen_parses_sources() { let log = vec![ LogEntry::new(1, "executor", "m", "tool_result", serde_json::json!({ "sources": [ {"doc_id": "W-1", "chunk_text": "Alice Smith — Welder in Toledo, OH. 5 years experience."}, {"doc_id": "W-2", "chunk_text": "Bob Jones — Welder in Toledo, OH. Night shift."}, ] })), LogEntry::new(2, "reviewer", "m", "critique", serde_json::json!({ "verdict": "continue", "notes": "" })), LogEntry::new(3, "executor", "m", "tool_result", serde_json::json!({ "sources": [ {"doc_id": "W-2", "chunk_text": "Bob Jones — Welder in Toledo, OH. Night shift."}, {"doc_id": "W-3", "chunk_text": "Carol Davis — Welder in Toledo, OH. AWS certified."}, ] })), ]; let seen = candidates_seen(&log); assert_eq!(seen.len(), 3, "dedup by doc_id"); assert_eq!(seen[0].name, "Alice Smith"); assert_eq!(seen[0].city, "Toledo"); assert_eq!(seen[0].state, "OH"); assert_eq!(seen[2].name, "Carol Davis"); } #[test] fn candidates_seen_ignores_malformed() { let log = vec![ LogEntry::new(1, "executor", "m", "tool_result", serde_json::json!({ "sources": [ {"doc_id": "W-1", "chunk_text": "no dash here"}, {"doc_id": "W-2", "chunk_text": "Name — but no 'in' keyword"}, {"doc_id": "W-3"}, // no chunk_text ] })), ]; assert_eq!(candidates_seen(&log).len(), 0); } #[test] fn tool_calls_since_propose_counts_correctly() { let log = vec![ LogEntry::new(1, "executor", "m", "tool_call", serde_json::json!({})), LogEntry::new(2, "executor", "m", "tool_call", serde_json::json!({})), LogEntry::new(3, "executor", "m", "tool_call", serde_json::json!({})), ]; assert_eq!(tool_calls_since_last_propose(&log), 3); // propose_done resets the counter let log2 = vec![ LogEntry::new(1, "executor", "m", "tool_call", serde_json::json!({})), LogEntry::new(2, "executor", "m", "propose_done", serde_json::json!({})), LogEntry::new(3, "executor", "m", "tool_call", serde_json::json!({})), ]; assert_eq!(tool_calls_since_last_propose(&log2), 1); } #[test] fn executor_prompt_includes_surfaced_candidates() { let req = req_with_spec(serde_json::json!({ "target_role": "Welder", "target_count": 2, "target_city": "Toledo", "target_state": "OH" })); let log = vec![ LogEntry::new(1, "executor", "m", "tool_result", serde_json::json!({ "sources": [ {"doc_id": "W-1", "chunk_text": "Alice Smith — Welder in Toledo, OH."}, ] })), ]; let p = build_executor_prompt(&req, &[], &log); assert!(p.contains("CANDIDATES SURFACED SO FAR")); // Prompt format deliberately separates name from doc_id now — // the line reads `name="Alice Smith" ... (vector doc_id=W-1)` // so the executor prompt explicitly tells the model NOT to // conflate doc_id with workers_500k.worker_id. Assertion was // expecting the old concatenated format; update to match the // semantic contract (both tokens present, any order). assert!(p.contains("Alice Smith")); assert!(p.contains("W-1")); assert!(p.contains("Toledo")); } #[test] fn reviewer_prompt_flags_loop_after_three_tool_calls() { let req = req_with_spec(serde_json::json!({ "target_role": "Welder", "target_count": 2, "target_city": "Toledo", "target_state": "OH" })); let log = vec![ LogEntry::new(1, "executor", "m", "tool_call", serde_json::json!({})), LogEntry::new(2, "executor", "m", "tool_call", serde_json::json!({})), LogEntry::new(3, "executor", "m", "tool_call", serde_json::json!({})), ]; let p = build_reviewer_prompt(&req, &log); assert!(p.contains("LOOP DETECTION")); assert!(p.contains("Stop re-searching")); } #[test] fn reviewer_prompt_no_loop_clause_before_three_calls() { let req = req_with_spec(serde_json::json!({ "target_role": "Welder", "target_count": 2, "target_city": "Toledo", "target_state": "OH" })); let log = vec![ LogEntry::new(1, "executor", "m", "tool_call", serde_json::json!({})), ]; let p = build_reviewer_prompt(&req, &log); assert!(!p.contains("LOOP DETECTION")); } #[test] fn is_cloud_model_recognizes_cloud_prefixes() { assert!(is_cloud_model("gpt-oss:120b")); assert!(is_cloud_model("gpt-oss:20b")); assert!(is_cloud_model("qwen3-coder:480b")); assert!(is_cloud_model("qwen3.5:397b")); assert!(is_cloud_model("kimi-k2.5")); assert!(is_cloud_model("kimi/k2-thinking")); } #[test] fn is_cloud_model_rejects_local_prefixes() { assert!(!is_cloud_model("qwen3.5:latest")); assert!(!is_cloud_model("qwen3:latest")); assert!(!is_cloud_model("qwen2.5:latest")); assert!(!is_cloud_model("mistral")); assert!(!is_cloud_model("nomic-embed-text")); } #[test] fn spec_target_count_defaults_to_zero() { let spec = serde_json::json!({}); assert_eq!(spec_target_count(&spec), 0); } #[test] fn executor_prompt_includes_target_when_spec_has_it() { let req = RespondRequest { task_class: "staffing.fill".into(), operation: "fill: Welder x2 in Toledo, OH".into(), spec: serde_json::json!({ "target_role": "Welder", "target_count": 2, "target_city": "Toledo", "target_state": "OH" }), executor_model: None, reviewer_model: None, max_turns: None, }; let p = build_executor_prompt(&req, &[], &[]); assert!(p.contains("TARGET: 2 × Welder in Toledo, OH")); assert!(p.contains("EXACTLY 2 entries")); assert!(p.contains("hybrid_search"), "executor prompt must list hybrid_search in tool catalog"); assert!(p.contains("sql(query"), "executor prompt must list sql tool signature"); assert!(p.contains("DO NOT invent others"), "executor prompt must warn against tool-name invention"); } #[test] fn executor_prompt_degrades_without_spec() { let req = RespondRequest { task_class: "code.review".into(), operation: "review PR #42".into(), spec: serde_json::json!(null), executor_model: None, reviewer_model: None, max_turns: None, }; let p = build_executor_prompt(&req, &[], &[]); assert!(p.contains("TASK CLASS: code.review")); assert!(!p.contains("TARGET:")); } #[test] fn reviewer_prompt_adds_hard_rule_when_awaiting_approval() { let req = RespondRequest { task_class: "staffing.fill".into(), operation: "fill: Welder x2 in Toledo, OH".into(), spec: serde_json::json!({"target_count": 2}), executor_model: None, reviewer_model: None, max_turns: None, }; let log = vec![LogEntry::new(1, "executor", "m", "propose_done", serde_json::json!({}))]; let p = build_reviewer_prompt(&req, &log); assert!(p.contains("HARD RULE")); } fn req_with_spec(spec: serde_json::Value) -> RespondRequest { RespondRequest { task_class: "staffing.fill".into(), operation: "fill: Welder x2 in Toledo, OH".into(), spec, executor_model: None, reviewer_model: None, max_turns: None, } } fn sample_stats() -> LoopStats { LoopStats { requests: 8, prompt_tokens: 12345, completion_tokens: 2345, total_tokens: 14690, latency_ms: 42000, } } #[test] fn sig_hash_is_stable_for_same_inputs() { let spec = serde_json::json!({ "target_role": "Welder", "target_city": "Toledo", "target_state": "OH" }); let a = sig_hash(&req_with_spec(spec.clone())); let b = sig_hash(&req_with_spec(spec)); assert_eq!(a, b); assert_eq!(a.len(), 16); } #[test] fn sig_hash_differs_by_geo() { let a = sig_hash(&req_with_spec(serde_json::json!({ "target_role": "Welder", "target_city": "Toledo", "target_state": "OH" }))); let b = sig_hash(&req_with_spec(serde_json::json!({ "target_role": "Welder", "target_city": "Dayton", "target_state": "OH" }))); assert_ne!(a, b); } #[test] fn seed_context_uses_hint_when_present() { let req = req_with_spec(serde_json::json!({ "approach_hint": "hybrid search", "target_role": "Welder", "target_city": "Toledo" })); assert_eq!(seed_context(&req), "hybrid search"); } #[test] fn seed_context_falls_back_to_role_city_state() { let req = req_with_spec(serde_json::json!({ "target_role": "Welder", "target_city": "Toledo", "target_state": "OH" })); assert_eq!(seed_context(&req), "Welder fill in Toledo, OH"); } #[test] fn seed_context_falls_back_to_operation_for_non_staffing() { let req = req_with_spec(serde_json::json!({})); assert_eq!(seed_context(&req), "fill: Welder x2 in Toledo, OH"); } #[test] fn outcomes_row_stamps_full_indicator_set_on_success() { let req = req_with_spec(serde_json::json!({ "target_role": "Welder", "target_city": "Toledo", "target_state": "OH" })); let stats = sample_stats(); let outcome = RespondOutcome::Ok { artifact: serde_json::json!({"fills": [{"candidate_id": "W-1", "name": "A B"}]}), log: vec![], }; let seed = serde_json::json!({"outcome": {"mode": "added"}, "entries_after": 1337}); let row = build_outcomes_row(&req, &stats, 4, false, &outcome, Some(seed)); assert_eq!(row["schema_version"], 2); assert_eq!(row["source_service"], "v1.respond"); assert_eq!(row["task_class"], "staffing.fill"); assert_eq!(row["ok"], true); assert_eq!(row["polarity"], "success_confirmation"); assert_eq!(row["iterations"], 4); assert_eq!(row["turns"], 4); assert_eq!(row["usage"]["total_tokens"], 14690); assert_eq!(row["usage"]["requests"], 8); assert_eq!(row["models"]["executor"], "qwen3.5:latest"); assert_eq!(row["provider"], "ollama"); assert_eq!(row["playbook_seed"]["entries_after"], 1337); assert!(row["sig_hash"].as_str().unwrap().len() == 16); assert!(row["truth_rule_citations"].is_array()); } #[test] fn outcomes_row_stamps_failure_polarity() { let req = req_with_spec(serde_json::json!({})); let stats = sample_stats(); let outcome = RespondOutcome::Failed { reason: "3 consecutive drifts".into(), log: vec![], }; let row = build_outcomes_row(&req, &stats, 2, false, &outcome, None); assert_eq!(row["ok"], false); assert_eq!(row["polarity"], "failure_pattern"); assert_eq!(row["error"], "3 consecutive drifts"); assert_eq!(row["fills"], serde_json::Value::Null); assert!(row["playbook_seed"].is_null()); assert_eq!(row["correction_applied"], false); assert!(row["correction_effective"].is_null()); } #[test] fn outcomes_row_marks_correction_effective_when_overseer_called_and_ok() { let req = req_with_spec(serde_json::json!({})); let stats = sample_stats(); let outcome = RespondOutcome::Ok { artifact: serde_json::json!({"fills": []}), log: vec![], }; let row = build_outcomes_row(&req, &stats, 3, true, &outcome, None); assert_eq!(row["correction_applied"], true); assert_eq!(row["correction_effective"], true); } #[test] fn outcomes_row_marks_correction_ineffective_when_overseer_called_and_failed() { let req = req_with_spec(serde_json::json!({})); let stats = sample_stats(); let outcome = RespondOutcome::Failed { reason: "still drifting after overseer".into(), log: vec![], }; let row = build_outcomes_row(&req, &stats, 3, true, &outcome, None); assert_eq!(row["correction_applied"], true); assert_eq!(row["correction_effective"], false); } // Atomic counter + PID guarantees a unique path across parallel // test invocations. Nanos-only showed 1/5 flake under `cargo // test` because SystemTime can repeat across threads that run // within sub-ns of each other. static APPEND_TEST_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); #[tokio::test] async fn append_outcomes_row_at_writes_valid_jsonl() { let seq = APPEND_TEST_SEQ.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let tmpdir = std::env::temp_dir().join(format!( "lh_outcomes_{}_{}", std::process::id(), seq, )); let path = tmpdir.join("outcomes.jsonl"); let row = serde_json::json!({"schema_version": 2, "ok": true, "test": "marker"}); append_outcomes_row_at(&path, &row).await.unwrap(); append_outcomes_row_at(&path, &row).await.unwrap(); let written = std::fs::read_to_string(&path).unwrap(); let lines: Vec<_> = written.lines().collect(); assert_eq!(lines.len(), 2); for line in lines { let parsed: serde_json::Value = serde_json::from_str(line).unwrap(); assert_eq!(parsed["test"], "marker"); } std::fs::remove_dir_all(&tmpdir).ok(); } #[test] fn trim_result_leaves_small_arrays_alone() { let r = serde_json::json!({ "rows": [1, 2, 3] }); let t = trim_result(&r); assert_eq!(t["rows"].as_array().unwrap().len(), 3); assert!(t.get("_trimmed").is_none()); } #[test] fn trim_result_caps_at_20_and_annotates() { let rows: Vec<_> = (0..100).map(serde_json::Value::from).collect(); let r = serde_json::json!({ "rows": rows, "other_field": "kept" }); let t = trim_result(&r); assert_eq!(t["rows"].as_array().unwrap().len(), 20); assert_eq!(t["_trimmed"], "80 more rows"); assert_eq!(t["other_field"], "kept"); } #[test] fn trim_result_passthrough_when_no_rows() { let r = serde_json::json!({ "answer": "42" }); let t = trim_result(&r); assert_eq!(t["answer"], "42"); } #[test] fn reviewer_prompt_omits_hard_rule_otherwise() { let req = RespondRequest { task_class: "staffing.fill".into(), operation: "fill: Welder x2 in Toledo, OH".into(), spec: serde_json::json!({"target_count": 2}), executor_model: None, reviewer_model: None, max_turns: None, }; let log = vec![LogEntry::new(1, "executor", "m", "tool_call", serde_json::json!({}))]; let p = build_reviewer_prompt(&req, &log); assert!(!p.contains("HARD RULE")); } }