// Real-world architecture stress test — 6 iterations of the full pipeline // against the PRD as a corpus. Goal: prove at scale what Phase 21 // promised (context continuation + tree-split), plus Phase 19 // compounding across iterations. // // Run: bun run tests/real-world/enrich_prd_pipeline.ts // // No mocks. No skipped layers. On any error, the test triggers // cloud-rescue rather than fail — it's the architecture's job to // recover. The test FAILS only if we can't complete 6 iterations. import { readFile, writeFile, mkdir } from "node:fs/promises"; import { createHash } from "node:crypto"; const PRD_PATH = "/home/profit/lakehouse/docs/PRD.md"; const CHUNK_SIZE = 800; // chars per chunk — ~200 tokens const CHUNK_OVERLAP = 120; const TOP_K_RETRIEVE = 12; // chunks per iteration — pulled up to force overflow const CONTEXT_BUDGET_CHARS = 4000; // tight budget — forces tree-split on every iter const INJECT_FAIL_ON_ITER = 3; // force the TASK-retry loop on iter 3 // Continuation controls (per-cloud-call) — used for output-overflow. // Separate from the task-retry loop (per-task) — that handles errors // across attempts. const PRIMARY_MAX_TOKENS = 150; // tight — forces truncation const CONTINUATION_MAX_TOKENS = 300; // each continuation doubles headroom const MAX_CONTINUATIONS = 6; // max stitch pieces per cloud call // Task-level retry loop (J's clarification, 2026-04-22): // When a TASK errors, retry the whole task up to 6 times. Each // retry gets prior attempts' failures injected as learning context, // so attempt N+1 is informed by what N failed at. The loop caps at // 6 to avoid infinite spinning on genuinely unsolvable tasks. const MAX_TASK_RETRIES = 6; // To FORCE the retry loop on iter INJECT_FAIL_ON_ITER, cycle through // 5 deliberately-invalid models + 1 valid one. Attempts 1-5 will // 502/404 from Ollama Cloud; attempt 6 finally succeeds. Proves the // loop fires all 6 with compounding failure context. const FORCE_RETRY_MODEL_SEQUENCE = [ "deliberately-invalid-model-attempt-1", "deliberately-invalid-model-attempt-2", "deliberately-invalid-model-attempt-3", "deliberately-invalid-model-attempt-4", "deliberately-invalid-model-attempt-5", "gpt-oss:20b", // 6th attempt succeeds ]; const GATEWAY = "http://localhost:3100"; const SIDECAR = "http://localhost:3200"; const CLOUD_MODEL = "gpt-oss:120b"; const RESCUE_MODEL = "gpt-oss:20b"; // fallback local cloud model via sidecar const RUN_NONCE = Date.now().toString(36); const OUT_DIR = `/home/profit/lakehouse/tests/real-world/runs/${RUN_NONCE}`; // The 6 progressively-compounding questions. #6 explicitly requires // synthesis across prior 5 answers. const QUESTIONS: string[] = [ "Summarize the Lakehouse project's one-paragraph thesis: what problem does it solve, what's the unique approach?", "How does Phase 19 playbook memory turn successful fills into a signal that boosts future rankings?", "Explain the role of Phase 24 observer in the learning loop — what does it see, and what does it feed into?", "What's the VRAM-aware profile swap mechanism in Phase 17, and why does it matter for multi-model serving?", "How do Phase 25 validity windows and Phase 27 playbook versioning interact when a schema drifts?", "Synthesize the prior 5 answers: how do the pieces (playbook memory, observer, profile swap, validity windows, versioning) compose into a system that measurably gets smarter over time? Cite specific prior answers.", ]; type Chunk = { id: string; text: string; embedding: number[]; offset: number }; interface IterationResult { iteration: number; question: string; retrieval_top_k: number; context_chars_before_budget: number; tree_split_fired: boolean; cloud_calls_total: number; continuation_retries: number; rescue_triggered: boolean; // Task-level retry telemetry task_attempts_made: number; // how many attempts fired (1 = first succeeded) task_retry_history: Array<{ n: number; model: string; error: string }>; playbook_id: string | null; tokens_prompt: number; tokens_completion: number; citations_from_prior_iterations: string[]; duration_ms: number; answer_preview: string; errors_recovered: string[]; } function log(msg: string) { console.log(`[enrich] ${msg}`); } function sleep(ms: number) { return new Promise(r => setTimeout(r, ms)); } function cosine(a: number[], b: number[]): number { let dot = 0, na = 0, nb = 0; for (let i = 0; i < a.length; i++) { dot += a[i] * b[i]; na += a[i] * a[i]; nb += b[i] * b[i]; } if (na === 0 || nb === 0) return 0; return dot / (Math.sqrt(na) * Math.sqrt(nb)); } function hash(s: string): string { return createHash("sha256").update(s).digest("hex").slice(0, 10); } async function embedBatch(texts: string[]): Promise { // Sidecar /embed accepts a list. On partial failure, retry individually. const r = await fetch(`${SIDECAR}/embed`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ texts }), signal: AbortSignal.timeout(120000), }); if (!r.ok) throw new Error(`embed batch ${r.status}: ${await r.text()}`); const j: any = await r.json(); return j.embeddings; } function chunkText(text: string): Array<{ text: string; offset: number }> { const out: Array<{ text: string; offset: number }> = []; let i = 0; while (i < text.length) { const end = Math.min(i + CHUNK_SIZE, text.length); const slice = text.slice(i, end).trim(); if (slice.length > 50) out.push({ text: slice, offset: i }); if (end >= text.length) break; i = end - CHUNK_OVERLAP; } return out; } async function chat(opts: { provider: "ollama" | "ollama_cloud", model: string, messages: Array<{ role: string; content: string }>, max_tokens: number, think: boolean, }): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; finish_reason: string }> { const r = await fetch(`${GATEWAY}/v1/chat`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ ...opts }), signal: AbortSignal.timeout(180000), }); if (!r.ok) throw new Error(`/v1/chat ${r.status}: ${await r.text()}`); const j: any = await r.json(); return { content: j.choices?.[0]?.message?.content ?? "", prompt_tokens: j.usage?.prompt_tokens ?? 0, completion_tokens: j.usage?.completion_tokens ?? 0, finish_reason: j.choices?.[0]?.finish_reason ?? "?", }; } // ─── Tree-split over oversized chunk set ────────────────────────── async function treeSplitSummarize( chunks: Chunk[], question: string, ): Promise<{ scratchpad: string; cloud_calls: number }> { // Shard into groups fitting within half the budget each. const perShard = Math.max(1, Math.floor((CONTEXT_BUDGET_CHARS / 2) / CHUNK_SIZE)); const shards: Chunk[][] = []; for (let i = 0; i < chunks.length; i += perShard) { shards.push(chunks.slice(i, i + perShard)); } log(` tree-split: ${chunks.length} chunks → ${shards.length} shards of up to ${perShard}`); let scratchpad = ""; let cloud_calls = 0; for (let si = 0; si < shards.length; si++) { const shard = shards[si]; const shardText = shard.map(c => `[chunk @${c.offset}]\n${c.text}`).join("\n\n"); const userMsg = `Question: ${question}\n\nShard ${si + 1}/${shards.length} of source material:\n\n${shardText}\n\nScratchpad so far:\n${scratchpad || "(empty)"}\n\nUpdate the scratchpad: extract only facts from THIS shard that help answer the question. Be terse. No prose.`; const r = await chat({ provider: "ollama_cloud", model: CLOUD_MODEL, messages: [ { role: "system", content: "You maintain a concise factual scratchpad across multiple shards of source text. No prose outside the scratchpad. Each shard, append ≤80 words of relevant facts." }, { role: "user", content: userMsg }, ], max_tokens: 500, think: false, }); cloud_calls += 1; scratchpad += `\n--- shard ${si + 1} notes ---\n${r.content.trim()}`; if (scratchpad.length > CONTEXT_BUDGET_CHARS) { // truncate oldest halves scratchpad = scratchpad.slice(-CONTEXT_BUDGET_CHARS); log(` tree-split: scratchpad truncated to ${scratchpad.length} chars`); } } return { scratchpad, cloud_calls }; } // ─── Continuable generate — up to max_continuations stitches ────── // // Two failure modes handled: // A) Empty response — typically thinking model burned the budget // on hidden reasoning. Retry with 2× max_tokens. // B) Truncated response (finish_reason=length) — answer got cut off // mid-sentence. Pass the partial back as scratchpad and ask the // model to continue from where it stopped. // // Stitching: keep appending content across retries; prompt_tokens and // completion_tokens accumulate; finish_reason reflects the LAST call. // Loop exits on the first call that finishes cleanly (stop) with // non-empty content, OR when retries hit the cap. async function generateContinuable( opts: Parameters[0] & { max_continuations?: number }, ): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number; finish_reason: string }> { const maxCont = opts.max_continuations ?? 1; let total = await chat(opts); let retries = 0; while (retries < maxCont && (total.content.length === 0 || total.finish_reason === "length")) { retries += 1; const mode = total.content.length === 0 ? "empty" : "truncated"; log(` continuation retry ${retries}/${maxCont} (${mode}: finish=${total.finish_reason}, content=${total.content.length} chars)`); // Continuation prompt — branch on failure mode: // empty → retry with 2× tokens, same prompt (thinking budget) // length → pass the partial as assistant turn, ask to continue const continuationMessages = total.content.length === 0 ? opts.messages : [ ...opts.messages, { role: "assistant", content: total.content }, { role: "user", content: "Continue from exactly where you stopped. Do not repeat. Finish the answer." }, ]; const continued = await chat({ ...opts, max_tokens: CONTINUATION_MAX_TOKENS, messages: continuationMessages, }); total = { content: total.content + continued.content, prompt_tokens: total.prompt_tokens + continued.prompt_tokens, completion_tokens: total.completion_tokens + continued.completion_tokens, finish_reason: continued.finish_reason, }; } return { ...total, continuation_retries: retries }; } // ─── Single iteration: retrieve → budget-check → chat → seed ───── async function runIteration( iteration: number, question: string, allChunks: Chunk[], priorPlaybookIds: string[], priorAnswers: string[], ): Promise { const started = Date.now(); const errorsRecovered: string[] = []; log(`iter ${iteration}: "${question.slice(0, 70)}..."`); // 1. Embed the question const qEmb = (await embedBatch([question]))[0]; // 2. Retrieve top-K chunks by cosine const scored = allChunks .map(c => ({ c, score: cosine(qEmb, c.embedding) })) .sort((a, b) => b.score - a.score) .slice(0, TOP_K_RETRIEVE); const chunks = scored.map(x => x.c); log(` retrieved top ${chunks.length} chunks (score range ${scored[0].score.toFixed(3)} .. ${scored[scored.length - 1].score.toFixed(3)})`); // 3. Context budget check — tree-split if over const contextChars = chunks.map(c => c.text).join("\n\n").length; let contextForPrompt: string; let treeSplit = false; let cloudCallsTotal = 0; if (contextChars > CONTEXT_BUDGET_CHARS) { treeSplit = true; log(` context ${contextChars} chars > budget ${CONTEXT_BUDGET_CHARS} → tree-split`); const { scratchpad, cloud_calls } = await treeSplitSummarize(chunks, question); contextForPrompt = `Distilled scratchpad from ${chunks.length} source chunks (too large to fit directly):\n${scratchpad}`; cloudCallsTotal += cloud_calls; } else { contextForPrompt = chunks.map(c => `[chunk @${c.offset}]\n${c.text}`).join("\n\n"); } // 4. Seed prompt with prior iteration answers (real compounding). // Not just IDs — the model needs the CONTENT to synthesize. let citationBlock = ""; let citationsReceived: string[] = []; if (priorPlaybookIds.length > 0 && priorAnswers.length > 0) { const lines = priorAnswers.map((ans, i) => { const pid = priorPlaybookIds[i]?.slice(0, 12) ?? "unknown"; // Trim each prior answer to ~400 chars so we don't blow budget return `[pb:${pid}] iter ${i + 1} answer:\n${ans.slice(0, 400)}\n`; }); citationBlock = `\n\n═══ PRIOR ITERATIONS (compounding context) ═══\n${lines.join("\n")}═══ end prior iterations ═══\n\nYour answer MUST cite specific prior iterations using [pb:ID] notation when drawing on them. Synthesis questions require explicit cross-iteration reasoning.`; citationsReceived = priorPlaybookIds.slice(); } // 5. TASK-LEVEL RETRY LOOP — per J's clarification 2026-04-22. // Try the task up to MAX_TASK_RETRIES times. Each retry: // a) Picks a model (normally CLOUD_MODEL; on INJECT_FAIL_ON_ITER, // cycles through 5 invalid models + 1 valid to force full loop) // b) Injects prior attempt errors as learning context // c) If the attempt succeeds (non-empty, >100 chars), loop exits // d) Otherwise, records failure and tries again with the learning // // Cap at 6 so we don't spin forever on unsolvable tasks. let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number; finish_reason: string } | null = null; let rescueTriggered = false; const taskAttemptHistory: Array<{ n: number; model: string; error: string }> = []; const forceRetries = iteration === INJECT_FAIL_ON_ITER; if (forceRetries) log(` FORCING TASK-RETRY LOOP — iter ${iteration} will cycle through 5 invalid models + 1 valid`); for (let attempt = 1; attempt <= MAX_TASK_RETRIES; attempt++) { const modelForAttempt = forceRetries ? FORCE_RETRY_MODEL_SEQUENCE[attempt - 1] : CLOUD_MODEL; // Compose a prior-attempts learning block for attempts 2+ const learningBlock = taskAttemptHistory.length > 0 ? `\n\n═══ PRIOR ATTEMPTS THIS TASK (do NOT repeat these failures; adjust approach) ═══\n${taskAttemptHistory.map(a => `Attempt ${a.n} (model ${a.model}) failed: ${a.error.slice(0, 160)}`).join("\n")}\n═══ end prior attempts ═══\n` : ""; log(` task attempt ${attempt}/${MAX_TASK_RETRIES}: model=${modelForAttempt}${learningBlock ? " [with prior-failure context]" : ""}`); try { const r = await generateContinuable({ provider: "ollama_cloud", model: modelForAttempt, messages: [ { role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Write a detailed 250-word answer." }, { role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}${learningBlock}` }, ], max_tokens: PRIMARY_MAX_TOKENS, think: false, max_continuations: MAX_CONTINUATIONS, }); cloudCallsTotal += 1 + r.continuation_retries; if (r.content && r.content.length > 100) { // Acceptable answer — exit loop result = r; if (attempt > 1) { log(` task attempt ${attempt} SUCCEEDED (${r.content.length} chars) after ${attempt - 1} prior failures`); rescueTriggered = true; } break; } // Thin response — count as failure with learning signal const err = `thin-answer: ${r.content.length} chars, finish=${r.finish_reason}`; taskAttemptHistory.push({ n: attempt, model: modelForAttempt, error: err }); errorsRecovered.push(`attempt ${attempt}: ${err}`); } catch (e) { const err = (e as Error).message; taskAttemptHistory.push({ n: attempt, model: modelForAttempt, error: err }); errorsRecovered.push(`attempt ${attempt}: ${err.slice(0, 120)}`); cloudCallsTotal += 1; } } // Last-ditch: if all 6 task attempts failed, try the local fallback // once more so we at least return SOMETHING. This is the "don't get // caught in a loop, accept best-so-far" rule J stated explicitly. if (!result) { errorsRecovered.push(`all ${MAX_TASK_RETRIES} task attempts failed — local fallback`); rescueTriggered = true; try { result = await generateContinuable({ provider: "ollama", model: "qwen3.5:latest", messages: [{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 4000)}` }], max_tokens: 300, think: false, max_continuations: 2, }); cloudCallsTotal += 1 + result.continuation_retries; } catch (e) { // Absolute last resort — fabricate a skeleton result result = { content: `[task failed after ${MAX_TASK_RETRIES} retries + local fallback: ${(e as Error).message}]`, prompt_tokens: 0, completion_tokens: 0, continuation_retries: 0, finish_reason: "error", }; } } if (result.content.length === 0) { errorsRecovered.push("even rescue returned empty — last-ditch local fallback"); rescueTriggered = true; result = await generateContinuable({ provider: "ollama", model: "qwen3.5:latest", messages: [{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 4000)}` }], max_tokens: 300, think: false, }); cloudCallsTotal += 1; } // 6. Seed playbook with the answer let playbook_id: string | null = null; try { const ts = new Date().toISOString(); const seedOp = `TEST: enrich_prd_run_${RUN_NONCE} iter${iteration} in Corpus, PRD`; const r = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ operation: seedOp, approach: `q="${question.slice(0, 80)}" context_chars=${contextChars} tree_split=${treeSplit}`, context: result.content.slice(0, 600), endorsed_names: [`iter${iteration}_${RUN_NONCE}`], append: true, }), signal: AbortSignal.timeout(15000), }); if (r.ok) { const j: any = await r.json(); playbook_id = j.outcome?.playbook_id ?? null; } else { errorsRecovered.push(`seed ${r.status}: ${(await r.text()).slice(0, 100)}`); } } catch (e) { errorsRecovered.push(`seed exception: ${(e as Error).message}`); } return { iteration, question, retrieval_top_k: chunks.length, context_chars_before_budget: contextChars, tree_split_fired: treeSplit, cloud_calls_total: cloudCallsTotal, continuation_retries: result.continuation_retries, rescue_triggered: rescueTriggered, task_attempts_made: taskAttemptHistory.length + 1, // +1 for the successful attempt task_retry_history: taskAttemptHistory, playbook_id, tokens_prompt: result.prompt_tokens, tokens_completion: result.completion_tokens, citations_from_prior_iterations: citationsReceived, duration_ms: Date.now() - started, answer_preview: result.content.slice(0, 500), errors_recovered: errorsRecovered, }; } async function main() { await mkdir(OUT_DIR, { recursive: true }); log(`run nonce: ${RUN_NONCE}`); log(`output dir: ${OUT_DIR}`); // ─── Phase 1: load, chunk, embed the PRD ─────────────────────── log(`loading PRD from ${PRD_PATH}`); const prd = await readFile(PRD_PATH, "utf8"); log(`PRD: ${prd.length} chars, ${prd.split("\n").length} lines`); const raw_chunks = chunkText(prd); log(`chunked into ${raw_chunks.length} pieces (size ${CHUNK_SIZE}, overlap ${CHUNK_OVERLAP})`); // Embed in batches of 32 to avoid sidecar overload const allChunks: Chunk[] = []; const BATCH = 32; const t0 = Date.now(); for (let i = 0; i < raw_chunks.length; i += BATCH) { const batch = raw_chunks.slice(i, i + BATCH); const embs = await embedBatch(batch.map(b => b.text)); for (let j = 0; j < batch.length; j++) { allChunks.push({ id: hash(batch[j].text), text: batch[j].text, embedding: embs[j].map(x => Number(x)), offset: batch[j].offset, }); } log(` embedded ${allChunks.length}/${raw_chunks.length}`); } log(`embedded all ${allChunks.length} chunks in ${((Date.now() - t0) / 1000).toFixed(1)}s`); // ─── Phase 2: 6 iterations ───────────────────────────────────── const results: IterationResult[] = []; const priorIds: string[] = []; const priorAnswers: string[] = []; for (let i = 1; i <= QUESTIONS.length; i++) { const q = QUESTIONS[i - 1]; const r = await runIteration(i, q, allChunks, priorIds, priorAnswers); results.push(r); if (r.playbook_id) priorIds.push(r.playbook_id); priorAnswers.push(r.answer_preview); log(` → iter ${i}: ${r.errors_recovered.length} errors recovered, ${r.continuation_retries} continuations, tree-split=${r.tree_split_fired}, rescue=${r.rescue_triggered}, ${r.duration_ms}ms`); await writeFile(`${OUT_DIR}/iter_${i}.json`, JSON.stringify(r, null, 2)); } // Check whether iter 6 actually cited prior pb:IDs in its answer. // Playbook IDs look like `pb-seed-` so the regex needs to allow // hyphens + letters inside the brackets, not just hex chars. const iter6 = results[5]; const citationsHonored = iter6 ? (iter6.answer_preview.match(/\[pb:[\w-]+\]/gi)?.length ?? 0) : 0; // ─── Phase 3: summary ────────────────────────────────────────── const summary = { run_nonce: RUN_NONCE, ran_at: new Date().toISOString(), prd_chars: prd.length, prd_chunks: allChunks.length, iterations: results.length, total_cloud_calls: results.reduce((s, r) => s + r.cloud_calls_total, 0), total_continuation_retries: results.reduce((s, r) => s + r.continuation_retries, 0), total_errors_recovered: results.reduce((s, r) => s + r.errors_recovered.length, 0), tree_splits_fired: results.filter(r => r.tree_split_fired).length, rescues_triggered: results.filter(r => r.rescue_triggered).length, iter6_received_prior_ids: results[5]?.citations_from_prior_iterations.length ?? 0, iter6_actually_cited_in_answer: citationsHonored, iter3_task_attempts: results[2]?.task_attempts_made ?? 0, iter3_task_retries: results[2]?.task_retry_history.length ?? 0, max_task_attempts_any_iter: Math.max(...results.map(r => r.task_attempts_made)), total_duration_ms: results.reduce((s, r) => s + r.duration_ms, 0), overall: results.length === 6 && results.every(r => r.playbook_id !== null) ? "PASS" : "PARTIAL", }; await writeFile(`${OUT_DIR}/summary.json`, JSON.stringify(summary, null, 2)); log(""); log(`══════ SUMMARY ${summary.overall} ══════`); log(` 6 iterations, ${summary.total_cloud_calls} cloud calls, ${summary.total_errors_recovered} errors recovered`); log(` tree-splits: ${summary.tree_splits_fired}/6 continuations: ${summary.total_continuation_retries} rescues: ${summary.rescues_triggered}`); log(` iter 6 received ${summary.iter6_received_prior_ids} prior IDs, cited ${summary.iter6_actually_cited_in_answer} [pb:...] markers in its answer`); log(` iter 3 task-retry loop: ${summary.iter3_task_attempts} attempts (${summary.iter3_task_retries} prior-failure retries before success)`); log(` total duration: ${(summary.total_duration_ms / 1000).toFixed(1)}s`); log(""); for (const r of results) { const flags = [ r.tree_split_fired ? "tree-split" : "", r.continuation_retries > 0 ? `cont=${r.continuation_retries}` : "", r.rescue_triggered ? "rescued" : "", r.errors_recovered.length > 0 ? `err=${r.errors_recovered.length}` : "", ].filter(Boolean).join(" "); log(` iter ${r.iteration}: ${r.tokens_prompt}+${r.tokens_completion} tok, ${r.duration_ms}ms ${flags ? `[${flags}]` : ""}`); } log(""); log(`artifacts: ${OUT_DIR}/{iter_1..6.json, summary.json}`); process.exit(summary.overall === "PASS" ? 0 : 1); } main().catch(e => { console.error("[enrich] fatal:", e); process.exit(2); });