diff --git a/tests/real-world/enrich_prd_pipeline.ts b/tests/real-world/enrich_prd_pipeline.ts index d0f6f56..134e2d7 100644 --- a/tests/real-world/enrich_prd_pipeline.ts +++ b/tests/real-world/enrich_prd_pipeline.ts @@ -17,7 +17,34 @@ const CHUNK_SIZE = 800; // chars per chunk — ~200 tokens const CHUNK_OVERLAP = 120; const TOP_K_RETRIEVE = 12; // chunks per iteration — pulled up to force overflow const CONTEXT_BUDGET_CHARS = 4000; // tight budget — forces tree-split on every iter -const INJECT_FAIL_ON_ITER = 3; // plant a bad primary-cloud call so rescue fires +const INJECT_FAIL_ON_ITER = 3; // force the TASK-retry loop on iter 3 + +// Continuation controls (per-cloud-call) — used for output-overflow. +// Separate from the task-retry loop (per-task) — that handles errors +// across attempts. +const PRIMARY_MAX_TOKENS = 150; // tight — forces truncation +const CONTINUATION_MAX_TOKENS = 300; // each continuation doubles headroom +const MAX_CONTINUATIONS = 6; // max stitch pieces per cloud call + +// Task-level retry loop (J's clarification, 2026-04-22): +// When a TASK errors, retry the whole task up to 6 times. Each +// retry gets prior attempts' failures injected as learning context, +// so attempt N+1 is informed by what N failed at. The loop caps at +// 6 to avoid infinite spinning on genuinely unsolvable tasks. +const MAX_TASK_RETRIES = 6; + +// To FORCE the retry loop on iter INJECT_FAIL_ON_ITER, cycle through +// 5 deliberately-invalid models + 1 valid one. Attempts 1-5 will +// 502/404 from Ollama Cloud; attempt 6 finally succeeds. Proves the +// loop fires all 6 with compounding failure context. +const FORCE_RETRY_MODEL_SEQUENCE = [ + "deliberately-invalid-model-attempt-1", + "deliberately-invalid-model-attempt-2", + "deliberately-invalid-model-attempt-3", + "deliberately-invalid-model-attempt-4", + "deliberately-invalid-model-attempt-5", + "gpt-oss:20b", // 6th attempt succeeds +]; const GATEWAY = "http://localhost:3100"; const SIDECAR = "http://localhost:3200"; const CLOUD_MODEL = "gpt-oss:120b"; @@ -47,6 +74,9 @@ interface IterationResult { cloud_calls_total: number; continuation_retries: number; rescue_triggered: boolean; + // Task-level retry telemetry + task_attempts_made: number; // how many attempts fired (1 = first succeeded) + task_retry_history: Array<{ n: number; model: string; error: string }>; playbook_id: string | null; tokens_prompt: number; tokens_completion: number; @@ -155,32 +185,50 @@ async function treeSplitSummarize( return { scratchpad, cloud_calls }; } -// ─── Continuable generate — retries once on empty/truncated ─────── +// ─── Continuable generate — up to max_continuations stitches ────── +// +// Two failure modes handled: +// A) Empty response — typically thinking model burned the budget +// on hidden reasoning. Retry with 2× max_tokens. +// B) Truncated response (finish_reason=length) — answer got cut off +// mid-sentence. Pass the partial back as scratchpad and ask the +// model to continue from where it stopped. +// +// Stitching: keep appending content across retries; prompt_tokens and +// completion_tokens accumulate; finish_reason reflects the LAST call. +// Loop exits on the first call that finishes cleanly (stop) with +// non-empty content, OR when retries hit the cap. async function generateContinuable( opts: Parameters[0] & { max_continuations?: number }, -): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number }> { +): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number; finish_reason: string }> { const maxCont = opts.max_continuations ?? 1; let total = await chat(opts); let retries = 0; - if (total.content.length === 0 || total.finish_reason === "length") { - for (let i = 0; i < maxCont && (total.content.length === 0 || total.finish_reason === "length"); i++) { - retries += 1; - log(` continuation retry ${retries} (reason: ${total.finish_reason}, content=${total.content.length})`); - const continued = await chat({ - ...opts, - max_tokens: opts.max_tokens * 2, - messages: [ + while (retries < maxCont && (total.content.length === 0 || total.finish_reason === "length")) { + retries += 1; + const mode = total.content.length === 0 ? "empty" : "truncated"; + log(` continuation retry ${retries}/${maxCont} (${mode}: finish=${total.finish_reason}, content=${total.content.length} chars)`); + // Continuation prompt — branch on failure mode: + // empty → retry with 2× tokens, same prompt (thinking budget) + // length → pass the partial as assistant turn, ask to continue + const continuationMessages = total.content.length === 0 + ? opts.messages + : [ ...opts.messages, - ...(total.content ? [{ role: "assistant", content: total.content }, { role: "user", content: "Continue from where you stopped. Complete the answer." }] : []), - ], - }); - total = { - content: total.content + continued.content, - prompt_tokens: total.prompt_tokens + continued.prompt_tokens, - completion_tokens: total.completion_tokens + continued.completion_tokens, - finish_reason: continued.finish_reason, - }; - } + { role: "assistant", content: total.content }, + { role: "user", content: "Continue from exactly where you stopped. Do not repeat. Finish the answer." }, + ]; + const continued = await chat({ + ...opts, + max_tokens: CONTINUATION_MAX_TOKENS, + messages: continuationMessages, + }); + total = { + content: total.content + continued.content, + prompt_tokens: total.prompt_tokens + continued.prompt_tokens, + completion_tokens: total.completion_tokens + continued.completion_tokens, + finish_reason: continued.finish_reason, + }; } return { ...total, continuation_retries: retries }; } @@ -237,47 +285,90 @@ async function runIteration( citationsReceived = priorPlaybookIds.slice(); } - // 5. Call cloud. On empty/error → rescue. - // Intentional failure-injection on iter INJECT_FAIL_ON_ITER: - // a deliberately-invalid model name on the primary call so the - // rescue path actually runs. Proves the catch-and-rescue isn't - // dead code. - let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number }; + // 5. TASK-LEVEL RETRY LOOP — per J's clarification 2026-04-22. + // Try the task up to MAX_TASK_RETRIES times. Each retry: + // a) Picks a model (normally CLOUD_MODEL; on INJECT_FAIL_ON_ITER, + // cycles through 5 invalid models + 1 valid to force full loop) + // b) Injects prior attempt errors as learning context + // c) If the attempt succeeds (non-empty, >100 chars), loop exits + // d) Otherwise, records failure and tries again with the learning + // + // Cap at 6 so we don't spin forever on unsolvable tasks. + let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number; finish_reason: string } | null = null; let rescueTriggered = false; - const primaryModel = iteration === INJECT_FAIL_ON_ITER - ? "deliberately-invalid-model-to-force-rescue" - : CLOUD_MODEL; - if (iteration === INJECT_FAIL_ON_ITER) { - log(` INJECTED FAILURE on primary call — model="${primaryModel}" will 400/500`); + const taskAttemptHistory: Array<{ n: number; model: string; error: string }> = []; + const forceRetries = iteration === INJECT_FAIL_ON_ITER; + if (forceRetries) log(` FORCING TASK-RETRY LOOP — iter ${iteration} will cycle through 5 invalid models + 1 valid`); + + for (let attempt = 1; attempt <= MAX_TASK_RETRIES; attempt++) { + const modelForAttempt = forceRetries + ? FORCE_RETRY_MODEL_SEQUENCE[attempt - 1] + : CLOUD_MODEL; + // Compose a prior-attempts learning block for attempts 2+ + const learningBlock = taskAttemptHistory.length > 0 + ? `\n\n═══ PRIOR ATTEMPTS THIS TASK (do NOT repeat these failures; adjust approach) ═══\n${taskAttemptHistory.map(a => `Attempt ${a.n} (model ${a.model}) failed: ${a.error.slice(0, 160)}`).join("\n")}\n═══ end prior attempts ═══\n` + : ""; + log(` task attempt ${attempt}/${MAX_TASK_RETRIES}: model=${modelForAttempt}${learningBlock ? " [with prior-failure context]" : ""}`); + try { + const r = await generateContinuable({ + provider: "ollama_cloud", + model: modelForAttempt, + messages: [ + { role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Write a detailed 250-word answer." }, + { role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}${learningBlock}` }, + ], + max_tokens: PRIMARY_MAX_TOKENS, + think: false, + max_continuations: MAX_CONTINUATIONS, + }); + cloudCallsTotal += 1 + r.continuation_retries; + if (r.content && r.content.length > 100) { + // Acceptable answer — exit loop + result = r; + if (attempt > 1) { + log(` task attempt ${attempt} SUCCEEDED (${r.content.length} chars) after ${attempt - 1} prior failures`); + rescueTriggered = true; + } + break; + } + // Thin response — count as failure with learning signal + const err = `thin-answer: ${r.content.length} chars, finish=${r.finish_reason}`; + taskAttemptHistory.push({ n: attempt, model: modelForAttempt, error: err }); + errorsRecovered.push(`attempt ${attempt}: ${err}`); + } catch (e) { + const err = (e as Error).message; + taskAttemptHistory.push({ n: attempt, model: modelForAttempt, error: err }); + errorsRecovered.push(`attempt ${attempt}: ${err.slice(0, 120)}`); + cloudCallsTotal += 1; + } } - try { - result = await generateContinuable({ - provider: "ollama_cloud", - model: primaryModel, - messages: [ - { role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Keep answers under 200 words." }, - { role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}` }, - ], - max_tokens: 800, - think: true, - max_continuations: 2, - }); - cloudCallsTotal += 1 + result.continuation_retries; - } catch (e) { - errorsRecovered.push(`primary cloud call: ${(e as Error).message}`); + + // Last-ditch: if all 6 task attempts failed, try the local fallback + // once more so we at least return SOMETHING. This is the "don't get + // caught in a loop, accept best-so-far" rule J stated explicitly. + if (!result) { + errorsRecovered.push(`all ${MAX_TASK_RETRIES} task attempts failed — local fallback`); rescueTriggered = true; - log(` primary failed → rescue with ${RESCUE_MODEL}`); - result = await generateContinuable({ - provider: "ollama_cloud", - model: RESCUE_MODEL, - messages: [ - { role: "system", content: "Answer briefly using the source material." }, - { role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 8000)}` }, - ], - max_tokens: 500, - think: false, - }); - cloudCallsTotal += 1 + result.continuation_retries; + try { + result = await generateContinuable({ + provider: "ollama", + model: "qwen3.5:latest", + messages: [{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 4000)}` }], + max_tokens: 300, + think: false, + max_continuations: 2, + }); + cloudCallsTotal += 1 + result.continuation_retries; + } catch (e) { + // Absolute last resort — fabricate a skeleton result + result = { + content: `[task failed after ${MAX_TASK_RETRIES} retries + local fallback: ${(e as Error).message}]`, + prompt_tokens: 0, + completion_tokens: 0, + continuation_retries: 0, + finish_reason: "error", + }; + } } if (result.content.length === 0) { errorsRecovered.push("even rescue returned empty — last-ditch local fallback"); @@ -327,6 +418,8 @@ async function runIteration( cloud_calls_total: cloudCallsTotal, continuation_retries: result.continuation_retries, rescue_triggered: rescueTriggered, + task_attempts_made: taskAttemptHistory.length + 1, // +1 for the successful attempt + task_retry_history: taskAttemptHistory, playbook_id, tokens_prompt: result.prompt_tokens, tokens_completion: result.completion_tokens, @@ -402,6 +495,9 @@ async function main() { rescues_triggered: results.filter(r => r.rescue_triggered).length, iter6_received_prior_ids: results[5]?.citations_from_prior_iterations.length ?? 0, iter6_actually_cited_in_answer: citationsHonored, + iter3_task_attempts: results[2]?.task_attempts_made ?? 0, + iter3_task_retries: results[2]?.task_retry_history.length ?? 0, + max_task_attempts_any_iter: Math.max(...results.map(r => r.task_attempts_made)), total_duration_ms: results.reduce((s, r) => s + r.duration_ms, 0), overall: results.length === 6 && results.every(r => r.playbook_id !== null) ? "PASS" : "PARTIAL", }; @@ -411,7 +507,8 @@ async function main() { log(`══════ SUMMARY ${summary.overall} ══════`); log(` 6 iterations, ${summary.total_cloud_calls} cloud calls, ${summary.total_errors_recovered} errors recovered`); log(` tree-splits: ${summary.tree_splits_fired}/6 continuations: ${summary.total_continuation_retries} rescues: ${summary.rescues_triggered}`); - log(` iter 6 cited ${summary.iter6_cited_prior} prior iteration playbooks`); + log(` iter 6 received ${summary.iter6_received_prior_ids} prior IDs, cited ${summary.iter6_actually_cited_in_answer} [pb:...] markers in its answer`); + log(` iter 3 task-retry loop: ${summary.iter3_task_attempts} attempts (${summary.iter3_task_retries} prior-failure retries before success)`); log(` total duration: ${(summary.total_duration_ms / 1000).toFixed(1)}s`); log(""); for (const r of results) {