lakehouse/crates/aibridge/src/continuation.rs
root 21fd3b9c61
Some checks failed
lakehouse/auditor 2 blocking issues: cloud: claim not backed — "| **P9-001** (partial) | `crates/ingestd/src/service.rs` | **3 → 6** ↑↑↑ | `journal.record_ing
Scrum-driven fixes: P5-001 auth wired, P42-001 truth evaluator, P9-001 journal on ingest
Apply the highest-confidence findings from the Phase 0→42 forensic sweep
after four scrum-master iterations under the adversarial prompt. Each fix
is independently validated by a later scrum iteration scoring the same
file higher under the same bar.

Code changes
────────────
P5-001 — crates/gateway/src/auth.rs + main.rs
  api_key_auth was marked #[allow(dead_code)] and never wrapped around
  the router, so `[auth] enabled=true` logged a green message and
  enforced nothing. Now wired via from_fn_with_state, with constant-time
  header compare and /health exempted for LB probes.

P42-001 — crates/truth/src/lib.rs
  TruthStore::check() ignored RuleCondition entirely — signature looked
  like enforcement, body returned every action unconditionally. Added
  evaluate(task_class, ctx) that actually walks FieldEquals / FieldEmpty /
  FieldGreater / Always against a serde_json::Value via dot-path lookup.
  check() kept for back-compat. Tests 14 → 24 (10 new exercising real
  pass/fail semantics). serde_json moved to [dependencies].

P9-001 (partial) — crates/ingestd/src/service.rs
  Added Optional<Journal> to IngestState + a journal.record_ingest() call
  on /ingest/file success. Gateway wires it with `journal.clone()` before
  the /journal nest consumes the original. First-ever internal mutation
  journal event verified live (total_events_created 0→1 after probe).

Iter-4 scrum scored these files higher under same prompt:
  ingestd/src/service.rs      3 → 6  (P9-001 visible)
  truth/src/lib.rs            3 → 4  (P42-001 visible)
  gateway/src/auth.rs         3 → 4  (P5-001 visible)
  gateway/src/execution_loop  4 → 6  (indirect)
  storaged/src/federation     3 → 4  (indirect)

Infrastructure additions
────────────────────────
 * tests/real-world/scrum_master_pipeline.ts
   - cloud-first ladder: kimi-k2:1t → deepseek-v3.1:671b → mistral-large-3:675b
     → gpt-oss:120b → devstral-2:123b → qwen3.5:397b (deep final thinker)
   - LH_SCRUM_FORENSIC env: injects SCRUM_FORENSIC_PROMPT.md as adversarial preamble
   - LH_SCRUM_PROPOSAL env: per-iter fix-wave doc override
   - Confidence extraction (markdown + JSON), schema v4 KB rows with:
     verdict, critical_failures_count, verified_components_count,
     missing_components_count, output_format, gradient_tier
   - Model trust profile written per file-accept to data/_kb/model_trust.jsonl
   - Fire-and-forget POST to observer /event so by_source.scrum appears in /stats

 * mcp-server/observer.ts — unchanged in shape, confirmed receiving scrum events

 * ui/ — new Visual Control Plane on :3950
   - Bun.serve with /data/{services,reviews,metrics,trust,overrides,findings,file,refactor_signals,search,logs/:svc,scrum_log}
   - Views: MAP (D3 graph, 5 overlays) / TRACE (per-file iter timeline) /
     TRAJECTORY (refactor signals + reverse index search) / METRICS (explainers
     with SOURCE + GOOD lines) / KB (card grid with tooltips) / CONSOLE (per-service
     journalctl tail, tabs for gateway/sidecar/observer/mcp/ctx7/auditor/langfuse)
   - tryFetch always attempts JSON.parse (fix for observer returning JSON without content-type)
   - renderNodeContext primitive-vs-object guard (fix for gateway /health string)

 * docs/SCRUM_FIX_WAVE.md     — iter-specific scope directing the scrum
 * docs/SCRUM_FORENSIC_PROMPT.md — adversarial audit prompt (verdict/critical/verified schema)
 * docs/SCRUM_LOOP_NOTES.md   — iteration observations + fix-next-loop queue
 * docs/SYSTEM_EVOLUTION_LAYERS.md — Layers 1-10 roadmap (trust profiling, execution DNA, drift sentinel, etc)

