Audit pipeline PR #9: determinism + fact extraction + verifier gate + KB stats #9

Merged
profit merged 34 commits from test/enrich-prd-pipeline into main 2026-04-23 05:29:39 +00:00
Showing only changes of commit 4458c94f45 - Show all commits

View File

@ -0,0 +1,431 @@
// Real-world architecture stress test — 6 iterations of the full pipeline
// against the PRD as a corpus. Goal: prove at scale what Phase 21
// promised (context continuation + tree-split), plus Phase 19
// compounding across iterations.
//
// Run: bun run tests/real-world/enrich_prd_pipeline.ts
//
// No mocks. No skipped layers. On any error, the test triggers
// cloud-rescue rather than fail — it's the architecture's job to
// recover. The test FAILS only if we can't complete 6 iterations.
import { readFile, writeFile, mkdir } from "node:fs/promises";
import { createHash } from "node:crypto";
const PRD_PATH = "/home/profit/lakehouse/docs/PRD.md";
const CHUNK_SIZE = 800; // chars per chunk — ~200 tokens
const CHUNK_OVERLAP = 120;
const TOP_K_RETRIEVE = 12; // chunks per iteration — pulled up to force overflow
const CONTEXT_BUDGET_CHARS = 4000; // tight budget — forces tree-split on every iter
const INJECT_FAIL_ON_ITER = 3; // plant a bad primary-cloud call so rescue fires
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const CLOUD_MODEL = "gpt-oss:120b";
const RESCUE_MODEL = "gpt-oss:20b"; // fallback local cloud model via sidecar
const RUN_NONCE = Date.now().toString(36);
const OUT_DIR = `/home/profit/lakehouse/tests/real-world/runs/${RUN_NONCE}`;
// The 6 progressively-compounding questions. #6 explicitly requires
// synthesis across prior 5 answers.
const QUESTIONS: string[] = [
"Summarize the Lakehouse project's one-paragraph thesis: what problem does it solve, what's the unique approach?",
"How does Phase 19 playbook memory turn successful fills into a signal that boosts future rankings?",
"Explain the role of Phase 24 observer in the learning loop — what does it see, and what does it feed into?",
"What's the VRAM-aware profile swap mechanism in Phase 17, and why does it matter for multi-model serving?",
"How do Phase 25 validity windows and Phase 27 playbook versioning interact when a schema drifts?",
"Synthesize the prior 5 answers: how do the pieces (playbook memory, observer, profile swap, validity windows, versioning) compose into a system that measurably gets smarter over time? Cite specific prior answers.",
];
type Chunk = { id: string; text: string; embedding: number[]; offset: number };
interface IterationResult {
iteration: number;
question: string;
retrieval_top_k: number;
context_chars_before_budget: number;
tree_split_fired: boolean;
cloud_calls_total: number;
continuation_retries: number;
rescue_triggered: boolean;
playbook_id: string | null;
tokens_prompt: number;
tokens_completion: number;
citations_from_prior_iterations: string[];
duration_ms: number;
answer_preview: string;
errors_recovered: string[];
}
function log(msg: string) { console.log(`[enrich] ${msg}`); }
function sleep(ms: number) { return new Promise(r => setTimeout(r, ms)); }
function cosine(a: number[], b: number[]): number {
let dot = 0, na = 0, nb = 0;
for (let i = 0; i < a.length; i++) { dot += a[i] * b[i]; na += a[i] * a[i]; nb += b[i] * b[i]; }
if (na === 0 || nb === 0) return 0;
return dot / (Math.sqrt(na) * Math.sqrt(nb));
}
function hash(s: string): string { return createHash("sha256").update(s).digest("hex").slice(0, 10); }
async function embedBatch(texts: string[]): Promise<number[][]> {
// Sidecar /embed accepts a list. On partial failure, retry individually.
const r = await fetch(`${SIDECAR}/embed`, {
method: "POST", headers: { "content-type": "application/json" },
body: JSON.stringify({ texts }),
signal: AbortSignal.timeout(120000),
});
if (!r.ok) throw new Error(`embed batch ${r.status}: ${await r.text()}`);
const j: any = await r.json();
return j.embeddings;
}
function chunkText(text: string): Array<{ text: string; offset: number }> {
const out: Array<{ text: string; offset: number }> = [];
let i = 0;
while (i < text.length) {
const end = Math.min(i + CHUNK_SIZE, text.length);
const slice = text.slice(i, end).trim();
if (slice.length > 50) out.push({ text: slice, offset: i });
if (end >= text.length) break;
i = end - CHUNK_OVERLAP;
}
return out;
}
async function chat(opts: {
provider: "ollama" | "ollama_cloud",
model: string,
messages: Array<{ role: string; content: string }>,
max_tokens: number,
think: boolean,
}): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; finish_reason: string }> {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST", headers: { "content-type": "application/json" },
body: JSON.stringify({ ...opts }),
signal: AbortSignal.timeout(180000),
});
if (!r.ok) throw new Error(`/v1/chat ${r.status}: ${await r.text()}`);
const j: any = await r.json();
return {
content: j.choices?.[0]?.message?.content ?? "",
prompt_tokens: j.usage?.prompt_tokens ?? 0,
completion_tokens: j.usage?.completion_tokens ?? 0,
finish_reason: j.choices?.[0]?.finish_reason ?? "?",
};
}
// ─── Tree-split over oversized chunk set ──────────────────────────
async function treeSplitSummarize(
chunks: Chunk[],
question: string,
): Promise<{ scratchpad: string; cloud_calls: number }> {
// Shard into groups fitting within half the budget each.
const perShard = Math.max(1, Math.floor((CONTEXT_BUDGET_CHARS / 2) / CHUNK_SIZE));
const shards: Chunk[][] = [];
for (let i = 0; i < chunks.length; i += perShard) {
shards.push(chunks.slice(i, i + perShard));
}
log(` tree-split: ${chunks.length} chunks → ${shards.length} shards of up to ${perShard}`);
let scratchpad = "";
let cloud_calls = 0;
for (let si = 0; si < shards.length; si++) {
const shard = shards[si];
const shardText = shard.map(c => `[chunk @${c.offset}]\n${c.text}`).join("\n\n");
const userMsg = `Question: ${question}\n\nShard ${si + 1}/${shards.length} of source material:\n\n${shardText}\n\nScratchpad so far:\n${scratchpad || "(empty)"}\n\nUpdate the scratchpad: extract only facts from THIS shard that help answer the question. Be terse. No prose.`;
const r = await chat({
provider: "ollama_cloud",
model: CLOUD_MODEL,
messages: [
{ role: "system", content: "You maintain a concise factual scratchpad across multiple shards of source text. No prose outside the scratchpad. Each shard, append ≤80 words of relevant facts." },
{ role: "user", content: userMsg },
],
max_tokens: 500,
think: false,
});
cloud_calls += 1;
scratchpad += `\n--- shard ${si + 1} notes ---\n${r.content.trim()}`;
if (scratchpad.length > CONTEXT_BUDGET_CHARS) {
// truncate oldest halves
scratchpad = scratchpad.slice(-CONTEXT_BUDGET_CHARS);
log(` tree-split: scratchpad truncated to ${scratchpad.length} chars`);
}
}
return { scratchpad, cloud_calls };
}
// ─── Continuable generate — retries once on empty/truncated ───────
async function generateContinuable(
opts: Parameters<typeof chat>[0] & { max_continuations?: number },
): Promise<{ content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number }> {
const maxCont = opts.max_continuations ?? 1;
let total = await chat(opts);
let retries = 0;
if (total.content.length === 0 || total.finish_reason === "length") {
for (let i = 0; i < maxCont && (total.content.length === 0 || total.finish_reason === "length"); i++) {
retries += 1;
log(` continuation retry ${retries} (reason: ${total.finish_reason}, content=${total.content.length})`);
const continued = await chat({
...opts,
max_tokens: opts.max_tokens * 2,
messages: [
...opts.messages,
...(total.content ? [{ role: "assistant", content: total.content }, { role: "user", content: "Continue from where you stopped. Complete the answer." }] : []),
],
});
total = {
content: total.content + continued.content,
prompt_tokens: total.prompt_tokens + continued.prompt_tokens,
completion_tokens: total.completion_tokens + continued.completion_tokens,
finish_reason: continued.finish_reason,
};
}
}
return { ...total, continuation_retries: retries };
}
// ─── Single iteration: retrieve → budget-check → chat → seed ─────
async function runIteration(
iteration: number,
question: string,
allChunks: Chunk[],
priorPlaybookIds: string[],
priorAnswers: string[],
): Promise<IterationResult> {
const started = Date.now();
const errorsRecovered: string[] = [];
log(`iter ${iteration}: "${question.slice(0, 70)}..."`);
// 1. Embed the question
const qEmb = (await embedBatch([question]))[0];
// 2. Retrieve top-K chunks by cosine
const scored = allChunks
.map(c => ({ c, score: cosine(qEmb, c.embedding) }))
.sort((a, b) => b.score - a.score)
.slice(0, TOP_K_RETRIEVE);
const chunks = scored.map(x => x.c);
log(` retrieved top ${chunks.length} chunks (score range ${scored[0].score.toFixed(3)} .. ${scored[scored.length - 1].score.toFixed(3)})`);
// 3. Context budget check — tree-split if over
const contextChars = chunks.map(c => c.text).join("\n\n").length;
let contextForPrompt: string;
let treeSplit = false;
let cloudCallsTotal = 0;
if (contextChars > CONTEXT_BUDGET_CHARS) {
treeSplit = true;
log(` context ${contextChars} chars > budget ${CONTEXT_BUDGET_CHARS} → tree-split`);
const { scratchpad, cloud_calls } = await treeSplitSummarize(chunks, question);
contextForPrompt = `Distilled scratchpad from ${chunks.length} source chunks (too large to fit directly):\n${scratchpad}`;
cloudCallsTotal += cloud_calls;
} else {
contextForPrompt = chunks.map(c => `[chunk @${c.offset}]\n${c.text}`).join("\n\n");
}
// 4. Seed prompt with prior iteration answers (real compounding).
// Not just IDs — the model needs the CONTENT to synthesize.
let citationBlock = "";
let citationsReceived: string[] = [];
if (priorPlaybookIds.length > 0 && priorAnswers.length > 0) {
const lines = priorAnswers.map((ans, i) => {
const pid = priorPlaybookIds[i]?.slice(0, 12) ?? "unknown";
// Trim each prior answer to ~400 chars so we don't blow budget
return `[pb:${pid}] iter ${i + 1} answer:\n${ans.slice(0, 400)}\n`;
});
citationBlock = `\n\n═══ PRIOR ITERATIONS (compounding context) ═══\n${lines.join("\n")}═══ end prior iterations ═══\n\nYour answer MUST cite specific prior iterations using [pb:ID] notation when drawing on them. Synthesis questions require explicit cross-iteration reasoning.`;
citationsReceived = priorPlaybookIds.slice();
}
// 5. Call cloud. On empty/error → rescue.
// Intentional failure-injection on iter INJECT_FAIL_ON_ITER:
// a deliberately-invalid model name on the primary call so the
// rescue path actually runs. Proves the catch-and-rescue isn't
// dead code.
let result: { content: string; prompt_tokens: number; completion_tokens: number; continuation_retries: number };
let rescueTriggered = false;
const primaryModel = iteration === INJECT_FAIL_ON_ITER
? "deliberately-invalid-model-to-force-rescue"
: CLOUD_MODEL;
if (iteration === INJECT_FAIL_ON_ITER) {
log(` INJECTED FAILURE on primary call — model="${primaryModel}" will 400/500`);
}
try {
result = await generateContinuable({
provider: "ollama_cloud",
model: primaryModel,
messages: [
{ role: "system", content: "You answer questions about the Lakehouse PRD using only the provided source material and prior iteration answers. Be specific. Cite chunk offsets OR [pb:ID] markers. Keep answers under 200 words." },
{ role: "user", content: `Question: ${question}\n\nSource material:\n${contextForPrompt}${citationBlock}` },
],
max_tokens: 800,
think: true,
max_continuations: 2,
});
cloudCallsTotal += 1 + result.continuation_retries;
} catch (e) {
errorsRecovered.push(`primary cloud call: ${(e as Error).message}`);
rescueTriggered = true;
log(` primary failed → rescue with ${RESCUE_MODEL}`);
result = await generateContinuable({
provider: "ollama_cloud",
model: RESCUE_MODEL,
messages: [
{ role: "system", content: "Answer briefly using the source material." },
{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 8000)}` },
],
max_tokens: 500,
think: false,
});
cloudCallsTotal += 1 + result.continuation_retries;
}
if (result.content.length === 0) {
errorsRecovered.push("even rescue returned empty — last-ditch local fallback");
rescueTriggered = true;
result = await generateContinuable({
provider: "ollama",
model: "qwen3.5:latest",
messages: [{ role: "user", content: `Q: ${question}\n\n${contextForPrompt.slice(0, 4000)}` }],
max_tokens: 300,
think: false,
});
cloudCallsTotal += 1;
}
// 6. Seed playbook with the answer
let playbook_id: string | null = null;
try {
const ts = new Date().toISOString();
const seedOp = `TEST: enrich_prd_run_${RUN_NONCE} iter${iteration} in Corpus, PRD`;
const r = await fetch(`${GATEWAY}/vectors/playbook_memory/seed`, {
method: "POST", headers: { "content-type": "application/json" },
body: JSON.stringify({
operation: seedOp,
approach: `q="${question.slice(0, 80)}" context_chars=${contextChars} tree_split=${treeSplit}`,
context: result.content.slice(0, 600),
endorsed_names: [`iter${iteration}_${RUN_NONCE}`],
append: true,
}),
signal: AbortSignal.timeout(15000),
});
if (r.ok) {
const j: any = await r.json();
playbook_id = j.outcome?.playbook_id ?? null;
} else {
errorsRecovered.push(`seed ${r.status}: ${(await r.text()).slice(0, 100)}`);
}
} catch (e) {
errorsRecovered.push(`seed exception: ${(e as Error).message}`);
}
return {
iteration,
question,
retrieval_top_k: chunks.length,
context_chars_before_budget: contextChars,
tree_split_fired: treeSplit,
cloud_calls_total: cloudCallsTotal,
continuation_retries: result.continuation_retries,
rescue_triggered: rescueTriggered,
playbook_id,
tokens_prompt: result.prompt_tokens,
tokens_completion: result.completion_tokens,
citations_from_prior_iterations: citationsReceived,
duration_ms: Date.now() - started,
answer_preview: result.content.slice(0, 500),
errors_recovered: errorsRecovered,
};
}
async function main() {
await mkdir(OUT_DIR, { recursive: true });
log(`run nonce: ${RUN_NONCE}`);
log(`output dir: ${OUT_DIR}`);
// ─── Phase 1: load, chunk, embed the PRD ───────────────────────
log(`loading PRD from ${PRD_PATH}`);
const prd = await readFile(PRD_PATH, "utf8");
log(`PRD: ${prd.length} chars, ${prd.split("\n").length} lines`);
const raw_chunks = chunkText(prd);
log(`chunked into ${raw_chunks.length} pieces (size ${CHUNK_SIZE}, overlap ${CHUNK_OVERLAP})`);
// Embed in batches of 32 to avoid sidecar overload
const allChunks: Chunk[] = [];
const BATCH = 32;
const t0 = Date.now();
for (let i = 0; i < raw_chunks.length; i += BATCH) {
const batch = raw_chunks.slice(i, i + BATCH);
const embs = await embedBatch(batch.map(b => b.text));
for (let j = 0; j < batch.length; j++) {
allChunks.push({
id: hash(batch[j].text),
text: batch[j].text,
embedding: embs[j].map(x => Number(x)),
offset: batch[j].offset,
});
}
log(` embedded ${allChunks.length}/${raw_chunks.length}`);
}
log(`embedded all ${allChunks.length} chunks in ${((Date.now() - t0) / 1000).toFixed(1)}s`);
// ─── Phase 2: 6 iterations ─────────────────────────────────────
const results: IterationResult[] = [];
const priorIds: string[] = [];
const priorAnswers: string[] = [];
for (let i = 1; i <= QUESTIONS.length; i++) {
const q = QUESTIONS[i - 1];
const r = await runIteration(i, q, allChunks, priorIds, priorAnswers);
results.push(r);
if (r.playbook_id) priorIds.push(r.playbook_id);
priorAnswers.push(r.answer_preview);
log(` → iter ${i}: ${r.errors_recovered.length} errors recovered, ${r.continuation_retries} continuations, tree-split=${r.tree_split_fired}, rescue=${r.rescue_triggered}, ${r.duration_ms}ms`);
await writeFile(`${OUT_DIR}/iter_${i}.json`, JSON.stringify(r, null, 2));
}
// Check whether iter 6 actually cited prior pb:IDs in its answer.
// Playbook IDs look like `pb-seed-<hex>` so the regex needs to allow
// hyphens + letters inside the brackets, not just hex chars.
const iter6 = results[5];
const citationsHonored = iter6 ? (iter6.answer_preview.match(/\[pb:[\w-]+\]/gi)?.length ?? 0) : 0;
// ─── Phase 3: summary ──────────────────────────────────────────
const summary = {
run_nonce: RUN_NONCE,
ran_at: new Date().toISOString(),
prd_chars: prd.length,
prd_chunks: allChunks.length,
iterations: results.length,
total_cloud_calls: results.reduce((s, r) => s + r.cloud_calls_total, 0),
total_continuation_retries: results.reduce((s, r) => s + r.continuation_retries, 0),
total_errors_recovered: results.reduce((s, r) => s + r.errors_recovered.length, 0),
tree_splits_fired: results.filter(r => r.tree_split_fired).length,
rescues_triggered: results.filter(r => r.rescue_triggered).length,
iter6_received_prior_ids: results[5]?.citations_from_prior_iterations.length ?? 0,
iter6_actually_cited_in_answer: citationsHonored,
total_duration_ms: results.reduce((s, r) => s + r.duration_ms, 0),
overall: results.length === 6 && results.every(r => r.playbook_id !== null) ? "PASS" : "PARTIAL",
};
await writeFile(`${OUT_DIR}/summary.json`, JSON.stringify(summary, null, 2));
log("");
log(`══════ SUMMARY ${summary.overall} ══════`);
log(` 6 iterations, ${summary.total_cloud_calls} cloud calls, ${summary.total_errors_recovered} errors recovered`);
log(` tree-splits: ${summary.tree_splits_fired}/6 continuations: ${summary.total_continuation_retries} rescues: ${summary.rescues_triggered}`);
log(` iter 6 cited ${summary.iter6_cited_prior} prior iteration playbooks`);
log(` total duration: ${(summary.total_duration_ms / 1000).toFixed(1)}s`);
log("");
for (const r of results) {
const flags = [
r.tree_split_fired ? "tree-split" : "",
r.continuation_retries > 0 ? `cont=${r.continuation_retries}` : "",
r.rescue_triggered ? "rescued" : "",
r.errors_recovered.length > 0 ? `err=${r.errors_recovered.length}` : "",
].filter(Boolean).join(" ");
log(` iter ${r.iteration}: ${r.tokens_prompt}+${r.tokens_completion} tok, ${r.duration_ms}ms ${flags ? `[${flags}]` : ""}`);
}
log("");
log(`artifacts: ${OUT_DIR}/{iter_1..6.json, summary.json}`);
process.exit(summary.overall === "PASS" ? 0 : 1);
}
main().catch(e => { console.error("[enrich] fatal:", e); process.exit(2); });