Audit pipeline PR #9: determinism + fact extraction + verifier gate + KB stats #9

Merged
profit merged 34 commits from test/enrich-prd-pipeline into main 2026-04-23 05:29:39 +00:00
Showing only changes of commit 6d6a306d4e - Show all commits

View File

@ -17,7 +17,34 @@ const CHUNK_SIZE = 800; // chars per chunk — ~200 tokens
const CHUNK_OVERLAP = 120;
const TOP_K_RETRIEVE = 12; // chunks per iteration — pulled up to force overflow
const CONTEXT_BUDGET_CHARS = 4000; // tight budget — forces tree-split on every iter
const INJECT_FAIL_ON_ITER = 3; // plant a bad primary-cloud call so rescue fires
const INJECT_FAIL_ON_ITER = 3; // force the TASK-retry loop on iter 3
// Continuation controls (per-cloud-call) — used for output-overflow.
// Separate from the task-retry loop (per-task) — that handles errors
// across attempts.
const PRIMARY_MAX_TOKENS = 150; // tight — forces truncation
const CONTINUATION_MAX_TOKENS = 300; // each continuation doubles headroom
const MAX_CONTINUATIONS = 6; // max stitch pieces per cloud call
// Task-level retry loop (J's clarification, 2026-04-22):
// When a TASK errors, retry the whole task up to 6 times. Each
// retry gets prior attempts' failures injected as learning context,
// so attempt N+1 is informed by what N failed at. The loop caps at
// 6 to avoid infinite spinning on genuinely unsolvable tasks.
const MAX_TASK_RETRIES = 6;
// To FORCE the retry loop on iter INJECT_FAIL_ON_ITER, cycle through
// 5 deliberately-invalid models + 1 valid one. Attempts 1-5 will
// 502/404 from Ollama Cloud; attempt 6 finally succeeds. Proves the
// loop fires all 6 with compounding failure context.
const FORCE_RETRY_MODEL_SEQUENCE = [
"deliberately-invalid-model-attempt-1",
"deliberately-invalid-model-attempt-2",
"deliberately-invalid-model-attempt-3",
"deliberately-invalid-model-attempt-4",
"deliberately-invalid-model-attempt-5",
"gpt-oss:20b", // 6th attempt succeeds
];
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const CLOUD_MODEL = "gpt-oss:120b";
@ -47,6 +74,9 @@ interface IterationResult {
cloud_calls_total: number;
continuation_retries: number;
rescue_triggered: boolean;
// Task-level retry telemetry
task_attempts_made: number; // how many attempts fired (1 = first succeeded)
task_retry_history: Array<{ n: number; model: string; error: string }>;
playbook_id: string | null;
tokens_prompt: number;
tokens_completion: number;
@ -155,32 +185,50 @@ async function treeSplitSummarize(
return { scratchpad, cloud_calls };
}
// ─── Continuable generate — retries once on empty/truncated ───────
// ─── Continuable generate — up to max_continuations stitches ──────
//
// Two failure modes handled:
// A) Empty response — typically thinking model burned the budget
// on hidden reasoning. Retry with 2× max_tokens.
// B) Truncated response (finish_reason=length) — answer got cut off
// mid-sentence. Pass the partial back as scratchpad and ask the
// model to continue from where it stopped.
//
// Stitching: keep appending content across retries; prompt_tokens and
// completion_tokens accumulate; finish_reason reflects the LAST call.
// Loop exits on the first call that finishes cleanly (stop) with
// non-empty content, OR when retries hit the cap.
async function generateContinuable(
opts: Parameters<typeof chat>[0] & { max_continuations?: number },
): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number }> {
): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number; finish_reason: string }> {
const maxCont = opts.max_continuations ?? 1;
let total = await chat(opts);
let retries = 0;
if (total.content.length === 0 || total.finish_reason === "length") {
for (let i = 0; i < maxCont && (total.content.length === 0 || total.finish_reason === "length"); i++) {
retries += 1;
log(` continuation retry ${retries} (reason: ${total.finish_reason}, content=${total.content.length})`);
const continued = await chat({
...opts,
max_tokens: opts.max_tokens * 2,
messages: [
while (retries < maxCont && (total.content.length === 0 || total.finish_reason === "length")) {
retries += 1;
const mode = total.content.length === 0 ? "empty" : "truncated";
log(` continuation retry ${retries}/${maxCont} (${mode}: finish=${total.finish_reason}, content=${total.content.length} chars)`);
// Continuation prompt — branch on failure mode:
// empty → retry with 2× tokens, same prompt (thinking budget)
// length → pass the partial as assistant turn, ask to continue
const continuationMessages = total.content.length === 0
? opts.messages
: [
...opts.messages,
...(total.content ? [{ role: "assistant", content: total.content }, { role: "user", content: "Continue from where you stopped. Complete the answer." }] : []),
],
});
total = {
content: total.content + continued.content,
prompt_tokens: total.prompt_tokens + continued.prompt_tokens,
completion_tokens: total.completion_tokens + continued.completion_tokens,
finish_reason: continued.finish_reason,
};
}
{ role: "assistant", content: total.content },
{ role: "user", content: "Continue from exactly where you stopped. Do not repeat. Finish the answer." },
];
const continued = await chat({
...opts,
max_tokens: CONTINUATION_MAX_TOKENS,
messages: continuationMessages,
});
total = {
content: total.content + continued.content,
prompt_tokens: total.prompt_tokens + continued.prompt_tokens,
completion_tokens: total.completion_tokens + continued.completion_tokens,
finish_reason: continued.finish_reason,
};
}
return { ...total, continuation_retries: retries };
}
@ -237,47 +285,90 @@ async function runIteration(
citationsReceived = priorPlaybookIds.slice();
}
// 5. Call cloud. On empty/error → rescue.
// Intentional failure-injection on iter INJECT_FAIL_ON_ITER:
// a deliberately-invalid model name on the primary call so the
// rescue path actually runs. Proves the catch-and-rescue isn't
// dead code.
let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number };
// 5. TASK-LEVEL RETRY LOOP — per J's clarification 2026-04-22.
// Try the task up to MAX_TASK_RETRIES times. Each retry:
// a) Picks a model (normally CLOUD_MODEL; on INJECT_FAIL_ON_ITER,
// cycles through 5 invalid models + 1 valid to force full loop)
// b) Injects prior attempt errors as learning context
// c) If the attempt succeeds (non-empty, >100 chars), loop exits
// d) Otherwise, records failure and tries again with the learning
//
// Cap at 6 so we don't spin forever on unsolvable tasks.
let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number; finish_reason: string } | null = null;
let rescueTriggered = false;
const primaryModel = iteration === INJECT_FAIL_ON_ITER
? "deliberately-invalid-model-to-force-rescue"
: CLOUD_MODEL;
if (iteration === INJECT_FAIL_ON_ITER) {
log(` INJECTED FAILURE on primary call — model="${primaryModel}" will 400/500`);
const taskAttemptHistory: Array<{ n: number; model: string; error: string }> = [];
const forceRetries = iteration === INJECT_FAIL_ON_ITER;
if (forceRetries) log(` FORCING TASK-RETRY LOOP — iter ${iteration} will cycle through 5 invalid models + 1 valid`);
for (let attempt = 1; attempt <= MAX_TASK_RETRIES; attempt++) {
const modelForAttempt = forceRetries
? FORCE_RETRY_MODEL_SEQUENCE[attempt - 1]
: CLOUD_MODEL;
// Compose a prior-attempts learning block for attempts 2+
const learningBlock = taskAttemptHistory.length > 0
? `\n\n═══ PRIOR ATTEMPTS THIS TASK (do NOT repeat these failures; adjust approach) ═══\n${taskAttemptHistory.map(a => `Attempt ${a.n} (model ${a.model}) failed: ${a.error.slice(0, 160)}`).join("\n")}\n═══ end prior attempts ═══\n`
: "";
log(` task attempt ${attempt}/${MAX_TASK_RETRIES}: model=${modelForAttempt}${learningBlock ? " [with prior-failure context]" : ""}`);
try {
const r = await generateContinuable({
provider: "ollama_cloud",
model: modelForAttempt,
messages: [
{ role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Write a detailed 250-word answer." },
{ role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}${learningBlock}` },
],
max_tokens: PRIMARY_MAX_TOKENS,
think: false,
max_continuations: MAX_CONTINUATIONS,
});
cloudCallsTotal += 1 + r.continuation_retries;
if (r.content && r.content.length > 100) {
// Acceptable answer — exit loop
result = r;
if (attempt > 1) {
log(` task attempt ${attempt} SUCCEEDED (${r.content.length} chars) after ${attempt - 1} prior failures`);
rescueTriggered = true;
}
break;
}
// Thin response — count as failure with learning signal
const err = `thin-answer: ${r.content.length} chars, finish=${r.finish_reason}`;
taskAttemptHistory.push({ n: attempt, model: modelForAttempt, error: err });
errorsRecovered.push(`attempt ${attempt}: ${err}`);
} catch (e) {
const err = (e as Error).message;
taskAttemptHistory.push({ n: attempt, model: modelForAttempt, error: err });
errorsRecovered.push(`attempt ${attempt}: ${err.slice(0, 120)}`);
cloudCallsTotal += 1;
}
}
try {
result = await generateContinuable({
provider: "ollama_cloud",
model: primaryModel,
messages: [
{ role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Keep answers under 200 words." },
{ role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}` },
],
max_tokens: 800,
think: true,
max_continuations: 2,
});
cloudCallsTotal += 1 + result.continuation_retries;
} catch (e) {
errorsRecovered.push(`primary cloud call: ${(e as Error).message}`);
// Last-ditch: if all 6 task attempts failed, try the local fallback
// once more so we at least return SOMETHING. This is the "don't get
// caught in a loop, accept best-so-far" rule J stated explicitly.
if (!result) {
errorsRecovered.push(`all ${MAX_TASK_RETRIES} task attempts failed — local fallback`);
rescueTriggered = true;
log(` primary failed → rescue with ${RESCUE_MODEL}`);
result = await generateContinuable({
provider: "ollama_cloud",
model: RESCUE_MODEL,
messages: [
{ role: "system", content: "Answer briefly using the source material." },
{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 8000)}` },
],
max_tokens: 500,
think: false,
});
cloudCallsTotal += 1 + result.continuation_retries;
try {
result = await generateContinuable({
provider: "ollama",
model: "qwen3.5:latest",
messages: [{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 4000)}` }],
max_tokens: 300,
think: false,
max_continuations: 2,
});
cloudCallsTotal += 1 + result.continuation_retries;
} catch (e) {
// Absolute last resort — fabricate a skeleton result
result = {
content: `[task failed after ${MAX_TASK_RETRIES} retries + local fallback: ${(e as Error).message}]`,
prompt_tokens: 0,
completion_tokens: 0,
continuation_retries: 0,
finish_reason: "error",
};
}
}
if (result.content.length === 0) {
errorsRecovered.push("even rescue returned empty — last-ditch local fallback");
@ -327,6 +418,8 @@ async function runIteration(
cloud_calls_total: cloudCallsTotal,
continuation_retries: result.continuation_retries,
rescue_triggered: rescueTriggered,
task_attempts_made: taskAttemptHistory.length + 1, // +1 for the successful attempt
task_retry_history: taskAttemptHistory,
playbook_id,
tokens_prompt: result.prompt_tokens,
tokens_completion: result.completion_tokens,
@ -402,6 +495,9 @@ async function main() {
rescues_triggered: results.filter(r => r.rescue_triggered).length,
iter6_received_prior_ids: results[5]?.citations_from_prior_iterations.length ?? 0,
iter6_actually_cited_in_answer: citationsHonored,
iter3_task_attempts: results[2]?.task_attempts_made ?? 0,
iter3_task_retries: results[2]?.task_retry_history.length ?? 0,
max_task_attempts_any_iter: Math.max(...results.map(r => r.task_attempts_made)),
total_duration_ms: results.reduce((s, r) => s + r.duration_ms, 0),
overall: results.length === 6 && results.every(r => r.playbook_id !== null) ? "PASS" : "PARTIAL",
};
@ -411,7 +507,8 @@ async function main() {
log(`══════ SUMMARY ${summary.overall} ══════`);
log(` 6 iterations, ${summary.total_cloud_calls} cloud calls, ${summary.total_errors_recovered} errors recovered`);
log(` tree-splits: ${summary.tree_splits_fired}/6 continuations: ${summary.total_continuation_retries} rescues: ${summary.rescues_triggered}`);
log(` iter 6 cited ${summary.iter6_cited_prior} prior iteration playbooks`);
log(` iter 6 received ${summary.iter6_received_prior_ids} prior IDs, cited ${summary.iter6_actually_cited_in_answer} [pb:...] markers in its answer`);
log(` iter 3 task-retry loop: ${summary.iter3_task_attempts} attempts (${summary.iter3_task_retries} prior-failure retries before success)`);
log(` total duration: ${(summary.total_duration_ms / 1000).toFixed(1)}s`);
log("");
for (const r of results) {