Measurements across iterations
──────────────────────────────
 iter 1 (soft prompt, gpt-oss:120b):   mean score 5.00/10
 iter 3 (forensic, kimi-k2:1t):        mean score 3.56/10 (−1.44 — bar raised)
 iter 4 (same bar, post fixes):        mean score 4.00/10 (+0.44 — fixes landed)

 Score movement iter3→iter4: ↑5 ↓1 =12
 21/21 first-attempt accept by kimi-k2:1t in iter 4
 20/21 emitted forensic JSON (richer signal than markdown)
 16 verified_components captured (proof-of-life, new metric)
 Permission Gradient distribution: 0 auto · 16 dry_run · 4 sim · 1 block

 Observer loop: by_source {scrum: 21, langfuse: 1985, phase24_audit: 1}
 v1/usage: 224 requests, 477K tokens, all tracked

Signal classes per file (iter 3 → iter 4):
 CONVERGING:  1 (ingestd/service.rs — fix clearly landed)
 LOOPING:     4 (catalogd/registry, main, queryd/service, vectord/index_registry)
 ORBITING:    1 (truth — novel findings surfacing as surface ones fix)
 PLATEAU:     9 (scores flat with high confidence — diminishing returns)
 MIXED:       6

Loop thesis status
──────────────────
A file's score rises only when the scrum confirms a real fix landed.
No false positives yet across 3 iterations. Fixes applied to 3 files all
raised their independent scores under the same adversarial prompt. Loop
is measurable, not hand-wavy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 02:25:43 -05:00

