lakehouse/crates/aibridge/src/continuation.rs
profit a6f12e2609 Phase 21 Rust port + Phase 27 playbook versioning + doc-sync
Phase 21 — Rust port of scratchpad + tree-split primitives (companion to
the 2026-04-21 TS shipment). New crates/aibridge modules:

  context.rs       — estimate_tokens (chars/4 ceil), context_window_for,
                     assert_context_budget returning a BudgetCheck with
                     numeric diagnostics on both success and overflow.
                     Windows table mirrors config/models.json.
  continuation.rs  — generate_continuable<G: TextGenerator>. Handles the
                     two failure modes: empty-response from thinking
                     models (geometric 2x budget backoff up to budget_cap)
                     and truncated-non-empty (continuation with partial
                     as scratchpad). is_structurally_complete balances
                     braces then JSON.parse-checks. Guards the degen case
                     "all retries empty, don't loop on empty partial".
  tree_split.rs    — generate_tree_split map->reduce with running
                     scratchpad. Per-shard + reduce-prompt go through
                     assert_context_budget first; loud-fails rather than
                     silently truncating. Oldest-digest-first scratchpad
                     truncation at scratchpad_budget (default 6000 t).

TextGenerator trait (native async-fn-in-trait, edition 2024). AiClient
implements it; ScriptedGenerator test double lets tests inject canned
sequences without a live Ollama.

GenerateRequest gained think: Option<bool> — forwards to sidecar for
per-call hidden-reasoning opt-out on hot-path JSON emitters. Three
existing callsites updated (rag.rs x2, service.rs hybrid answer).

