Session infrastructure: OpenRouter + tree-split reducer + observer→LLM Team + scrum_applier #11

Merged
profit merged 118 commits from scrum/auto-apply-19814 into main 2026-04-27 15:55:24 +00:00
3 changed files with 405 additions and 2 deletions
Showing only changes of commit 86f63a083d - Show all commits

View File

@ -12,8 +12,13 @@
[[task_class]]
name = "scrum_review"
preferred_mode = "codereview"
fallback_modes = ["consensus", "ladder"]
# `codereview_lakehouse` is the codebase-specific enrichment runner —
# bundles defined/imported symbols, pathway-memory bug fingerprints,
# and relevance-filtered matrix chunks into ONE precise prompt so the
# model gets it right the first call. The generic `codereview` mode
# from LLM Team is still the network fallback if execute fails.
preferred_mode = "codereview_lakehouse"
fallback_modes = ["codereview", "consensus", "ladder"]
default_model = "qwen3-coder:480b"
matrix_corpus = "distilled_procedural_v20260423102847"

View File

@ -86,6 +86,7 @@ pub fn router(state: V1State) -> Router {
.route("/context", get(truth::context))
.route("/mode", post(mode::route))
.route("/mode/list", get(mode::list))
.route("/mode/execute", post(mode::execute))
.with_state(state)
}

View File

