root d77622fc6b distillation: fix 7 grounding bugs found by Kimi audit
Kimi For Coding (api.kimi.com, kimi-for-coding) ran a forensic audit on
distillation v1.0.0 with full file content. 7/7 flags verified real on
grep. Substrate now matches what v1.0.0 claimed: deterministic, no
schema bypasses, Rust tests compile.

Fixes:
- mode.rs:1035,1042  matrix_corpus Some/None -> vec![..]/vec![]; cargo
                     check --tests now compiles (was silently broken;
                     only bun tests were running)
- scorer.ts:30       SCORER_VERSION env override removed - identical
                     input now produces identical version stamp, not
                     env-dependent drift
- transforms.ts:181  auto_apply wall-clock fallback (new Date()) ->
                     deterministic recorded_at fallback
- replay.ts:378      recorded_run_id Date.now() -> sha256(recorded_at);
                     replay rows now reproducible given recorded_at
- receipts.ts:454,495  input_hash_match hardcoded true was misleading
                       telemetry; bumped DRIFT_REPORT_SCHEMA_VERSION 1->2,
                       field is now boolean|null with honest null when
                       not computed at this layer
- score_runs.ts:89-100,159  dedup keyed only on sig_hash made
                            scorer-version bumps invisible. Composite
                            sig_hash:scorer_version forces re-scoring
- export_sft.ts:126  (ev as any).contractor bypass emitted "<contractor>"
                     placeholder for every contract_analyses SFT row.
                     Added typed EvidenceRecord.metadata bucket;
                     transforms.ts populates metadata.contractor;
                     exporter reads typed value

Verification (all green):
  cargo check -p gateway --tests   compiles
  bun test tests/distillation/     145 pass / 0 fail
  bun acceptance                   22/22 invariants
  bun audit-full                   16/16 required checks

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 05:34:31 -05:00

