lakehouse/crates/aibridge/src/continuation.rs
root 454da15301
Some checks failed
lakehouse/auditor 16 blocking issues: cloud: claim not backed — "Verified end-to-end:"
auditor + aibridge: 6 fixes from Opus 4.7 self-audit on PR #11
The kimi_architect auditor on commit 00c8408 ran with auto-promotion
to claude-opus-4-7 (diff > 100k chars), produced 10 grounded
findings, 1 BLOCK + 6 WARN + 3 INFO. This commit lands 6 of them; 3
are skipped (false positives or out-of-scope cleanup deferred).

LANDED:

1. kimi_architect.ts:144  empty-parse cache poisoning. When parseFindings
   returns 0 findings (markdown shape changed, prompt too big, regex
   missed every block), the verdict was still persisted with empty
   findings, and the 24h TTL cache short-circuited every subsequent
   audit with a useless "0 findings" hit. Fix: only persist when
   findings.length > 0; metrics still appended unconditionally.

2. kimi_architect.ts:122  outage negative-cache. When callKimi throws
   (network error, gateway 502, rate limit), we returned skipFinding
   but didn't note the outage anywhere. Every audit cycle within the
   24h TTL hammered the dead upstream. Fix: write a sentinel file
   `<verdict>.outage` on failure with 10-min TTL; future calls within
   that window short-circuit immediately.

3. kimi_architect.ts:331  mkdir(join(p, "..")) -> dirname(p). The
   "/.." idiom resolved correctly via Node path normalization but
   was non-idiomatic and breaks if the path ever has trailing dots.
   Both Haiku and Opus self-audits flagged it.

4. inference.ts:202  N=3 consensus latency double/triple-count.
   `totalLatencyMs += run.latency_ms` summed across THREE parallel
   `Promise.all` calls — wall-clock is bounded by the slowest, not
   the sum. Renamed to `maxLatencyMs` using `Math.max`. Telemetry now
   reports actual wall-clock instead of 3x reality.

5. continuation.rs:198,199,230,231  i64/u64 -> u32 saturating cast.
   `resp.tokens_evaluated as u32` truncates bits when source > u32::MAX
   instead of saturating. Fix: u32::try_from(...).unwrap_or(u32::MAX)
   wraps the cast in a real saturate. Applied to both the empty-retry
   loop and the structural-completion continuation loop.

SKIPPED:

- BLOCK at Cargo.lock:8911 "validator-not-in-workspace" — confabulation.
  The diff Opus saw was truncated mid-line; validator IS in
  Cargo.toml workspace members. Real-world MAX_DIFF_CHARS=180k
  edge case to watch as we feed more big diffs.
- WARN at kimi_architect.ts:248 regex absolute-path edge case — minor,
  doesn't affect grounding rate observed so far.
- INFO at inference.ts:606 "dead reconstruction loop" — Opus misread.
  The Promise.all worker fills `summaries[]`; the second loop builds
  a sequential `scratchpad` string from those. Two distinct
  operations, not redundant.

Verification:
  bun build auditor/checks/{kimi_architect,inference}.ts   compiles
  cargo check -p aibridge                                  green
  cargo build --release -p gateway                          green
  systemctl restart lakehouse.service lakehouse-auditor.service  active

Next audit cycle (~90s after push) will run on the new diff and
exercise the negative-cache + dirname + maxLatencyMs paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 07:10:43 -05:00