@ -29,14 +29,29 @@ use super::V1State;
/// Validated against the LLM Team /api/run handler at
/// /root/llm_team_ui.py:10581. Kept in sync manually — adding a mode
/// here without adding it upstream returns 400 from the proxy.
///
/// Modes prefixed with the codebase name (e.g. `codereview_lakehouse`)
/// are NATIVE Rust enrichment runners — they don't proxy to LLM Team,
/// they compose the lakehouse's own context primitives (pathway memory,
/// relevance filter, matrix corpora) into a one-shot prompt for the
/// recommended model. Native modes are listed alongside upstream ones
/// so the router can pick either without callers caring.
const VALID_MODES: &[&str] = &[
"brainstorm", "pipeline", "debate", "validator", "roundrobin",
"redteam", "consensus", "codereview", "ladder", "tournament",
"evolution", "blindassembly", "staircase", "drift", "mesh",
"hallucination", "timeloop", "research", "eval", "extract",
"refine", "adaptive", "deep_analysis", "distill",
// Native runners (not in LLM Team — handled by /v1/mode/execute):
"codereview_lakehouse",
];
/// Whether a mode is handled natively in this gateway vs proxied to
/// LLM Team. Drives /v1/mode/execute dispatch.
fn is_native_mode(mode: &str) -> bool {
matches!(mode, "codereview_lakehouse")
}
#[derive(Clone, Debug, Deserialize)]
pub struct TaskClassEntry {
pub name: String,
@ -283,6 +298,388 @@ pub async fn list(State(_state): State<V1State>) -> impl IntoResponse {
}))
}
// ─── Native runner: codereview_lakehouse ───
//
// Enrichment composer for the lakehouse-specific code review mode.
// Pulls every context primitive the gateway exposes — focus file
// content, pathway-memory bug fingerprints, matrix corpus chunks
// (post relevance filter) — bundles them into ONE prompt designed
// for one-shot success against qwen3-coder:480b. The whole point of
// the mode is that the model gets it right the first time because
// the prompt was molded for THIS file in THIS codebase.
//
// Network composition only — no Rust port of the relevance scorer.
// Every primitive is already an HTTP endpoint; the runner just stitches.
#[derive(Deserialize, Debug)]
pub struct ExecuteRequest {
pub task_class: String,
pub file_path: String,
/// If absent, the runner reads the file from disk relative to the
/// gateway working directory. Useful for test harnesses that don't
/// want to rely on filesystem state.
#[serde(default)]
pub file_content: Option<String>,
/// Override the resolved mode — same semantics as RouteRequest.
#[serde(default)]
pub force_mode: Option<String>,
/// Override the resolved model. Defaults to the task_class's
/// default_model from modes.toml.
#[serde(default)]
pub force_model: Option<String>,
/// Reserved for ad-hoc questions about the file. If omitted, the
/// runner uses its built-in forensic-review framing.
#[serde(default)]
pub user_question: Option<String>,
}
#[derive(Serialize, Debug, Default)]
pub struct EnrichmentSources {
pub focus_file_bytes: usize,
pub bug_fingerprints_count: usize,
pub matrix_chunks_kept: usize,
pub matrix_chunks_dropped: usize,
pub matrix_corpus: Option<String>,
pub relevance_filter_used: bool,
pub enrichment_warnings: Vec<String>,
}
#[derive(Serialize, Debug)]
pub struct ExecuteResponse {
pub mode: String,
pub model: String,
pub task_class: String,
pub enriched_prompt_chars: usize,
pub enriched_prompt_preview: String,
pub sources: EnrichmentSources,
pub response: String,
pub latency_ms: u64,
}
const REVIEWER_FRAMING: &str = "You are an adversarial code reviewer for the Lakehouse codebase \
(Rust + DataFusion + Parquet + object storage). Audit the focus file forensically. \
Output a markdown report with: (1) one-line verdict (pass | needs_patch | fail), (2) ranked \
findings table with file:line, evidence, severity, confidence percent, (3) concrete patch \
suggestions, (4) PRD/ADR refs where applicable. Be precise assume nothing works until \
proven. Do NOT hedge.";
pub async fn execute(
State(_state): State<V1State>,
Json(req): Json<ExecuteRequest>,
) -> impl IntoResponse {
let cfg = load_config();
let t0 = std::time::Instant::now();
// Resolve mode + model (mirrors /v1/mode logic).
let tc = cfg.lookup(&req.task_class);
let mode = req
.force_mode
.clone()
.or_else(|| tc.map(|t| t.preferred_mode.clone()))
.unwrap_or_else(|| cfg.default.preferred_mode.clone());
let model = req
.force_model
.clone()
.or_else(|| tc.map(|t| t.default_model.clone()))
.unwrap_or_else(|| cfg.default.default_model.clone());
let matrix_corpus = tc.and_then(|t| t.matrix_corpus.clone());
if !is_native_mode(&mode) {
// Native execute is the only path implemented; LLM-Team proxy
// is queued behind this. Surface a clear 501 so callers know.
return Err((
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": format!(
"mode '{}' has no native runner — proxy to /api/run not yet wired",
mode
),
"hint": "use force_mode=codereview_lakehouse, or call LLM Team /api/run directly until proxy lands",
})),
));
}
let mut sources = EnrichmentSources {
matrix_corpus: matrix_corpus.clone(),
..Default::default()
};
// Step 1: focus file content.
let file_content = match req.file_content.clone() {
Some(c) => c,
None => match std::fs::read_to_string(&req.file_path) {
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": format!("read {} failed: {}", req.file_path, e),
})),
));
}
},
};
sources.focus_file_bytes = file_content.len();
// Local HTTP client for composing internal calls. Short timeout
// because every endpoint is on localhost; the LLM call uses its
// own longer timeout further down.
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(8))
.build()
{
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("client build: {e}")})),
));
}
};
// Step 2: pathway memory bug fingerprints for this file area.
let mut bug_preamble = String::new();
{
let body = serde_json::json!({
"task_class": req.task_class,
"file_path": req.file_path,
"signal_class": null,
"limit": 10,
});
match client
.post("http://localhost:3100/vectors/pathway/bug_fingerprints")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
if let Ok(j) = r.json::<serde_json::Value>().await {
let fps = j.get("fingerprints").and_then(|v| v.as_array()).cloned().unwrap_or_default();
sources.bug_fingerprints_count = fps.len();
if !fps.is_empty() {
bug_preamble.push_str(
"📚 PATHWAY MEMORY — BUGS PREVIOUSLY FOUND IN THIS FILE AREA:\n",
);
for fp in &fps {
let pk = fp.get("pattern_key").and_then(|v| v.as_str()).unwrap_or("?");
let occ = fp.get("occurrences").and_then(|v| v.as_u64()).unwrap_or(0);
let ex = fp.get("example").and_then(|v| v.as_str()).unwrap_or("");
bug_preamble.push_str(&format!(
" • {} (×{}) e.g. `{}`\n",
pk, occ, ex
));
}
bug_preamble.push_str("Watch for these patterns recurring.\n\n");
}
}
}
Ok(r) => sources
.enrichment_warnings
.push(format!("bug_fingerprints HTTP {}", r.status())),
Err(e) => sources
.enrichment_warnings
.push(format!("bug_fingerprints err: {e}")),
}
}
// Step 3: matrix corpus search (if configured for this task class).
let mut raw_chunks: Vec<serde_json::Value> = vec![];
if let Some(corpus) = &matrix_corpus {
let body = serde_json::json!({
"index_name": corpus,
"query": format!("{} {}\n{}", req.task_class, req.file_path, &file_content[..file_content.len().min(500)]),
"top_k": 8,
});
match client
.post("http://localhost:3100/vectors/search")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
if let Ok(j) = r.json::<serde_json::Value>().await {
raw_chunks = j
.get("results")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
}
}
Ok(r) => sources
.enrichment_warnings
.push(format!("matrix_search HTTP {}", r.status())),
Err(e) => sources
.enrichment_warnings
.push(format!("matrix_search err: {e}")),
}
}
// Step 4: relevance filter — drop adjacency pollution.
let kept_chunks: Vec<serde_json::Value> = if !raw_chunks.is_empty() {
let chunks_for_filter: Vec<serde_json::Value> = raw_chunks
.iter()
.map(|c| {
serde_json::json!({
"source": c.get("source").cloned().unwrap_or_default(),
"doc_id": c.get("doc_id").cloned().unwrap_or_default(),
"text": c.get("chunk_text").or_else(|| c.get("text")).cloned().unwrap_or_default(),
"score": c.get("score").cloned().unwrap_or(serde_json::json!(0.0)),
})
})
.collect();
let body = serde_json::json!({
"focus_file": { "path": req.file_path, "content": file_content },
"chunks": chunks_for_filter,
"threshold": 0.3,
});
match client
.post("http://localhost:3800/relevance")
.json(&body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
sources.relevance_filter_used = true;
if let Ok(j) = r.json::<serde_json::Value>().await {
let kept = j.get("kept").and_then(|v| v.as_array()).cloned().unwrap_or_default();
let dropped = j.get("dropped").and_then(|v| v.as_array()).cloned().unwrap_or_default();
sources.matrix_chunks_kept = kept.len();
sources.matrix_chunks_dropped = dropped.len();
kept
} else {
raw_chunks
}
}
_ => {
sources
.enrichment_warnings
.push("relevance filter unreachable, using raw chunks".to_string());
raw_chunks
}
}
} else {
vec![]
};
// Step 5: assemble the prompt.
let mut user_prompt = String::new();
user_prompt.push_str(&bug_preamble);
if !kept_chunks.is_empty() {
user_prompt.push_str("📁 RELATED CONTEXT (relevance-filtered from matrix):\n");
for c in &kept_chunks {
let src = c.get("source").and_then(|v| v.as_str()).unwrap_or("?");
let txt = c.get("text").and_then(|v| v.as_str()).unwrap_or("");
user_prompt.push_str(&format!(" [{}] {}\n", src, &txt[..txt.len().min(280)]));
}
user_prompt.push_str("\n");
}
user_prompt.push_str(&format!("FILE: {}\n```rust\n{}\n```\n", req.file_path, file_content));
if let Some(q) = &req.user_question {
user_prompt.push_str(&format!("\nQUESTION: {}\n", q));
} else {
user_prompt.push_str("\nProduce the forensic review now.\n");
}
let enriched_chars = user_prompt.len();
let preview: String = user_prompt.chars().take(800).collect();
// Step 6: ONE call to /v1/chat. The whole point of the mode is
// that this single call gets it right because the prompt was
// molded for THIS file. No retry ladder.
//
// Provider selection mirrors routing.toml's broad strokes — Phase 40
// routing engine isn't auto-wired into /v1/chat yet, so the runner
// hints explicitly. Cloud-only models (kimi*, qwen3-coder*,
// deepseek*, mistral-large*, gpt-oss:120b, qwen3.5:397b) → cloud;
// smaller local-resident models → local ollama default.
let provider_hint = if model.contains('/') || model.contains(":free") {
// OpenRouter convention: vendor/model[:tag] (e.g.
// "openai/gpt-oss-120b:free", "google/gemma-3-27b-it:free").
"openrouter"
} else if model.starts_with("kimi-")
|| model.starts_with("qwen3-coder")
|| model.starts_with("deepseek-v")
|| model.starts_with("mistral-large")
|| model == "gpt-oss:120b"
|| model == "qwen3.5:397b"
{
"ollama_cloud"
} else {
"ollama"
};
let chat_body = serde_json::json!({
"model": model,
"provider": provider_hint,
"messages": [
{ "role": "system", "content": REVIEWER_FRAMING },
{ "role": "user", "content": user_prompt },
],
"temperature": 0.1,
"max_tokens": 4096,
});
let chat_client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(180))
.build()
{
Ok(c) => c,
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": format!("chat client build: {e}")})),
));
}
};
let response_text = match chat_client
.post("http://localhost:3100/v1/chat")
.json(&chat_body)
.send()
.await
{
Ok(r) if r.status().is_success() => match r.json::<serde_json::Value>().await {
Ok(j) => j
.get("choices")
.and_then(|c| c.as_array())
.and_then(|a| a.first())
.and_then(|c| c.get("message"))
.and_then(|m| m.get("content"))
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string(),
Err(e) => {
return Err((
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": format!("/v1/chat parse: {e}")})),
));
}
},
Ok(r) => {
let status = r.status();
let body = r.text().await.unwrap_or_default();
return Err((
status,
Json(serde_json::json!({"error": "/v1/chat upstream error", "body": body})),
));
}
Err(e) => {
return Err((
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({"error": format!("/v1/chat send: {e}")})),
));
}
};
Ok(Json(ExecuteResponse {
mode,
model,
task_class: req.task_class,
enriched_prompt_chars: enriched_chars,
enriched_prompt_preview: preview,
sources,
response: response_text,
latency_ms: t0.elapsed().as_millis() as u64,
}))
}
#[cfg(test)]
mod tests {
use super::*;