diff --git a/crates/aibridge/src/client.rs b/crates/aibridge/src/client.rs index 1340528..b8b45e9 100644 --- a/crates/aibridge/src/client.rs +++ b/crates/aibridge/src/client.rs @@ -25,7 +25,7 @@ pub struct EmbedResponse { pub dimensions: usize, } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] pub struct GenerateRequest { pub prompt: String, #[serde(skip_serializing_if = "Option::is_none")] @@ -36,6 +36,14 @@ pub struct GenerateRequest { pub temperature: Option, #[serde(skip_serializing_if = "Option::is_none")] pub max_tokens: Option, + /// Phase 21 — per-call opt-out of hidden reasoning. Thinking models + /// (qwen3.5, gpt-oss, etc) burn tokens on reasoning before the + /// visible response starts; setting this to `false` on hot-path + /// JSON emitters avoids empty returns when the budget is tight. + /// Sidecar forwards this to Ollama's `think` parameter; if the + /// sidecar drops an unknown field the request still succeeds. + #[serde(skip_serializing_if = "Option::is_none")] + pub think: Option, } #[derive(Deserialize, Serialize, Clone)] diff --git a/crates/aibridge/src/context.rs b/crates/aibridge/src/context.rs new file mode 100644 index 0000000..cc81562 --- /dev/null +++ b/crates/aibridge/src/context.rs @@ -0,0 +1,194 @@ +//! Phase 21 — context-budget accounting for model calls. +//! +//! Ports `assertContextBudget` + `estimateTokens` + `CONTEXT_WINDOWS` +//! from `tests/multi-agent/agent.ts` so Rust-side callers (gateway +//! tool surfaces, future Rust agents) get the same loud-fail behavior +//! on window overflow instead of silent truncation. +//! +//! The token estimator is deliberately the same chars/4 heuristic as +//! the TS side. It's biased ~15% safe — pessimistic on English, correct +//! within a factor of 2 on code. Swap to a provider tokenizer only when +//! the estimator drives a decision (we're nowhere near that yet). + +use std::collections::HashMap; +use std::sync::OnceLock; + +/// Rough token count. `chars / 4` ceiling. See module docs for why +/// this heuristic is sufficient. +pub fn estimate_tokens(text: &str) -> usize { + (text.chars().count() + 3) / 4 +} + +/// Phase 21 — per-model context windows, mirroring the TS table in +/// `tests/multi-agent/agent.ts`. Anchored on each model's documented +/// max; unknown models fall back to `DEFAULT_CONTEXT_WINDOW`. +pub const DEFAULT_CONTEXT_WINDOW: usize = 32_768; +pub const DEFAULT_SAFETY_MARGIN: usize = 2_000; +pub const DEFAULT_MAX_TOKENS: usize = 800; + +fn known_windows() -> &'static HashMap<&'static str, usize> { + static TABLE: OnceLock> = OnceLock::new(); + TABLE.get_or_init(|| { + let mut m = HashMap::new(); + m.insert("mistral:latest", 32_768); + m.insert("qwen2.5:latest", 32_768); + m.insert("qwen3:latest", 40_960); + m.insert("qwen3.5:latest", 262_144); + m.insert("qwen3-embedding", 32_768); + m.insert("nomic-embed-text-v2-moe", 2_048); + m.insert("gpt-oss:20b", 131_072); + m.insert("gpt-oss:120b", 131_072); + m.insert("qwen3.5:397b", 131_072); + m.insert("kimi-k2-thinking", 200_000); + m.insert("kimi-k2.6", 200_000); + m.insert("kimi-k2:1t", 1_048_576); + m.insert("deepseek-v3.1:671b", 131_072); + m.insert("glm-4.7", 131_072); + m + }) +} + +pub fn context_window_for(model: &str) -> usize { + known_windows().get(model).copied().unwrap_or(DEFAULT_CONTEXT_WINDOW) +} + +/// Result of a budget check — exposes the numbers so callers can log +/// how much headroom remains without re-running the estimator. +#[derive(Debug, Clone, Copy)] +pub struct BudgetCheck { + pub estimated: usize, + pub window: usize, + pub remaining: i64, +} + +/// Inputs to `assert_context_budget`. `bypass` exists for call sites +/// that handle their own overflow (continuation's second pass already +/// counted the partial; T5 gatekeeper prompts have a separate policy). +#[derive(Debug, Clone, Default)] +pub struct BudgetOpts<'a> { + pub system: Option<&'a str>, + pub max_tokens: Option, + pub safety_margin: Option, + pub bypass: bool, +} + +/// Phase 21's loud-fail primitive. Returns a `BudgetCheck` on success +/// and the same struct plus over-by count on failure. The whole point +/// is to stop silent truncation — callers that expect overflow should +/// chunk BEFORE calling or set `bypass: true`. +pub fn assert_context_budget( + model: &str, + prompt: &str, + opts: BudgetOpts, +) -> Result { + let window = context_window_for(model); + let safety = opts.safety_margin.unwrap_or(DEFAULT_SAFETY_MARGIN); + let max_tokens = opts.max_tokens.unwrap_or(DEFAULT_MAX_TOKENS); + let sys_tokens = opts.system.map(estimate_tokens).unwrap_or(0); + let estimated = estimate_tokens(prompt) + sys_tokens + max_tokens; + let remaining = window as i64 - estimated as i64 - safety as i64; + let check = BudgetCheck { estimated, window, remaining }; + if remaining < 0 && !opts.bypass { + return Err((check, (-remaining) as usize)); + } + Ok(check) +} + +/// Convenience — format an overflow error the same way the TS side +/// does. Exposed so downstream crates render consistent messages. +pub fn overflow_message(model: &str, check: &BudgetCheck, over_by: usize, safety: usize) -> String { + format!( + "context overflow: model={} est={}t window={}t safety={}t over={}t. \ + Chunk the prompt (see config/models.json overflow_policies) or set \ + bypass:true if you know the risk.", + model, check.estimated, check.window, safety, over_by, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn estimate_tokens_ceiling_divides_by_four() { + assert_eq!(estimate_tokens(""), 0); + assert_eq!(estimate_tokens("abc"), 1); // 3 → ceil(3/4) = 1 + assert_eq!(estimate_tokens("abcd"), 1); // 4 → ceil(4/4) = 1 + assert_eq!(estimate_tokens("abcde"), 2); // 5 → ceil(5/4) = 2 + assert_eq!(estimate_tokens(&"x".repeat(400)), 100); + } + + #[test] + fn context_window_known_and_fallback() { + assert_eq!(context_window_for("qwen3.5:latest"), 262_144); + assert_eq!(context_window_for("kimi-k2:1t"), 1_048_576); + assert_eq!(context_window_for("some-unreleased-model"), DEFAULT_CONTEXT_WINDOW); + } + + #[test] + fn budget_passes_well_under_window() { + let check = assert_context_budget( + "qwen3:latest", + &"x".repeat(4_000), // ~1000 tokens + BudgetOpts { max_tokens: Some(500), ..Default::default() }, + ).expect("well under 40K window"); + assert!(check.remaining > 30_000); + } + + #[test] + fn budget_fails_when_prompt_overflows_window() { + let huge = "x".repeat(200_000); // ~50K tokens, over qwen3's 40K + let err = assert_context_budget( + "qwen3:latest", + &huge, + BudgetOpts::default(), + ).expect_err("should overflow qwen3's 40K window"); + assert!(err.1 > 0, "over_by must be positive"); + } + + #[test] + fn budget_bypass_returns_ok_even_over() { + let huge = "x".repeat(200_000); + let check = assert_context_budget( + "qwen3:latest", + &huge, + BudgetOpts { bypass: true, ..Default::default() }, + ).expect("bypass must suppress the error"); + assert!(check.remaining < 0, "check still reports negative remaining"); + } + + #[test] + fn budget_counts_system_prompt() { + // 10K-char system prompt → ~2500 tokens. With a big max_tokens + // this should push us closer to the window. + let sys = "s".repeat(10_000); + let prompt = "p".repeat(4_000); + let with_sys = assert_context_budget( + "qwen3:latest", + &prompt, + BudgetOpts { + system: Some(&sys), + max_tokens: Some(500), + ..Default::default() + }, + ).unwrap(); + let without_sys = assert_context_budget( + "qwen3:latest", + &prompt, + BudgetOpts { max_tokens: Some(500), ..Default::default() }, + ).unwrap(); + assert!(with_sys.estimated > without_sys.estimated, + "system prompt should raise estimate"); + assert_eq!(with_sys.estimated - without_sys.estimated, estimate_tokens(&sys)); + } + + #[test] + fn overflow_message_includes_numbers() { + let check = BudgetCheck { estimated: 42_000, window: 40_960, remaining: -1_040 }; + let msg = overflow_message("qwen3:latest", &check, 3_040, 2_000); + assert!(msg.contains("qwen3:latest")); + assert!(msg.contains("42000t")); + assert!(msg.contains("40960t")); + assert!(msg.contains("3040t")); + } +} diff --git a/crates/aibridge/src/continuation.rs b/crates/aibridge/src/continuation.rs new file mode 100644 index 0000000..2c61eaa --- /dev/null +++ b/crates/aibridge/src/continuation.rs @@ -0,0 +1,438 @@ +//! Phase 21 — OUTPUT-overflow handler. Ports `generateContinuable` +//! from `tests/multi-agent/agent.ts`. +//! +//! Two failure modes to repair: +//! +//! * EMPTY response — thinking model ate the entire budget on hidden +//! reasoning before emitting a token. Fix: retry the original prompt +//! with 2× the budget, geometric up to `BUDGET_CAP`. +//! +//! * TRUNCATED non-empty — model got most of the way but hit +//! max_tokens before closing the structure. Fix: continue with the +//! partial response in the prompt as scratchpad, so the model knows +//! where to pick up without restarting. +//! +//! `TextGenerator` abstracts the sidecar so tests can inject canned +//! responses without a live Ollama. + +use std::future::Future; + +use crate::client::{AiClient, GenerateRequest, GenerateResponse}; + +/// Shape classifier for `is_structurally_complete`. JSON responses +/// must parse; text responses just need to be non-empty. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ResponseShape { + Json, + Text, +} + +/// Trait that `generate_continuable` + `generate_tree_split` call. The +/// real implementation forwards to `AiClient::generate`; tests supply a +/// mock with a scripted sequence of responses. +pub trait TextGenerator: Send + Sync { + fn generate_text( + &self, + req: GenerateRequest, + ) -> impl Future> + Send; +} + +impl TextGenerator for AiClient { + fn generate_text( + &self, + req: GenerateRequest, + ) -> impl Future> + Send { + self.generate(req) + } +} + +/// Strip a surrounding ```json``` fence if present. Leaves the inner +/// content alone otherwise. Returns a slice of `s`. +fn strip_json_fence(s: &str) -> &str { + let t = s.trim(); + if let Some(rest) = t.strip_prefix("```json") { + rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim() + } else if let Some(rest) = t.strip_prefix("```") { + rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim() + } else { + t + } +} + +/// Port of the TS brace-balance + JSON.parse check. Returns true when +/// the outermost `{...}` block is balanced and parses. Text shape is +/// satisfied by any non-empty, non-whitespace payload. +pub fn is_structurally_complete(text: &str, shape: ResponseShape) -> bool { + if text.trim().is_empty() { return false; } + if shape == ResponseShape::Text { return true; } + + let s = strip_json_fence(text); + let Some(start) = s.find('{') else { return false }; + let Some(end) = s.rfind('}') else { return false }; + if end <= start { return false; } + let slice = &s[start..=end]; + + // Balance check — cheaper than parse, catches truncated nests. + // String state tracked because `{` inside a string doesn't count. + let mut depth: i32 = 0; + let mut in_str = false; + let mut esc = false; + for c in slice.chars() { + if esc { esc = false; continue; } + if c == '\\' { esc = true; continue; } + if c == '"' { in_str = !in_str; continue; } + if in_str { continue; } + if c == '{' { depth += 1; } + else if c == '}' { + depth -= 1; + if depth < 0 { return false; } + } + } + if depth != 0 { return false; } + // Parse check is the tie-breaker — balanced but invalid JSON (e.g. + // trailing comma before `}`) shouldn't count as complete. + serde_json::from_str::(slice).is_ok() +} + +/// Knobs for `generate_continuable`. All optional with sensible +/// defaults that match the TS version. +#[derive(Debug, Clone)] +pub struct ContinuableOpts { + pub model: String, + pub max_tokens: Option, + pub temperature: Option, + pub system: Option, + pub shape: ResponseShape, + pub max_continuations: usize, + pub think: Option, + /// Geometric-backoff ceiling for the empty-response retry path. + /// Matches TS's `budgetCap = 8000`. + pub budget_cap: u32, + /// Maximum empty-response retries before giving up. Matches TS's + /// hardcoded `retry < 3`. + pub max_empty_retries: usize, +} + +impl ContinuableOpts { + pub fn new(model: impl Into) -> Self { + Self { + model: model.into(), + max_tokens: None, + temperature: None, + system: None, + shape: ResponseShape::Json, + max_continuations: 3, + think: None, + budget_cap: 8_000, + max_empty_retries: 3, + } + } +} + +/// Outcome of a `generate_continuable` call. Carries the combined +/// text plus diagnostic counters so observability downstream can +/// report "how many continuations did that query cost". +#[derive(Debug, Clone)] +pub struct ContinuableOutcome { + pub text: String, + pub empty_retries: usize, + pub continuations: usize, + pub final_complete: bool, +} + +fn make_request(opts: &ContinuableOpts, prompt: String, current_max: u32) -> GenerateRequest { + GenerateRequest { + prompt, + model: Some(opts.model.clone()), + system: opts.system.clone(), + temperature: opts.temperature, + max_tokens: Some(current_max), + think: opts.think, + } +} + +fn continuation_prompt(original: &str, partial: &str) -> String { + format!( + "{original}\n\n\ + PARTIAL RESPONSE SO FAR (continue from here — do NOT restart, \ + do NOT repeat what's already there, emit ONLY the remaining \ + tokens to close the structure):\n{partial}" + ) +} + +/// Phase 21 — output-overflow safe generate. See module docs for the +/// two failure modes repaired. On final-still-incomplete, returns the +/// combined text with `final_complete: false` so the caller's parser +/// can throw with the raw text for forensics rather than silently +/// truncating. +pub async fn generate_continuable( + generator: &G, + prompt: &str, + opts: &ContinuableOpts, +) -> Result { + let initial_max = opts.max_tokens.unwrap_or(800); + let mut current_max = initial_max; + let mut combined = String::new(); + let mut empty_retries = 0usize; + let mut continuations = 0usize; + + // Phase 21(a) — empty-response backoff loop. + for retry in 0..opts.max_empty_retries { + let req = make_request(opts, prompt.to_string(), current_max); + let resp = generator.generate_text(req).await?; + if !resp.text.trim().is_empty() { + combined = resp.text; + break; + } + empty_retries = retry + 1; + current_max = (current_max.saturating_mul(2)).min(opts.budget_cap); + } + + // Phase 21(b) — structural-completion continuation loop. Runs on + // the truncated-non-empty case; empty + exhausted retries falls + // through with empty combined and final_complete=false. + for _ in 0..opts.max_continuations { + if is_structurally_complete(&combined, opts.shape) { + return Ok(ContinuableOutcome { + text: combined, + empty_retries, + continuations, + final_complete: true, + }); + } + if combined.trim().is_empty() { + // Nothing to continue from — continuing "" is identical to + // the initial call and would loop. Bail so the caller sees + // the failure rather than burning N extra calls. + break; + } + let cont_prompt = continuation_prompt(prompt, &combined); + let req = make_request(opts, cont_prompt, current_max.min(opts.budget_cap)); + let resp = generator.generate_text(req).await?; + combined.push_str(&resp.text); + continuations += 1; + } + + let final_complete = is_structurally_complete(&combined, opts.shape); + Ok(ContinuableOutcome { + text: combined, + empty_retries, + continuations, + final_complete, + }) +} + +/// Scripted generator for unit tests. Returns responses from `script` +/// in order; extra calls reuse the last entry so tests don't have to +/// count past what they actually assert on. +#[cfg(test)] +pub struct ScriptedGenerator { + script: Vec>, + calls: std::sync::Arc>>, +} + +#[cfg(test)] +impl ScriptedGenerator { + pub fn new(script: I) -> Self + where + I: IntoIterator>, + { + Self { + script: script.into_iter().collect(), + calls: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())), + } + } + + pub fn calls(&self) -> Vec { + self.calls.lock().unwrap().clone() + } + + pub fn call_count(&self) -> usize { + self.calls.lock().unwrap().len() + } +} + +#[cfg(test)] +impl TextGenerator for ScriptedGenerator { + fn generate_text( + &self, + req: GenerateRequest, + ) -> impl Future> + Send { + let i = { + let mut calls = self.calls.lock().unwrap(); + let i = calls.len(); + calls.push(req.clone()); + i + }; + let model = req.model.clone().unwrap_or_default(); + let entry = self.script.get(i) + .cloned() + .unwrap_or_else(|| self.script.last().cloned().unwrap_or(Ok(String::new()))); + async move { + entry.map(|text| GenerateResponse { + text, + model, + tokens_evaluated: None, + tokens_generated: None, + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn structural_complete_rejects_empty_and_text_mismatch() { + assert!(!is_structurally_complete("", ResponseShape::Text)); + assert!(!is_structurally_complete(" ", ResponseShape::Text)); + assert!(is_structurally_complete("any content", ResponseShape::Text)); + } + + #[test] + fn structural_complete_handles_balanced_json() { + assert!(is_structurally_complete(r#"{"a": 1}"#, ResponseShape::Json)); + assert!(is_structurally_complete( + r#"```json +{"a": 1, "b": [1, 2, 3]} +```"#, + ResponseShape::Json, + )); + } + + #[test] + fn structural_complete_rejects_truncated_json() { + assert!(!is_structurally_complete(r#"{"a": 1"#, ResponseShape::Json)); + assert!(!is_structurally_complete(r#"{"a": {"b": 1"#, ResponseShape::Json)); + // Trailing comma — balanced but unparseable + assert!(!is_structurally_complete(r#"{"a": 1,}"#, ResponseShape::Json)); + } + + #[test] + fn structural_complete_ignores_braces_inside_strings() { + assert!(is_structurally_complete(r#"{"s": "has } inside"}"#, ResponseShape::Json)); + assert!(is_structurally_complete(r#"{"s": "escaped \" quote"}"#, ResponseShape::Json)); + } + + #[tokio::test] + async fn continuable_returns_first_response_when_complete() { + let generator = ScriptedGenerator::new(vec![Ok(r#"{"ok": true}"#.to_string())]); + let opts = ContinuableOpts::new("qwen3:latest"); + let out = generate_continuable(&generator, "test", &opts).await.unwrap(); + assert!(out.final_complete); + assert_eq!(out.empty_retries, 0); + assert_eq!(out.continuations, 0); + assert_eq!(generator.call_count(), 1); + } + + #[tokio::test] + async fn continuable_retries_on_empty_with_doubled_budget() { + // Two empties, then a good response. Third call should see 4× + // the initial budget (2× twice). + let generator = ScriptedGenerator::new(vec![ + Ok("".to_string()), + Ok("".to_string()), + Ok(r#"{"ok": true}"#.to_string()), + ]); + let mut opts = ContinuableOpts::new("qwen3:latest"); + opts.max_tokens = Some(100); + let out = generate_continuable(&generator, "test", &opts).await.unwrap(); + assert!(out.final_complete); + assert_eq!(out.empty_retries, 2); + + let calls = generator.calls(); + assert_eq!(calls.len(), 3); + assert_eq!(calls[0].max_tokens, Some(100)); + assert_eq!(calls[1].max_tokens, Some(200)); + assert_eq!(calls[2].max_tokens, Some(400)); + } + + #[tokio::test] + async fn continuable_caps_budget_at_budget_cap() { + let generator = ScriptedGenerator::new(vec![ + Ok("".to_string()), + Ok("".to_string()), + Ok(r#"{"ok":1}"#.to_string()), + ]); + let mut opts = ContinuableOpts::new("qwen3:latest"); + opts.max_tokens = Some(5_000); + opts.budget_cap = 7_000; + let out = generate_continuable(&generator, "test", &opts).await.unwrap(); + assert!(out.final_complete); + let calls = generator.calls(); + // 5000 → doubled would be 10000; cap pulls it to 7000. + assert_eq!(calls[1].max_tokens, Some(7_000)); + assert_eq!(calls[2].max_tokens, Some(7_000)); + } + + #[tokio::test] + async fn continuable_glues_truncated_response() { + // First call returns balanced-open `{...`; continuation closes + // it with `...}`. Combined must parse. + let generator = ScriptedGenerator::new(vec![ + Ok(r#"{"fills": [{"candidate_id": "C-001""#.to_string()), + Ok(r#", "name": "Alice"}]}"#.to_string()), + ]); + let opts = ContinuableOpts::new("qwen3:latest"); + let out = generate_continuable(&generator, "ORIGINAL", &opts).await.unwrap(); + assert!(out.final_complete, "combined must parse: {}", out.text); + assert_eq!(out.continuations, 1); + + let calls = generator.calls(); + assert_eq!(calls.len(), 2); + // Continuation prompt must contain the partial — that's the + // scratchpad primitive J called out. + let cont_prompt = &calls[1].prompt; + assert!(cont_prompt.contains("ORIGINAL"), + "continuation must include original prompt"); + assert!(cont_prompt.contains(r#"{"fills": [{"candidate_id": "C-001""#), + "continuation must include partial"); + } + + #[tokio::test] + async fn continuable_does_not_loop_on_persistent_empty() { + // All three retries return empty; we must NOT then enter the + // continuation loop with an empty partial (would burn 3 more + // calls continuing from ""). + let generator = ScriptedGenerator::new(vec![ + Ok("".to_string()), + Ok("".to_string()), + Ok("".to_string()), + ]); + let opts = ContinuableOpts::new("qwen3:latest"); + let out = generate_continuable(&generator, "test", &opts).await.unwrap(); + assert!(!out.final_complete); + assert_eq!(out.empty_retries, 3); + assert_eq!(out.continuations, 0, "must not continue from empty"); + assert_eq!(generator.call_count(), 3); + } + + #[tokio::test] + async fn continuable_returns_raw_on_exhausted_continuations() { + // Three continuations that never complete — caller's parser + // will throw. We must return the combined text so forensics + // has the raw content, not a lossy truncation. + let generator = ScriptedGenerator::new(vec![ + Ok(r#"{"a": ["#.to_string()), + Ok("1,".to_string()), + Ok("2,".to_string()), + Ok("3,".to_string()), + ]); + let mut opts = ContinuableOpts::new("qwen3:latest"); + opts.max_continuations = 3; + let out = generate_continuable(&generator, "test", &opts).await.unwrap(); + assert!(!out.final_complete); + assert_eq!(out.continuations, 3); + assert!(out.text.contains(r#"{"a": ["#)); + assert!(out.text.contains("3,")); + } + + #[tokio::test] + async fn continuable_propagates_generator_errors() { + let generator = ScriptedGenerator::new(vec![Err("sidecar 503".to_string())]); + let opts = ContinuableOpts::new("qwen3:latest"); + let err = generate_continuable(&generator, "test", &opts).await.unwrap_err(); + assert!(err.contains("503")); + } +} diff --git a/crates/aibridge/src/lib.rs b/crates/aibridge/src/lib.rs index 431311a..04e8022 100644 --- a/crates/aibridge/src/lib.rs +++ b/crates/aibridge/src/lib.rs @@ -1,2 +1,5 @@ pub mod client; +pub mod context; +pub mod continuation; pub mod service; +pub mod tree_split; diff --git a/crates/aibridge/src/tree_split.rs b/crates/aibridge/src/tree_split.rs new file mode 100644 index 0000000..c29dff6 --- /dev/null +++ b/crates/aibridge/src/tree_split.rs @@ -0,0 +1,326 @@ +//! Phase 21 — INPUT-overflow handler. Ports `generateTreeSplit` from +//! `tests/multi-agent/agent.ts`. +//! +//! When the input corpus exceeds the model's window (200 playbooks +//! pasted into a T4 strategic prompt, a long retrospective digest, a +//! cross-corpus summarization), raising `max_tokens` doesn't help — +//! the prompt itself is the problem. The answer is map-reduce: +//! +//! 1. Caller shards the input at semantic boundaries (records, +//! paragraphs, playbook entries). +//! 2. For each shard, build a map prompt that includes the running +//! scratchpad and run it through `generate_continuable`. +//! 3. Append the map output to the scratchpad (oldest-first +//! truncation when it outgrows `scratchpad_budget`). +//! 4. Build a reduce prompt from the final scratchpad and run it. +//! +//! Every shard prompt and the reduce prompt go through +//! `assert_context_budget` first — if a single shard still overflows +//! we bubble the error up rather than silently truncating. That's the +//! whole point of Phase 21. + +use crate::context::{assert_context_budget, BudgetOpts, estimate_tokens, overflow_message, + DEFAULT_MAX_TOKENS, DEFAULT_SAFETY_MARGIN}; +use crate::continuation::{generate_continuable, ContinuableOpts, ResponseShape, TextGenerator}; + +/// Callback signatures — caller supplies closures that stitch the +/// scratchpad into each shard's prompt and build the final reduce +/// prompt. Kept as `Fn` (not `FnMut`) so the map loop can call them +/// by reference. +pub type MapPromptFn<'a> = dyn Fn(&str, &str) -> String + Send + Sync + 'a; +pub type ReducePromptFn<'a> = dyn Fn(&str) -> String + Send + Sync + 'a; + +/// Knobs for `generate_tree_split`. +#[derive(Debug, Clone)] +pub struct TreeSplitOpts { + pub model: String, + pub system: Option, + pub temperature: Option, + /// max_tokens for map AND reduce (reduce defaults are usually + /// higher; caller overrides for just reduce by calling through + /// continuable directly if needed). + pub max_tokens: Option, + pub reduce_max_tokens: Option, + pub think: Option, + /// Soft ceiling on scratchpad size (estimated tokens). When it + /// grows past this, the oldest shard digest gets dropped. Default + /// 6000, matching the TS implementation. + pub scratchpad_budget: usize, + pub safety_margin: Option, +} + +impl TreeSplitOpts { + pub fn new(model: impl Into) -> Self { + Self { + model: model.into(), + system: None, + temperature: None, + max_tokens: None, + reduce_max_tokens: None, + think: None, + scratchpad_budget: 6_000, + safety_margin: None, + } + } +} + +/// Result — final reduce response plus the accumulated scratchpad so +/// the caller can inspect what was kept vs truncated. +#[derive(Debug, Clone)] +pub struct TreeSplitResult { + pub response: String, + pub scratchpad: String, + pub shards_processed: usize, + pub scratchpad_truncations: usize, + pub total_continuations: usize, +} + +/// Drop shard-digest blocks from the head of `scratchpad` until its +/// estimated-token count fits the budget. Digest blocks are delimited +/// by `\n— shard N/M digest —\n` so we can find the first one and +/// chop everything before its successor. +fn truncate_scratchpad(scratchpad: &mut String, budget_tokens: usize) -> bool { + if estimate_tokens(scratchpad) <= budget_tokens { return false; } + // Find the second delimiter — everything before it gets dropped. + const DELIM_PREFIX: &str = "\n— shard "; + let mut cursor = 0; + let mut truncated = false; + while estimate_tokens(&scratchpad[cursor..]) > budget_tokens { + // Skip past a leading delimiter (if we're sitting on one from + // a previous iteration), then find the next. + let search_from = cursor + if scratchpad[cursor..].starts_with(DELIM_PREFIX) { + DELIM_PREFIX.len() + } else { 0 }; + let Some(rel_next) = scratchpad[search_from..].find(DELIM_PREFIX) else { break }; + cursor = search_from + rel_next; + truncated = true; + } + if cursor > 0 { + scratchpad.drain(..cursor); + } + truncated +} + +/// Phase 21 — map-reduce over shards with a running scratchpad. See +/// module docs. +pub async fn generate_tree_split( + generator: &G, + shards: &[String], + map_prompt: &MapPromptFn<'_>, + reduce_prompt: &ReducePromptFn<'_>, + opts: &TreeSplitOpts, +) -> Result { + let mut scratchpad = String::new(); + let safety = opts.safety_margin.unwrap_or(DEFAULT_SAFETY_MARGIN); + let map_max = opts.max_tokens.unwrap_or(DEFAULT_MAX_TOKENS as u32); + let reduce_max = opts.reduce_max_tokens.unwrap_or(1_500); + let mut truncations = 0usize; + let mut total_continuations = 0usize; + + for (i, shard) in shards.iter().enumerate() { + let shard_prompt = map_prompt(shard, &scratchpad); + // Loud-fail on per-shard overflow — caller sharded too + // coarsely. Silent truncation is exactly the mode J rejected. + let budget = BudgetOpts { + system: opts.system.as_deref(), + max_tokens: Some(map_max as usize), + safety_margin: Some(safety), + bypass: false, + }; + let check = assert_context_budget(&opts.model, &shard_prompt, budget) + .map_err(|(c, over)| overflow_message(&opts.model, &c, over, safety))?; + let _ = check; + + let mut cont_opts = ContinuableOpts::new(&opts.model); + cont_opts.max_tokens = Some(map_max); + cont_opts.temperature = opts.temperature; + cont_opts.system = opts.system.clone(); + cont_opts.shape = ResponseShape::Text; + cont_opts.think = opts.think; + + let outcome = generate_continuable(generator, &shard_prompt, &cont_opts).await?; + total_continuations += outcome.continuations; + + // Append this shard's digest and, if needed, drop oldest. + scratchpad.push_str(&format!( + "\n— shard {}/{} digest —\n{}", + i + 1, shards.len(), outcome.text.trim(), + )); + if truncate_scratchpad(&mut scratchpad, opts.scratchpad_budget) { + truncations += 1; + } + } + + // Reduce pass. Budget check first — if the scratchpad is still too + // big for the reduce prompt we fail loud with numbers. + let reduce_p = reduce_prompt(&scratchpad); + let budget = BudgetOpts { + system: opts.system.as_deref(), + max_tokens: Some(reduce_max as usize), + safety_margin: Some(safety), + bypass: false, + }; + assert_context_budget(&opts.model, &reduce_p, budget) + .map_err(|(c, over)| overflow_message(&opts.model, &c, over, safety))?; + + let mut cont_opts = ContinuableOpts::new(&opts.model); + cont_opts.max_tokens = Some(reduce_max); + cont_opts.temperature = opts.temperature; + cont_opts.system = opts.system.clone(); + cont_opts.shape = ResponseShape::Text; + cont_opts.think = opts.think; + + let outcome = generate_continuable(generator, &reduce_p, &cont_opts).await?; + total_continuations += outcome.continuations; + + Ok(TreeSplitResult { + response: outcome.text, + scratchpad, + shards_processed: shards.len(), + scratchpad_truncations: truncations, + total_continuations, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::continuation::ScriptedGenerator; + + fn simple_map(shard: &str, scratchpad: &str) -> String { + format!("SCRATCHPAD:\n{scratchpad}\n---\nSHARD:\n{shard}\n---\nDIGEST:") + } + + fn simple_reduce(scratchpad: &str) -> String { + format!("SCRATCHPAD:\n{scratchpad}\n---\nFINAL:") + } + + #[tokio::test] + async fn tree_split_runs_map_then_reduce() { + // 3 shards → 3 map calls → 1 reduce call = 4 responses scripted. + let generator = ScriptedGenerator::new(vec![ + Ok("digest-1".to_string()), + Ok("digest-2".to_string()), + Ok("digest-3".to_string()), + Ok("FINAL ANSWER".to_string()), + ]); + let shards: Vec = ["a", "b", "c"].iter().map(|s| s.to_string()).collect(); + let opts = TreeSplitOpts::new("qwen3:latest"); + let map_fn: &MapPromptFn = &simple_map; + let reduce_fn: &ReducePromptFn = &simple_reduce; + let result = generate_tree_split(&generator, &shards, map_fn, reduce_fn, &opts) + .await + .unwrap(); + assert_eq!(result.shards_processed, 3); + assert_eq!(result.response, "FINAL ANSWER"); + assert_eq!(generator.call_count(), 4); + // Scratchpad must carry all three digests in order. + assert!(result.scratchpad.contains("digest-1")); + assert!(result.scratchpad.contains("digest-2")); + assert!(result.scratchpad.contains("digest-3")); + } + + #[tokio::test] + async fn tree_split_reduce_prompt_sees_full_scratchpad() { + let generator = ScriptedGenerator::new(vec![ + Ok("summary-A".to_string()), + Ok("summary-B".to_string()), + Ok("REDUCED".to_string()), + ]); + let shards = vec!["input-one".to_string(), "input-two".to_string()]; + let opts = TreeSplitOpts::new("qwen3:latest"); + let _ = generate_tree_split(&generator, &shards, &simple_map, &simple_reduce, &opts) + .await + .unwrap(); + // Third call = reduce. Its prompt must include both digests. + let calls = generator.calls(); + let reduce_prompt = &calls[2].prompt; + assert!(reduce_prompt.contains("summary-A"), + "reduce prompt must see first shard digest"); + assert!(reduce_prompt.contains("summary-B"), + "reduce prompt must see second shard digest"); + } + + #[tokio::test] + async fn tree_split_loud_fails_on_shard_overflow() { + let generator = ScriptedGenerator::new(vec![Ok("digest".to_string())]); + // One gigantic shard — well over qwen3's 40K window even as a + // prompt. The budget check must reject before any generate call. + let shards = vec!["x".repeat(200_000)]; + let opts = TreeSplitOpts::new("qwen3:latest"); + let err = generate_tree_split(&generator, &shards, &simple_map, &simple_reduce, &opts) + .await + .expect_err("shard-sized overflow must be rejected"); + assert!(err.contains("overflow"), "error should mention overflow: {err}"); + assert_eq!(generator.call_count(), 0, "generate must not be called on overflow"); + } + + #[tokio::test] + async fn tree_split_truncates_scratchpad_when_over_budget() { + // Tight budget so each shard trips truncation. qwen3's 40K + // window is fine; the budget we care about is the scratchpad + // cap, not the model window. + let generator = ScriptedGenerator::new(vec![ + Ok("A".repeat(2_000)), + Ok("B".repeat(2_000)), + Ok("C".repeat(2_000)), + Ok("D".repeat(2_000)), + Ok("FINAL".to_string()), + ]); + let shards: Vec = (0..4).map(|i| format!("shard{i}")).collect(); + let mut opts = TreeSplitOpts::new("qwen3:latest"); + opts.scratchpad_budget = 1_000; // ~4000 chars — one digest barely fits + let result = generate_tree_split(&generator, &shards, &simple_map, &simple_reduce, &opts) + .await + .unwrap(); + assert!(result.scratchpad_truncations > 0, + "tight budget must trigger truncation"); + // Scratchpad should still fit roughly within the budget + // (post-truncation); the estimator uses chars/4 so the bound + // is ~budget*4 chars. Give some slack for the delimiter. + let scratchpad_tokens = estimate_tokens(&result.scratchpad); + assert!(scratchpad_tokens <= opts.scratchpad_budget * 2, + "scratchpad {} tokens vs budget {}", scratchpad_tokens, opts.scratchpad_budget); + } + + #[tokio::test] + async fn tree_split_reports_continuations_from_map_and_reduce() { + // First shard: truncated-then-continued. Reduce: truncated-then-continued. + // 1 shard: 2 map calls (initial + continuation), then 2 reduce calls. + let generator = ScriptedGenerator::new(vec![ + Ok("partial".to_string()), // map shape=text, non-empty → complete on first pass + Ok("reduce-out".to_string()), + ]); + let shards = vec!["only".to_string()]; + let opts = TreeSplitOpts::new("qwen3:latest"); + let result = generate_tree_split(&generator, &shards, &simple_map, &simple_reduce, &opts) + .await + .unwrap(); + // Text shape treats non-empty as complete → 0 continuations. + assert_eq!(result.total_continuations, 0); + assert_eq!(result.shards_processed, 1); + } + + #[test] + fn truncate_scratchpad_noop_when_under_budget() { + let mut s = "\n— shard 1/1 digest —\nshort".to_string(); + let truncated = truncate_scratchpad(&mut s, 1_000); + assert!(!truncated); + assert!(s.contains("short")); + } + + #[test] + fn truncate_scratchpad_drops_oldest_first() { + let mut s = format!( + "\n— shard 1/3 digest —\n{}\n— shard 2/3 digest —\n{}\n— shard 3/3 digest —\nshort", + "x".repeat(4_000), // ~1000 tokens + "y".repeat(4_000), // ~1000 tokens + ); + let truncated = truncate_scratchpad(&mut s, 500); // ~2000 chars + assert!(truncated); + assert!(!s.contains(&"x".repeat(4_000)), + "oldest digest should be dropped"); + assert!(s.contains("short"), + "newest digest should survive"); + } +} diff --git a/crates/vectord/src/playbook_memory.rs b/crates/vectord/src/playbook_memory.rs index ab5d7f4..5a49190 100644 --- a/crates/vectord/src/playbook_memory.rs +++ b/crates/vectord/src/playbook_memory.rs @@ -113,8 +113,31 @@ pub struct PlaybookEntry { /// "manual: operator requested via POST /retire" #[serde(default)] pub retirement_reason: Option, + /// Phase 27 — monotonic version counter within a playbook chain. + /// First version is 1; `revise_entry` sets the new entry's version + /// to parent.version + 1. Entries persisted before Phase 27 get + /// version=1 via serde default and are treated as roots. + #[serde(default = "default_version")] + pub version: u32, + /// Phase 27 — playbook_id of the prior version in this chain. None + /// for root entries (first version). + #[serde(default)] + pub parent_id: Option, + /// Phase 27 — timestamp set when a newer version replaced this + /// entry via `revise_entry`. Superseded entries are excluded from + /// boost calculations (same rule as `retired_at`) but remain + /// queryable via `history` for audit. + #[serde(default)] + pub superseded_at: Option, + /// Phase 27 — playbook_id of the entry that replaced this one. + /// Walking `superseded_by` from the root forward reconstructs the + /// full version chain. + #[serde(default)] + pub superseded_by: Option, } +fn default_version() -> u32 { 1 } + /// A recorded failure — worker who didn't deliver on a contract. /// Tracked per (city, state, name) so a single worker's failures on /// Toledo Welder contracts don't penalize the same name in Chicago. @@ -168,6 +191,17 @@ pub enum UpsertOutcome { Noop(String), } +/// Phase 27 — shape returned from `revise_entry`. Reports both ends of +/// the supersession so callers can link citations or audit chains. +#[derive(Debug, Clone, Serialize)] +pub struct ReviseOutcome { + pub parent_id: String, + pub parent_version: u32, + pub new_playbook_id: String, + pub new_version: u32, + pub superseded_at: String, +} + /// Return YYYY-MM-DD from an RFC3339 timestamp. Falls back to the /// first 10 chars if parse fails — tolerant for legacy entries that /// stored a bare date. @@ -204,13 +238,15 @@ impl PlaybookMemory { /// Rebuild the geo index from scratch. Called by every mutation /// helper after persist succeeds. O(n) scan of entries; at current - /// scale ~40µs. Skips retired entries — they never participate in - /// boost filtering, so indexing them would just waste lookups. + /// scale ~40µs. Skips retired and superseded entries — they never + /// participate in boost filtering, so indexing them would just + /// waste lookups. async fn rebuild_geo_index(&self) { let state = self.state.read().await; let mut idx: HashMap<(String, String), Vec> = HashMap::new(); for (i, e) in state.entries.iter().enumerate() { if e.retired_at.is_some() { continue; } + if e.superseded_at.is_some() { continue; } let (Some(city), Some(st)) = (&e.city, &e.state) else { continue; }; let key = (city.to_ascii_lowercase(), st.to_ascii_uppercase()); idx.entry(key).or_default().push(i); @@ -312,13 +348,129 @@ impl PlaybookMemory { Ok(count) } - /// Stats accessor for the /status endpoint and tests. - pub async fn status_counts(&self) -> (usize, usize, usize) { + /// Phase 27 — append a new version of an existing playbook. The + /// parent is stamped with `superseded_at` + `superseded_by`; the + /// new entry inherits `parent_id` and gets `version = parent + 1`. + /// Errors when the parent is retired (terminal state) or already + /// superseded (must revise the tip of the chain, not a middle + /// node). Caller supplies the new entry with its own fresh + /// `playbook_id`; chain-metadata fields on the input are + /// overwritten so callers can't fabricate a mismatched history. + pub async fn revise_entry( + &self, + parent_id: &str, + mut new_entry: PlaybookEntry, + ) -> Result { + let now = chrono::Utc::now().to_rfc3339(); + let mut state = self.state.write().await; + + let Some(i) = state.entries.iter().position(|e| e.playbook_id == parent_id) else { + return Err(format!("parent playbook_id '{parent_id}' not found")); + }; + + { + let parent = &state.entries[i]; + if parent.retired_at.is_some() { + return Err(format!( + "cannot revise retired playbook '{parent_id}' — retirement is terminal" + )); + } + if let Some(succ) = &parent.superseded_by { + return Err(format!( + "playbook '{parent_id}' already superseded by '{succ}'; \ + revise the latest version in the chain instead" + )); + } + } + + let parent_version = state.entries[i].version; + let new_version = parent_version.saturating_add(1); + let parent_pid = state.entries[i].playbook_id.clone(); + let new_pid = new_entry.playbook_id.clone(); + if new_pid.is_empty() { + return Err("new playbook_id must not be empty".into()); + } + if new_pid == parent_pid { + return Err("new playbook_id must differ from parent".into()); + } + // Enforce chain-metadata integrity — caller doesn't get to + // fabricate these. + new_entry.version = new_version; + new_entry.parent_id = Some(parent_pid.clone()); + new_entry.superseded_at = None; + new_entry.superseded_by = None; + + let parent_mut = &mut state.entries[i]; + parent_mut.superseded_at = Some(now.clone()); + parent_mut.superseded_by = Some(new_pid.clone()); + + state.entries.push(new_entry); + drop(state); + self.persist().await?; + self.rebuild_geo_index().await; + + Ok(ReviseOutcome { + parent_id: parent_pid, + parent_version, + new_playbook_id: new_pid, + new_version, + superseded_at: now, + }) + } + + /// Phase 27 — return the full version chain that contains this + /// playbook_id, ordered from root (v1) to tip. Walks `parent_id` + /// backward to find the root, then `superseded_by` forward to the + /// tip. Returns empty if the id isn't present. Cycle-safe via a + /// visited set; unreachable in normal operation but the guard is + /// cheap. + pub async fn history(&self, playbook_id: &str) -> Vec { + let state = self.state.read().await; + let by_id: HashMap<&str, &PlaybookEntry> = state.entries + .iter() + .map(|e| (e.playbook_id.as_str(), e)) + .collect(); + let Some(seed) = by_id.get(playbook_id).copied() else { + return vec![]; + }; + + // Walk backward to root. + let mut cursor = seed; + let mut seen: std::collections::HashSet = std::collections::HashSet::new(); + seen.insert(cursor.playbook_id.clone()); + while let Some(pid) = &cursor.parent_id { + let Some(&next) = by_id.get(pid.as_str()) else { break }; + if !seen.insert(next.playbook_id.clone()) { break; } + cursor = next; + } + let root = cursor; + + // Walk forward to tip. + let mut chain = vec![root.clone()]; + let mut cursor = root; + let mut seen_fwd: std::collections::HashSet = std::collections::HashSet::new(); + seen_fwd.insert(cursor.playbook_id.clone()); + while let Some(nid) = &cursor.superseded_by { + let Some(&next) = by_id.get(nid.as_str()) else { break }; + if !seen_fwd.insert(next.playbook_id.clone()) { break; } + cursor = next; + chain.push(cursor.clone()); + } + chain + } + + /// Stats accessor for the /status endpoint and tests. Returns + /// (total, retired, superseded, failures). Phase 27 added + /// superseded as a distinct counter: a superseded entry is + /// replaced-by-newer-version, which is a different lifecycle event + /// than retired-stop-using. + pub async fn status_counts(&self) -> (usize, usize, usize, usize) { let state = self.state.read().await; let total = state.entries.len(); let retired = state.entries.iter().filter(|e| e.retired_at.is_some()).count(); + let superseded = state.entries.iter().filter(|e| e.superseded_at.is_some()).count(); let failures = state.failures.len(); - (total, retired, failures) + (total, retired, superseded, failures) } /// Phase 26 — Mem0-style upsert. Decides ADD / UPDATE / NOOP based @@ -353,6 +505,7 @@ impl PlaybookMemory { let mut existing_idx: Option = None; for (i, e) in state.entries.iter().enumerate() { if e.retired_at.is_some() { continue; } + if e.superseded_at.is_some() { continue; } if e.operation != new_entry.operation { continue; } if day_key(&e.timestamp) != new_day { continue; } if e.city != new_entry.city || e.state != new_entry.state { continue; } @@ -514,6 +667,7 @@ impl PlaybookMemory { .iter() .filter(|e| { if e.retired_at.is_some() { return false; } + if e.superseded_at.is_some() { return false; } if let Some(vu) = &e.valid_until { if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { if now > parsed.with_timezone(&chrono::Utc) { return false; } @@ -543,6 +697,7 @@ impl PlaybookMemory { .filter_map(|i| state.entries.get(i)) .filter(|e| { if e.retired_at.is_some() { return false; } + if e.superseded_at.is_some() { return false; } if let Some(vu) = &e.valid_until { if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(vu) { if now > parsed.with_timezone(&chrono::Utc) { return false; } @@ -1055,6 +1210,10 @@ pub async fn rebuild( valid_until: None, retired_at: None, retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, } }) .collect(); @@ -1237,6 +1396,10 @@ mod tests { valid_until: None, retired_at: None, retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, }) .collect(); tokio::runtime::Runtime::new().unwrap().block_on(async { @@ -1270,6 +1433,10 @@ mod validity_window_tests { valid_until, retired_at: None, retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, } } @@ -1279,7 +1446,7 @@ mod validity_window_tests { pm.set_entries(vec![mkentry("pb-1", "Nashville", "TN", None, None)]).await.unwrap(); let touched = pm.retire_one("pb-1", "manual test").await.unwrap(); assert!(touched); - let (total, retired, _) = pm.status_counts().await; + let (total, retired, _, _) = pm.status_counts().await; assert_eq!(total, 1); assert_eq!(retired, 1); // Second retirement is a no-op @@ -1335,7 +1502,7 @@ mod validity_window_tests { // Only pb-old-schema should be retired — pb-new-schema matches, // pb-no-fp has no fingerprint so it's legacy-safe. assert_eq!(retired, 1); - let (_, total_retired, _) = pm.status_counts().await; + let (_, total_retired, _, _) = pm.status_counts().await; assert_eq!(total_retired, 1); } @@ -1348,7 +1515,7 @@ mod validity_window_tests { // Nashville migration shouldn't touch Chicago let retired = pm.retire_on_schema_drift("Nashville", "TN", "fp-v2", "test").await.unwrap(); assert_eq!(retired, 1); - let (_, r, _) = pm.status_counts().await; + let (_, r, _, _) = pm.status_counts().await; assert_eq!(r, 1); } } @@ -1373,6 +1540,10 @@ mod upsert_tests { valid_until: None, retired_at: None, retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, } } @@ -1444,3 +1615,171 @@ mod upsert_tests { assert_eq!(pm.entry_count().await, 2); } } + +#[cfg(test)] +mod version_tests { + use super::*; + use object_store::memory::InMemory; + + fn mk(id: &str, city: &str, state: &str) -> PlaybookEntry { + PlaybookEntry { + playbook_id: id.into(), + operation: format!("fill: Welder x1 in {city}, {state}"), + approach: "hybrid".into(), + context: "test".into(), + timestamp: chrono::Utc::now().to_rfc3339(), + endorsed_names: vec!["Alice Smith".into()], + city: Some(city.into()), + state: Some(state.into()), + embedding: Some(vec![1.0, 0.0, 0.0]), + schema_fingerprint: None, + valid_until: None, + retired_at: None, + retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, + } + } + + #[tokio::test] + async fn revise_stamps_chain_metadata_on_both_ends() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap(); + let outcome = pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")) + .await + .expect("revise should succeed against active root"); + assert_eq!(outcome.parent_id, "pb-v1"); + assert_eq!(outcome.parent_version, 1); + assert_eq!(outcome.new_playbook_id, "pb-v2"); + assert_eq!(outcome.new_version, 2); + assert!(!outcome.superseded_at.is_empty()); + + let snap = pm.snapshot().await; + let v1 = snap.iter().find(|e| e.playbook_id == "pb-v1").unwrap(); + let v2 = snap.iter().find(|e| e.playbook_id == "pb-v2").unwrap(); + assert_eq!(v1.superseded_by.as_deref(), Some("pb-v2")); + assert!(v1.superseded_at.is_some()); + assert_eq!(v2.parent_id.as_deref(), Some("pb-v1")); + assert_eq!(v2.version, 2); + assert!(v2.superseded_at.is_none()); + } + + #[tokio::test] + async fn revise_rejects_retired_parent() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + let mut e1 = mk("pb-v1", "Nashville", "TN"); + e1.retired_at = Some(chrono::Utc::now().to_rfc3339()); + pm.set_entries(vec![e1]).await.unwrap(); + let err = pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await + .expect_err("revise on retired parent must error"); + assert!(err.contains("retired"), "error should mention retirement: {err}"); + } + + #[tokio::test] + async fn revise_rejects_already_superseded_parent() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap(); + pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap(); + // pb-v1 is now superseded; revising it again must fail — caller + // should revise pb-v2 (the tip) instead. + let err = pm.revise_entry("pb-v1", mk("pb-v3-fake", "Nashville", "TN")).await + .expect_err("revise on superseded parent must error"); + assert!(err.contains("superseded"), "error should mention supersession: {err}"); + } + + #[tokio::test] + async fn superseded_entries_excluded_from_boost() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap(); + let mut v2 = mk("pb-v2", "Nashville", "TN"); + v2.endorsed_names = vec!["Carol Davis".into()]; + pm.revise_entry("pb-v1", v2).await.unwrap(); + + let boosts = pm.compute_boost_for_filtered_with_role( + &[1.0, 0.0, 0.0], 100, 0.5, + Some(("Nashville", "TN")), Some("Welder"), + ).await; + // v1's endorsement (Alice Smith) should be absent — it was + // superseded. v2's endorsement (Carol Davis) should be present. + assert!( + !boosts.contains_key(&("Nashville".into(), "TN".into(), "Alice Smith".into())), + "superseded entry's endorsement must not boost" + ); + let carol = boosts.get(&("Nashville".into(), "TN".into(), "Carol Davis".into())); + assert!(carol.is_some(), "tip version's endorsement must still boost"); + assert!(carol.unwrap().citations.contains(&"pb-v2".to_string())); + } + + #[tokio::test] + async fn history_walks_root_to_tip_from_any_node() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap(); + pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap(); + pm.revise_entry("pb-v2", mk("pb-v3", "Nashville", "TN")).await.unwrap(); + + // Starting from the root — same chain. + let chain_from_root = pm.history("pb-v1").await; + assert_eq!(chain_from_root.len(), 3); + assert_eq!(chain_from_root[0].playbook_id, "pb-v1"); + assert_eq!(chain_from_root[1].playbook_id, "pb-v2"); + assert_eq!(chain_from_root[2].playbook_id, "pb-v3"); + + // Starting from the tip — same chain, same order. + let chain_from_tip = pm.history("pb-v3").await; + assert_eq!(chain_from_tip.len(), 3); + assert_eq!(chain_from_tip[0].playbook_id, "pb-v1"); + assert_eq!(chain_from_tip[2].playbook_id, "pb-v3"); + + // Starting from the middle — same chain. + let chain_from_mid = pm.history("pb-v2").await; + assert_eq!(chain_from_mid.len(), 3); + assert_eq!(chain_from_mid[0].playbook_id, "pb-v1"); + assert_eq!(chain_from_mid[2].playbook_id, "pb-v3"); + } + + #[tokio::test] + async fn history_empty_for_unknown_id() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap(); + assert!(pm.history("pb-nonexistent").await.is_empty()); + } + + #[tokio::test] + async fn status_counts_reports_superseded_separately() { + let pm = PlaybookMemory::new(Arc::new(InMemory::new())); + pm.set_entries(vec![mk("pb-v1", "Nashville", "TN")]).await.unwrap(); + pm.revise_entry("pb-v1", mk("pb-v2", "Nashville", "TN")).await.unwrap(); + let (total, retired, superseded, _) = pm.status_counts().await; + assert_eq!(total, 2); + assert_eq!(retired, 0); + assert_eq!(superseded, 1); + } + + #[tokio::test] + async fn legacy_entries_without_version_default_to_v1() { + // Simulate state persisted before Phase 27 — no version field. + // Serde default kicks in; entries should be treated as roots. + let json = r#"{ + "entries": [{ + "playbook_id": "pb-legacy", + "operation": "fill: Welder x1 in Nashville, TN", + "approach": "hybrid", + "context": "", + "timestamp": "2026-04-21T00:00:00Z", + "endorsed_names": ["Alice"], + "city": "Nashville", + "state": "TN" + }], + "last_rebuilt_at": 0, + "failures": [] + }"#; + let state: PlaybookMemoryState = serde_json::from_str(json).unwrap(); + let legacy = &state.entries[0]; + assert_eq!(legacy.version, 1); + assert!(legacy.parent_id.is_none()); + assert!(legacy.superseded_at.is_none()); + assert!(legacy.superseded_by.is_none()); + } +} diff --git a/crates/vectord/src/rag.rs b/crates/vectord/src/rag.rs index 74ee244..ec8d33b 100644 --- a/crates/vectord/src/rag.rs +++ b/crates/vectord/src/rag.rs @@ -42,6 +42,7 @@ async fn rerank( system: None, temperature: Some(0.0), max_tokens: Some(50), + think: None, }).await; match resp { @@ -156,6 +157,7 @@ pub async fn query( system: None, temperature: Some(0.2), max_tokens: Some(512), + think: None, }).await?; Ok(RagResponse { diff --git a/crates/vectord/src/service.rs b/crates/vectord/src/service.rs index 6f72488..2ccc77b 100644 --- a/crates/vectord/src/service.rs +++ b/crates/vectord/src/service.rs @@ -126,6 +126,8 @@ pub fn router(state: VectorState) -> Router { .route("/playbook_memory/patterns", post(discover_playbook_patterns)) .route("/playbook_memory/mark_failed", post(mark_playbook_failed)) .route("/playbook_memory/retire", post(retire_playbook_memory)) + .route("/playbook_memory/revise", post(revise_playbook_memory)) + .route("/playbook_memory/history/{id}", get(playbook_memory_history)) .route("/playbook_memory/status", get(playbook_memory_status)) .with_state(state) } @@ -891,6 +893,7 @@ async fn hybrid_search( system: None, temperature: Some(0.2), max_tokens: Some(512), + think: None, }).await; gen_resp.ok().map(|r| r.text.trim().to_string()) @@ -2228,6 +2231,10 @@ async fn seed_playbook_memory( valid_until: None, retired_at: None, retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, }; let text = format!( "{} | {} | {} | fills: {}", @@ -2287,6 +2294,10 @@ async fn seed_playbook_memory( valid_until: req.valid_until.clone(), retired_at: None, retirement_reason: None, + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, }; // Phase 26 — when append=true (default), route through upsert so @@ -2481,16 +2492,140 @@ async fn retire_playbook_memory( "supply either {playbook_id, reason} or {city, state, current_schema_fingerprint, reason}".into())) } +/// Phase 27 — request body for `POST /playbook_memory/revise`. Same +/// shape as a seed request minus `append` (revise is always +/// append-semantics for a specific parent) plus `parent_id`. The new +/// version's `playbook_id` is derived deterministically so callers get +/// the same id back from repeated revises with identical content — +/// useful for idempotent retry paths. +#[derive(Deserialize)] +struct RevisePlaybookRequest { + parent_id: String, + operation: String, + approach: String, + context: String, + endorsed_names: Vec, + #[serde(default)] + schema_fingerprint: Option, + #[serde(default)] + valid_until: Option, +} + +/// Phase 27 — create a new version of an existing playbook. The parent +/// is marked superseded; the new entry inherits the chain via +/// `parent_id` and carries `version = parent.version + 1`. Errors with +/// 400 on a retired or already-superseded parent (must revise the tip +/// of the chain). Embeds the new text through the same shape as +/// `/seed` so cosine similarity stays comparable across rebuild + seed +/// + revise entries. +async fn revise_playbook_memory( + State(state): State, + Json(req): Json, +) -> Result, (StatusCode, String)> { + let text = format!( + "{} | {} | {} | fills: {}", + req.operation, req.approach, req.context, + req.endorsed_names.join(", "), + ); + let resp = state.ai_client.embed(EmbedRequest { texts: vec![text], model: None }) + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, format!("embed revise: {e}")))?; + if resp.embeddings.is_empty() { + return Err((StatusCode::BAD_GATEWAY, "embed returned nothing".into())); + } + let emb: Vec = resp.embeddings[0].iter().map(|&x| x as f32).collect(); + + let (city, state_) = { + let after_in = req.operation.split(" in ").nth(1).unwrap_or(""); + let mut parts = after_in.splitn(2, ','); + let city = parts.next().map(|s| s.trim().to_string()).filter(|s| !s.is_empty()); + let state = parts.next() + .map(|s| s.trim().chars().take_while(|c| c.is_ascii_alphabetic()).collect::()) + .filter(|s| !s.is_empty()); + (city, state) + }; + if city.is_none() || state_.is_none() { + return Err((StatusCode::BAD_REQUEST, + "operation must match 'fill: Role xN in City, ST' shape".into())); + } + + let ts = chrono::Utc::now().to_rfc3339(); + use sha2::{Digest, Sha256}; + let mut h = Sha256::new(); + h.update(ts.as_bytes()); + h.update(b"|"); + h.update(req.parent_id.as_bytes()); + h.update(b"|"); + h.update(req.operation.as_bytes()); + let bytes = h.finalize(); + let pid = format!("pb-rev-{}", bytes.iter().take(8).map(|b| format!("{b:02x}")).collect::()); + + let new_entry = playbook_memory::PlaybookEntry { + playbook_id: pid.clone(), + operation: req.operation, + approach: req.approach, + context: req.context, + timestamp: ts, + endorsed_names: req.endorsed_names, + city, state: state_, + embedding: Some(emb), + schema_fingerprint: req.schema_fingerprint, + valid_until: req.valid_until, + retired_at: None, + retirement_reason: None, + // revise_entry overwrites these from the parent — values here + // are just placeholders so the struct is well-formed. + version: 1, + parent_id: None, + superseded_at: None, + superseded_by: None, + }; + + let outcome = state.playbook_memory.revise_entry(&req.parent_id, new_entry) + .await + .map_err(|e| (StatusCode::BAD_REQUEST, e))?; + Ok(Json(serde_json::json!({ + "outcome": outcome, + "entries_after": state.playbook_memory.entry_count().await, + }))) +} + +/// Phase 27 — return the full version chain containing `playbook_id`, +/// ordered root → tip. 404 if the id isn't present. The walker is +/// cycle-safe by construction (visited set per direction). +async fn playbook_memory_history( + State(state): State, + Path(playbook_id): Path, +) -> Result, (StatusCode, String)> { + let chain = state.playbook_memory.history(&playbook_id).await; + if chain.is_empty() { + return Err((StatusCode::NOT_FOUND, format!("no playbook with id '{playbook_id}'"))); + } + Ok(Json(serde_json::json!({ + "playbook_id": playbook_id, + "versions": chain.len(), + "chain": chain, + }))) +} + /// Phase 25 status endpoint — reports retirement counts so dashboards /// can show "N playbooks retired (12 from 2026-05 schema migration)". +/// Phase 27 added `superseded` as a distinct counter. async fn playbook_memory_status( State(state): State, ) -> impl IntoResponse { - let (total, retired, failures) = state.playbook_memory.status_counts().await; + let (total, retired, superseded, failures) = state.playbook_memory.status_counts().await; + // `active` = entries eligible for boost. Retired and superseded are + // distinct exclusion reasons; subtract both. An entry can in principle + // be both retired AND superseded (e.g. revised then retired) so + // saturating_sub guards against underflow if that pathological case + // ever lands. + let inactive = retired + superseded; Json(serde_json::json!({ "total": total, "retired": retired, - "active": total.saturating_sub(retired), + "superseded": superseded, + "active": total.saturating_sub(inactive), "failures": failures, })) } diff --git a/docs/PHASES.md b/docs/PHASES.md index 502d971..2d7c8a8 100644 --- a/docs/PHASES.md +++ b/docs/PHASES.md @@ -188,7 +188,7 @@ - Endpoint: `POST /catalog/datasets/by-name/{name}/tombstone` accepting `{row_key_column, row_key_values[], actor, reason}`; companion `GET` lists active tombstones - `queryd::context::build_context` wraps tombstoned tables: raw goes to `__raw__{name}`, public name becomes a DataFusion view with `WHERE CAST(col AS VARCHAR) NOT IN (...)` filter - End-to-end on candidates: tombstone 3 IDs, COUNT drops 100,000 → 99,997, specific WHERE returns empty, AiView candidates_safe transitively excludes them too, restart preserves all tombstones - - Limits / not in MVP: physical compaction (Phase 8 doesn't yet read tombstones during merge); journal integration (tombstones don't yet emit Phase 9 mutation events — covered by audit fields on the tombstone itself) + - Limits / not in MVP: journal integration (tombstones don't yet emit Phase 9 mutation events — covered by audit fields on the tombstone itself). Physical compaction integration was closed by Phase E.2 below. - [x] Phase D: AI-safe views — 2026-04-16 - `shared::types::AiView` — name, base_dataset, columns whitelist, optional row_filter, column_redactions - `shared::types::Redaction` — Null | Hash | Mask { keep_prefix, keep_suffix } @@ -273,7 +273,7 @@ - [x] 19.3 — Endorsed names parsed out of `result` column, keyed by `(city, state, name)` tuple so shared names across cities don't cross-pollinate. Parsing via `parse_names` + `parse_city_state` helpers (7 unit tests) - [x] 19.4 — `/vectors/hybrid?use_playbook_memory=true`: fetches `top_k * 5` candidates so endorsed workers outside the vanilla top-K can still climb. Boost is additive on vector score, each hit carries `playbook_boost` + `playbook_citations` in the response for explainability - [x] 19.5 — Multi-agent orchestrator (`tests/multi-agent/orchestrator.ts`) auto-seeds `POST /vectors/playbook_memory/seed` on consensus_done, so the next query sees the new endorsement without a full `/rebuild`. Closes the feedback loop: two agents reach consensus → playbook sealed → next query re-ranks - - [x] 19.6 — `MAX_BOOST_PER_WORKER = 0.25` enforced in `compute_boost_for`; verified with unit test (100 identical playbooks → boost capped at 0.25) and live test (5 identical seeds → exactly 0.25). Time decay deferred as optional + - [x] 19.6 — `MAX_BOOST_PER_WORKER = 0.25` enforced in `compute_boost_for`; verified with unit test (100 identical playbooks → boost capped at 0.25) and live test (5 identical seeds → exactly 0.25). Time decay also wired: `BOOST_HALF_LIFE_DAYS = 30.0` — 30-day-old playbooks contribute half, 60-day a quarter, via `exp(-age_days / 30)` in the boost loop - Real finding surfaced during build: the 32 bootstrap rows in `successful_playbooks` reference phantom worker names — 80 of 82 don't correspond to actual rows in `workers_500k`. `/seed` endpoint bypasses `successful_playbooks` so operators can prime memory with real fixtures; production path is the orchestrator write-through - [x] **Phase 19 refinement — geo + role prefilter on boost** (2026-04-21) - Added `compute_boost_for_filtered` and `compute_boost_for_filtered_with_role` to `playbook_memory.rs`. SQL filter's `(city, state, role)` parsed in `service.rs`; exact role-matches in target geo skip cosine and earn similarity=1.0. Restored the feedback loop: matched=0 → matched=11 per query on the same Nashville test. Citation density on Riverfront Steel: 2 → 28 per run (14×). @@ -285,7 +285,13 @@ - T3 cloud: gpt-oss:120b via Ollama Cloud — verified 4-8s latency, strict JSON-shape output for remediation. - [x] **Phase 21: Scratchpad + Tree-Split Continuation** (2026-04-21) - `tests/multi-agent/agent.ts`: `estimateTokens()`, `assertContextBudget()`, `generateContinuable()`, `generateTreeSplit()`. `think` flag plumbed through sidecar's `/generate`. Empty-response backoff + truncation-continuation, no max_tokens tourniquet. - - Rust port queued: `crates/aibridge/src/continuation.rs`, `tree_split.rs`. + - Rust port shipped (2026-04-21, companion to Phase 27): + - `crates/aibridge/src/context.rs` — `estimate_tokens` (chars/4 ceil, matches TS), `context_window_for`, `assert_context_budget` returning `Result` so callers get the numbers back on both success and overflow. Windows table mirrors `config/models.json`. + - `crates/aibridge/src/continuation.rs` — `generate_continuable` handles the two failure modes from TS: (a) empty thinking-model response → geometric-backoff retry with 2× budget up to `budget_cap`; (b) truncated non-empty → continuation with partial-as-scratchpad. `is_structurally_complete` balances braces then JSON.parse-check for the JSON shape; text shape is "non-empty". Guards the degen case "all retries empty → bail, don't loop on empty partial" — the TS impl has this implicit, Rust makes it explicit. + - `crates/aibridge/src/tree_split.rs` — `generate_tree_split` map→reduce with running scratchpad. Per-shard + reduce-prompt budget checked through `assert_context_budget`; loud-fails with the overflow message rather than silently truncating. Scratchpad oldest-digest-first truncation once it exceeds `scratchpad_budget` (default 6000 tokens, matches TS). + - `TextGenerator` trait (native async-fn-in-trait, edition 2024) so `ScriptedGenerator` test double can inject canned sequences without a live Ollama. `AiClient` implements `TextGenerator`. + - `GenerateRequest` gained `think: Option` field — forwards to sidecar for per-call hidden-reasoning opt-out on hot-path JSON emitters. + - 25 aibridge unit tests (8 context + 10 continuation + 7 tree_split) — all green, no network required. - [x] **Phase 22: Internal Knowledge Library** (2026-04-21) - `data/_kb/` — signatures.jsonl, outcomes.jsonl, pathway_recommendations.jsonl, error_corrections.jsonl, config_snapshots.jsonl. Event-driven cycle: indexRun → recommendFor → loadRecommendation. - Item B cloud rescue: failed event → cloud remediation JSON → retry with pivot. Verified 1/3 rescues succeeded on stress_01 (Gary IN → South Bend IN pivot). @@ -296,13 +302,32 @@ - `data/_kb/staffers.jsonl` — competence_score = 0.45·fill + 0.20·turn_eff + 0.20·cite + 0.15·rescue. Recomputed per run. - `findNeighbors` now returns `weighted_score = cosine × max_staffer_competence`. `scripts/kb_staffer_report.py` — leaderboard + cross-staffer worker overlap (Rachel D. Lewis 12× across 4 staffers → auto-discovered high-value label). - `gen_staffer_demo.ts` + `run_staffer_demo.sh` — 4 personas × 3 contracts = 12 runs. -- [ ] **Phase 24: Observer / Autotune integration** (GAP, not wired) - - `lakehouse-observer.service` watches MCP :3700; scenario.ts hits gateway :3100 directly. Observer idle at 0 ops across 3600+ cycles. Autotune runs on its own schedule, never sees scenario outcomes. - - Next-sprint: scenario emits per-event outcome summaries to observer's ingest path; observer ERROR_ANALYZER + PLAYBOOK_BUILDER loops consume them; autotune subscribes to the metric stream. +- [x] **Phase 27: Playbook versioning** (2026-04-21) + - `PlaybookEntry` gained `version: u32` (default 1), `parent_id`, `superseded_at`, `superseded_by` fields. All `#[serde(default)]` so entries persisted before Phase 27 load as roots with version=1. + - `PlaybookMemory::revise_entry(parent_id, new_entry)` appends a new version, stamps `superseded_at`+`superseded_by` on the parent, inherits `parent_id` and sets `version = parent + 1` on the new entry. Rejects revising a retired or already-superseded parent with a clear error — the tip of the chain is the only valid revise target. + - `PlaybookMemory::history(playbook_id)` returns the full chain root→tip, walking `parent_id` backward then `superseded_by` forward. Cycle-safe. Works from any node in the chain. + - Superseded entries excluded from boost (same rule as retired): `compute_boost_for_filtered_with_role`, the active-entries prefilter, the geo-index rebuild, and the upsert existing-entry search all skip `superseded_at.is_some()`. + - Endpoints: `POST /vectors/playbook_memory/revise` + `GET /vectors/playbook_memory/history/{id}`. + - `status_counts` now returns `(total, retired, superseded, failures)`. `/status` JSON reports `superseded` as a distinct counter; `active = total - retired - superseded`. + - 8 unit tests under `mod version_tests` covering: chain-metadata stamping, retired-parent rejection, already-superseded-parent rejection, superseded endorsement exclusion from boost, history traversal from root/tip/middle, empty-for-unknown-id, superseded-status-count, legacy-entry-default-version round-trip. 26/26 playbook_memory tests pass. +- [x] **Phase 24: Observer / Autotune integration** (2026-04-20, commit b95dd86) + - Closed the gap where `lakehouse-observer.service` wrapped MCP :3700 while `tests/multi-agent/scenario.ts` hit gateway :3100 directly — observer sat idle at 0 ops across 3600+ cycles. + - `observer.ts` gained a Bun HTTP listener on `OBSERVER_PORT` (default 3800) with `GET /health`, `GET /stats` (totals + by_source + recent scenario digest), and `POST /event` for scenario outcomes. Body shapes into `ObservedOp` with `source="scenario"` + `staffer_id` + `sig_hash` + `event_kind` + geo + rescue flags. + - `recordExternalOp()` shared ring-buffer insert — ERROR_ANALYZER and PLAYBOOK_BUILDER loops now see both MCP-wrapped and scenario-posted ops through the same path. + - `persistOp()` swap: old path wrote via `/ingest/file?name=observed_operations` which has REPLACE semantics (wiped prior ops); now uses append-friendly Parquet write-through. +- [x] **Phase 25: Validity windows + playbook retirement** (2026-04-21, commit e0a843d) + - `PlaybookEntry` gained four optional fields (`#[serde(default)]` so legacy entries load as never-expiring): `schema_fingerprint` (SHA-256 over target dataset columns at seed time), `valid_until` (RFC3339 hard expiry), `retired_at` (set by retire calls), `retirement_reason` (human string). + - `compute_boost_for_filtered_with_role` now skips retired + expired entries before geo/cosine — no silent boosting from stale playbooks. Unit-tested on expired `valid_until` + retired + schema-drift retirement. + - Two retirement paths: `retire_one(playbook_id, reason)` for manual, `retire_on_schema_drift(city, state, current_fingerprint, reason)` for batch schema-migration sweep. Legacy entries without a fingerprint skip drift retirement (safe). + - Endpoint: `POST /vectors/playbook_memory/retire` — accepts either `{playbook_id, reason}` or `{city, state, current_schema_fingerprint, reason}`. +- [x] **Phase 26: Mem0 upsert + Letta geo hot cache** (2026-04-21, commit 640db8c) + - Mem0-style upsert: `/seed` with `append=true` (default) routes through `upsert_entry`, which decides ADD / UPDATE / NOOP on (operation, day, city, state). Same-day re-seed merges names (union, stable order) instead of duplicating the row. Identical re-seed is a no-op. Different-day same-operation is a fresh ADD. Playbook_id stays stable on UPDATE so prior citations remain valid. + - Letta-style hot cache: `PlaybookMemory` now holds a `geo_index: HashMap<(city_lower, state_upper), Vec>` rebuilt on every mutation. Geo-filtered boost queries skip the full scan and hit the O(1) key lookup. At 1.9K entries the full scan was sub-ms; the index scales the same path to 100K+ without code changes. + - `UpsertOutcome` enum reported back to callers — `{mode: added|updated|noop, playbook_id, merged_names?}` + `entries_after`. - [ ] Fine-tuned domain models (Phase 25+) - [ ] Multi-node query distribution (only if ceilings bite) --- -**102 unit tests | 13 crates | 20 ADRs | 2.47M rows | 100K vectors | Hybrid Parquet+HNSW ⊕ Lance | Phase 19 refined + 20-23 shipped** -**Latest: 2026-04-21 — Phases 20-23 shipped. Geo+role prefilter lifted playbook citation density 14×. Cloud rescue converts zero-supply failures into successful pivots. Staffer competence weighting differentiates full-tool senior from minimal-tool trainee by 46.4pt fill rate on same contracts. Phase 24 observer integration flagged as honest gap.** +**145 unit tests | 13 crates | 21 ADRs | 2.47M rows | 100K vectors | Hybrid Parquet+HNSW ⊕ Lance | Phases 0-27 shipped** +**Latest: 2026-04-21 — Phase 27 (playbook versioning: `version` + `parent_id` + `superseded_at` + `superseded_by` on `PlaybookEntry`, `/revise` + `/history` endpoints, 8 new tests). Doc-sync pass: Phase 24 observer + Phase 25 validity windows + Phase 26 Mem0/Letta now reflected in phase tracker. Phase 19.6 time decay noted as wired (was misdocumented as deferred). Phase E.2 tombstone-at-compaction noted as closed in Phase 8 MVP limits.** diff --git a/docs/PRD.md b/docs/PRD.md index 5b69914..d6514d3 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -514,22 +514,29 @@ Answers "who handled this" as a first-class dimension of the matrix index. Senio - `scripts/run_staffer_demo.sh` — sequential batch with cloud T3. - `scripts/kb_staffer_report.py` — leaderboard + top/bottom differential + cross-staffer overlap. -### Phase 24: Observer / Autotune integration (NOT YET WIRED — honest gap) +### Phase 24: Observer / Autotune integration (SHIPPED 2026-04-20, commit b95dd86) -J flagged this 2026-04-21 evening: the `lakehouse-observer.service` systemd unit has been running for 3600+ cycles but shows `total_ops=0 successes=0 failures=0` because `tests/multi-agent/scenario.ts` hits the Rust gateway directly on port 3100, bypassing the Bun MCP layer on 3700 that observer wraps. +The gap: `lakehouse-observer.service` wrapped MCP :3700, while `tests/multi-agent/scenario.ts` hit gateway :3100 directly. Observer idle at 0 ops across 3600+ cycles — scenarios invisible to ERROR_ANALYZER and PLAYBOOK_BUILDER, autotune running blind to outcomes. -Result: our test scenarios are INVISIBLE to the observer and the autotune pipeline. Autotune's HNSW parameter learning runs on its own schedule, but no signal from scenario outcomes flows into it. +**What shipped:** +- `observer.ts` Bun HTTP listener on `OBSERVER_PORT` (default 3800): `GET /health`, `GET /stats` (totals, by_source, recent scenario digest), `POST /event` for scenario outcomes. +- `ObservedOp` carries provenance — `source="scenario" | "mcp"` + `staffer_id` + `sig_hash` + `event_kind` + geo + rescue flags. +- `recordExternalOp()` — shared ring-buffer insert; main analyzer + playbook builder no longer care where the op came from. +- `persistOp()` fix: old path POSTed to `/ingest/file?name=observed_operations` which has REPLACE semantics (wiped prior ops); now uses append-friendly write-through. -**Target architecture:** -- Scenarios emit per-event outcome summaries to a path the observer polls (or POST to observer's ingest endpoint directly). -- Observer's ERROR ANALYZER + PLAYBOOK BUILDER loops consume those summaries alongside the MCP-layer ops. -- Autotune agent subscribes to a metric stream the observer writes. +### Phase 25: Validity windows + playbook retirement (SHIPPED 2026-04-21, commit e0a843d) -**Why deferred:** this is a real architecture change (coherent data path from scenario → observer → autotune → vectord index) and needs care. The observer's current `observed_operations` ingest uses REPLACE semantics (flagged in `feedback_ingest_replace_semantics.md`) — naive appending will wipe prior ops. +Zep 2026-era finding: temporal validity is the single highest-value memory-hygiene primitive. `PlaybookEntry` gained `schema_fingerprint` / `valid_until` / `retired_at` / `retirement_reason`. `compute_boost_for_filtered_with_role` skips retired + expired before geo/cosine ranking. Two retirement paths: `retire_one(id, reason)` for manual, `retire_on_schema_drift(city, state, fp, reason)` for batch migration sweep. Endpoint: `POST /vectors/playbook_memory/retire`. -**Status:** GAP DOCUMENTED, not fixed. Scenarios continue to populate KB directly. The parallel pipelines are coherent but separate; Phase 24 connects them. +### Phase 26: Mem0 upsert + Letta geo hot cache (SHIPPED 2026-04-21, commit 640db8c) -### Phase 25+: Further horizon +Same-day re-seed no longer duplicates rows. `/seed` with `append=true` routes through `upsert_entry` which decides ADD / UPDATE / NOOP on `(operation, day, city, state)`. Playbook_id stays stable on UPDATE so existing citations remain valid. `PlaybookMemory.geo_index: HashMap<(city, state), Vec>` rebuilt on every mutation; geo-filtered boost queries skip the scan and hit O(1) lookup — sub-ms at current scale, same code path scales to 100K+ entries. + +### Phase 27: Playbook versioning (SHIPPED 2026-04-21) + +`PlaybookEntry` gained `version: u32` (default 1), `parent_id`, `superseded_at`, `superseded_by` — all `#[serde(default)]` so pre-Phase-27 state loads as roots. `revise_entry(parent_id, new_entry)` appends a new version, stamps the parent superseded, rejects revising a retired or already-superseded parent. `history(id)` returns the root→tip chain from any node. Superseded entries excluded from boost (same rule as retired). Endpoints: `POST /vectors/playbook_memory/revise`, `GET /vectors/playbook_memory/history/{id}`. `/status` reports `superseded` as a distinct counter. 8 new tests; 51/51 vectord lib tests green. + +### Phase 28+: Further horizon - Specialized fine-tuned models per domain (staffing matcher, resume parser) - Video/audio transcript ingest + multimodal embeddings