462 lines
17 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Phase 21 — OUTPUT-overflow handler. Ports `generateContinuable`
//! from `tests/multi-agent/agent.ts`.
//!
//! Two failure modes to repair:
//!
//! * EMPTY response — thinking model ate the entire budget on hidden
//! reasoning before emitting a token. Fix: retry the original prompt
//! with 2× the budget, geometric up to `BUDGET_CAP`.
//!
//! * TRUNCATED non-empty — model got most of the way but hit
//! max_tokens before closing the structure. Fix: continue with the
//! partial response in the prompt as scratchpad, so the model knows
//! where to pick up without restarting.
//!
//! `TextGenerator` abstracts the sidecar so tests can inject canned
//! responses without a live Ollama.
use std::future::Future;
use crate::client::{AiClient, GenerateRequest, GenerateResponse};
/// Shape classifier for `is_structurally_complete`. JSON responses
/// must parse; text responses just need to be non-empty.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseShape {
Json,
Text,
}
/// Trait that `generate_continuable` + `generate_tree_split` call. The
/// real implementation forwards to `AiClient::generate`; tests supply a
/// mock with a scripted sequence of responses.
pub trait TextGenerator: Send + Sync {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send;
}
impl TextGenerator for AiClient {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send {
self.generate(req)
}
}
/// Strip a surrounding ```json``` fence if present. Leaves the inner
/// content alone otherwise. Returns a slice of `s`.
fn strip_json_fence(s: &str) -> &str {
let t = s.trim();
if let Some(rest) = t.strip_prefix("```json") {
rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim()
} else if let Some(rest) = t.strip_prefix("```") {
rest.trim_start_matches('\n').strip_suffix("```").unwrap_or(rest).trim()
} else {
t
}
}
/// Port of the TS brace-balance + JSON.parse check. Returns true when
/// the outermost `{...}` block is balanced and parses. Text shape is
/// satisfied by any non-empty, non-whitespace payload.
pub fn is_structurally_complete(text: &str, shape: ResponseShape) -> bool {
if text.trim().is_empty() { return false; }
if shape == ResponseShape::Text { return true; }
let s = strip_json_fence(text);
let Some(start) = s.find('{') else { return false };
let Some(end) = s.rfind('}') else { return false };
if end <= start { return false; }
let slice = &s[start..=end];
// Balance check — cheaper than parse, catches truncated nests.
// String state tracked because `{` inside a string doesn't count.
let mut depth: i32 = 0;
let mut in_str = false;
let mut esc = false;
for c in slice.chars() {
if esc { esc = false; continue; }
if c == '\\' { esc = true; continue; }
if c == '"' { in_str = !in_str; continue; }
if in_str { continue; }
if c == '{' { depth += 1; }
else if c == '}' {
depth -= 1;
if depth < 0 { return false; }
}
}
if depth != 0 { return false; }
// Parse check is the tie-breaker — balanced but invalid JSON (e.g.
// trailing comma before `}`) shouldn't count as complete.
serde_json::from_str::<serde_json::Value>(slice).is_ok()
}
/// Knobs for `generate_continuable`. All optional with sensible
/// defaults that match the TS version.
#[derive(Debug, Clone)]
pub struct ContinuableOpts {
pub model: String,
pub max_tokens: Option<u32>,
pub temperature: Option<f64>,
pub system: Option<String>,
pub shape: ResponseShape,
pub max_continuations: usize,
pub think: Option<bool>,
/// Geometric-backoff ceiling for the empty-response retry path.
/// Matches TS's `budgetCap = 8000`.
pub budget_cap: u32,
/// Maximum empty-response retries before giving up. Matches TS's
/// hardcoded `retry < 3`.
pub max_empty_retries: usize,
}
impl ContinuableOpts {
pub fn new(model: impl Into<String>) -> Self {
Self {
model: model.into(),
max_tokens: None,
temperature: None,
system: None,
shape: ResponseShape::Json,
max_continuations: 3,
think: None,
budget_cap: 8_000,
max_empty_retries: 3,
}
}
}
/// Outcome of a `generate_continuable` call. Carries the combined
/// text plus diagnostic counters so observability downstream can
/// report "how many continuations did that query cost".
#[derive(Debug, Clone)]
pub struct ContinuableOutcome {
pub text: String,
pub empty_retries: usize,
pub continuations: usize,
pub final_complete: bool,
/// Sum of `prompt_tokens` across every generator call made to
/// produce this outcome — including empty retries and continuations.
/// Lets callers (gateway execution loop, observability) stamp
/// accurate per-task usage without second-guessing the retry fan-out.
pub prompt_tokens: u32,
/// Sum of `completion_tokens` across every generator call.
pub completion_tokens: u32,
/// Total number of generator calls. `1 + empty_retries +
/// continuations` in the normal case; the field is explicit so
/// callers don't have to re-derive it.
pub calls: u32,
}
fn make_request(opts: &ContinuableOpts, prompt: String, current_max: u32) -> GenerateRequest {
GenerateRequest {
prompt,
model: Some(opts.model.clone()),
system: opts.system.clone(),
temperature: opts.temperature,
max_tokens: Some(current_max),
think: opts.think,
}
}
fn continuation_prompt(original: &str, partial: &str) -> String {
format!(
"{original}\n\n\
PARTIAL RESPONSE SO FAR (continue from here — do NOT restart, \
do NOT repeat what's already there, emit ONLY the remaining \
tokens to close the structure):\n{partial}"
)
}
/// Phase 21 — output-overflow safe generate. See module docs for the
/// two failure modes repaired. On final-still-incomplete, returns the
/// combined text with `final_complete: false` so the caller's parser
/// can throw with the raw text for forensics rather than silently
/// truncating.
pub async fn generate_continuable<G: TextGenerator>(
generator: &G,
prompt: &str,
opts: &ContinuableOpts,
) -> Result<ContinuableOutcome, String> {
let initial_max = opts.max_tokens.unwrap_or(800);
let mut current_max = initial_max;
let mut combined = String::new();
let mut empty_retries = 0usize;
let mut continuations = 0usize;
let mut prompt_tokens: u32 = 0;
let mut completion_tokens: u32 = 0;
let mut calls: u32 = 0;
// Phase 21(a) — empty-response backoff loop.
for retry in 0..opts.max_empty_retries {
let req = make_request(opts, prompt.to_string(), current_max);
let resp = generator.generate_text(req).await?;
calls += 1;
prompt_tokens = prompt_tokens.saturating_add(resp.tokens_evaluated.unwrap_or(0) as u32);
completion_tokens = completion_tokens.saturating_add(resp.tokens_generated.unwrap_or(0) as u32);
if !resp.text.trim().is_empty() {
combined = resp.text;
break;
}
empty_retries = retry + 1;
current_max = (current_max.saturating_mul(2)).min(opts.budget_cap);
}
// Phase 21(b) — structural-completion continuation loop.
for _ in 0..opts.max_continuations {
if is_structurally_complete(&combined, opts.shape) {
return Ok(ContinuableOutcome {
text: combined,
empty_retries,
continuations,
final_complete: true,
prompt_tokens,
completion_tokens,
calls,
});
}
if combined.trim().is_empty() {
// Nothing to continue from — continuing "" is identical to
// the initial call and would loop.
break;
}
let cont_prompt = continuation_prompt(prompt, &combined);
let req = make_request(opts, cont_prompt, current_max.min(opts.budget_cap));
let resp = generator.generate_text(req).await?;
calls += 1;
prompt_tokens = prompt_tokens.saturating_add(resp.tokens_evaluated.unwrap_or(0) as u32);
completion_tokens = completion_tokens.saturating_add(resp.tokens_generated.unwrap_or(0) as u32);
combined.push_str(&resp.text);
continuations += 1;
}
let final_complete = is_structurally_complete(&combined, opts.shape);
Ok(ContinuableOutcome {
text: combined,
empty_retries,
continuations,
final_complete,
prompt_tokens,
completion_tokens,
calls,
})
}
/// Scripted generator for unit tests. Returns responses from `script`
/// in order; extra calls reuse the last entry so tests don't have to
/// count past what they actually assert on.
#[cfg(test)]
pub struct ScriptedGenerator {
script: Vec<Result<String, String>>,
calls: std::sync::Arc<std::sync::Mutex<Vec<GenerateRequest>>>,
}
#[cfg(test)]
impl ScriptedGenerator {
pub fn new<I>(script: I) -> Self
where
I: IntoIterator<Item = Result<String, String>>,
{
Self {
script: script.into_iter().collect(),
calls: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
}
}
pub fn calls(&self) -> Vec<GenerateRequest> {
self.calls.lock().unwrap().clone()
}
pub fn call_count(&self) -> usize {
self.calls.lock().unwrap().len()
}
}
#[cfg(test)]
impl TextGenerator for ScriptedGenerator {
fn generate_text(
&self,
req: GenerateRequest,
) -> impl Future<Output = Result<GenerateResponse, String>> + Send {
let i = {
let mut calls = self.calls.lock().unwrap();
let i = calls.len();
calls.push(req.clone());
i
};
let model = req.model.clone().unwrap_or_default();
let entry = self.script.get(i)
.cloned()
.unwrap_or_else(|| self.script.last().cloned().unwrap_or(Ok(String::new())));
async move {
entry.map(|text| GenerateResponse {
text,
model,
tokens_evaluated: None,
tokens_generated: None,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn structural_complete_rejects_empty_and_text_mismatch() {
assert!(!is_structurally_complete("", ResponseShape::Text));
assert!(!is_structurally_complete(" ", ResponseShape::Text));
assert!(is_structurally_complete("any content", ResponseShape::Text));
}
#[test]
fn structural_complete_handles_balanced_json() {
assert!(is_structurally_complete(r#"{"a": 1}"#, ResponseShape::Json));
assert!(is_structurally_complete(
r#"```json
{"a": 1, "b": [1, 2, 3]}
```"#,
ResponseShape::Json,
));
}
#[test]
fn structural_complete_rejects_truncated_json() {
assert!(!is_structurally_complete(r#"{"a": 1"#, ResponseShape::Json));
assert!(!is_structurally_complete(r#"{"a": {"b": 1"#, ResponseShape::Json));
// Trailing comma — balanced but unparseable
assert!(!is_structurally_complete(r#"{"a": 1,}"#, ResponseShape::Json));
}
#[test]
fn structural_complete_ignores_braces_inside_strings() {
assert!(is_structurally_complete(r#"{"s": "has } inside"}"#, ResponseShape::Json));
assert!(is_structurally_complete(r#"{"s": "escaped \" quote"}"#, ResponseShape::Json));
}
#[tokio::test]
async fn continuable_returns_first_response_when_complete() {
let generator = ScriptedGenerator::new(vec![Ok(r#"{"ok": true}"#.to_string())]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
assert_eq!(out.empty_retries, 0);
assert_eq!(out.continuations, 0);
assert_eq!(generator.call_count(), 1);
}
#[tokio::test]
async fn continuable_retries_on_empty_with_doubled_budget() {
// Two empties, then a good response. Third call should see 4×
// the initial budget (2× twice).
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok(r#"{"ok": true}"#.to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_tokens = Some(100);
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
assert_eq!(out.empty_retries, 2);
let calls = generator.calls();
assert_eq!(calls.len(), 3);
assert_eq!(calls[0].max_tokens, Some(100));
assert_eq!(calls[1].max_tokens, Some(200));
assert_eq!(calls[2].max_tokens, Some(400));
}
#[tokio::test]
async fn continuable_caps_budget_at_budget_cap() {
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok(r#"{"ok":1}"#.to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_tokens = Some(5_000);
opts.budget_cap = 7_000;
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(out.final_complete);
let calls = generator.calls();
// 5000 → doubled would be 10000; cap pulls it to 7000.
assert_eq!(calls[1].max_tokens, Some(7_000));
assert_eq!(calls[2].max_tokens, Some(7_000));
}
#[tokio::test]
async fn continuable_glues_truncated_response() {
// First call returns balanced-open `{...`; continuation closes
// it with `...}`. Combined must parse.
let generator = ScriptedGenerator::new(vec![
Ok(r#"{"fills": [{"candidate_id": "C-001""#.to_string()),
Ok(r#", "name": "Alice"}]}"#.to_string()),
]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "ORIGINAL", &opts).await.unwrap();
assert!(out.final_complete, "combined must parse: {}", out.text);
assert_eq!(out.continuations, 1);
let calls = generator.calls();
assert_eq!(calls.len(), 2);
// Continuation prompt must contain the partial — that's the
// scratchpad primitive J called out.
let cont_prompt = &calls[1].prompt;
assert!(cont_prompt.contains("ORIGINAL"),
"continuation must include original prompt");
assert!(cont_prompt.contains(r#"{"fills": [{"candidate_id": "C-001""#),
"continuation must include partial");
}
#[tokio::test]
async fn continuable_does_not_loop_on_persistent_empty() {
// All three retries return empty; we must NOT then enter the
// continuation loop with an empty partial (would burn 3 more
// calls continuing from "").
let generator = ScriptedGenerator::new(vec![
Ok("".to_string()),
Ok("".to_string()),
Ok("".to_string()),
]);
let opts = ContinuableOpts::new("qwen3:latest");
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(!out.final_complete);
assert_eq!(out.empty_retries, 3);
assert_eq!(out.continuations, 0, "must not continue from empty");
assert_eq!(generator.call_count(), 3);
}
#[tokio::test]
async fn continuable_returns_raw_on_exhausted_continuations() {
// Three continuations that never complete — caller's parser
// will throw. We must return the combined text so forensics
// has the raw content, not a lossy truncation.
let generator = ScriptedGenerator::new(vec![
Ok(r#"{"a": ["#.to_string()),
Ok("1,".to_string()),
Ok("2,".to_string()),
Ok("3,".to_string()),
]);
let mut opts = ContinuableOpts::new("qwen3:latest");
opts.max_continuations = 3;
let out = generate_continuable(&generator, "test", &opts).await.unwrap();
assert!(!out.final_complete);
assert_eq!(out.continuations, 3);
assert!(out.text.contains(r#"{"a": ["#));
assert!(out.text.contains("3,"));
}
#[tokio::test]
async fn continuable_propagates_generator_errors() {
let generator = ScriptedGenerator::new(vec![Err("sidecar 503".to_string())]);
let opts = ContinuableOpts::new("qwen3:latest");
let err = generate_continuable(&generator, "test", &opts).await.unwrap_err();
assert!(err.contains("503"));
}
}