Phase 27 — Playbook versioning. PlaybookEntry gained four optional
fields (all #[serde(default)] so pre-Phase-27 state loads as roots):

  version           u32, default 1
  parent_id         Option<String>, previous version's playbook_id
  superseded_at     Option<String>, set when newer version replaces
  superseded_by     Option<String>, the playbook_id that replaced

New methods:

  revise_entry(parent_id, new_entry) — appends new version, stamps
    superseded_at+superseded_by on parent, inherits parent_id and sets
    version = parent + 1 on the new entry. Rejects revising a retired
    or already-superseded parent (tip-of-chain is the only valid
    revise target).
  history(playbook_id) — returns full chain root->tip from any node.
    Walks parent_id back to root, then superseded_by forward to tip.
    Cycle-safe.

Superseded entries excluded from boost (same rule as retired): filter
in compute_boost_for_filtered_with_role (both active-entries prefilter
and geo-filtered path), rebuild_geo_index, and upsert_entry's existing-
idx search. status_counts returns (total, retired, superseded, failures);
/status JSON reports active = total - retired - superseded.

Endpoints:
  POST /vectors/playbook_memory/revise
  GET  /vectors/playbook_memory/history/{id}

Doc-sync — PHASES.md + PRD.md drifted from git after Phases 24-26
shipped. Fixes applied:

  - Phase 24 marked shipped (commit b95dd86) with detail of observer
    HTTP ingest + scenario outcome streaming. PRD "NOT YET WIRED"
    rewritten to reflect shipped state.
  - Phase 25 (validity windows, commit e0a843d) added to PHASES +
    PRD.
  - Phase 26 (Mem0 upsert + Letta hot cache, commit 640db8c) added.
  - Phase 27 entry added to both docs.
  - Phase 19.6 time decay corrected: was documented as "deferred",
    actually wired via BOOST_HALF_LIFE_DAYS = 30.0 in playbook_memory.rs.
  - Phase E/Phase 8 tombstone-at-compaction limit note updated —
    Phase E.2 closed it.

Tests: 8 new version_tests in vectord (chain-metadata stamping,
retired/superseded parent rejection, boost exclusion, history from
root/tip/middle, legacy default round-trip, status counts). 25 new
aibridge tests (context/continuation/tree_split). Workspace total
145 green (was 120).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 17:40:49 -05:00

439 lines
16 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Phase 21 — OUTPUT-overflow handler. Ports `generateContinuable`
//! from `tests/multi-agent/agent.ts`.
//!
//! Two failure modes to repair:
//!
//! * EMPTY response — thinking model ate the entire budget on hidden
//! reasoning before emitting a token. Fix: retry the original prompt
//! with 2× the budget, geometric up to `BUDGET_CAP`.
//!
//! * TRUNCATED non-empty — model got most of the way but hit
//! max_tokens before closing the structure. Fix: continue with the
//! partial response in the prompt as scratchpad, so the model knows
//! where to pick up without restarting.
//!
//! `TextGenerator` abstracts the sidecar so tests can inject canned
//! responses without a live Ollama.
use std::future::Future;
use crate::client::{AiClient, GenerateRequest, GenerateResponse};
/// Shape classifier for `is_structurally_complete`. JSON responses
/// must parse; text responses just need to be non-empty.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseShape {
Json,
Text,
}
/// Trait that `generate_continuable` + `generate_tree_split` call. The
/// real implementation forwards to `AiClient::generate`; tests supply a
/// mock with a scripted sequence of responses.
pub trait TextGenerator: Send + Sync {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send;
}
impl TextGenerator for AiClient {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send {
self.generate(req)
}
}
/// Strip a surrounding ```json``` fence if present. Leaves the inner
/// content alone otherwise. Returns a slice of `s`.
fn strip_json_fence(s: &str) -> &str {
let t = s.trim();
if let Some(rest) = t.strip_prefix("```json") {
rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim()
} else if let Some(rest) = t.strip_prefix("```") {
rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim()
} else {
t
}
}
/// Port of the TS brace-balance + JSON.parse check. Returns true when
/// the outermost `{...}` block is balanced and parses. Text shape is
/// satisfied by any non-empty, non-whitespace payload.
pub fn is_structurally_complete(text: &str, shape: ResponseShape) -> bool {
if text.trim().is_empty() { return false; }
if shape == ResponseShape::Text { return true; }
let s = strip_json_fence(text);
let Some(start) = s.find('{') else { return false };
let Some(end) = s.rfind('}') else { return false };
if end <= start { return false; }
let slice = &s[start..=end];
// Balance check — cheaper than parse, catches truncated nests.
// String state tracked because `{` inside a string doesn't count.
let mut depth: i32 = 0;
let mut in_str = false;
let mut esc = false;
for c in slice.chars() {
if esc { esc = false; continue; }
if c == '\\' { esc = true; continue; }
if c == '"' { in_str = !in_str; continue; }
if in_str { continue; }
if c == '{' { depth += 1; }
else if c == '}' {
depth -= 1;
if depth < 0 { return false; }
}
}
if depth != 0 { return false; }
// Parse check is the tie-breaker — balanced but invalid JSON (e.g.
// trailing comma before `}`) shouldn't count as complete.
serde_json::from_str::<serde_json::Value>(slice).is_ok()
}
/// Knobs for `generate_continuable`. All optional with sensible
/// defaults that match the TS version.
#[derive(Debug, Clone)]
pub struct ContinuableOpts {
pub model: String,
pub max_tokens: Option<u32>,
pub temperature: Option<f64>,
pub system: Option<String>,
pub shape: ResponseShape,
pub max_continuations: usize,
pub think: Option<bool>,
/// Geometric-backoff ceiling for the empty-response retry path.
/// Matches TS's `budgetCap = 8000`.
pub budget_cap: u32,
/// Maximum empty-response retries before giving up. Matches TS's
/// hardcoded `retry < 3`.
pub max_empty_retries: usize,
}
impl ContinuableOpts {
pub fn new(model: impl Into<String>) -> Self {
Self {
model: model.into(),
max_tokens: None,
temperature: None,
system: None,
shape: ResponseShape::Json,
max_continuations: 3,
think: None,
budget_cap: 8_000,
max_empty_retries: 3,
}
}
}
/// Outcome of a `generate_continuable` call. Carries the combined
/// text plus diagnostic counters so observability downstream can
/// report "how many continuations did that query cost".
#[derive(Debug, Clone)]
pub struct ContinuableOutcome {
pub text: String,
pub empty_retries: usize,
pub continuations: usize,
pub final_complete: bool,
}
fn make_request(opts: &ContinuableOpts, prompt: String, current_max: u32) -> GenerateRequest {
GenerateRequest {
prompt,
model: Some(opts.model.clone()),
system: opts.system.clone(),
temperature: opts.temperature,
max_tokens: Some(current_max),
think: opts.think,
}
}
fn continuation_prompt(original: &str, partial: &str) -> String {
format!(
"{original}\n\n\
PARTIAL RESPONSE SO FAR (continue from here — do NOT restart, \
do NOT repeat what's already there, emit ONLY the remaining \
tokens to close the structure):\n{partial}"
)
}
/// Phase 21 — output-overflow safe generate. See module docs for the
/// two failure modes repaired. On final-still-incomplete, returns the
/// combined text with `final_complete: false` so the caller's parser
/// can throw with the raw text for forensics rather than silently
/// truncating.
pub async fn generate_continuable<G: TextGenerator>(
generator: &G,
prompt: &str,
opts: &ContinuableOpts,
) -> Result<ContinuableOutcome, String> {
let initial_max = opts.max_tokens.unwrap_or(800);
let mut current_max = initial_max;
let mut combined = String::new();
let mut empty_retries = 0usize;
let mut continuations = 0usize;
// Phase 21(a) — empty-response backoff loop.
for retry in 0..opts.max_empty_retries {
let req = make_request(opts, prompt.to_string(), current_max);
let resp = generator.generate_text(req).await?;
if !resp.text.trim().is_empty() {
combined = resp.text;
break;
}
empty_retries = retry + 1;
current_max = (current_max.saturating_mul(2)).min(opts.budget_cap);
}
// Phase 21(b) — structural-completion continuation loop. Runs on
// the truncated-non-empty case; empty + exhausted retries falls
// through with empty combined and final_complete=false.
for _ in 0..opts.max_continuations {
if is_structurally_complete(&combined, opts.shape) {
return Ok(ContinuableOutcome {
text: combined,
empty_retries,
continuations,
final_complete: true,
});
}
if combined.trim().is_empty() {
// Nothing to continue from — continuing "" is identical to
// the initial call and would loop. Bail so the caller sees
// the failure rather than burning N extra calls.
break;
}
let cont_prompt = continuation_prompt(prompt, &combined);
let req = make_request(opts, cont_prompt, current_max.min(opts.budget_cap));
let resp = generator.generate_text(req).await?;
combined.push_str(&resp.text);
continuations += 1;
}
let final_complete = is_structurally_complete(&combined, opts.shape);
Ok(ContinuableOutcome {
text: combined,
empty_retries,
continuations,
final_complete,
})
}
/// Scripted generator for unit tests. Returns responses from `script`
/// in order; extra calls reuse the last entry so tests don't have to
/// count past what they actually assert on.
#[cfg(test)]
pub struct ScriptedGenerator {
script: Vec<Result<String, String>>,
calls: std::sync::Arc<std::sync::Mutex<Vec<GenerateRequest>>>,
}
#[cfg(test)]
impl ScriptedGenerator {
pub fn new<I>(script: I) -> Self
where
I: IntoIterator<Item = Result<String, String>>,
{
Self {
script: script.into_iter().collect(),
calls: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
pub fn calls(&self) -> Vec<GenerateRequest> {
self.calls.lock().unwrap().clone()
}
pub fn call_count(&self) -> usize {
self.calls.lock().unwrap().len()
}
}
#[cfg(test)]
impl TextGenerator for ScriptedGenerator {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send {
let i = {
let mut calls = self.calls.lock().unwrap();
let i = calls.len();
calls.push(req.clone());
i
};
let model = req.model.clone().unwrap_or_default();
let entry = self.script.get(i)
.cloned()
.unwrap_or_else(|| self.script.last().cloned().unwrap_or(Ok(String::new())));
async move {
entry.map(|text| GenerateResponse {
text,
model,
tokens_evaluated: None,
tokens_generated: None,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn structural_complete_rejects_empty_and_text_mismatch() {
assert!(!is_structurally_complete("", ResponseShape::Text));
assert!(!is_structurally_complete(" ", ResponseShape::Text));
assert!(is_structurally_complete("any content", ResponseShape::Text));
}
#[test]
fn structural_complete_handles_balanced_json() {
assert!(is_structurally_complete(r#"{"a": 1}"#, ResponseShape::Json));
assert!(is_structurally_complete(
r#"```json
{"a": 1, "b": [1, 2, 3]}
```"#,
ResponseShape::Json,
));
}
#[test]
fn structural_complete_rejects_truncated_json() {
assert!(!is_structurally_complete(r#"{"a": 1"#, ResponseShape::Json));
assert!(!is_structurally_complete(r#"{"a": {"b": 1"#, ResponseShape::Json));
// Trailing comma — balanced but unparseable
assert!(!is_structurally_complete(r#"{"a": 1,}"#, ResponseShape::Json));
}
#[test]
fn structural_complete_ignores_braces_inside_strings() {
assert!(is_structurally_complete(r#"{"s": "has } inside"}"#, ResponseShape::Json));
assert!(is_structurally_complete(r#"{"s": "escaped \" quote"}"#, ResponseShape::Json));
}
#[tokio::test]
async fn continuable_returns_first_response_when_complete() {
let generator = ScriptedGenerator::new(vec![Ok(r#"{"ok": true}"#.to_string())]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
assert_eq!(out.empty_retries, 0);
assert_eq!(out.continuations, 0);
assert_eq!(generator.call_count(), 1);
}
#[tokio::test]
async fn continuable_retries_on_empty_with_doubled_budget() {
// Two empties, then a good response. Third call should see 4×
// the initial budget (2× twice).
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok(r#"{"ok": true}"#.to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_tokens = Some(100);
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
assert_eq!(out.empty_retries, 2);
let calls = generator.calls();
assert_eq!(calls.len(), 3);
assert_eq!(calls[0].max_tokens, Some(100));
assert_eq!(calls[1].max_tokens, Some(200));
assert_eq!(calls[2].max_tokens, Some(400));
}
#[tokio::test]
async fn continuable_caps_budget_at_budget_cap() {
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok(r#"{"ok":1}"#.to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_tokens = Some(5_000);
opts.budget_cap = 7_000;
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
let calls = generator.calls();
// 5000 → doubled would be 10000; cap pulls it to 7000.
assert_eq!(calls[1].max_tokens, Some(7_000));
assert_eq!(calls[2].max_tokens, Some(7_000));
}
#[tokio::test]
async fn continuable_glues_truncated_response() {
// First call returns balanced-open `{...`; continuation closes
// it with `...}`. Combined must parse.
let generator = ScriptedGenerator::new(vec![
Ok(r#"{"fills": [{"candidate_id": "C-001""#.to_string()),
Ok(r#", "name": "Alice"}]}"#.to_string()),
]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "ORIGINAL", &opts).await.unwrap();
assert!(out.final_complete, "combined must parse: {}", out.text);
assert_eq!(out.continuations, 1);
let calls = generator.calls();
assert_eq!(calls.len(), 2);
// Continuation prompt must contain the partial — that's the
// scratchpad primitive J called out.
let cont_prompt = &calls[1].prompt;
assert!(cont_prompt.contains("ORIGINAL"),
"continuation must include original prompt");
assert!(cont_prompt.contains(r#"{"fills": [{"candidate_id": "C-001""#),
"continuation must include partial");
}
#[tokio::test]
async fn continuable_does_not_loop_on_persistent_empty() {
// All three retries return empty; we must NOT then enter the
// continuation loop with an empty partial (would burn 3 more
// calls continuing from "").
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok("".to_string()),
]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(!out.final_complete);
assert_eq!(out.empty_retries, 3);
assert_eq!(out.continuations, 0, "must not continue from empty");
assert_eq!(generator.call_count(), 3);
}
#[tokio::test]
async fn continuable_returns_raw_on_exhausted_continuations() {
// Three continuations that never complete — caller's parser
// will throw. We must return the combined text so forensics
// has the raw content, not a lossy truncation.
let generator = ScriptedGenerator::new(vec![
Ok(r#"{"a": ["#.to_string()),
Ok("1,".to_string()),
Ok("2,".to_string()),
Ok("3,".to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_continuations = 3;
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(!out.final_complete);
assert_eq!(out.continuations, 3);
assert!(out.text.contains(r#"{"a": ["#));
assert!(out.text.contains("3,"));
}
#[tokio::test]
async fn continuable_propagates_generator_errors() {
let generator = ScriptedGenerator::new(vec![Err("sidecar 503".to_string())]);
let opts = ContinuableOpts::new("qwen3:latest");
let err = generate_continuable(&generator, "test", &opts).await.unwrap_err();
assert!(err.contains("503"));
}
}