diff --git a/config/modes.toml b/config/modes.toml index 42145f3..e021a2f 100644 --- a/config/modes.toml +++ b/config/modes.toml @@ -12,8 +12,13 @@ [[task_class]] name = "scrum_review" -preferred_mode = "codereview" -fallback_modes = ["consensus", "ladder"] +# `codereview_lakehouse` is the codebase-specific enrichment runner — +# bundles defined/imported symbols, pathway-memory bug fingerprints, +# and relevance-filtered matrix chunks into ONE precise prompt so the +# model gets it right the first call. The generic `codereview` mode +# from LLM Team is still the network fallback if execute fails. +preferred_mode = "codereview_lakehouse" +fallback_modes = ["codereview", "consensus", "ladder"] default_model = "qwen3-coder:480b" matrix_corpus = "distilled_procedural_v20260423102847" diff --git a/crates/gateway/src/v1/mod.rs b/crates/gateway/src/v1/mod.rs index 482c79e..b477d86 100644 --- a/crates/gateway/src/v1/mod.rs +++ b/crates/gateway/src/v1/mod.rs @@ -86,6 +86,7 @@ pub fn router(state: V1State) -> Router { .route("/context", get(truth::context)) .route("/mode", post(mode::route)) .route("/mode/list", get(mode::list)) + .route("/mode/execute", post(mode::execute)) .with_state(state) } diff --git a/crates/gateway/src/v1/mode.rs b/crates/gateway/src/v1/mode.rs index 834de12..294ccd7 100644 --- a/crates/gateway/src/v1/mode.rs +++ b/crates/gateway/src/v1/mode.rs @@ -29,14 +29,29 @@ use super::V1State; /// Validated against the LLM Team /api/run handler at /// /root/llm_team_ui.py:10581. Kept in sync manually — adding a mode /// here without adding it upstream returns 400 from the proxy. +/// +/// Modes prefixed with the codebase name (e.g. `codereview_lakehouse`) +/// are NATIVE Rust enrichment runners — they don't proxy to LLM Team, +/// they compose the lakehouse's own context primitives (pathway memory, +/// relevance filter, matrix corpora) into a one-shot prompt for the +/// recommended model. Native modes are listed alongside upstream ones +/// so the router can pick either without callers caring. const VALID_MODES: &[&str] = &[ "brainstorm", "pipeline", "debate", "validator", "roundrobin", "redteam", "consensus", "codereview", "ladder", "tournament", "evolution", "blindassembly", "staircase", "drift", "mesh", "hallucination", "timeloop", "research", "eval", "extract", "refine", "adaptive", "deep_analysis", "distill", + // Native runners (not in LLM Team — handled by /v1/mode/execute): + "codereview_lakehouse", ]; +/// Whether a mode is handled natively in this gateway vs proxied to +/// LLM Team. Drives /v1/mode/execute dispatch. +fn is_native_mode(mode: &str) -> bool { + matches!(mode, "codereview_lakehouse") +} + #[derive(Clone, Debug, Deserialize)] pub struct TaskClassEntry { pub name: String, @@ -283,6 +298,388 @@ pub async fn list(State(_state): State) -> impl IntoResponse { })) } +// ─── Native runner: codereview_lakehouse ─── +// +// Enrichment composer for the lakehouse-specific code review mode. +// Pulls every context primitive the gateway exposes — focus file +// content, pathway-memory bug fingerprints, matrix corpus chunks +// (post relevance filter) — bundles them into ONE prompt designed +// for one-shot success against qwen3-coder:480b. The whole point of +// the mode is that the model gets it right the first time because +// the prompt was molded for THIS file in THIS codebase. +// +// Network composition only — no Rust port of the relevance scorer. +// Every primitive is already an HTTP endpoint; the runner just stitches. + +#[derive(Deserialize, Debug)] +pub struct ExecuteRequest { + pub task_class: String, + pub file_path: String, + /// If absent, the runner reads the file from disk relative to the + /// gateway working directory. Useful for test harnesses that don't + /// want to rely on filesystem state. + #[serde(default)] + pub file_content: Option, + /// Override the resolved mode — same semantics as RouteRequest. + #[serde(default)] + pub force_mode: Option, + /// Override the resolved model. Defaults to the task_class's + /// default_model from modes.toml. + #[serde(default)] + pub force_model: Option, + /// Reserved for ad-hoc questions about the file. If omitted, the + /// runner uses its built-in forensic-review framing. + #[serde(default)] + pub user_question: Option, +} + +#[derive(Serialize, Debug, Default)] +pub struct EnrichmentSources { + pub focus_file_bytes: usize, + pub bug_fingerprints_count: usize, + pub matrix_chunks_kept: usize, + pub matrix_chunks_dropped: usize, + pub matrix_corpus: Option, + pub relevance_filter_used: bool, + pub enrichment_warnings: Vec, +} + +#[derive(Serialize, Debug)] +pub struct ExecuteResponse { + pub mode: String, + pub model: String, + pub task_class: String, + pub enriched_prompt_chars: usize, + pub enriched_prompt_preview: String, + pub sources: EnrichmentSources, + pub response: String, + pub latency_ms: u64, +} + +const REVIEWER_FRAMING: &str = "You are an adversarial code reviewer for the Lakehouse codebase \ +(Rust + DataFusion + Parquet + object storage). Audit the focus file forensically. \ +Output a markdown report with: (1) one-line verdict (pass | needs_patch | fail), (2) ranked \ +findings table with file:line, evidence, severity, confidence percent, (3) concrete patch \ +suggestions, (4) PRD/ADR refs where applicable. Be precise — assume nothing works until \ +proven. Do NOT hedge."; + +pub async fn execute( + State(_state): State, + Json(req): Json, +) -> impl IntoResponse { + let cfg = load_config(); + let t0 = std::time::Instant::now(); + + // Resolve mode + model (mirrors /v1/mode logic). + let tc = cfg.lookup(&req.task_class); + let mode = req + .force_mode + .clone() + .or_else(|| tc.map(|t| t.preferred_mode.clone())) + .unwrap_or_else(|| cfg.default.preferred_mode.clone()); + let model = req + .force_model + .clone() + .or_else(|| tc.map(|t| t.default_model.clone())) + .unwrap_or_else(|| cfg.default.default_model.clone()); + let matrix_corpus = tc.and_then(|t| t.matrix_corpus.clone()); + + if !is_native_mode(&mode) { + // Native execute is the only path implemented; LLM-Team proxy + // is queued behind this. Surface a clear 501 so callers know. + return Err(( + StatusCode::NOT_IMPLEMENTED, + Json(serde_json::json!({ + "error": format!( + "mode '{}' has no native runner — proxy to /api/run not yet wired", + mode + ), + "hint": "use force_mode=codereview_lakehouse, or call LLM Team /api/run directly until proxy lands", + })), + )); + } + + let mut sources = EnrichmentSources { + matrix_corpus: matrix_corpus.clone(), + ..Default::default() + }; + + // Step 1: focus file content. + let file_content = match req.file_content.clone() { + Some(c) => c, + None => match std::fs::read_to_string(&req.file_path) { + Ok(c) => c, + Err(e) => { + return Err(( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "error": format!("read {} failed: {}", req.file_path, e), + })), + )); + } + }, + }; + sources.focus_file_bytes = file_content.len(); + + // Local HTTP client for composing internal calls. Short timeout + // because every endpoint is on localhost; the LLM call uses its + // own longer timeout further down. + let client = match reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(8)) + .build() + { + Ok(c) => c, + Err(e) => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": format!("client build: {e}")})), + )); + } + }; + + // Step 2: pathway memory bug fingerprints for this file area. + let mut bug_preamble = String::new(); + { + let body = serde_json::json!({ + "task_class": req.task_class, + "file_path": req.file_path, + "signal_class": null, + "limit": 10, + }); + match client + .post("http://localhost:3100/vectors/pathway/bug_fingerprints") + .json(&body) + .send() + .await + { + Ok(r) if r.status().is_success() => { + if let Ok(j) = r.json::().await { + let fps = j.get("fingerprints").and_then(|v| v.as_array()).cloned().unwrap_or_default(); + sources.bug_fingerprints_count = fps.len(); + if !fps.is_empty() { + bug_preamble.push_str( + "📚 PATHWAY MEMORY — BUGS PREVIOUSLY FOUND IN THIS FILE AREA:\n", + ); + for fp in &fps { + let pk = fp.get("pattern_key").and_then(|v| v.as_str()).unwrap_or("?"); + let occ = fp.get("occurrences").and_then(|v| v.as_u64()).unwrap_or(0); + let ex = fp.get("example").and_then(|v| v.as_str()).unwrap_or(""); + bug_preamble.push_str(&format!( + " • {} (×{}) e.g. `{}`\n", + pk, occ, ex + )); + } + bug_preamble.push_str("Watch for these patterns recurring.\n\n"); + } + } + } + Ok(r) => sources + .enrichment_warnings + .push(format!("bug_fingerprints HTTP {}", r.status())), + Err(e) => sources + .enrichment_warnings + .push(format!("bug_fingerprints err: {e}")), + } + } + + // Step 3: matrix corpus search (if configured for this task class). + let mut raw_chunks: Vec = vec![]; + if let Some(corpus) = &matrix_corpus { + let body = serde_json::json!({ + "index_name": corpus, + "query": format!("{} {}\n{}", req.task_class, req.file_path, &file_content[..file_content.len().min(500)]), + "top_k": 8, + }); + match client + .post("http://localhost:3100/vectors/search") + .json(&body) + .send() + .await + { + Ok(r) if r.status().is_success() => { + if let Ok(j) = r.json::().await { + raw_chunks = j + .get("results") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + } + } + Ok(r) => sources + .enrichment_warnings + .push(format!("matrix_search HTTP {}", r.status())), + Err(e) => sources + .enrichment_warnings + .push(format!("matrix_search err: {e}")), + } + } + + // Step 4: relevance filter — drop adjacency pollution. + let kept_chunks: Vec = if !raw_chunks.is_empty() { + let chunks_for_filter: Vec = raw_chunks + .iter() + .map(|c| { + serde_json::json!({ + "source": c.get("source").cloned().unwrap_or_default(), + "doc_id": c.get("doc_id").cloned().unwrap_or_default(), + "text": c.get("chunk_text").or_else(|| c.get("text")).cloned().unwrap_or_default(), + "score": c.get("score").cloned().unwrap_or(serde_json::json!(0.0)), + }) + }) + .collect(); + let body = serde_json::json!({ + "focus_file": { "path": req.file_path, "content": file_content }, + "chunks": chunks_for_filter, + "threshold": 0.3, + }); + match client + .post("http://localhost:3800/relevance") + .json(&body) + .send() + .await + { + Ok(r) if r.status().is_success() => { + sources.relevance_filter_used = true; + if let Ok(j) = r.json::().await { + let kept = j.get("kept").and_then(|v| v.as_array()).cloned().unwrap_or_default(); + let dropped = j.get("dropped").and_then(|v| v.as_array()).cloned().unwrap_or_default(); + sources.matrix_chunks_kept = kept.len(); + sources.matrix_chunks_dropped = dropped.len(); + kept + } else { + raw_chunks + } + } + _ => { + sources + .enrichment_warnings + .push("relevance filter unreachable, using raw chunks".to_string()); + raw_chunks + } + } + } else { + vec![] + }; + + // Step 5: assemble the prompt. + let mut user_prompt = String::new(); + user_prompt.push_str(&bug_preamble); + if !kept_chunks.is_empty() { + user_prompt.push_str("📁 RELATED CONTEXT (relevance-filtered from matrix):\n"); + for c in &kept_chunks { + let src = c.get("source").and_then(|v| v.as_str()).unwrap_or("?"); + let txt = c.get("text").and_then(|v| v.as_str()).unwrap_or(""); + user_prompt.push_str(&format!(" [{}] {}\n", src, &txt[..txt.len().min(280)])); + } + user_prompt.push_str("\n"); + } + user_prompt.push_str(&format!("FILE: {}\n```rust\n{}\n```\n", req.file_path, file_content)); + if let Some(q) = &req.user_question { + user_prompt.push_str(&format!("\nQUESTION: {}\n", q)); + } else { + user_prompt.push_str("\nProduce the forensic review now.\n"); + } + + let enriched_chars = user_prompt.len(); + let preview: String = user_prompt.chars().take(800).collect(); + + // Step 6: ONE call to /v1/chat. The whole point of the mode is + // that this single call gets it right because the prompt was + // molded for THIS file. No retry ladder. + // + // Provider selection mirrors routing.toml's broad strokes — Phase 40 + // routing engine isn't auto-wired into /v1/chat yet, so the runner + // hints explicitly. Cloud-only models (kimi*, qwen3-coder*, + // deepseek*, mistral-large*, gpt-oss:120b, qwen3.5:397b) → cloud; + // smaller local-resident models → local ollama default. + let provider_hint = if model.contains('/') || model.contains(":free") { + // OpenRouter convention: vendor/model[:tag] (e.g. + // "openai/gpt-oss-120b:free", "google/gemma-3-27b-it:free"). + "openrouter" + } else if model.starts_with("kimi-") + || model.starts_with("qwen3-coder") + || model.starts_with("deepseek-v") + || model.starts_with("mistral-large") + || model == "gpt-oss:120b" + || model == "qwen3.5:397b" + { + "ollama_cloud" + } else { + "ollama" + }; + let chat_body = serde_json::json!({ + "model": model, + "provider": provider_hint, + "messages": [ + { "role": "system", "content": REVIEWER_FRAMING }, + { "role": "user", "content": user_prompt }, + ], + "temperature": 0.1, + "max_tokens": 4096, + }); + let chat_client = match reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(180)) + .build() + { + Ok(c) => c, + Err(e) => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": format!("chat client build: {e}")})), + )); + } + }; + let response_text = match chat_client + .post("http://localhost:3100/v1/chat") + .json(&chat_body) + .send() + .await + { + Ok(r) if r.status().is_success() => match r.json::().await { + Ok(j) => j + .get("choices") + .and_then(|c| c.as_array()) + .and_then(|a| a.first()) + .and_then(|c| c.get("message")) + .and_then(|m| m.get("content")) + .and_then(|s| s.as_str()) + .unwrap_or("") + .to_string(), + Err(e) => { + return Err(( + StatusCode::BAD_GATEWAY, + Json(serde_json::json!({"error": format!("/v1/chat parse: {e}")})), + )); + } + }, + Ok(r) => { + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + return Err(( + status, + Json(serde_json::json!({"error": "/v1/chat upstream error", "body": body})), + )); + } + Err(e) => { + return Err(( + StatusCode::BAD_GATEWAY, + Json(serde_json::json!({"error": format!("/v1/chat send: {e}")})), + )); + } + }; + + Ok(Json(ExecuteResponse { + mode, + model, + task_class: req.task_class, + enriched_prompt_chars: enriched_chars, + enriched_prompt_preview: preview, + sources, + response: response_text, + latency_ms: t0.elapsed().as_millis() as u64, + })) +} + #[cfg(test)] mod tests { use super::*;