465 lines
17 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Phase 21 — OUTPUT-overflow handler. Ports `generateContinuable`
//! from `tests/multi-agent/agent.ts`.
//!
//! Two failure modes to repair:
//!
//! * EMPTY response — thinking model ate the entire budget on hidden
//! reasoning before emitting a token. Fix: retry the original prompt
//! with 2× the budget, geometric up to `BUDGET_CAP`.
//!
//! * TRUNCATED non-empty — model got most of the way but hit
//! max_tokens before closing the structure. Fix: continue with the
//! partial response in the prompt as scratchpad, so the model knows
//! where to pick up without restarting.
//!
//! `TextGenerator` abstracts the sidecar so tests can inject canned
//! responses without a live Ollama.
use std::future::Future;
use crate::client::{AiClient, GenerateRequest, GenerateResponse};
/// Shape classifier for `is_structurally_complete`. JSON responses
/// must parse; text responses just need to be non-empty.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseShape {
Json,
Text,
}
/// Trait that `generate_continuable` + `generate_tree_split` call. The
/// real implementation forwards to `AiClient::generate`; tests supply a
/// mock with a scripted sequence of responses.
pub trait TextGenerator: Send + Sync {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send;
}
impl TextGenerator for AiClient {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send {
self.generate(req)
}
}
/// Strip a surrounding ```json``` fence if present. Leaves the inner
/// content alone otherwise. Returns a slice of `s`.
fn strip_json_fence(s: &str) -> &str {
let t = s.trim();
if let Some(rest) = t.strip_prefix("```json") {
rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim()
} else if let Some(rest) = t.strip_prefix("```") {
rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim()
} else {
t
}
}
/// Port of the TS brace-balance + JSON.parse check. Returns true when
/// the outermost `{...}` block is balanced and parses. Text shape is
/// satisfied by any non-empty, non-whitespace payload.
pub fn is_structurally_complete(text: &str, shape: ResponseShape) -> bool {
if text.trim().is_empty() { return false; }
if shape == ResponseShape::Text { return true; }
let s = strip_json_fence(text);
let Some(start) = s.find('{') else { return false };
let Some(end) = s.rfind('}') else { return false };
if end <= start { return false; }
let slice = &s[start..=end];
// Balance check — cheaper than parse, catches truncated nests.
// String state tracked because `{` inside a string doesn't count.
let mut depth: i32 = 0;
let mut in_str = false;
let mut esc = false;
for c in slice.chars() {
if esc { esc = false; continue; }
if c == '\\' { esc = true; continue; }
if c == '"' { in_str = !in_str; continue; }
if in_str { continue; }
if c == '{' { depth += 1; }
else if c == '}' {
depth -= 1;
if depth < 0 { return false; }
}
}
if depth != 0 { return false; }
// Parse check is the tie-breaker — balanced but invalid JSON (e.g.
// trailing comma before `}`) shouldn't count as complete.
serde_json::from_str::<serde_json::Value>(slice).is_ok()
}
/// Knobs for `generate_continuable`. All optional with sensible
/// defaults that match the TS version.
#[derive(Debug, Clone)]
pub struct ContinuableOpts {
pub model: String,
pub max_tokens: Option<u32>,
pub temperature: Option<f64>,
pub system: Option<String>,
pub shape: ResponseShape,
pub max_continuations: usize,
pub think: Option<bool>,
/// Geometric-backoff ceiling for the empty-response retry path.
/// Matches TS's `budgetCap = 8000`.
pub budget_cap: u32,
/// Maximum empty-response retries before giving up. Matches TS's
/// hardcoded `retry < 3`.
pub max_empty_retries: usize,
}
impl ContinuableOpts {
pub fn new(model: impl Into<String>) -> Self {
Self {
model: model.into(),
max_tokens: None,
temperature: None,
system: None,
shape: ResponseShape::Json,
max_continuations: 3,
think: None,
budget_cap: 8_000,
max_empty_retries: 3,
}
}
}
/// Outcome of a `generate_continuable` call. Carries the combined
/// text plus diagnostic counters so observability downstream can
/// report "how many continuations did that query cost".
#[derive(Debug, Clone)]
pub struct ContinuableOutcome {
pub text: String,
pub empty_retries: usize,
pub continuations: usize,
pub final_complete: bool,
/// Sum of `prompt_tokens` across every generator call made to
/// produce this outcome — including empty retries and continuations.
/// Lets callers (gateway execution loop, observability) stamp
/// accurate per-task usage without second-guessing the retry fan-out.
pub prompt_tokens: u32,
/// Sum of `completion_tokens` across every generator call.
pub completion_tokens: u32,
/// Total number of generator calls. `1 + empty_retries +
/// continuations` in the normal case; the field is explicit so
/// callers don't have to re-derive it.
pub calls: u32,
}
fn make_request(opts: &ContinuableOpts, prompt: String, current_max: u32) -> GenerateRequest {
GenerateRequest {
prompt,
model: Some(opts.model.clone()),
system: opts.system.clone(),
temperature: opts.temperature,
max_tokens: Some(current_max),
think: opts.think,
}
}
fn continuation_prompt(original: &str, partial: &str) -> String {
format!(
"{original}\n\n\
PARTIAL RESPONSE SO FAR (continue from here — do NOT restart, \
do NOT repeat what's already there, emit ONLY the remaining \
tokens to close the structure):\n{partial}"
)
}
/// Phase 21 — output-overflow safe generate. See module docs for the
/// two failure modes repaired. On final-still-incomplete, returns the
/// combined text with `final_complete: false` so the caller's parser
/// can throw with the raw text for forensics rather than silently
/// truncating.
pub async fn generate_continuable<G: TextGenerator>(
generator: &G,
prompt: &str,
opts: &ContinuableOpts,
) -> Result<ContinuableOutcome, String> {
let initial_max = opts.max_tokens.unwrap_or(800);
let mut current_max = initial_max;
let mut combined = String::new();
let mut empty_retries = 0usize;
let mut continuations = 0usize;
let mut prompt_tokens: u32 = 0;
let mut completion_tokens: u32 = 0;
let mut calls: u32 = 0;
// Phase 21(a) — empty-response backoff loop.
for retry in 0..opts.max_empty_retries {
let req = make_request(opts, prompt.to_string(), current_max);
let resp = generator.generate_text(req).await?;
calls += 1;
// u32::try_from saturates at u32::MAX instead of silently
// truncating bits when tokens_evaluated/_generated comes back
// as a u64 > 4 billion. Caught 2026-04-27 by Opus self-audit.
prompt_tokens = prompt_tokens.saturating_add(u32::try_from(resp.tokens_evaluated.unwrap_or(0)).unwrap_or(u32::MAX));
completion_tokens = completion_tokens.saturating_add(u32::try_from(resp.tokens_generated.unwrap_or(0)).unwrap_or(u32::MAX));
if !resp.text.trim().is_empty() {
combined = resp.text;
break;
}
empty_retries = retry + 1;
current_max = (current_max.saturating_mul(2)).min(opts.budget_cap);
}
// Phase 21(b) — structural-completion continuation loop.
for _ in 0..opts.max_continuations {
if is_structurally_complete(&combined, opts.shape) {
return Ok(ContinuableOutcome {
text: combined,
empty_retries,
continuations,
final_complete: true,
prompt_tokens,
completion_tokens,
calls,
});
}
if combined.trim().is_empty() {
// Nothing to continue from — continuing "" is identical to
// the initial call and would loop.
break;
}
let cont_prompt = continuation_prompt(prompt, &combined);
let req = make_request(opts, cont_prompt, current_max.min(opts.budget_cap));
let resp = generator.generate_text(req).await?;
calls += 1;
prompt_tokens = prompt_tokens.saturating_add(u32::try_from(resp.tokens_evaluated.unwrap_or(0)).unwrap_or(u32::MAX));
completion_tokens = completion_tokens.saturating_add(u32::try_from(resp.tokens_generated.unwrap_or(0)).unwrap_or(u32::MAX));
combined.push_str(&resp.text);
continuations += 1;
}
let final_complete = is_structurally_complete(&combined, opts.shape);
Ok(ContinuableOutcome {
text: combined,
empty_retries,
continuations,
final_complete,
prompt_tokens,
completion_tokens,
calls,
})
}
/// Scripted generator for unit tests. Returns responses from `script`
/// in order; extra calls reuse the last entry so tests don't have to
/// count past what they actually assert on.
#[cfg(test)]
pub struct ScriptedGenerator {
script: Vec<Result<String, String>>,
calls: std::sync::Arc<std::sync::Mutex<Vec<GenerateRequest>>>,
}
#[cfg(test)]
impl ScriptedGenerator {
pub fn new<I>(script: I) -> Self
where
I: IntoIterator<Item = Result<String, String>>,
{
Self {
script: script.into_iter().collect(),
calls: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
pub fn calls(&self) -> Vec<GenerateRequest> {
self.calls.lock().unwrap().clone()
}
pub fn call_count(&self) -> usize {
self.calls.lock().unwrap().len()
}
}
#[cfg(test)]
impl TextGenerator for ScriptedGenerator {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send {
let i = {
let mut calls = self.calls.lock().unwrap();
let i = calls.len();
calls.push(req.clone());
i
};
let model = req.model.clone().unwrap_or_default();
let entry = self.script.get(i)
.cloned()
.unwrap_or_else(|| self.script.last().cloned().unwrap_or(Ok(String::new())));
async move {
entry.map(|text| GenerateResponse {
text,
model,
tokens_evaluated: None,
tokens_generated: None,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn structural_complete_rejects_empty_and_text_mismatch() {
assert!(!is_structurally_complete("", ResponseShape::Text));
assert!(!is_structurally_complete(" ", ResponseShape::Text));
assert!(is_structurally_complete("any content", ResponseShape::Text));
}
#[test]
fn structural_complete_handles_balanced_json() {
assert!(is_structurally_complete(r#"{"a": 1}"#, ResponseShape::Json));
assert!(is_structurally_complete(
r#"```json
{"a": 1, "b": [1, 2, 3]}
```"#,
ResponseShape::Json,
));
}
#[test]
fn structural_complete_rejects_truncated_json() {
assert!(!is_structurally_complete(r#"{"a": 1"#, ResponseShape::Json));
assert!(!is_structurally_complete(r#"{"a": {"b": 1"#, ResponseShape::Json));
// Trailing comma — balanced but unparseable
assert!(!is_structurally_complete(r#"{"a": 1,}"#, ResponseShape::Json));
}
#[test]
fn structural_complete_ignores_braces_inside_strings() {
assert!(is_structurally_complete(r#"{"s": "has } inside"}"#, ResponseShape::Json));
assert!(is_structurally_complete(r#"{"s": "escaped \" quote"}"#, ResponseShape::Json));
}
#[tokio::test]
async fn continuable_returns_first_response_when_complete() {
let generator = ScriptedGenerator::new(vec![Ok(r#"{"ok": true}"#.to_string())]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
assert_eq!(out.empty_retries, 0);
assert_eq!(out.continuations, 0);
assert_eq!(generator.call_count(), 1);
}
#[tokio::test]
async fn continuable_retries_on_empty_with_doubled_budget() {
// Two empties, then a good response. Third call should see 4×
// the initial budget (2× twice).
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok(r#"{"ok": true}"#.to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_tokens = Some(100);
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
assert_eq!(out.empty_retries, 2);
let calls = generator.calls();
assert_eq!(calls.len(), 3);
assert_eq!(calls[0].max_tokens, Some(100));
assert_eq!(calls[1].max_tokens, Some(200));
assert_eq!(calls[2].max_tokens, Some(400));
}
#[tokio::test]
async fn continuable_caps_budget_at_budget_cap() {
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok(r#"{"ok":1}"#.to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_tokens = Some(5_000);
opts.budget_cap = 7_000;
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
let calls = generator.calls();
// 5000 → doubled would be 10000; cap pulls it to 7000.
assert_eq!(calls[1].max_tokens, Some(7_000));
assert_eq!(calls[2].max_tokens, Some(7_000));
}
#[tokio::test]
async fn continuable_glues_truncated_response() {
// First call returns balanced-open `{...`; continuation closes
// it with `...}`. Combined must parse.
let generator = ScriptedGenerator::new(vec![
Ok(r#"{"fills": [{"candidate_id": "C-001""#.to_string()),
Ok(r#", "name": "Alice"}]}"#.to_string()),
]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "ORIGINAL", &opts).await.unwrap();
assert!(out.final_complete, "combined must parse: {}", out.text);
assert_eq!(out.continuations, 1);
let calls = generator.calls();
assert_eq!(calls.len(), 2);
// Continuation prompt must contain the partial — that's the
// scratchpad primitive J called out.
let cont_prompt = &calls[1].prompt;
assert!(cont_prompt.contains("ORIGINAL"),
"continuation must include original prompt");
assert!(cont_prompt.contains(r#"{"fills": [{"candidate_id": "C-001""#),
"continuation must include partial");
}
#[tokio::test]
async fn continuable_does_not_loop_on_persistent_empty() {
// All three retries return empty; we must NOT then enter the
// continuation loop with an empty partial (would burn 3 more
// calls continuing from "").
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok("".to_string()),
]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(!out.final_complete);
assert_eq!(out.empty_retries, 3);
assert_eq!(out.continuations, 0, "must not continue from empty");
assert_eq!(generator.call_count(), 3);
}
#[tokio::test]
async fn continuable_returns_raw_on_exhausted_continuations() {
// Three continuations that never complete — caller's parser
// will throw. We must return the combined text so forensics
// has the raw content, not a lossy truncation.
let generator = ScriptedGenerator::new(vec![
Ok(r#"{"a": ["#.to_string()),
Ok("1,".to_string()),
Ok("2,".to_string()),
Ok("3,".to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_continuations = 3;
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(!out.final_complete);
assert_eq!(out.continuations, 3);
assert!(out.text.contains(r#"{"a": ["#));
assert!(out.text.contains("3,"));
}
#[tokio::test]
async fn continuable_propagates_generator_errors() {
let generator = ScriptedGenerator::new(vec![Err("sidecar 503".to_string())]);
let opts = ContinuableOpts::new("qwen3:latest");
let err = generate_continuable(&generator, "test", &opts).await.unwrap_err();
assert!(err.contains("503"));
}
}