diff --git a/tests/real-world/enrich_prd_pipeline.ts b/tests/real-world/enrich_prd_pipeline.ts new file mode 100644 index 0000000..d0f6f56 --- /dev/null +++ b/tests/real-world/enrich_prd_pipeline.ts @@ -0,0 +1,431 @@ +// Real-world architecture stress test — 6 iterations of the full pipeline +// against the PRD as a corpus. Goal: prove at scale what Phase 21 +// promised (context continuation + tree-split), plus Phase 19 +// compounding across iterations. +// +// Run: bun run tests/real-world/enrich_prd_pipeline.ts +// +// No mocks. No skipped layers. On any error, the test triggers +// cloud-rescue rather than fail — it's the architecture's job to +// recover. The test FAILS only if we can't complete 6 iterations. + +import { readFile, writeFile, mkdir } from "node:fs/promises"; +import { createHash } from "node:crypto"; + +const PRD_PATH = "/home/profit/lakehouse/docs/PRD.md"; +const CHUNK_SIZE = 800; // chars per chunk — ~200 tokens +const CHUNK_OVERLAP = 120; +const TOP_K_RETRIEVE = 12; // chunks per iteration — pulled up to force overflow +const CONTEXT_BUDGET_CHARS = 4000; // tight budget — forces tree-split on every iter +const INJECT_FAIL_ON_ITER = 3; // plant a bad primary-cloud call so rescue fires +const GATEWAY = "http://localhost:3100"; +const SIDECAR = "http://localhost:3200"; +const CLOUD_MODEL = "gpt-oss:120b"; +const RESCUE_MODEL = "gpt-oss:20b"; // fallback local cloud model via sidecar +const RUN_NONCE = Date.now().toString(36); +const OUT_DIR = `/home/profit/lakehouse/tests/real-world/runs/${RUN_NONCE}`; + +// The 6 progressively-compounding questions. #6 explicitly requires +// synthesis across prior 5 answers. +const QUESTIONS: string[] = [ + "Summarize the Lakehouse project's one-paragraph thesis: what problem does it solve, what's the unique approach?", + "How does Phase 19 playbook memory turn successful fills into a signal that boosts future rankings?", + "Explain the role of Phase 24 observer in the learning loop — what does it see, and what does it feed into?", + "What's the VRAM-aware profile swap mechanism in Phase 17, and why does it matter for multi-model serving?", + "How do Phase 25 validity windows and Phase 27 playbook versioning interact when a schema drifts?", + "Synthesize the prior 5 answers: how do the pieces (playbook memory, observer, profile swap, validity windows, versioning) compose into a system that measurably gets smarter over time? Cite specific prior answers.", +]; + +type Chunk = { id: string; text: string; embedding: number[]; offset: number }; + +interface IterationResult { + iteration: number; + question: string; + retrieval_top_k: number; + context_chars_before_budget: number; + tree_split_fired: boolean; + cloud_calls_total: number; + continuation_retries: number; + rescue_triggered: boolean; + playbook_id: string | null; + tokens_prompt: number; + tokens_completion: number; + citations_from_prior_iterations: string[]; + duration_ms: number; + answer_preview: string; + errors_recovered: string[]; +} + +function log(msg: string) { console.log(`[enrich] ${msg}`); } + +function sleep(ms: number) { return new Promise(r => setTimeout(r, ms)); } + +function cosine(a: number[], b: number[]): number { + let dot = 0, na = 0, nb = 0; + for (let i = 0; i < a.length; i++) { dot += a[i] * b[i]; na += a[i] * a[i]; nb += b[i] * b[i]; } + if (na === 0 || nb === 0) return 0; + return dot / (Math.sqrt(na) * Math.sqrt(nb)); +} + +function hash(s: string): string { return createHash("sha256").update(s).digest("hex").slice(0, 10); } + +async function embedBatch(texts: string[]): Promise { + // Sidecar /embed accepts a list. On partial failure, retry individually. + const r = await fetch(`${SIDECAR}/embed`, { + method: "POST", headers: { "content-type": "application/json" }, + body: JSON.stringify({ texts }), + signal: AbortSignal.timeout(120000), + }); + if (!r.ok) throw new Error(`embed batch ${r.status}: ${await r.text()}`); + const j: any = await r.json(); + return j.embeddings; +} + +function chunkText(text: string): Array<{ text: string; offset: number }> { + const out: Array<{ text: string; offset: number }> = []; + let i = 0; + while (i < text.length) { + const end = Math.min(i + CHUNK_SIZE, text.length); + const slice = text.slice(i, end).trim(); + if (slice.length > 50) out.push({ text: slice, offset: i }); + if (end >= text.length) break; + i = end - CHUNK_OVERLAP; + } + return out; +} + +async function chat(opts: { + provider: "ollama" | "ollama_cloud", + model: string, + messages: Array<{ role: string; content: string }>, + max_tokens: number, + think: boolean, +}): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; finish_reason: string }> { + const r = await fetch(`${GATEWAY}/v1/chat`, { + method: "POST", headers: { "content-type": "application/json" }, + body: JSON.stringify({ ...opts }), + signal: AbortSignal.timeout(180000), + }); + if (!r.ok) throw new Error(`/v1/chat ${r.status}: ${await r.text()}`); + const j: any = await r.json(); + return { + content: j.choices?.[0]?.message?.content ?? "", + prompt_tokens: j.usage?.prompt_tokens ?? 0, + completion_tokens: j.usage?.completion_tokens ?? 0, + finish_reason: j.choices?.[0]?.finish_reason ?? "?", + }; +} + +// ─── Tree-split over oversized chunk set ────────────────────────── +async function treeSplitSummarize( + chunks: Chunk[], + question: string, +): Promise<{ scratchpad: string; cloud_calls: number }> { + // Shard into groups fitting within half the budget each. + const perShard = Math.max(1, Math.floor((CONTEXT_BUDGET_CHARS / 2) / CHUNK_SIZE)); + const shards: Chunk[][] = []; + for (let i = 0; i < chunks.length; i += perShard) { + shards.push(chunks.slice(i, i + perShard)); + } + log(` tree-split: ${chunks.length} chunks → ${shards.length} shards of up to ${perShard}`); + let scratchpad = ""; + let cloud_calls = 0; + for (let si = 0; si < shards.length; si++) { + const shard = shards[si]; + const shardText = shard.map(c => `[chunk @${c.offset}]\n${c.text}`).join("\n\n"); + const userMsg = `Question: ${question}\n\nShard ${si + 1}/${shards.length} of source material:\n\n${shardText}\n\nScratchpad so far:\n${scratchpad || "(empty)"}\n\nUpdate the scratchpad: extract only facts from THIS shard that help answer the question. Be terse. No prose.`; + const r = await chat({ + provider: "ollama_cloud", + model: CLOUD_MODEL, + messages: [ + { role: "system", content: "You maintain a concise factual scratchpad across multiple shards of source text. No prose outside the scratchpad. Each shard, append ≤80 words of relevant facts." }, + { role: "user", content: userMsg }, + ], + max_tokens: 500, + think: false, + }); + cloud_calls += 1; + scratchpad += `\n--- shard ${si + 1} notes ---\n${r.content.trim()}`; + if (scratchpad.length > CONTEXT_BUDGET_CHARS) { + // truncate oldest halves + scratchpad = scratchpad.slice(-CONTEXT_BUDGET_CHARS); + log(` tree-split: scratchpad truncated to ${scratchpad.length} chars`); + } + } + return { scratchpad, cloud_calls }; +} + +// ─── Continuable generate — retries once on empty/truncated ─────── +async function generateContinuable( + opts: Parameters[0] & { max_continuations?: number }, +): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number }> { + const maxCont = opts.max_continuations ?? 1; + let total = await chat(opts); + let retries = 0; + if (total.content.length === 0 || total.finish_reason === "length") { + for (let i = 0; i < maxCont && (total.content.length === 0 || total.finish_reason === "length"); i++) { + retries += 1; + log(` continuation retry ${retries} (reason: ${total.finish_reason}, content=${total.content.length})`); + const continued = await chat({ + ...opts, + max_tokens: opts.max_tokens * 2, + messages: [ + ...opts.messages, + ...(total.content ? [{ role: "assistant", content: total.content }, { role: "user", content: "Continue from where you stopped. Complete the answer." }] : []), + ], + }); + total = { + content: total.content + continued.content, + prompt_tokens: total.prompt_tokens + continued.prompt_tokens, + completion_tokens: total.completion_tokens + continued.completion_tokens, + finish_reason: continued.finish_reason, + }; + } + } + return { ...total, continuation_retries: retries }; +} + +// ─── Single iteration: retrieve → budget-check → chat → seed ───── +async function runIteration( + iteration: number, + question: string, + allChunks: Chunk[], + priorPlaybookIds: string[], + priorAnswers: string[], +): Promise { + const started = Date.now(); + const errorsRecovered: string[] = []; + log(`iter ${iteration}: "${question.slice(0, 70)}..."`); + + // 1. Embed the question + const qEmb = (await embedBatch([question]))[0]; + + // 2. Retrieve top-K chunks by cosine + const scored = allChunks + .map(c => ({ c, score: cosine(qEmb, c.embedding) })) + .sort((a, b) => b.score - a.score) + .slice(0, TOP_K_RETRIEVE); + const chunks = scored.map(x => x.c); + log(` retrieved top ${chunks.length} chunks (score range ${scored[0].score.toFixed(3)} .. ${scored[scored.length - 1].score.toFixed(3)})`); + + // 3. Context budget check — tree-split if over + const contextChars = chunks.map(c => c.text).join("\n\n").length; + let contextForPrompt: string; + let treeSplit = false; + let cloudCallsTotal = 0; + if (contextChars > CONTEXT_BUDGET_CHARS) { + treeSplit = true; + log(` context ${contextChars} chars > budget ${CONTEXT_BUDGET_CHARS} → tree-split`); + const { scratchpad, cloud_calls } = await treeSplitSummarize(chunks, question); + contextForPrompt = `Distilled scratchpad from ${chunks.length} source chunks (too large to fit directly):\n${scratchpad}`; + cloudCallsTotal += cloud_calls; + } else { + contextForPrompt = chunks.map(c => `[chunk @${c.offset}]\n${c.text}`).join("\n\n"); + } + + // 4. Seed prompt with prior iteration answers (real compounding). + // Not just IDs — the model needs the CONTENT to synthesize. + let citationBlock = ""; + let citationsReceived: string[] = []; + if (priorPlaybookIds.length > 0 && priorAnswers.length > 0) { + const lines = priorAnswers.map((ans, i) => { + const pid = priorPlaybookIds[i]?.slice(0, 12) ?? "unknown"; + // Trim each prior answer to ~400 chars so we don't blow budget + return `[pb:${pid}] iter ${i + 1} answer:\n${ans.slice(0, 400)}\n`; + }); + citationBlock = `\n\n═══ PRIOR ITERATIONS (compounding context) ═══\n${lines.join("\n")}═══ end prior iterations ═══\n\nYour answer MUST cite specific prior iterations using [pb:ID] notation when drawing on them. Synthesis questions require explicit cross-iteration reasoning.`; + citationsReceived = priorPlaybookIds.slice(); + } + + // 5. Call cloud. On empty/error → rescue. + // Intentional failure-injection on iter INJECT_FAIL_ON_ITER: + // a deliberately-invalid model name on the primary call so the + // rescue path actually runs. Proves the catch-and-rescue isn't + // dead code. + let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number }; + let rescueTriggered = false; + const primaryModel = iteration === INJECT_FAIL_ON_ITER + ? "deliberately-invalid-model-to-force-rescue" + : CLOUD_MODEL; + if (iteration === INJECT_FAIL_ON_ITER) { + log(` INJECTED FAILURE on primary call — model="${primaryModel}" will 400/500`); + } + try { + result = await generateContinuable({ + provider: "ollama_cloud", + model: primaryModel, + messages: [ + { role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Keep answers under 200 words." }, + { role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}` }, + ], + max_tokens: 800, + think: true, + max_continuations: 2, + }); + cloudCallsTotal += 1 + result.continuation_retries; + } catch (e) { + errorsRecovered.push(`primary cloud call: ${(e as Error).message}`); + rescueTriggered = true; + log(` primary failed → rescue with ${RESCUE_MODEL}`); + result = await generateContinuable({ + provider: "ollama_cloud", + model: RESCUE_MODEL, + messages: [ + { role: "system", content: "Answer briefly using the source material." }, + { role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 8000)}` }, + ], + max_tokens: 500, + think: false, + }); + cloudCallsTotal += 1 + result.continuation_retries; + } + if (result.content.length === 0) { + errorsRecovered.push("even rescue returned empty — last-ditch local fallback"); + rescueTriggered = true; + result = await generateContinuable({ + provider: "ollama", + model: "qwen3.5:latest", + messages: [{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 4000)}` }], + max_tokens: 300, + think: false, + }); + cloudCallsTotal += 1; + } + + // 6. Seed playbook with the answer + let playbook_id: string | null = null; + try { + const ts = new Date().toISOString(); + const seedOp = `TEST: enrich_prd_run_${RUN_NONCE} iter${iteration} in Corpus, PRD`; + const r = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, { + method: "POST", headers: { "content-type": "application/json" }, + body: JSON.stringify({ + operation: seedOp, + approach: `q="${question.slice(0, 80)}" context_chars=${contextChars} tree_split=${treeSplit}`, + context: result.content.slice(0, 600), + endorsed_names: [`iter${iteration}_${RUN_NONCE}`], + append: true, + }), + signal: AbortSignal.timeout(15000), + }); + if (r.ok) { + const j: any = await r.json(); + playbook_id = j.outcome?.playbook_id ?? null; + } else { + errorsRecovered.push(`seed ${r.status}: ${(await r.text()).slice(0, 100)}`); + } + } catch (e) { + errorsRecovered.push(`seed exception: ${(e as Error).message}`); + } + + return { + iteration, + question, + retrieval_top_k: chunks.length, + context_chars_before_budget: contextChars, + tree_split_fired: treeSplit, + cloud_calls_total: cloudCallsTotal, + continuation_retries: result.continuation_retries, + rescue_triggered: rescueTriggered, + playbook_id, + tokens_prompt: result.prompt_tokens, + tokens_completion: result.completion_tokens, + citations_from_prior_iterations: citationsReceived, + duration_ms: Date.now() - started, + answer_preview: result.content.slice(0, 500), + errors_recovered: errorsRecovered, + }; +} + +async function main() { + await mkdir(OUT_DIR, { recursive: true }); + log(`run nonce: ${RUN_NONCE}`); + log(`output dir: ${OUT_DIR}`); + + // ─── Phase 1: load, chunk, embed the PRD ─────────────────────── + log(`loading PRD from ${PRD_PATH}`); + const prd = await readFile(PRD_PATH, "utf8"); + log(`PRD: ${prd.length} chars, ${prd.split("\n").length} lines`); + + const raw_chunks = chunkText(prd); + log(`chunked into ${raw_chunks.length} pieces (size ${CHUNK_SIZE}, overlap ${CHUNK_OVERLAP})`); + + // Embed in batches of 32 to avoid sidecar overload + const allChunks: Chunk[] = []; + const BATCH = 32; + const t0 = Date.now(); + for (let i = 0; i < raw_chunks.length; i += BATCH) { + const batch = raw_chunks.slice(i, i + BATCH); + const embs = await embedBatch(batch.map(b => b.text)); + for (let j = 0; j < batch.length; j++) { + allChunks.push({ + id: hash(batch[j].text), + text: batch[j].text, + embedding: embs[j].map(x => Number(x)), + offset: batch[j].offset, + }); + } + log(` embedded ${allChunks.length}/${raw_chunks.length}`); + } + log(`embedded all ${allChunks.length} chunks in ${((Date.now() - t0) / 1000).toFixed(1)}s`); + + // ─── Phase 2: 6 iterations ───────────────────────────────────── + const results: IterationResult[] = []; + const priorIds: string[] = []; + const priorAnswers: string[] = []; + for (let i = 1; i <= QUESTIONS.length; i++) { + const q = QUESTIONS[i - 1]; + const r = await runIteration(i, q, allChunks, priorIds, priorAnswers); + results.push(r); + if (r.playbook_id) priorIds.push(r.playbook_id); + priorAnswers.push(r.answer_preview); + log(` → iter ${i}: ${r.errors_recovered.length} errors recovered, ${r.continuation_retries} continuations, tree-split=${r.tree_split_fired}, rescue=${r.rescue_triggered}, ${r.duration_ms}ms`); + await writeFile(`${OUT_DIR}/iter_${i}.json`, JSON.stringify(r, null, 2)); + } + // Check whether iter 6 actually cited prior pb:IDs in its answer. + // Playbook IDs look like `pb-seed-` so the regex needs to allow + // hyphens + letters inside the brackets, not just hex chars. + const iter6 = results[5]; + const citationsHonored = iter6 ? (iter6.answer_preview.match(/\[pb:[\w-]+\]/gi)?.length ?? 0) : 0; + + // ─── Phase 3: summary ────────────────────────────────────────── + const summary = { + run_nonce: RUN_NONCE, + ran_at: new Date().toISOString(), + prd_chars: prd.length, + prd_chunks: allChunks.length, + iterations: results.length, + total_cloud_calls: results.reduce((s, r) => s + r.cloud_calls_total, 0), + total_continuation_retries: results.reduce((s, r) => s + r.continuation_retries, 0), + total_errors_recovered: results.reduce((s, r) => s + r.errors_recovered.length, 0), + tree_splits_fired: results.filter(r => r.tree_split_fired).length, + rescues_triggered: results.filter(r => r.rescue_triggered).length, + iter6_received_prior_ids: results[5]?.citations_from_prior_iterations.length ?? 0, + iter6_actually_cited_in_answer: citationsHonored, + total_duration_ms: results.reduce((s, r) => s + r.duration_ms, 0), + overall: results.length === 6 && results.every(r => r.playbook_id !== null) ? "PASS" : "PARTIAL", + }; + await writeFile(`${OUT_DIR}/summary.json`, JSON.stringify(summary, null, 2)); + + log(""); + log(`══════ SUMMARY ${summary.overall} ══════`); + log(` 6 iterations, ${summary.total_cloud_calls} cloud calls, ${summary.total_errors_recovered} errors recovered`); + log(` tree-splits: ${summary.tree_splits_fired}/6 continuations: ${summary.total_continuation_retries} rescues: ${summary.rescues_triggered}`); + log(` iter 6 cited ${summary.iter6_cited_prior} prior iteration playbooks`); + log(` total duration: ${(summary.total_duration_ms / 1000).toFixed(1)}s`); + log(""); + for (const r of results) { + const flags = [ + r.tree_split_fired ? "tree-split" : "", + r.continuation_retries > 0 ? `cont=${r.continuation_retries}` : "", + r.rescue_triggered ? "rescued" : "", + r.errors_recovered.length > 0 ? `err=${r.errors_recovered.length}` : "", + ].filter(Boolean).join(" "); + log(` iter ${r.iteration}: ${r.tokens_prompt}+${r.tokens_completion} tok, ${r.duration_ms}ms ${flags ? `[${flags}]` : ""}`); + } + log(""); + log(`artifacts: ${OUT_DIR}/{iter_1..6.json, summary.json}`); + process.exit(summary.overall === "PASS" ? 0 : 1); +} + +main().catch(e => { console.error("[enrich] fatal:", e); process.exit(2); });