root 2dbc8dbc83
Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
v1/mode: model-aware enrichment downgrade + 3 corpora + variance harness
Pass 5 (5 reps × 4 conditions × 1 file on grok-4.1-fast) showed composing
matrix corpora is anti-additive on strong models — composed lakehouse_arch
+ symbols LOST 5/5 head-to-head vs codereview_isolation (Δ −1.8 grounded
findings, p=0.031). Default flips to isolation; matrix path now auto-
downgrades when the resolved model is strong.

Mode runner:
- matrix_corpus is Vec<String> (string OR array via deserialize_string_or_vec)
- top_k=6 from each corpus, merge by score, take top 8 globally
- chunk tag prefers doc_id over source so reviewer sees [adr:009] vs [lakehouse_arch]
- is_weak_model() gate auto-downgrades codereview_lakehouse → codereview_isolation
  for strong models (default-strong; weak = :free suffix or local last-resort)
- LH_FORCE_FULL_ENRICHMENT=1 bypasses for diagnostic runs
- EnrichmentSources.downgraded_from records when the gate fires

Three corpora indexed via /vectors/index (5849 chunks total):
- lakehouse_arch_v1 — ADRs + phases + PRD + scrum spec (93 docs, 2119 chunks)
- scrum_findings_v1 — past scrum_reviews.jsonl (168 docs, 1260 chunks; EXCLUDED
  from defaults — 24% out-of-bounds line citations from cross-file drift)
- lakehouse_symbols_v1 — regex-extracted pub items + /// docs (656 docs, 2470 chunks)

Experiment infra:
- scripts/build_*_corpus.ts — re-runnable when source content changes
- scripts/mode_pass5_variance_paid.ts — N reps × M conditions on one file
- scripts/mode_pass5_summarize.ts — mean ± σ + head-to-head, parser handles
  numbered + path-with-line + path-with-symbol finding tables
- scripts/mode_compare.ts — groups by mode|corpus when sweeps span corpora
- scripts/mode_experiment.ts — default model bumped to x-ai/grok-4.1-fast,
  --corpus flag for per-call override

Decisions + open follow-ups: docs/MODE_RUNNER_TUNING_PLAN.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 17:29:17 -05:00