1077 lines
43 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Mode router — task_class → mode + model recommendation.
//!
//! HANDOVER §queued (2026-04-25): "Mode router — port LLM Team multi-model
//! patterns. Pick the right TOOL/MODE for each task class via the matrix,
//! not cascade through models."
//!
//! Two-stage architecture:
//!
//! 1. **Decision** (`POST /v1/mode`) — given `{task_class, prompt}`,
//! consult `config/modes.toml` + (future) pathway memory and return
//! `{mode, model, decision_trace}`. Pure recommendation; no execution.
//!
//! 2. **Execution** (`POST /v1/mode/execute`) — given `{mode, prompt, ...}`,
//! proxy to LLM Team UI (`localhost:5000/api/run`) which has all 25
//! mode runners implemented. As Rust-native runners land in this
//! crate, they short-circuit before the proxy.
//!
//! The split lets us A/B-test the routing logic (decision-only) without
//! committing to running every recommendation. It also keeps the pure
//! decision function simple enough to unit-test exhaustively.
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::OnceLock;
use super::V1State;
/// Validated against the LLM Team /api/run handler at
/// /root/llm_team_ui.py:10581. Kept in sync manually — adding a mode
/// here without adding it upstream returns 400 from the proxy.
///
/// Modes prefixed with the codebase name (e.g. `codereview_lakehouse`)
/// are NATIVE Rust enrichment runners — they don't proxy to LLM Team,
/// they compose the lakehouse's own context primitives (pathway memory,
/// relevance filter, matrix corpora) into a one-shot prompt for the
/// recommended model. Native modes are listed alongside upstream ones
/// so the router can pick either without callers caring.
const VALID_MODES: &[&str] = &[
"brainstorm", "pipeline", "debate", "validator", "roundrobin",
"redteam", "consensus", "codereview", "ladder", "tournament",
"evolution", "blindassembly", "staircase", "drift", "mesh",
"hallucination", "timeloop", "research", "eval", "extract",
"refine", "adaptive", "deep_analysis", "distill",
// Native runners (not in LLM Team — handled by /v1/mode/execute).
// Each is a parameterized preset of EnrichmentFlags below — designed
// as a deliberate experiment so we can read the matrix and identify
// which signals are doing real work vs adding latency for nothing.
"codereview_lakehouse", // all enrichment on (ceiling)
"codereview_null", // raw file + generic prompt (baseline)
"codereview_isolation", // file + pathway only (no matrix)
"codereview_matrix_only", // file + matrix only (no pathway)
"codereview_playbook_only", // pathway only, NO file content (lossy ceiling)
"staffing_inference_lakehouse", // staffing-domain composer (Pass 4)
"pr_audit", // PR-wide claim-vs-diff verifier (auditor)
];
/// Whether a mode is handled natively in this gateway vs proxied to
/// LLM Team. Drives /v1/mode/execute dispatch.
fn is_native_mode(mode: &str) -> bool {
matches!(
mode,
"codereview_lakehouse"
| "codereview_null"
| "codereview_isolation"
| "codereview_matrix_only"
| "codereview_playbook_only"
| "staffing_inference_lakehouse"
| "pr_audit"
)
}
/// Per-mode enrichment knobs — each native mode is a preset over these
/// flags. Exists so the runner code is one path (less drift between
/// modes) and the comparison harness can read which signals fired.
#[derive(Debug, Clone, Copy, Serialize)]
pub struct EnrichmentFlags {
pub include_file_content: bool,
pub include_bug_fingerprints: bool,
pub include_matrix_chunks: bool,
pub use_relevance_filter: bool,
pub framing: ReviewerFraming,
}
#[derive(Debug, Clone, Copy, Serialize)]
pub enum ReviewerFraming {
Adversarial, // forensic, ranked findings + verdict (lakehouse default)
Generic, // "review this" — no codebase priors (null baseline)
Staffing, // staffing-domain coordinator framing (Pass 4)
PrAudit, // PR-wide claim verification — JSON-shaped {claim_verdicts}
}
fn flags_for_mode(mode: &str) -> EnrichmentFlags {
match mode {
"codereview_null" => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: false,
include_matrix_chunks: false,
use_relevance_filter: false,
framing: ReviewerFraming::Generic,
},
"codereview_isolation" => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: false,
use_relevance_filter: false,
framing: ReviewerFraming::Adversarial,
},
"codereview_matrix_only" => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: false,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::Adversarial,
},
"codereview_playbook_only" => EnrichmentFlags {
include_file_content: false, // lossy on purpose — measures pathway-alone ceiling
include_bug_fingerprints: true,
include_matrix_chunks: false,
use_relevance_filter: false,
framing: ReviewerFraming::Adversarial,
},
"staffing_inference_lakehouse" => EnrichmentFlags {
// Staffing reuses the same composer architecture but with
// domain-specific framing. file_content here = the request
// payload (e.g. "fill 2 welders in Toledo OH"), bug_fingerprints
// surface prior playbook patterns from this geo+role, matrix
// pulls candidate workers + city/state demand chunks.
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::Staffing,
},
"pr_audit" => EnrichmentFlags {
// PR-wide claim verification. file_content = the diff text
// (or curated scratchpad for huge PRs — auditor handles the
// tree-split BEFORE calling). bug_fingerprints surface
// prior PR-level patterns. matrix corpus pulls
// lakehouse_answers_v1 — prior accepted scrum reviews +
// observer escalations — so the reviewer sees how similar
// claims were resolved before. relevance filter on to drop
// adjacency pollution from the answer corpus.
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::PrAudit,
},
// Default (codereview_lakehouse): everything on.
_ => EnrichmentFlags {
include_file_content: true,
include_bug_fingerprints: true,
include_matrix_chunks: true,
use_relevance_filter: true,
framing: ReviewerFraming::Adversarial,
},
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct TaskClassEntry {
pub name: String,
pub preferred_mode: String,
#[serde(default)]
pub fallback_modes: Vec<String>,
pub default_model: String,
/// One or more corpora the mode runner queries (top-k per corpus,
/// merged by score before the relevance filter). Accepts a single
/// string or an array in modes.toml — `deserialize_string_or_vec`
/// handles both shapes for backward compat.
#[serde(default, deserialize_with = "deserialize_string_or_vec")]
pub matrix_corpus: Vec<String>,
}
/// Accept `key = "x"` or `key = ["x", "y"]` in TOML/JSON. Empty string or
/// missing field → empty vec.
fn deserialize_string_or_vec<'de, D>(d: D) -> Result<Vec<String>, D::Error>
where D: serde::Deserializer<'de> {
use serde::de::Error;
let v = serde_json::Value::deserialize(d).map_err(D::Error::custom)?;
match v {
serde_json::Value::Null => Ok(vec![]),
serde_json::Value::String(s) if s.is_empty() => Ok(vec![]),
serde_json::Value::String(s) => Ok(vec![s]),
serde_json::Value::Array(a) => a
.into_iter()
.map(|x| x.as_str().map(String::from)
.ok_or_else(|| D::Error::custom("matrix_corpus array must contain strings")))
.collect(),
other => Err(D::Error::custom(format!("matrix_corpus must be string or array, got {other:?}"))),
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct DefaultEntry {
pub preferred_mode: String,
#[serde(default)]
pub fallback_modes: Vec<String>,
pub default_model: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ModeRouterConfig {
#[serde(default, rename = "task_class")]
pub task_classes: Vec<TaskClassEntry>,
pub default: DefaultEntry,
}
impl ModeRouterConfig {
pub fn lookup(&self, task_class: &str) -> Option<&TaskClassEntry> {
self.task_classes.iter().find(|t| t.name == task_class)
}
}
/// Process-global config cache. Loaded on first request from
/// `config/modes.toml` (or `LH_MODES_CONFIG`). If parsing fails the
/// router falls back to a hard-coded default so a malformed config can
/// never take the gateway down.
static CONFIG: OnceLock<ModeRouterConfig> = OnceLock::new();
fn load_config() -> &'static ModeRouterConfig {
CONFIG.get_or_init(|| {
let path = std::env::var("LH_MODES_CONFIG")
.unwrap_or_else(|_| "config/modes.toml".to_string());
match std::fs::read_to_string(&path) {
Ok(s) => match toml::from_str::<ModeRouterConfig>(&s) {
Ok(c) => {
tracing::info!(target: "v1::mode", "loaded {} task classes from {}", c.task_classes.len(), path);
c
}
Err(e) => {
tracing::warn!(target: "v1::mode", "parse {} failed ({}), using built-in default", path, e);
fallback_config()
}
},
Err(e) => {
tracing::warn!(target: "v1::mode", "read {} failed ({}), using built-in default", path, e);
fallback_config()
}
}
})
}
fn fallback_config() -> ModeRouterConfig {
ModeRouterConfig {
task_classes: vec![],
default: DefaultEntry {
preferred_mode: "pipeline".into(),
fallback_modes: vec!["consensus".into(), "ladder".into()],
default_model: "qwen3.5:latest".into(),
},
}
}
#[derive(Deserialize, Debug)]
pub struct RouteRequest {
pub task_class: String,
/// Reserved for future matrix-informed routing (cosine against
/// matrix_corpus + pathway memory). Currently parsed but unused by
/// the decision logic — kept on the API so callers can land their
/// integration without waiting on the matrix-signal hookup.
#[serde(default)]
#[allow(dead_code)]
pub prompt: Option<String>,
/// Caller-supplied override. When set, the router honors it (with a
/// validation check against VALID_MODES) and skips the matrix
/// signal — useful for testing a specific mode in isolation.
#[serde(default)]
pub force_mode: Option<String>,
}
#[derive(Serialize, Debug)]
pub struct DecisionTrace {
pub task_class_matched: bool,
pub source: &'static str, // "config" | "default" | "force_mode"
pub fallbacks: Vec<String>,
pub matrix_corpus: Vec<String>,
pub notes: Vec<String>,
}
#[derive(Serialize, Debug)]
pub struct RouteDecision {
pub mode: String,
pub model: String,
pub decision: DecisionTrace,
}
/// `POST /v1/mode` — pure recommendation. Returns a `RouteDecision`
/// with the chosen mode + model + reasoning trail. Caller is then
/// responsible for invoking the mode (either via `/v1/mode/execute`
/// proxy or directly against the LLM Team `/api/run`).
pub async fn route(
State(_state): State<V1State>,
Json(req): Json<RouteRequest>,
) -> impl IntoResponse {
let cfg = load_config();
let mut notes = Vec::new();
// force_mode short-circuits everything else but still validates.
if let Some(forced) = req.force_mode.as_deref() {
if !VALID_MODES.contains(&forced) {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("Unknown mode: {}", forced),
"valid_modes": VALID_MODES,
})),
));
}
let model = cfg
.lookup(&req.task_class)
.map(|t| t.default_model.clone())
.unwrap_or_else(|| cfg.default.default_model.clone());
notes.push("force_mode override accepted".into());
return Ok(Json(RouteDecision {
mode: forced.to_string(),
model,
decision: DecisionTrace {
task_class_matched: cfg.lookup(&req.task_class).is_some(),
source: "force_mode",
fallbacks: vec![],
matrix_corpus: vec![],
notes,
},
}));
}
// Lookup task class; fall through to default if absent.
if let Some(tc) = cfg.lookup(&req.task_class) {
notes.push(format!(
"task_class '{}' matched, preferred mode '{}'",
tc.name, tc.preferred_mode
));
if !VALID_MODES.contains(&tc.preferred_mode.as_str()) {
notes.push(format!(
"preferred '{}' not in VALID_MODES — falling through to first valid fallback",
tc.preferred_mode
));
for fb in &tc.fallback_modes {
if VALID_MODES.contains(&fb.as_str()) {
notes.push(format!("fallback '{}' selected", fb));
return Ok(Json(RouteDecision {
mode: fb.clone(),
model: tc.default_model.clone(),
decision: DecisionTrace {
task_class_matched: true,
source: "config",
fallbacks: tc.fallback_modes.clone(),
matrix_corpus: tc.matrix_corpus.clone(),
notes,
},
}));
}
}
// No fallback was valid either — return 422 so the caller
// knows the config is broken for this task class.
return Err((
StatusCode::UNPROCESSABLE_ENTITY,
Json(serde_json::json!({
"error": format!(
"task_class '{}' has no valid mode (preferred='{}', fallbacks={:?})",
req.task_class, tc.preferred_mode, tc.fallback_modes
),
"valid_modes": VALID_MODES,
})),
));
}
return Ok(Json(RouteDecision {
mode: tc.preferred_mode.clone(),
model: tc.default_model.clone(),
decision: DecisionTrace {
task_class_matched: true,
source: "config",
fallbacks: tc.fallback_modes.clone(),
matrix_corpus: tc.matrix_corpus.clone(),
notes,
},
}));
}
notes.push(format!(
"task_class '{}' not in config, using default",
req.task_class
));
Ok(Json(RouteDecision {
mode: cfg.default.preferred_mode.clone(),
model: cfg.default.default_model.clone(),
decision: DecisionTrace {
task_class_matched: false,
source: "default",
fallbacks: cfg.default.fallback_modes.clone(),
matrix_corpus: vec![],
notes,
},
}))
}
/// `GET /v1/mode/list` — operator-facing introspection. Returns the
/// current registry table + valid modes so a UI can render the matrix
/// without re-parsing the TOML.
pub async fn list(State(_state): State<V1State>) -> impl IntoResponse {
let cfg = load_config();
let task_map: HashMap<&str, serde_json::Value> = cfg
.task_classes
.iter()
.map(|t| {
(
t.name.as_str(),
serde_json::json!({
"preferred_mode": t.preferred_mode,
"fallback_modes": t.fallback_modes,
"default_model": t.default_model,
"matrix_corpus": t.matrix_corpus,
}),
)
})
.collect();
Json(serde_json::json!({
"task_classes": task_map,
"default": {
"preferred_mode": cfg.default.preferred_mode,
"fallback_modes": cfg.default.fallback_modes,
"default_model": cfg.default.default_model,
},
"valid_modes": VALID_MODES,
}))
}
// ─── Native runner: codereview_lakehouse ───
//
// Enrichment composer for the lakehouse-specific code review mode.
// Pulls every context primitive the gateway exposes — focus file
// content, pathway-memory bug fingerprints, matrix corpus chunks
// (post relevance filter) — bundles them into ONE prompt designed
// for one-shot success against qwen3-coder:480b. The whole point of
// the mode is that the model gets it right the first time because
// the prompt was molded for THIS file in THIS codebase.
//
// Network composition only — no Rust port of the relevance scorer.
// Every primitive is already an HTTP endpoint; the runner just stitches.
#[derive(Deserialize, Debug)]
pub struct ExecuteRequest {
pub task_class: String,
pub file_path: String,
/// If absent, the runner reads the file from disk relative to the
/// gateway working directory. Useful for test harnesses that don't
/// want to rely on filesystem state.
#[serde(default)]
pub file_content: Option<String>,
/// Override the resolved mode — same semantics as RouteRequest.
#[serde(default)]
pub force_mode: Option<String>,
/// Override the resolved model. Defaults to the task_class's
/// default_model from modes.toml.
#[serde(default)]
pub force_model: Option<String>,
/// Reserved for ad-hoc questions about the file. If omitted, the
/// runner uses its built-in forensic-review framing.
#[serde(default)]
pub user_question: Option<String>,
/// Override the matrix corpus (or corpora) the runner queries.
/// Accepts a single string or array — same semantics as
/// modes.toml's `matrix_corpus`. Empty/missing → use the task
/// class default. Multi-corpus path: top-k retrieved from each,
/// merged and re-sorted by score before the relevance filter.
#[serde(default, deserialize_with = "deserialize_string_or_vec")]
pub force_matrix_corpus: Vec<String>,
/// Override the relevance filter threshold (default 0.3). Setting
/// to 0 keeps every chunk; raising rejects more aggressively. Used
/// to find the threshold sweet spot per task class.
#[serde(default)]
pub force_relevance_threshold: Option<f64>,
/// Override the LLM temperature (default 0.1). Used by Pass 3
/// variance testing to measure run-to-run stability.
#[serde(default)]
pub force_temperature: Option<f64>,
}
#[derive(Serialize, Debug, Default)]
pub struct EnrichmentSources {
pub focus_file_bytes: usize,
pub bug_fingerprints_count: usize,
pub matrix_chunks_kept: usize,
pub matrix_chunks_dropped: usize,
pub matrix_corpus: Vec<String>,
pub relevance_filter_used: bool,
/// Set when the model-aware downgrade fires — records the mode the
/// caller was originally routed to before is_weak_model() flipped
/// it. None means no downgrade happened.
#[serde(skip_serializing_if = "Option::is_none")]
pub downgraded_from: Option<String>,
pub enrichment_warnings: Vec<String>,
/// Which enrichment knobs the runner used for this mode. Lets
/// the comparison aggregator group runs by signal-set.
pub flags: Option<EnrichmentFlags>,
}
#[derive(Serialize, Debug)]
pub struct ExecuteResponse {
pub mode: String,
pub model: String,
pub task_class: String,
pub enriched_prompt_chars: usize,
pub enriched_prompt_preview: String,
pub sources: EnrichmentSources,
pub response: String,
pub latency_ms: u64,
}
const FRAMING_ADVERSARIAL: &str = "You are an adversarial code reviewer for the Lakehouse codebase \
(Rust + DataFusion + Parquet + object storage). Audit the focus file forensically. \
Output a markdown report with: (1) one-line verdict (pass | needs_patch | fail), (2) ranked \
findings table with file:line, evidence, severity, confidence percent, (3) concrete patch \
suggestions, (4) PRD/ADR refs where applicable. Be precise — assume nothing works until \
proven. Do NOT hedge.";
const FRAMING_GENERIC: &str = "You are a code reviewer. Read the file below and produce a \
markdown review with findings.";
const FRAMING_STAFFING: &str = "You are a senior staffing coordinator for a light-industrial \
labor agency. You receive a fill request (role × count × city × deadline) and have access \
to historical playbook patterns from prior fills in this geo, plus a corpus of candidate \
workers + demand signals. Produce a markdown plan with: (1) one-line verdict (fillable | \
contingent | unfillable), (2) ranked candidate list with name, city, role, distance, prior \
fill citations from the playbook, (3) risks (double-booking, eligibility gaps, geo stretch) \
with severity + confidence percent, (4) playbook reference IDs you used. Be precise — only \
recommend candidates whose names appear in the matrix data; do NOT fabricate workers.";
const FRAMING_PR_AUDIT: &str = "You are an adversarial PR claim verifier for the Lakehouse \
codebase (Rust + DataFusion + Parquet + object storage). Caller passes ship-claims from a PR \
description and the unified diff (or a curated scratchpad of it for huge PRs). Your job: for \
each claim, decide whether the diff actually backs it. Be ruthless — claim-diff divergence \
is the failure mode this auditor exists to prevent. Output ONLY a single JSON object with \
this exact shape:\n\
{\n\
\"claim_verdicts\": [\n\
{\"claim_idx\": <integer matching the input list>, \"backed\": <true|false>, \"evidence\": \"<one-line citation: file:line or `quote`, max 240 chars>\"}\n\
],\n\
\"unflagged_gaps\": [\"<one-line description of substantive code change in diff that no claim covers>\"]\n\
}\n\
No markdown, no preamble, no explanation outside the JSON. Every input claim must appear in \
claim_verdicts exactly once. Lean toward backed=false when in doubt — false positives waste \
human time but false negatives ship broken claims.";
fn framing_text(f: ReviewerFraming) -> &'static str {
match f {
ReviewerFraming::Adversarial => FRAMING_ADVERSARIAL,
ReviewerFraming::Generic => FRAMING_GENERIC,
ReviewerFraming::Staffing => FRAMING_STAFFING,
ReviewerFraming::PrAudit => FRAMING_PR_AUDIT,
}
}
/// Strong-model heuristic for the model-aware enrichment downgrade.
///
/// Pass 5 variance test (2026-04-26, see docs/MODE_RUNNER_TUNING_PLAN.md)
/// proved that on `x-ai/grok-4.1-fast`, composing matrix corpora into the
/// `codereview_lakehouse` prompt LOST 5/5 head-to-head reps against the
/// matrix-free `codereview_isolation` mode. Strong models have enough
/// native capacity that bug fingerprints + adversarial framing + file
/// content carry them; matrix chunks displace depth-of-analysis.
///
/// We default to "strong" (downgrade matrix off) because most production
/// traffic uses paid models. The explicit `weak` predicate keeps the
/// list small and easy to extend — anything matching `:free` (OpenRouter
/// free tier) or the local last-resort qwen3.5 stays on the full
/// `codereview_lakehouse` path where matrix demonstrably helped during
/// the 2026-04-26 free-tier bake-off.
fn is_weak_model(model: &str) -> bool {
if model.ends_with(":free") || model.contains(":free/") {
return true;
}
// Local last-resort rung from the scrum ladder. Other local models
// can be added here as we test them.
matches!(model, "qwen3.5:latest" | "qwen3:latest")
}
pub async fn execute(
State(_state): State<V1State>,
Json(req): Json<ExecuteRequest>,
) -> impl IntoResponse {
let cfg = load_config();
let t0 = std::time::Instant::now();
// Resolve mode + model (mirrors /v1/mode logic).
let tc = cfg.lookup(&req.task_class);
let mode = req
.force_mode
.clone()
.or_else(|| tc.map(|t| t.preferred_mode.clone()))
.unwrap_or_else(|| cfg.default.preferred_mode.clone());
let model = req
.force_model
.clone()
.or_else(|| tc.map(|t| t.default_model.clone()))
.unwrap_or_else(|| cfg.default.default_model.clone());
let matrix_corpus: Vec<String> = tc
.map(|t| t.matrix_corpus.clone())
.unwrap_or_default();
// Model-aware enrichment downgrade (2026-04-26 pass 5 finding).
// If a caller resolves `codereview_lakehouse` against a strong
// model, downgrade to `codereview_isolation` so we don't pollute
// the prompt with matrix chunks the model would do better without.
// `LH_FORCE_FULL_ENRICHMENT=1` bypasses for diagnostic runs.
// `force_mode` from the caller is treated as opt-in to the chosen
// mode and skips the downgrade — experiments need to inspect exact
// mode behavior on whatever model they pass.
let force_full = std::env::var("LH_FORCE_FULL_ENRICHMENT")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
let downgraded_from = if mode == "codereview_lakehouse"
&& req.force_mode.is_none()
&& !force_full
&& !is_weak_model(&model)
{
tracing::info!(
target: "v1::mode",
"downgrade codereview_lakehouse -> codereview_isolation for strong model {}",
model
);
Some(mode.clone())
} else {
None
};
let mode = if downgraded_from.is_some() {
"codereview_isolation".to_string()
} else {
mode
};
if !is_native_mode(&mode) {
// Native execute is the only path implemented; LLM-Team proxy
// is queued behind this. Surface a clear 501 so callers know.
return Err((
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": format!(
"mode '{}' has no native runner — proxy to /api/run not yet wired",
mode
),
"hint": "use force_mode=codereview_lakehouse, or call LLM Team /api/run directly until proxy lands",
})),
));
}
// Caller can override the matrix corpus per-call (Pass 2 corpus
// tightening). Empty force_matrix_corpus falls back to modes.toml.
let matrix_corpus: Vec<String> = if req.force_matrix_corpus.is_empty() {
matrix_corpus
} else {
req.force_matrix_corpus.clone()
};
let flags = flags_for_mode(&mode);
let mut sources = EnrichmentSources {
matrix_corpus: matrix_corpus.clone(),
flags: Some(flags),
downgraded_from: downgraded_from.clone(),
..Default::default()
};
// Step 1: focus file content (always read — even modes that don't
// include it in the prompt may need it for citation/sources).
let file_content = match req.file_content.clone() {
Some(c) => c,
None => match std::fs::read_to_string(&req.file_path) {
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("read {} failed: {}", req.file_path, e),
})),
));
}
},
};
sources.focus_file_bytes = file_content.len();
// Local HTTP client for composing internal calls. Short timeout
// because every endpoint is on localhost; the LLM call uses its
// own longer timeout further down.
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(8))
.build()
{
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("client build: {e}")})),
));
}
};
// Step 2: pathway memory bug fingerprints for this file area.
let mut bug_preamble = String::new();
if flags.include_bug_fingerprints {
let body = serde_json::json!({
"task_class": req.task_class,
"file_path": req.file_path,
"signal_class": null,
"limit": 10,
});
match client
.post("http://localhost:3100/vectors/pathway/bug_fingerprints")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
if let Ok(j) = r.json::<serde_json::Value>().await {
let fps = j.get("fingerprints").and_then(|v| v.as_array()).cloned().unwrap_or_default();
sources.bug_fingerprints_count = fps.len();
if !fps.is_empty() {
bug_preamble.push_str(
"📚 PATHWAY MEMORY — BUGS PREVIOUSLY FOUND IN THIS FILE AREA:\n",
);
for fp in &fps {
let pk = fp.get("pattern_key").and_then(|v| v.as_str()).unwrap_or("?");
let occ = fp.get("occurrences").and_then(|v| v.as_u64()).unwrap_or(0);
let ex = fp.get("example").and_then(|v| v.as_str()).unwrap_or("");
bug_preamble.push_str(&format!(
"{} (×{}) e.g. `{}`\n",
pk, occ, ex
));
}
bug_preamble.push_str("Watch for these patterns recurring.\n\n");
}
}
}
Ok(r) => sources
.enrichment_warnings
.push(format!("bug_fingerprints HTTP {}", r.status())),
Err(e) => sources
.enrichment_warnings
.push(format!("bug_fingerprints err: {e}")),
}
}
// Step 3: matrix corpus search. Multi-corpus path: query top_k from
// each, merge, re-sort by score, take top 8 overall before the
// relevance filter — orthogonal corpora (e.g. arch + symbols) get
// composed without one swamping the other on chunk count alone.
let mut raw_chunks: Vec<serde_json::Value> = vec![];
if flags.include_matrix_chunks && !matrix_corpus.is_empty() {
let query_str = format!(
"{} {}\n{}",
req.task_class,
req.file_path,
&file_content[..file_content.len().min(500)]
);
let per_corpus_k = if matrix_corpus.len() == 1 { 8 } else { 6 };
for corpus in &matrix_corpus {
let body = serde_json::json!({
"index_name": corpus,
"query": query_str,
"top_k": per_corpus_k,
});
match client
.post("http://localhost:3100/vectors/search")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
if let Ok(j) = r.json::<serde_json::Value>().await {
if let Some(arr) = j.get("results").and_then(|v| v.as_array()) {
for mut c in arr.iter().cloned() {
// Tag the corpus origin on each chunk so
// dropped/kept telemetry can attribute
// signal back to its source corpus.
if let serde_json::Value::Object(ref mut obj) = c {
obj.insert(
"corpus".to_string(),
serde_json::Value::String(corpus.clone()),
);
}
raw_chunks.push(c);
}
}
}
}
Ok(r) => sources
.enrichment_warnings
.push(format!("matrix_search[{}] HTTP {}", corpus, r.status())),
Err(e) => sources
.enrichment_warnings
.push(format!("matrix_search[{}] err: {e}", corpus)),
}
}
// Sort merged chunks by score desc and take the global top 8.
raw_chunks.sort_by(|a, b| {
let sa = a.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
let sb = b.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
sb.partial_cmp(&sa).unwrap_or(std::cmp::Ordering::Equal)
});
raw_chunks.truncate(8);
}
// Step 4: relevance filter — drop adjacency pollution.
let kept_chunks: Vec<serde_json::Value> = if flags.use_relevance_filter && !raw_chunks.is_empty() {
let chunks_for_filter: Vec<serde_json::Value> = raw_chunks
.iter()
.map(|c| {
serde_json::json!({
"source": c.get("source").cloned().unwrap_or_default(),
"doc_id": c.get("doc_id").cloned().unwrap_or_default(),
"text": c.get("chunk_text").or_else(|| c.get("text")).cloned().unwrap_or_default(),
"score": c.get("score").cloned().unwrap_or(serde_json::json!(0.0)),
})
})
.collect();
let body = serde_json::json!({
"focus_file": { "path": req.file_path, "content": file_content },
"chunks": chunks_for_filter,
"threshold": req.force_relevance_threshold.unwrap_or(0.3),
});
match client
.post("http://localhost:3800/relevance")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
sources.relevance_filter_used = true;
if let Ok(j) = r.json::<serde_json::Value>().await {
let kept = j.get("kept").and_then(|v| v.as_array()).cloned().unwrap_or_default();
let dropped = j.get("dropped").and_then(|v| v.as_array()).cloned().unwrap_or_default();
sources.matrix_chunks_kept = kept.len();
sources.matrix_chunks_dropped = dropped.len();
kept
} else {
raw_chunks
}
}
_ => {
sources
.enrichment_warnings
.push("relevance filter unreachable, using raw chunks".to_string());
raw_chunks
}
}
} else if !flags.use_relevance_filter && !raw_chunks.is_empty() {
// Take raw matrix chunks unfiltered — `codereview_matrix_only`
// turns the filter off intentionally to measure how much
// pollution the filter is actually catching.
sources.matrix_chunks_kept = raw_chunks.len();
raw_chunks.clone()
} else {
vec![]
};
// Step 5: assemble the prompt — strictly per-flag so we don't
// leak signals across modes.
let mut user_prompt = String::new();
if flags.include_bug_fingerprints {
user_prompt.push_str(&bug_preamble);
}
if flags.include_matrix_chunks && !kept_chunks.is_empty() {
user_prompt.push_str("📁 RELATED CONTEXT (matrix chunks):\n");
for c in &kept_chunks {
// Prefer doc_id for the tag — corpus builders encode origin
// in doc_id (e.g. `adr:017`, `phase:19`) so the reviewer sees
// useful provenance instead of a generic source label.
let tag = c.get("doc_id").and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.or_else(|| c.get("source").and_then(|v| v.as_str()))
.unwrap_or("?");
let txt = c.get("text").or_else(|| c.get("chunk_text"))
.and_then(|v| v.as_str()).unwrap_or("");
user_prompt.push_str(&format!(" [{}] {}\n", tag, &txt[..txt.len().min(280)]));
}
user_prompt.push_str("\n");
}
if flags.include_file_content {
user_prompt.push_str(&format!("FILE: {}\n```rust\n{}\n```\n", req.file_path, file_content));
} else {
// Lossy mode — playbook_only intentionally omits file content
// to measure how much value pathway memory carries on its own.
user_prompt.push_str(&format!(
"FILE PATH (content omitted): {}\nFile size: {} bytes\n",
req.file_path, file_content.len()
));
}
if let Some(q) = &req.user_question {
user_prompt.push_str(&format!("\nQUESTION: {}\n", q));
} else {
user_prompt.push_str("\nProduce the review now.\n");
}
let enriched_chars = user_prompt.len();
let preview: String = user_prompt.chars().take(800).collect();
// Step 6: ONE call to /v1/chat. The whole point of the mode is
// that this single call gets it right because the prompt was
// molded for THIS file. No retry ladder.
//
// Provider selection mirrors routing.toml's broad strokes — Phase 40
// routing engine isn't auto-wired into /v1/chat yet, so the runner
// hints explicitly. Cloud-only models (kimi*, qwen3-coder*,
// deepseek*, mistral-large*, gpt-oss:120b, qwen3.5:397b) → cloud;
// smaller local-resident models → local ollama default.
let provider_hint = if model.contains('/') || model.contains(":free") {
// OpenRouter convention: vendor/model[:tag] (e.g.
// "openai/gpt-oss-120b:free", "google/gemma-3-27b-it:free").
"openrouter"
} else if model.starts_with("kimi-")
|| model.starts_with("qwen3-coder")
|| model.starts_with("deepseek-v")
|| model.starts_with("mistral-large")
|| model == "gpt-oss:120b"
|| model == "qwen3.5:397b"
{
"ollama_cloud"
} else {
"ollama"
};
let chat_body = serde_json::json!({
"model": model,
"provider": provider_hint,
"messages": [
{ "role": "system", "content": framing_text(flags.framing) },
{ "role": "user", "content": user_prompt },
],
"temperature": req.force_temperature.unwrap_or(0.1),
"max_tokens": 4096,
});
let chat_client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(180))
.build()
{
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("chat client build: {e}")})),
));
}
};
let response_text = match chat_client
.post("http://localhost:3100/v1/chat")
.json(&chat_body)
.send()
.await
{
Ok(r) if r.status().is_success() => match r.json::<serde_json::Value>().await {
Ok(j) => j
.get("choices")
.and_then(|c| c.as_array())
.and_then(|a| a.first())
.and_then(|c| c.get("message"))
.and_then(|m| m.get("content"))
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string(),
Err(e) => {
return Err((
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": format!("/v1/chat parse: {e}")})),
));
}
},
Ok(r) => {
let status = r.status();
let body = r.text().await.unwrap_or_default();
return Err((
status,
Json(serde_json::json!({"error": "/v1/chat upstream error", "body": body})),
));
}
Err(e) => {
return Err((
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": format!("/v1/chat send: {e}")})),
));
}
};
let resp = ExecuteResponse {
mode: mode.clone(),
model: model.clone(),
task_class: req.task_class.clone(),
enriched_prompt_chars: enriched_chars,
enriched_prompt_preview: preview,
sources,
response: response_text,
latency_ms: t0.elapsed().as_millis() as u64,
};
// Append to mode_experiments.jsonl so the comparison aggregator
// can read the matrix later. Best-effort — write failure must not
// fail the request. Skips if LH_MODE_LOG_OFF=1.
if std::env::var("LH_MODE_LOG_OFF").as_deref() != Ok("1") {
let log_path = std::env::var("LH_MODE_LOG_PATH")
.unwrap_or_else(|_| "data/_kb/mode_experiments.jsonl".to_string());
let row = serde_json::json!({
"ts": chrono::Utc::now().to_rfc3339(),
"mode": resp.mode,
"model": resp.model,
"task_class": resp.task_class,
"file_path": req.file_path,
"enriched_prompt_chars": resp.enriched_prompt_chars,
"response_chars": resp.response.len(),
"latency_ms": resp.latency_ms,
"sources": resp.sources,
"response": resp.response,
});
if let Some(parent) = std::path::Path::new(&log_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&log_path) {
use std::io::Write;
let _ = writeln!(f, "{}", row);
}
}
Ok(Json(resp))
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg_for_test() -> ModeRouterConfig {
ModeRouterConfig {
task_classes: vec![
TaskClassEntry {
name: "scrum_review".into(),
preferred_mode: "codereview".into(),
fallback_modes: vec!["consensus".into()],
default_model: "qwen3-coder:480b".into(),
matrix_corpus: vec!["distilled_procedural_v1".into()],
},
TaskClassEntry {
name: "broken".into(),
preferred_mode: "nonsense_mode".into(),
fallback_modes: vec!["consensus".into()],
default_model: "x".into(),
matrix_corpus: vec![],
},
],
default: DefaultEntry {
preferred_mode: "pipeline".into(),
fallback_modes: vec![],
default_model: "qwen3.5:latest".into(),
},
}
}
#[test]
fn lookup_finds_matching_task_class() {
let cfg = cfg_for_test();
assert_eq!(cfg.lookup("scrum_review").unwrap().preferred_mode, "codereview");
assert!(cfg.lookup("unknown").is_none());
}
#[test]
fn valid_modes_contains_known_runners() {
assert!(VALID_MODES.contains(&"extract"));
assert!(VALID_MODES.contains(&"codereview"));
assert!(VALID_MODES.contains(&"deep_analysis"));
assert!(!VALID_MODES.contains(&"made_up"));
}
#[test]
fn fallback_path_is_well_defined() {
let cfg = cfg_for_test();
let tc = cfg.lookup("broken").unwrap();
// Preferred is invalid; first valid fallback should be 'consensus'.
assert!(!VALID_MODES.contains(&tc.preferred_mode.as_str()));
assert!(VALID_MODES.contains(&tc.fallback_modes[0].as_str()));
}
}