//! KB context loader — reads recent signal from `data/_kb/*.jsonl` for //! a given sig_hash + task_class and returns a compact summary. //! //! This is the "pipe to the overviewer" from the 2026-04-23 session: //! the overseer tier (T3, gpt-oss:120b) consumes this context before //! generating a correction, so its suggestions are informed by //! historical cost / latency / outcome / prior-correction patterns //! across ALL profiles that have run this task class — not just the //! single current loop. //! //! Hot-swap profiles read the SAME pool. When a profile activates and //! starts iterating, its KB context is the shared surface — one //! profile's learning becomes every profile's starting point. //! //! Best-effort throughout: missing files, corrupt rows, empty //! directories all produce an empty KbContext. The overseer works //! fine with no history; we just can't seed it then. use serde::Serialize; use std::path::Path; use tokio::io::AsyncBufReadExt; /// Compact summary returned to the overseer. Bounded size — recent /// outcomes + corrections plus rolled-up rates. Goal is to fit in a /// prompt without eating the overseer's context budget. #[derive(Debug, Clone, Default, Serialize)] pub struct KbContext { pub sig_hash: String, pub task_class: String, pub recent_outcomes: Vec, pub recent_corrections: Vec, pub success_rate: Option, pub avg_turns: Option, pub avg_latency_ms: Option, pub total_observed: u32, } #[derive(Debug, Clone, Serialize)] pub struct OutcomeSummary { pub created_at: String, pub ok: bool, pub polarity: String, pub turns: u32, pub latency_ms: u64, pub total_tokens: u64, pub error: Option, } #[derive(Debug, Clone, Serialize)] pub struct CorrectionSummary { pub created_at: String, pub reason: String, pub correction_preview: String, // first 300 chars pub applied_at_turn: u32, } const OUTCOMES_PATH: &str = "data/_kb/outcomes.jsonl"; const CORRECTIONS_PATH: &str = "data/_kb/overseer_corrections.jsonl"; const RECENT_OUTCOME_LIMIT: usize = 5; const RECENT_CORRECTION_LIMIT: usize = 3; const AGGREGATE_WINDOW: usize = 50; impl KbContext { /// Build context from the default KB paths. pub async fn load_for(sig_hash: &str, task_class: &str) -> Self { Self::load_from( sig_hash, task_class, Path::new(OUTCOMES_PATH), Path::new(CORRECTIONS_PATH), ).await } /// Path-taking variant — tests inject tmp files without touching /// the real KB directory (same pattern as append_outcomes_row_at). pub async fn load_from( sig_hash: &str, task_class: &str, outcomes_path: &Path, corrections_path: &Path, ) -> Self { let mut ctx = KbContext { sig_hash: sig_hash.to_string(), task_class: task_class.to_string(), ..Default::default() }; // Scan outcomes — matches on sig_hash primary, task_class // secondary (so different geos for the same task_class still // contribute to aggregate rates even though they won't make // the top-5 recent). The bounded window keeps scan cost // linear in file size — we're reading tail only. let outcome_rows = tail_matching( outcomes_path, AGGREGATE_WINDOW * 4, |row| { let row_sig = row.get("sig_hash").and_then(|v| v.as_str()).unwrap_or(""); let row_tc = row.get("task_class").and_then(|v| v.as_str()).unwrap_or(""); row_sig == sig_hash || row_tc == task_class }, ).await; // Recent outcomes: exact sig_hash match first (strongest // signal), then task_class fallback up to the limit. let mut exact: Vec = Vec::new(); let mut loose: Vec = Vec::new(); for row in &outcome_rows { let row_sig = row.get("sig_hash").and_then(|v| v.as_str()).unwrap_or(""); let summary = summarize_outcome(row); if row_sig == sig_hash { exact.push(summary); } else { loose.push(summary); } } ctx.recent_outcomes = exact.into_iter().rev().take(RECENT_OUTCOME_LIMIT).collect(); if ctx.recent_outcomes.len() < RECENT_OUTCOME_LIMIT { let need = RECENT_OUTCOME_LIMIT - ctx.recent_outcomes.len(); ctx.recent_outcomes.extend(loose.into_iter().rev().take(need)); } // Aggregate rates across the full matched window (both // sig_hash and task_class matches — gives a stable rate even // on sparse sig_hash history). let window = outcome_rows.iter().rev().take(AGGREGATE_WINDOW); let mut ok_count = 0u32; let mut total = 0u32; let mut turn_sum = 0u32; let mut latency_sum = 0u64; for row in window { total += 1; if row.get("ok").and_then(|v| v.as_bool()).unwrap_or(false) { ok_count += 1; } turn_sum += row.get("turns").and_then(|v| v.as_u64()).unwrap_or(0) as u32; latency_sum += row.get("usage") .and_then(|u| u.get("latency_ms")) .and_then(|v| v.as_u64()).unwrap_or(0); } if total > 0 { ctx.total_observed = total; ctx.success_rate = Some(ok_count as f64 / total as f64); ctx.avg_turns = Some(turn_sum as f64 / total as f64); ctx.avg_latency_ms = Some(latency_sum / total as u64); } // Overseer corrections. Prefer sig_hash match; fall back to // task_class. The overseer reading its OWN prior corrections // is the main point — if the last 3 attempts produced // corrections X, Y, Z, the new correction should acknowledge // those patterns rather than suggest X for the fourth time. let correction_rows = tail_matching( corrections_path, RECENT_CORRECTION_LIMIT * 4, |row| { let row_sig = row.get("sig_hash").and_then(|v| v.as_str()).unwrap_or(""); let row_tc = row.get("task_class").and_then(|v| v.as_str()).unwrap_or(""); row_sig == sig_hash || row_tc == task_class }, ).await; let mut c_exact: Vec = Vec::new(); let mut c_loose: Vec = Vec::new(); for row in &correction_rows { let row_sig = row.get("sig_hash").and_then(|v| v.as_str()).unwrap_or(""); let summary = summarize_correction(row); if row_sig == sig_hash { c_exact.push(summary); } else { c_loose.push(summary); } } ctx.recent_corrections = c_exact.into_iter().rev().take(RECENT_CORRECTION_LIMIT).collect(); if ctx.recent_corrections.len() < RECENT_CORRECTION_LIMIT { let need = RECENT_CORRECTION_LIMIT - ctx.recent_corrections.len(); ctx.recent_corrections.extend(c_loose.into_iter().rev().take(need)); } ctx } /// Compact string form for the overseer prompt. Deterministic /// ordering + bounded length so prompt caching stays stable /// across iterations on the same task. pub fn to_prompt_section(&self) -> String { let mut s = String::new(); s.push_str("## Knowledge Base Context\n"); if let (Some(rate), Some(turns), Some(lat)) = (self.success_rate, self.avg_turns, self.avg_latency_ms) { s.push_str(&format!( "Across {} prior similar runs: success_rate={:.1}%, avg_turns={:.1}, avg_latency_ms={}\n", self.total_observed, rate * 100.0, turns, lat, )); } else { s.push_str("No prior similar runs recorded.\n"); } if !self.recent_outcomes.is_empty() { s.push_str(&format!("\nRecent {} outcomes:\n", self.recent_outcomes.len())); for o in &self.recent_outcomes { let err = o.error.as_deref().map(|e| format!(" — {}", truncate(e, 80))).unwrap_or_default(); s.push_str(&format!( " [{}] ok={} turns={} tokens={} lat={}ms{}\n", &o.created_at[..19.min(o.created_at.len())], o.ok, o.turns, o.total_tokens, o.latency_ms, err, )); } } if !self.recent_corrections.is_empty() { s.push_str(&format!("\nRecent {} overseer corrections (yours — don't repeat):\n", self.recent_corrections.len())); for c in &self.recent_corrections { s.push_str(&format!( " [{}] turn={} reason={} correction={}\n", &c.created_at[..19.min(c.created_at.len())], c.applied_at_turn, truncate(&c.reason, 40), truncate(&c.correction_preview, 200), )); } } s } } fn summarize_outcome(row: &serde_json::Value) -> OutcomeSummary { OutcomeSummary { created_at: row.get("created_at").and_then(|v| v.as_str()).unwrap_or("").to_string(), ok: row.get("ok").and_then(|v| v.as_bool()).unwrap_or(false), polarity: row.get("polarity").and_then(|v| v.as_str()).unwrap_or("").to_string(), turns: row.get("turns").and_then(|v| v.as_u64()).unwrap_or(0) as u32, latency_ms: row.get("usage").and_then(|u| u.get("latency_ms")) .and_then(|v| v.as_u64()).unwrap_or(0), total_tokens: row.get("usage").and_then(|u| u.get("total_tokens")) .and_then(|v| v.as_u64()).unwrap_or(0), error: row.get("error").and_then(|v| v.as_str()).map(String::from), } } fn summarize_correction(row: &serde_json::Value) -> CorrectionSummary { let preview = row.get("correction").and_then(|v| v.as_str()).unwrap_or(""); CorrectionSummary { created_at: row.get("created_at").and_then(|v| v.as_str()).unwrap_or("").to_string(), reason: row.get("reason").and_then(|v| v.as_str()).unwrap_or("").to_string(), correction_preview: truncate(preview, 300), applied_at_turn: row.get("applied_at_turn").and_then(|v| v.as_u64()).unwrap_or(0) as u32, } } fn truncate(s: &str, n: usize) -> String { if s.len() <= n { s.to_string() } else { format!("{}…", &s[..n]) } } /// Read a JSONL file from the tail, returning at most `limit` rows /// that match `filter`. Missing file returns empty. Corrupt lines are /// skipped. Limit is honored from the tail — a full-file scan with an /// in-memory ring would be wasteful for large outcomes histories, but /// we cap at reading the whole file and filtering post-hoc for now /// (reverse-seek line iteration is a real engineering task and the /// file is bounded by ingest rate; revisit when it bites). async fn tail_matching( path: &Path, limit: usize, filter: F, ) -> Vec where F: Fn(&serde_json::Value) -> bool, { let Ok(file) = tokio::fs::File::open(path).await else { return Vec::new(); }; let reader = tokio::io::BufReader::new(file); let mut lines = reader.lines(); let mut matches: Vec = Vec::new(); while let Ok(Some(line)) = lines.next_line().await { let Ok(v) = serde_json::from_str::(&line) else { continue }; if filter(&v) { matches.push(v); if matches.len() > limit { // Keep the most-recent window only — drop from the // front as we go rather than buffering everything. matches.remove(0); } } } matches } #[cfg(test)] mod tests { use super::*; use tokio::io::AsyncWriteExt; async fn write_fixture(path: &Path, rows: Vec) { if let Some(dir) = path.parent() { tokio::fs::create_dir_all(dir).await.unwrap(); } let mut f = tokio::fs::OpenOptions::new() .create(true).write(true).truncate(true).open(path).await.unwrap(); for r in rows { let mut line = serde_json::to_string(&r).unwrap(); line.push('\n'); f.write_all(line.as_bytes()).await.unwrap(); } } fn tmp_path(name: &str) -> std::path::PathBuf { let nanos = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos(); std::env::temp_dir().join(format!("lh_kb_ctx_{}_{}_{}", std::process::id(), nanos, name)) } #[tokio::test] async fn empty_files_produce_empty_context() { let op = tmp_path("outcomes.jsonl"); let cp = tmp_path("corrections.jsonl"); let ctx = KbContext::load_from("sig123", "staffing.fill", &op, &cp).await; assert!(ctx.recent_outcomes.is_empty()); assert!(ctx.recent_corrections.is_empty()); assert!(ctx.success_rate.is_none()); assert_eq!(ctx.total_observed, 0); } #[tokio::test] async fn exact_sig_hash_matches_take_priority() { let op = tmp_path("outcomes.jsonl"); let cp = tmp_path("corrections.jsonl"); write_fixture(&op, vec![ // Other sig_hash, same task_class — loose match serde_json::json!({ "sig_hash": "other", "task_class": "staffing.fill", "ok": false, "polarity": "failure_pattern", "turns": 1, "usage": {"latency_ms": 1000, "total_tokens": 100}, "created_at": "2026-04-22T10:00:00Z", }), // Exact sig_hash — should lead serde_json::json!({ "sig_hash": "sig123", "task_class": "staffing.fill", "ok": true, "polarity": "success_confirmation", "turns": 3, "usage": {"latency_ms": 2000, "total_tokens": 500}, "created_at": "2026-04-23T10:00:00Z", }), ]).await; write_fixture(&cp, vec![]).await; let ctx = KbContext::load_from("sig123", "staffing.fill", &op, &cp).await; assert_eq!(ctx.recent_outcomes.len(), 2); assert_eq!(ctx.recent_outcomes[0].created_at, "2026-04-23T10:00:00Z"); assert_eq!(ctx.recent_outcomes[0].ok, true); assert_eq!(ctx.total_observed, 2); assert!((ctx.success_rate.unwrap() - 0.5).abs() < 0.001); } #[tokio::test] async fn corrupt_rows_are_skipped() { let op = tmp_path("outcomes.jsonl"); let cp = tmp_path("corrections.jsonl"); // Mix valid + invalid — invalid should be silently skipped. if let Some(dir) = op.parent() { tokio::fs::create_dir_all(dir).await.unwrap(); } tokio::fs::write(&op, "not json\n{\"sig_hash\":\"sig1\",\"task_class\":\"tc\",\"ok\":true,\"turns\":1,\"usage\":{}}\ngarbage\n").await.unwrap(); write_fixture(&cp, vec![]).await; let ctx = KbContext::load_from("sig1", "tc", &op, &cp).await; assert_eq!(ctx.recent_outcomes.len(), 1); } #[tokio::test] async fn corrections_preview_is_truncated() { let op = tmp_path("outcomes.jsonl"); let cp = tmp_path("corrections.jsonl"); let long = "x".repeat(500); write_fixture(&op, vec![]).await; write_fixture(&cp, vec![serde_json::json!({ "sig_hash": "sig1", "task_class": "tc", "reason": "abort", "correction": long, "applied_at_turn": 3, "created_at": "2026-04-23T10:00:00Z", })]).await; let ctx = KbContext::load_from("sig1", "tc", &op, &cp).await; assert_eq!(ctx.recent_corrections.len(), 1); // 300-char cap + 3-byte UTF-8 ellipsis character = 303-byte worst case. assert!(ctx.recent_corrections[0].correction_preview.len() <= 303); } #[test] fn prompt_section_is_stable_for_empty_context() { let ctx = KbContext::default(); let s = ctx.to_prompt_section(); assert!(s.contains("No prior similar runs recorded")); } #[test] fn prompt_section_reports_aggregate_rates() { let ctx = KbContext { total_observed: 10, success_rate: Some(0.7), avg_turns: Some(4.2), avg_latency_ms: Some(45000), ..Default::default() }; let s = ctx.to_prompt_section(); assert!(s.contains("success_rate=70.0%")); assert!(s.contains("avg_turns=4.2")); assert!(s.contains("avg_latency_ms=45000")); } }