1042 lines
41 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Mode router — task_class → mode + model recommendation.
//!
//! HANDOVER §queued (2026-04-25): "Mode router — port LLM Team multi-model
//! patterns. Pick the right TOOL/MODE for each task class via the matrix,
//! not cascade through models."
//!
//! Two-stage architecture:
//!
//! 1. **Decision** (`POST /v1/mode`) — given `{task_class, prompt}`,
//! consult `config/modes.toml` + (future) pathway memory and return
//! `{mode, model, decision_trace}`. Pure recommendation; no execution.
//!
//! 2. **Execution** (`POST /v1/mode/execute`) — given `{mode, prompt, ...}`,
//! proxy to LLM Team UI (`localhost:5000/api/run`) which has all 25
//! mode runners implemented. As Rust-native runners land in this
//! crate, they short-circuit before the proxy.
//!
//! The split lets us A/B-test the routing logic (decision-only) without
//! committing to running every recommendation. It also keeps the pure
//! decision function simple enough to unit-test exhaustively.
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::OnceLock;
use super::V1State;
/// Validated against the LLM Team /api/run handler at
/// /root/llm_team_ui.py:10581. Kept in sync manually — adding a mode
/// here without adding it upstream returns 400 from the proxy.
///
/// Modes prefixed with the codebase name (e.g. `codereview_lakehouse`)
/// are NATIVE Rust enrichment runners — they don't proxy to LLM Team,
/// they compose the lakehouse's own context primitives (pathway memory,
/// relevance filter, matrix corpora) into a one-shot prompt for the
/// recommended model. Native modes are listed alongside upstream ones
/// so the router can pick either without callers caring.
const VALID_MODES: &[&str] = &[
"brainstorm", "pipeline", "debate", "validator", "roundrobin",
"redteam", "consensus", "codereview", "ladder", "tournament",
"evolution", "blindassembly", "staircase", "drift", "mesh",
"hallucination", "timeloop", "research", "eval", "extract",
"refine", "adaptive", "deep_analysis", "distill",
// Native runners (not in LLM Team — handled by /v1/mode/execute).
// Each is a parameterized preset of EnrichmentFlags below — designed
// as a deliberate experiment so we can read the matrix and identify
// which signals are doing real work vs adding latency for nothing.
"codereview_lakehouse", // all enrichment on (ceiling)
"codereview_null", // raw file + generic prompt (baseline)
"codereview_isolation", // file + pathway only (no matrix)
"codereview_matrix_only", // file + matrix only (no pathway)
"codereview_playbook_only", // pathway only, NO file content (lossy ceiling)
"staffing_inference_lakehouse", // staffing-domain composer (Pass 4)
];
/// Whether a mode is handled natively in this gateway vs proxied to
/// LLM Team. Drives /v1/mode/execute dispatch.
fn is_native_mode(mode: &str) -> bool {
matches!(
mode,
"codereview_lakehouse"
| "codereview_null"
| "codereview_isolation"
| "codereview_matrix_only"
| "codereview_playbook_only"
| "staffing_inference_lakehouse"
)
}
/// Per-mode enrichment knobs — each native mode is a preset over these
/// flags. Exists so the runner code is one path (less drift between
/// modes) and the comparison harness can read which signals fired.
#[derive(Debug, Clone, Copy, Serialize)]
pub struct EnrichmentFlags {
pub include_file_content: bool,
pub include_bug_fingerprints: bool,
pub include_matrix_chunks: bool,
pub use_relevance_filter: bool,
pub framing: ReviewerFraming,
}
#[derive(Debug, Clone, Copy, Serialize)]
pub enum ReviewerFraming {
Adversarial, // forensic, ranked findings + verdict (lakehouse default)
Generic, // "review this" — no codebase priors (null baseline)
Staffing, // staffing-domain coordinator framing (Pass 4)
}
fn flags_for_mode(mode: &str) -> EnrichmentFlags {
match mode {
"codereview_null" => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: false,
include_matrix_chunks: false,
use_relevance_filter: false,
framing: ReviewerFraming::Generic,
},
"codereview_isolation" => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: false,
use_relevance_filter: false,
framing: ReviewerFraming::Adversarial,
},
"codereview_matrix_only" => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: false,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::Adversarial,
},
"codereview_playbook_only" => EnrichmentFlags {
include_file_content: false, // lossy on purpose — measures pathway-alone ceiling
include_bug_fingerprints: true,
include_matrix_chunks: false,
use_relevance_filter: false,
framing: ReviewerFraming::Adversarial,
},
"staffing_inference_lakehouse" => EnrichmentFlags {
// Staffing reuses the same composer architecture but with
// domain-specific framing. file_content here = the request
// payload (e.g. "fill 2 welders in Toledo OH"), bug_fingerprints
// surface prior playbook patterns from this geo+role, matrix
// pulls candidate workers + city/state demand chunks.
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::Staffing,
},
// Default (codereview_lakehouse): everything on.
_ => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::Adversarial,
},
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct TaskClassEntry {
pub name: String,
pub preferred_mode: String,
#[serde(default)]
pub fallback_modes: Vec<String>,
pub default_model: String,
/// One or more corpora the mode runner queries (top-k per corpus,
/// merged by score before the relevance filter). Accepts a single
/// string or an array in modes.toml — `deserialize_string_or_vec`
/// handles both shapes for backward compat.
#[serde(default, deserialize_with = "deserialize_string_or_vec")]
pub matrix_corpus: Vec<String>,
}
/// Accept `key = "x"` or `key = ["x", "y"]` in TOML/JSON. Empty string or
/// missing field → empty vec.
fn deserialize_string_or_vec<'de, D>(d: D) -> Result<Vec<String>, D::Error>
where D: serde::Deserializer<'de> {
use serde::de::Error;
let v = serde_json::Value::deserialize(d).map_err(D::Error::custom)?;
match v {
serde_json::Value::Null => Ok(vec![]),
serde_json::Value::String(s) if s.is_empty() => Ok(vec![]),
serde_json::Value::String(s) => Ok(vec![s]),
serde_json::Value::Array(a) => a
.into_iter()
.map(|x| x.as_str().map(String::from)
.ok_or_else(|| D::Error::custom("matrix_corpus array must contain strings")))
.collect(),
other => Err(D::Error::custom(format!("matrix_corpus must be string or array, got {other:?}"))),
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct DefaultEntry {
pub preferred_mode: String,
#[serde(default)]
pub fallback_modes: Vec<String>,
pub default_model: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ModeRouterConfig {
#[serde(default, rename = "task_class")]
pub task_classes: Vec<TaskClassEntry>,
pub default: DefaultEntry,
}
impl ModeRouterConfig {
pub fn lookup(&self, task_class: &str) -> Option<&TaskClassEntry> {
self.task_classes.iter().find(|t| t.name == task_class)
}
}
/// Process-global config cache. Loaded on first request from
/// `config/modes.toml` (or `LH_MODES_CONFIG`). If parsing fails the
/// router falls back to a hard-coded default so a malformed config can
/// never take the gateway down.
static CONFIG: OnceLock<ModeRouterConfig> = OnceLock::new();
fn load_config() -> &'static ModeRouterConfig {
CONFIG.get_or_init(|| {
let path = std::env::var("LH_MODES_CONFIG")
.unwrap_or_else(|_| "config/modes.toml".to_string());
match std::fs::read_to_string(&path) {
Ok(s) => match toml::from_str::<ModeRouterConfig>(&s) {
Ok(c) => {
tracing::info!(target: "v1::mode", "loaded {} task classes from {}", c.task_classes.len(), path);
c
}
Err(e) => {
tracing::warn!(target: "v1::mode", "parse {} failed ({}), using built-in default", path, e);
fallback_config()
}
},
Err(e) => {
tracing::warn!(target: "v1::mode", "read {} failed ({}), using built-in default", path, e);
fallback_config()
}
}
})
}
fn fallback_config() -> ModeRouterConfig {
ModeRouterConfig {
task_classes: vec![],
default: DefaultEntry {
preferred_mode: "pipeline".into(),
fallback_modes: vec!["consensus".into(), "ladder".into()],
default_model: "qwen3.5:latest".into(),
},
}
}
#[derive(Deserialize, Debug)]
pub struct RouteRequest {
pub task_class: String,
/// Reserved for future matrix-informed routing (cosine against
/// matrix_corpus + pathway memory). Currently parsed but unused by
/// the decision logic — kept on the API so callers can land their
/// integration without waiting on the matrix-signal hookup.
#[serde(default)]
#[allow(dead_code)]
pub prompt: Option<String>,
/// Caller-supplied override. When set, the router honors it (with a
/// validation check against VALID_MODES) and skips the matrix
/// signal — useful for testing a specific mode in isolation.
#[serde(default)]
pub force_mode: Option<String>,
}
#[derive(Serialize, Debug)]
pub struct DecisionTrace {
pub task_class_matched: bool,
pub source: &'static str, // "config" | "default" | "force_mode"
pub fallbacks: Vec<String>,
pub matrix_corpus: Vec<String>,
pub notes: Vec<String>,
}
#[derive(Serialize, Debug)]
pub struct RouteDecision {
pub mode: String,
pub model: String,
pub decision: DecisionTrace,
}
/// `POST /v1/mode` — pure recommendation. Returns a `RouteDecision`
/// with the chosen mode + model + reasoning trail. Caller is then
/// responsible for invoking the mode (either via `/v1/mode/execute`
/// proxy or directly against the LLM Team `/api/run`).
pub async fn route(
State(_state): State<V1State>,
Json(req): Json<RouteRequest>,
) -> impl IntoResponse {
let cfg = load_config();
let mut notes = Vec::new();
// force_mode short-circuits everything else but still validates.
if let Some(forced) = req.force_mode.as_deref() {
if !VALID_MODES.contains(&forced) {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("Unknown mode: {}", forced),
"valid_modes": VALID_MODES,
})),
));
}
let model = cfg
.lookup(&req.task_class)
.map(|t| t.default_model.clone())
.unwrap_or_else(|| cfg.default.default_model.clone());
notes.push("force_mode override accepted".into());
return Ok(Json(RouteDecision {
mode: forced.to_string(),
model,
decision: DecisionTrace {
task_class_matched: cfg.lookup(&req.task_class).is_some(),
source: "force_mode",
fallbacks: vec![],
matrix_corpus: vec![],
notes,
},
}));
}
// Lookup task class; fall through to default if absent.
if let Some(tc) = cfg.lookup(&req.task_class) {
notes.push(format!(
"task_class '{}' matched, preferred mode '{}'",
tc.name, tc.preferred_mode
));
if !VALID_MODES.contains(&tc.preferred_mode.as_str()) {
notes.push(format!(
"preferred '{}' not in VALID_MODES — falling through to first valid fallback",
tc.preferred_mode
));
for fb in &tc.fallback_modes {
if VALID_MODES.contains(&fb.as_str()) {
notes.push(format!("fallback '{}' selected", fb));
return Ok(Json(RouteDecision {
mode: fb.clone(),
model: tc.default_model.clone(),
decision: DecisionTrace {
task_class_matched: true,
source: "config",
fallbacks: tc.fallback_modes.clone(),
matrix_corpus: tc.matrix_corpus.clone(),
notes,
},
}));
}
}
// No fallback was valid either — return 422 so the caller
// knows the config is broken for this task class.
return Err((
StatusCode::UNPROCESSABLE_ENTITY,
Json(serde_json::json!({
"error": format!(
"task_class '{}' has no valid mode (preferred='{}', fallbacks={:?})",
req.task_class, tc.preferred_mode, tc.fallback_modes
),
"valid_modes": VALID_MODES,
})),
));
}
return Ok(Json(RouteDecision {
mode: tc.preferred_mode.clone(),
model: tc.default_model.clone(),
decision: DecisionTrace {
task_class_matched: true,
source: "config",
fallbacks: tc.fallback_modes.clone(),
matrix_corpus: tc.matrix_corpus.clone(),
notes,
},
}));
}
notes.push(format!(
"task_class '{}' not in config, using default",
req.task_class
));
Ok(Json(RouteDecision {
mode: cfg.default.preferred_mode.clone(),
model: cfg.default.default_model.clone(),
decision: DecisionTrace {
task_class_matched: false,
source: "default",
fallbacks: cfg.default.fallback_modes.clone(),
matrix_corpus: vec![],
notes,
},
}))
}
/// `GET /v1/mode/list` — operator-facing introspection. Returns the
/// current registry table + valid modes so a UI can render the matrix
/// without re-parsing the TOML.
pub async fn list(State(_state): State<V1State>) -> impl IntoResponse {
let cfg = load_config();
let task_map: HashMap<&str, serde_json::Value> = cfg
.task_classes
.iter()
.map(|t| {
(
t.name.as_str(),
serde_json::json!({
"preferred_mode": t.preferred_mode,
"fallback_modes": t.fallback_modes,
"default_model": t.default_model,
"matrix_corpus": t.matrix_corpus,
}),
)
})
.collect();
Json(serde_json::json!({
"task_classes": task_map,
"default": {
"preferred_mode": cfg.default.preferred_mode,
"fallback_modes": cfg.default.fallback_modes,
"default_model": cfg.default.default_model,
},
"valid_modes": VALID_MODES,
}))
}
// ─── Native runner: codereview_lakehouse ───
//
// Enrichment composer for the lakehouse-specific code review mode.
// Pulls every context primitive the gateway exposes — focus file
// content, pathway-memory bug fingerprints, matrix corpus chunks
// (post relevance filter) — bundles them into ONE prompt designed
// for one-shot success against qwen3-coder:480b. The whole point of
// the mode is that the model gets it right the first time because
// the prompt was molded for THIS file in THIS codebase.
//
// Network composition only — no Rust port of the relevance scorer.
// Every primitive is already an HTTP endpoint; the runner just stitches.
#[derive(Deserialize, Debug)]
pub struct ExecuteRequest {
pub task_class: String,
pub file_path: String,
/// If absent, the runner reads the file from disk relative to the
/// gateway working directory. Useful for test harnesses that don't
/// want to rely on filesystem state.
#[serde(default)]
pub file_content: Option<String>,
/// Override the resolved mode — same semantics as RouteRequest.
#[serde(default)]
pub force_mode: Option<String>,
/// Override the resolved model. Defaults to the task_class's
/// default_model from modes.toml.
#[serde(default)]
pub force_model: Option<String>,
/// Reserved for ad-hoc questions about the file. If omitted, the
/// runner uses its built-in forensic-review framing.
#[serde(default)]
pub user_question: Option<String>,
/// Override the matrix corpus (or corpora) the runner queries.
/// Accepts a single string or array — same semantics as
/// modes.toml's `matrix_corpus`. Empty/missing → use the task
/// class default. Multi-corpus path: top-k retrieved from each,
/// merged and re-sorted by score before the relevance filter.
#[serde(default, deserialize_with = "deserialize_string_or_vec")]
pub force_matrix_corpus: Vec<String>,
/// Override the relevance filter threshold (default 0.3). Setting
/// to 0 keeps every chunk; raising rejects more aggressively. Used
/// to find the threshold sweet spot per task class.
#[serde(default)]
pub force_relevance_threshold: Option<f64>,
/// Override the LLM temperature (default 0.1). Used by Pass 3
/// variance testing to measure run-to-run stability.
#[serde(default)]
pub force_temperature: Option<f64>,
}
#[derive(Serialize, Debug, Default)]
pub struct EnrichmentSources {
pub focus_file_bytes: usize,
pub bug_fingerprints_count: usize,
pub matrix_chunks_kept: usize,
pub matrix_chunks_dropped: usize,
pub matrix_corpus: Vec<String>,
pub relevance_filter_used: bool,
/// Set when the model-aware downgrade fires — records the mode the
/// caller was originally routed to before is_weak_model() flipped
/// it. None means no downgrade happened.
#[serde(skip_serializing_if = "Option::is_none")]
pub downgraded_from: Option<String>,
pub enrichment_warnings: Vec<String>,
/// Which enrichment knobs the runner used for this mode. Lets
/// the comparison aggregator group runs by signal-set.
pub flags: Option<EnrichmentFlags>,
}
#[derive(Serialize, Debug)]
pub struct ExecuteResponse {
pub mode: String,
pub model: String,
pub task_class: String,
pub enriched_prompt_chars: usize,
pub enriched_prompt_preview: String,
pub sources: EnrichmentSources,
pub response: String,
pub latency_ms: u64,
}
const FRAMING_ADVERSARIAL: &str = "You are an adversarial code reviewer for the Lakehouse codebase \
(Rust + DataFusion + Parquet + object storage). Audit the focus file forensically. \
Output a markdown report with: (1) one-line verdict (pass | needs_patch | fail), (2) ranked \
findings table with file:line, evidence, severity, confidence percent, (3) concrete patch \
suggestions, (4) PRD/ADR refs where applicable. Be precise — assume nothing works until \
proven. Do NOT hedge.";
const FRAMING_GENERIC: &str = "You are a code reviewer. Read the file below and produce a \
markdown review with findings.";
const FRAMING_STAFFING: &str = "You are a senior staffing coordinator for a light-industrial \
labor agency. You receive a fill request (role × count × city × deadline) and have access \
to historical playbook patterns from prior fills in this geo, plus a corpus of candidate \
workers + demand signals. Produce a markdown plan with: (1) one-line verdict (fillable | \
contingent | unfillable), (2) ranked candidate list with name, city, role, distance, prior \
fill citations from the playbook, (3) risks (double-booking, eligibility gaps, geo stretch) \
with severity + confidence percent, (4) playbook reference IDs you used. Be precise — only \
recommend candidates whose names appear in the matrix data; do NOT fabricate workers.";
fn framing_text(f: ReviewerFraming) -> &'static str {
match f {
ReviewerFraming::Adversarial => FRAMING_ADVERSARIAL,
ReviewerFraming::Generic => FRAMING_GENERIC,
ReviewerFraming::Staffing => FRAMING_STAFFING,
}
}
/// Strong-model heuristic for the model-aware enrichment downgrade.
///
/// Pass 5 variance test (2026-04-26, see docs/MODE_RUNNER_TUNING_PLAN.md)
/// proved that on `x-ai/grok-4.1-fast`, composing matrix corpora into the
/// `codereview_lakehouse` prompt LOST 5/5 head-to-head reps against the
/// matrix-free `codereview_isolation` mode. Strong models have enough
/// native capacity that bug fingerprints + adversarial framing + file
/// content carry them; matrix chunks displace depth-of-analysis.
///
/// We default to "strong" (downgrade matrix off) because most production
/// traffic uses paid models. The explicit `weak` predicate keeps the
/// list small and easy to extend — anything matching `:free` (OpenRouter
/// free tier) or the local last-resort qwen3.5 stays on the full
/// `codereview_lakehouse` path where matrix demonstrably helped during
/// the 2026-04-26 free-tier bake-off.
fn is_weak_model(model: &str) -> bool {
if model.ends_with(":free") || model.contains(":free/") {
return true;
}
// Local last-resort rung from the scrum ladder. Other local models
// can be added here as we test them.
matches!(model, "qwen3.5:latest" | "qwen3:latest")
}
pub async fn execute(
State(_state): State<V1State>,
Json(req): Json<ExecuteRequest>,
) -> impl IntoResponse {
let cfg = load_config();
let t0 = std::time::Instant::now();
// Resolve mode + model (mirrors /v1/mode logic).
let tc = cfg.lookup(&req.task_class);
let mode = req
.force_mode
.clone()
.or_else(|| tc.map(|t| t.preferred_mode.clone()))
.unwrap_or_else(|| cfg.default.preferred_mode.clone());
let model = req
.force_model
.clone()
.or_else(|| tc.map(|t| t.default_model.clone()))
.unwrap_or_else(|| cfg.default.default_model.clone());
let matrix_corpus: Vec<String> = tc
.map(|t| t.matrix_corpus.clone())
.unwrap_or_default();
// Model-aware enrichment downgrade (2026-04-26 pass 5 finding).
// If a caller resolves `codereview_lakehouse` against a strong
// model, downgrade to `codereview_isolation` so we don't pollute
// the prompt with matrix chunks the model would do better without.
// `LH_FORCE_FULL_ENRICHMENT=1` bypasses for diagnostic runs.
// `force_mode` from the caller is treated as opt-in to the chosen
// mode and skips the downgrade — experiments need to inspect exact
// mode behavior on whatever model they pass.
let force_full = std::env::var("LH_FORCE_FULL_ENRICHMENT")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let downgraded_from = if mode == "codereview_lakehouse"
&& req.force_mode.is_none()
&& !force_full
&& !is_weak_model(&model)
{
tracing::info!(
target: "v1::mode",
"downgrade codereview_lakehouse -> codereview_isolation for strong model {}",
model
);
Some(mode.clone())
} else {
None
};
let mode = if downgraded_from.is_some() {
"codereview_isolation".to_string()
} else {
mode
};
if !is_native_mode(&mode) {
// Native execute is the only path implemented; LLM-Team proxy
// is queued behind this. Surface a clear 501 so callers know.
return Err((
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": format!(
"mode '{}' has no native runner — proxy to /api/run not yet wired",
mode
),
"hint": "use force_mode=codereview_lakehouse, or call LLM Team /api/run directly until proxy lands",
})),
));
}
// Caller can override the matrix corpus per-call (Pass 2 corpus
// tightening). Empty force_matrix_corpus falls back to modes.toml.
let matrix_corpus: Vec<String> = if req.force_matrix_corpus.is_empty() {
matrix_corpus
} else {
req.force_matrix_corpus.clone()
};
let flags = flags_for_mode(&mode);
let mut sources = EnrichmentSources {
matrix_corpus: matrix_corpus.clone(),
flags: Some(flags),
downgraded_from: downgraded_from.clone(),
..Default::default()
};
// Step 1: focus file content (always read — even modes that don't
// include it in the prompt may need it for citation/sources).
let file_content = match req.file_content.clone() {
Some(c) => c,
None => match std::fs::read_to_string(&req.file_path) {
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("read {} failed: {}", req.file_path, e),
})),
));
}
},
};
sources.focus_file_bytes = file_content.len();
// Local HTTP client for composing internal calls. Short timeout
// because every endpoint is on localhost; the LLM call uses its
// own longer timeout further down.
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(8))
.build()
{
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("client build: {e}")})),
));
}
};
// Step 2: pathway memory bug fingerprints for this file area.
let mut bug_preamble = String::new();
if flags.include_bug_fingerprints {
let body = serde_json::json!({
"task_class": req.task_class,
"file_path": req.file_path,
"signal_class": null,
"limit": 10,
});
match client
.post("http://localhost:3100/vectors/pathway/bug_fingerprints")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
if let Ok(j) = r.json::<serde_json::Value>().await {
let fps = j.get("fingerprints").and_then(|v| v.as_array()).cloned().unwrap_or_default();
sources.bug_fingerprints_count = fps.len();
if !fps.is_empty() {
bug_preamble.push_str(
"📚 PATHWAY MEMORY — BUGS PREVIOUSLY FOUND IN THIS FILE AREA:\n",
);
for fp in &fps {
let pk = fp.get("pattern_key").and_then(|v| v.as_str()).unwrap_or("?");
let occ = fp.get("occurrences").and_then(|v| v.as_u64()).unwrap_or(0);
let ex = fp.get("example").and_then(|v| v.as_str()).unwrap_or("");
bug_preamble.push_str(&format!(
"{} (×{}) e.g. `{}`\n",
pk, occ, ex
));
}
bug_preamble.push_str("Watch for these patterns recurring.\n\n");
}
}
}
Ok(r) => sources
.enrichment_warnings
.push(format!("bug_fingerprints HTTP {}", r.status())),
Err(e) => sources
.enrichment_warnings
.push(format!("bug_fingerprints err: {e}")),
}
}
// Step 3: matrix corpus search. Multi-corpus path: query top_k from
// each, merge, re-sort by score, take top 8 overall before the
// relevance filter — orthogonal corpora (e.g. arch + symbols) get
// composed without one swamping the other on chunk count alone.
let mut raw_chunks: Vec<serde_json::Value> = vec![];
if flags.include_matrix_chunks && !matrix_corpus.is_empty() {
let query_str = format!(
"{} {}\n{}",
req.task_class,
req.file_path,
&file_content[..file_content.len().min(500)]
);
let per_corpus_k = if matrix_corpus.len() == 1 { 8 } else { 6 };
for corpus in &matrix_corpus {
let body = serde_json::json!({
"index_name": corpus,
"query": query_str,
"top_k": per_corpus_k,
});
match client
.post("http://localhost:3100/vectors/search")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
if let Ok(j) = r.json::<serde_json::Value>().await {
if let Some(arr) = j.get("results").and_then(|v| v.as_array()) {
for mut c in arr.iter().cloned() {
// Tag the corpus origin on each chunk so
// dropped/kept telemetry can attribute
// signal back to its source corpus.
if let serde_json::Value::Object(ref mut obj) = c {
obj.insert(
"corpus".to_string(),
serde_json::Value::String(corpus.clone()),
);
}
raw_chunks.push(c);
}
}
}
}
Ok(r) => sources
.enrichment_warnings
.push(format!("matrix_search[{}] HTTP {}", corpus, r.status())),
Err(e) => sources
.enrichment_warnings
.push(format!("matrix_search[{}] err: {e}", corpus)),
}
}
// Sort merged chunks by score desc and take the global top 8.
raw_chunks.sort_by(|a, b| {
let sa = a.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
let sb = b.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
sb.partial_cmp(&sa).unwrap_or(std::cmp::Ordering::Equal)
});
raw_chunks.truncate(8);
}
// Step 4: relevance filter — drop adjacency pollution.
let kept_chunks: Vec<serde_json::Value> = if flags.use_relevance_filter && !raw_chunks.is_empty() {
let chunks_for_filter: Vec<serde_json::Value> = raw_chunks
.iter()
.map(|c| {
serde_json::json!({
"source": c.get("source").cloned().unwrap_or_default(),
"doc_id": c.get("doc_id").cloned().unwrap_or_default(),
"text": c.get("chunk_text").or_else(|| c.get("text")).cloned().unwrap_or_default(),
"score": c.get("score").cloned().unwrap_or(serde_json::json!(0.0)),
})
})
.collect();
let body = serde_json::json!({
"focus_file": { "path": req.file_path, "content": file_content },
"chunks": chunks_for_filter,
"threshold": req.force_relevance_threshold.unwrap_or(0.3),
});
match client
.post("http://localhost:3800/relevance")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
sources.relevance_filter_used = true;
if let Ok(j) = r.json::<serde_json::Value>().await {
let kept = j.get("kept").and_then(|v| v.as_array()).cloned().unwrap_or_default();
let dropped = j.get("dropped").and_then(|v| v.as_array()).cloned().unwrap_or_default();
sources.matrix_chunks_kept = kept.len();
sources.matrix_chunks_dropped = dropped.len();
kept
} else {
raw_chunks
}
}
_ => {
sources
.enrichment_warnings
.push("relevance filter unreachable, using raw chunks".to_string());
raw_chunks
}
}
} else if !flags.use_relevance_filter && !raw_chunks.is_empty() {
// Take raw matrix chunks unfiltered — `codereview_matrix_only`
// turns the filter off intentionally to measure how much
// pollution the filter is actually catching.
sources.matrix_chunks_kept = raw_chunks.len();
raw_chunks.clone()
} else {
vec![]
};
// Step 5: assemble the prompt — strictly per-flag so we don't
// leak signals across modes.
let mut user_prompt = String::new();
if flags.include_bug_fingerprints {
user_prompt.push_str(&bug_preamble);
}
if flags.include_matrix_chunks && !kept_chunks.is_empty() {
user_prompt.push_str("📁 RELATED CONTEXT (matrix chunks):\n");
for c in &kept_chunks {
// Prefer doc_id for the tag — corpus builders encode origin
// in doc_id (e.g. `adr:017`, `phase:19`) so the reviewer sees
// useful provenance instead of a generic source label.
let tag = c.get("doc_id").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.or_else(|| c.get("source").and_then(|v| v.as_str()))
.unwrap_or("?");
let txt = c.get("text").or_else(|| c.get("chunk_text"))
.and_then(|v| v.as_str()).unwrap_or("");
user_prompt.push_str(&format!(" [{}] {}\n", tag, &txt[..txt.len().min(280)]));
}
user_prompt.push_str("\n");
}
if flags.include_file_content {
user_prompt.push_str(&format!("FILE: {}\n```rust\n{}\n```\n", req.file_path, file_content));
} else {
// Lossy mode — playbook_only intentionally omits file content
// to measure how much value pathway memory carries on its own.
user_prompt.push_str(&format!(
"FILE PATH (content omitted): {}\nFile size: {} bytes\n",
req.file_path, file_content.len()
));
}
if let Some(q) = &req.user_question {
user_prompt.push_str(&format!("\nQUESTION: {}\n", q));
} else {
user_prompt.push_str("\nProduce the review now.\n");
}
let enriched_chars = user_prompt.len();
let preview: String = user_prompt.chars().take(800).collect();
// Step 6: ONE call to /v1/chat. The whole point of the mode is
// that this single call gets it right because the prompt was
// molded for THIS file. No retry ladder.
//
// Provider selection mirrors routing.toml's broad strokes — Phase 40
// routing engine isn't auto-wired into /v1/chat yet, so the runner
// hints explicitly. Cloud-only models (kimi*, qwen3-coder*,
// deepseek*, mistral-large*, gpt-oss:120b, qwen3.5:397b) → cloud;
// smaller local-resident models → local ollama default.
let provider_hint = if model.contains('/') || model.contains(":free") {
// OpenRouter convention: vendor/model[:tag] (e.g.
// "openai/gpt-oss-120b:free", "google/gemma-3-27b-it:free").
"openrouter"
} else if model.starts_with("kimi-")
|| model.starts_with("qwen3-coder")
|| model.starts_with("deepseek-v")
|| model.starts_with("mistral-large")
|| model == "gpt-oss:120b"
|| model == "qwen3.5:397b"
{
"ollama_cloud"
} else {
"ollama"
};
let chat_body = serde_json::json!({
"model": model,
"provider": provider_hint,
"messages": [
{ "role": "system", "content": framing_text(flags.framing) },
{ "role": "user", "content": user_prompt },
],
"temperature": req.force_temperature.unwrap_or(0.1),
"max_tokens": 4096,
});
let chat_client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(180))
.build()
{
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("chat client build: {e}")})),
));
}
};
let response_text = match chat_client
.post("http://localhost:3100/v1/chat")
.json(&chat_body)
.send()
.await
{
Ok(r) if r.status().is_success() => match r.json::<serde_json::Value>().await {
Ok(j) => j
.get("choices")
.and_then(|c| c.as_array())
.and_then(|a| a.first())
.and_then(|c| c.get("message"))
.and_then(|m| m.get("content"))
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string(),
Err(e) => {
return Err((
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": format!("/v1/chat parse: {e}")})),
));
}
},
Ok(r) => {
let status = r.status();
let body = r.text().await.unwrap_or_default();
return Err((
status,
Json(serde_json::json!({"error": "/v1/chat upstream error", "body": body})),
));
}
Err(e) => {
return Err((
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": format!("/v1/chat send: {e}")})),
));
}
};
let resp = ExecuteResponse {
mode: mode.clone(),
model: model.clone(),
task_class: req.task_class.clone(),
enriched_prompt_chars: enriched_chars,
enriched_prompt_preview: preview,
sources,
response: response_text,
latency_ms: t0.elapsed().as_millis() as u64,
};
// Append to mode_experiments.jsonl so the comparison aggregator
// can read the matrix later. Best-effort — write failure must not
// fail the request. Skips if LH_MODE_LOG_OFF=1.
if std::env::var("LH_MODE_LOG_OFF").as_deref() != Ok("1") {
let log_path = std::env::var("LH_MODE_LOG_PATH")
.unwrap_or_else(|_| "data/_kb/mode_experiments.jsonl".to_string());
let row = serde_json::json!({
"ts": chrono::Utc::now().to_rfc3339(),
"mode": resp.mode,
"model": resp.model,
"task_class": resp.task_class,
"file_path": req.file_path,
"enriched_prompt_chars": resp.enriched_prompt_chars,
"response_chars": resp.response.len(),
"latency_ms": resp.latency_ms,
"sources": resp.sources,
"response": resp.response,
});
if let Some(parent) = std::path::Path::new(&log_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&log_path) {
use std::io::Write;
let _ = writeln!(f, "{}", row);
}
}
Ok(Json(resp))
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg_for_test() -> ModeRouterConfig {
ModeRouterConfig {
task_classes: vec![
TaskClassEntry {
name: "scrum_review".into(),
preferred_mode: "codereview".into(),
fallback_modes: vec!["consensus".into()],
default_model: "qwen3-coder:480b".into(),
matrix_corpus: Some("distilled_procedural_v1".into()),
},
TaskClassEntry {
name: "broken".into(),
preferred_mode: "nonsense_mode".into(),
fallback_modes: vec!["consensus".into()],
default_model: "x".into(),
matrix_corpus: None,
},
],
default: DefaultEntry {
preferred_mode: "pipeline".into(),
fallback_modes: vec![],
default_model: "qwen3.5:latest".into(),
},
}
}
#[test]
fn lookup_finds_matching_task_class() {
let cfg = cfg_for_test();
assert_eq!(cfg.lookup("scrum_review").unwrap().preferred_mode, "codereview");
assert!(cfg.lookup("unknown").is_none());
}
#[test]
fn valid_modes_contains_known_runners() {
assert!(VALID_MODES.contains(&"extract"));
assert!(VALID_MODES.contains(&"codereview"));
assert!(VALID_MODES.contains(&"deep_analysis"));
assert!(!VALID_MODES.contains(&"made_up"));
}
#[test]
fn fallback_path_is_well_defined() {
let cfg = cfg_for_test();
let tc = cfg.lookup("broken").unwrap();
// Preferred is invalid; first valid fallback should be 'consensus'.
assert!(!VALID_MODES.contains(&tc.preferred_mode.as_str()));
assert!(VALID_MODES.contains(&tc.fallback_modes[0].as_str()));
}
}