// acceptance.ts — Phase 6 final gate. Runs the entire distillation // pipeline end-to-end on a fixture set covering every spec case, // asserts every invariant, then runs a SECOND time with the same // recorded_at and asserts hash reproducibility. // // Exits non-zero if ANY invariant fails. Writes // reports/distillation/phase6-acceptance-report.md with the // expected-vs-actual table. // // USAGE // bun run scripts/distillation/acceptance.ts // ./scripts/distill.ts acceptance import { existsSync, readFileSync, mkdirSync, rmSync, writeFileSync, readdirSync, statSync, copyFileSync, } from "node:fs"; import { resolve, basename } from "node:path"; import { runAllWithReceipts } from "./receipts"; import { validateStageReceipt } from "../../auditor/schemas/distillation/stage_receipt"; import { validateRunSummary } from "../../auditor/schemas/distillation/run_summary"; import { validateDriftReport } from "../../auditor/schemas/distillation/drift_report"; const REPO_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; const FIXTURE_DIR = resolve(REPO_ROOT, "tests/fixtures/distillation/acceptance"); const TMP_ROOT = "/tmp/distillation_phase6_acceptance"; const RECORDED_AT = "2026-04-26T22:30:00.000Z"; interface Check { name: string; passed: boolean; expected: string; actual: string; } const checks: Check[] = []; function record(name: string, expected: string, actual: string, passed: boolean) { checks.push({ name, expected, actual, passed }); } function readJsonl(path: string): any[] { if (!existsSync(path)) return []; return readFileSync(path, "utf8").split("\n").filter(Boolean).map(l => JSON.parse(l)); } function setupFreshRoot(rootPath: string) { if (existsSync(rootPath)) rmSync(rootPath, { recursive: true, force: true }); mkdirSync(resolve(rootPath, "data/_kb"), { recursive: true }); // Copy fixture jsonls const fixtureKb = resolve(FIXTURE_DIR, "data/_kb"); for (const f of readdirSync(fixtureKb)) { if (f.endsWith(".jsonl")) { copyFileSync(resolve(fixtureKb, f), resolve(rootPath, "data/_kb", f)); } } // Init git so receipts capture a commit hash Bun.spawnSync(["git", "init", "-q"], { cwd: rootPath }); Bun.spawnSync(["git", "-C", rootPath, "config", "user.email", "acceptance@test"]); Bun.spawnSync(["git", "-C", rootPath, "config", "user.name", "acceptance"]); Bun.spawnSync(["git", "-C", rootPath, "add", "."]); Bun.spawnSync(["git", "-C", rootPath, "commit", "-q", "-m", "fixture"]); } async function run(rootPath: string, run_id: string) { return await runAllWithReceipts({ root: rootPath, recorded_at: RECORDED_AT, run_id, }); } async function main() { console.log("[acceptance] setting up fresh root with fixtures..."); setupFreshRoot(TMP_ROOT); console.log("[acceptance] running pipeline (run 1/2)..."); const r1 = await run(TMP_ROOT, "acceptance-run-1-stable"); // ── Invariant 1: receipts exist for all 5 stages ── const runDir = resolve(TMP_ROOT, "reports/distillation", r1.run_id); const expected_stages = ["collect", "score", "export-rag", "export-sft", "export-preference"]; const missing = expected_stages.filter(s => !existsSync(resolve(runDir, `${s}.json`))); record( "receipts: all 5 stages emitted", expected_stages.join(","), missing.length === 0 ? "all present" : `missing: ${missing.join(",")}`, missing.length === 0, ); // ── Invariant 2: summary + drift exist ── record( "summary.json exists", "exists", existsSync(resolve(runDir, "summary.json")) ? "exists" : "missing", existsSync(resolve(runDir, "summary.json")), ); record( "summary.md exists", "exists", existsSync(resolve(runDir, "summary.md")) ? "exists" : "missing", existsSync(resolve(runDir, "summary.md")), ); record( "drift.json exists", "exists", existsSync(resolve(runDir, "drift.json")) ? "exists" : "missing", existsSync(resolve(runDir, "drift.json")), ); // ── Invariant 3: every receipt validates against StageReceipt schema ── let invalidReceipts = 0; for (const stage of expected_stages) { const path = resolve(runDir, `${stage}.json`); if (!existsSync(path)) continue; const v = validateStageReceipt(JSON.parse(readFileSync(path, "utf8"))); if (!v.valid) invalidReceipts++; } record( "every StageReceipt validates against schema", "0 invalid", `${invalidReceipts} invalid`, invalidReceipts === 0, ); // ── Invariant 4: RunSummary + DriftReport validate ── const summary = JSON.parse(readFileSync(resolve(runDir, "summary.json"), "utf8")); const drift = JSON.parse(readFileSync(resolve(runDir, "drift.json"), "utf8")); record("RunSummary validates", "valid", validateRunSummary(summary).valid ? "valid" : "invalid", validateRunSummary(summary).valid); record("DriftReport validates", "valid", validateDriftReport(drift).valid ? "valid" : "invalid", validateDriftReport(drift).valid); // ── Invariant 5: SFT contains accepted records ── const sftRows = readJsonl(resolve(TMP_ROOT, "exports/sft/instruction_response.jsonl")); record( "SFT: ≥1 accepted record exported", ">=1", `${sftRows.length}`, sftRows.length >= 1, ); // ── Invariant 6: SFT NEVER contains forbidden quality_score ── const sftForbidden = sftRows.filter(r => r.quality_score !== "accepted" && r.quality_score !== "partially_accepted"); record( "SFT contamination firewall: no rejected/needs_human_review", "0", `${sftForbidden.length}`, sftForbidden.length === 0, ); // ── Invariant 6b: SFT default excludes partially_accepted ── const sftPartialDefault = sftRows.filter(r => r.quality_score === "partially_accepted"); record( "SFT default mode: 0 partial leaks (no --include-partial used)", "0", `${sftPartialDefault.length}`, sftPartialDefault.length === 0, ); // ── Invariant 7: RAG contains accepted + partially_accepted; never rejected ── const ragRows = readJsonl(resolve(TMP_ROOT, "exports/rag/playbooks.jsonl")); const ragRejected = ragRows.filter(r => r.success_score === "rejected"); record( "RAG: 0 rejected leaks", "0", `${ragRejected.length}`, ragRejected.length === 0, ); const ragPartial = ragRows.filter(r => r.success_score === "partially_accepted"); record( "RAG: ≥1 partially_accepted accepted (RAG accepts partial)", ">=1", `${ragPartial.length}`, ragPartial.length >= 1, ); // ── Invariant 8: Preference pairs exported AND no self-pairs / identical text ── const prefRows = readJsonl(resolve(TMP_ROOT, "exports/preference/chosen_rejected.jsonl")); record( "Preference: ≥1 valid pair exported", ">=1", `${prefRows.length}`, prefRows.length >= 1, ); const prefSelfPairs = prefRows.filter(r => r.chosen_run_id === r.rejected_run_id); record( "Preference: 0 self-pairs (chosen_run_id != rejected_run_id)", "0", `${prefSelfPairs.length}`, prefSelfPairs.length === 0, ); const prefIdenticalText = prefRows.filter(r => r.chosen === r.rejected); record( "Preference: 0 identical-text pairs", "0", `${prefIdenticalText.length}`, prefIdenticalText.length === 0, ); // ── Invariant 9: every export row has provenance.sig_hash ── const allExportRows = [...ragRows, ...sftRows, ...prefRows]; const noProv = allExportRows.filter(r => !r.provenance?.sig_hash || !/^[0-9a-f]{64}$/.test(r.provenance.sig_hash)); record( "every export row has valid sha256 provenance.sig_hash", "0 missing", `${noProv.length} missing`, noProv.length === 0, ); // ── Invariant 10: missing-provenance fixture row was skipped, not exported ── // The fixture row in scrum_reviews.jsonl missing reviewed_at should // fail the EvidenceRecord schema → land in distillation_skips.jsonl, // not in any export. const skipsPath = resolve(TMP_ROOT, "data/_kb/distillation_skips.jsonl"); const hadSkips = existsSync(skipsPath) && readJsonl(skipsPath).length >= 1; record( "Phase 2 collect: missing-provenance fixture row skipped to distillation_skips.jsonl", "≥1 skip recorded", hadSkips ? `${readJsonl(skipsPath).length} skip(s)` : "no skips file", hadSkips, ); // ── Invariant 11: quarantine populated for SFT (forbidden categories) ── const sftQuarantine = readJsonl(resolve(TMP_ROOT, "exports/quarantine/sft.jsonl")); const unsafeCategoryEntries = sftQuarantine.filter(e => e.reason === "unsafe_sft_category"); record( "SFT quarantine: rejected/needs_human caught at unsafe_sft_category gate", "≥1", `${unsafeCategoryEntries.length}`, unsafeCategoryEntries.length >= 1, ); // ── Invariant 12: quarantine populated for preference (invalid pairs) ── // Fixture has rows acc-scrum-1 (accepted) and acc-scrum-2 (partial) on same task_id. // We expect a valid pair from those. acc-scrum-3 is needs_human, observer_reviews // adds accept+reject on same file — that yields the strong pair. // No invalid pairs in this fixture by default — check the file exists if any. // ── Invariant 13: scratchpad/tree-split fixture row materialized ── const evidencePath = resolve(TMP_ROOT, "data/evidence/2026/04/26"); const evScrum = existsSync(resolve(evidencePath, "scrum_reviews.jsonl")) ? readJsonl(resolve(evidencePath, "scrum_reviews.jsonl")) : []; const treeSplitRow = evScrum.find(r => r.text?.includes("Tree-split scratchpad case")); record( "scratchpad/tree-split case: fixture row materialized into evidence", "found", treeSplitRow ? "found" : "not found", !!treeSplitRow, ); // ── Invariant 14: PRD drift example present in evidence and scored ── const evAudits = existsSync(resolve(evidencePath, "audits.jsonl")) ? readJsonl(resolve(evidencePath, "audits.jsonl")) : []; // Audits transform stores `evidence` field as text (preferred over `resolution`), // so search for the audit's evidence content. Fixture row #2 carries // "no CircuitBreaker / break_on_failures" which is the PRD-drift signature. const prdDriftRow = evAudits.find(r => r.text?.includes("CircuitBreaker") || r.text?.includes("break_on_failures")); record( "PRD drift case: fixture row materialized", "found", prdDriftRow ? "found" : "not found", !!prdDriftRow, ); // ── Invariant 15: HASH REPRODUCIBILITY — second run with same recorded_at matches ── console.log("[acceptance] running pipeline (run 2/2) for hash reproducibility..."); // Wipe outputs but keep fixtures + git rmSync(resolve(TMP_ROOT, "data/evidence"), { recursive: true, force: true }); rmSync(resolve(TMP_ROOT, "data/scored-runs"), { recursive: true, force: true }); rmSync(resolve(TMP_ROOT, "exports"), { recursive: true, force: true }); rmSync(resolve(TMP_ROOT, "data/_kb/distillation_skips.jsonl"), { force: true }); rmSync(resolve(TMP_ROOT, "data/_kb/scoring_skips.jsonl"), { force: true }); const r2 = await run(TMP_ROOT, "acceptance-run-2-stable"); // Compare per-stage output hashes — must match const r1Stages = new Map(r1.summary.stages.map(s => [s.stage, s])); let hashMismatches = 0; const mismatchDetail: string[] = []; for (const s2 of r2.summary.stages) { const s1 = r1Stages.get(s2.stage); if (!s1) { hashMismatches++; mismatchDetail.push(`stage ${s2.stage} missing in run1`); continue; } if (s1.output_hash !== s2.output_hash) { hashMismatches++; mismatchDetail.push(`${s2.stage}: ${s1.output_hash.slice(0,12)}... → ${s2.output_hash.slice(0,12)}...`); } } record( "hash reproducibility: per-stage output_hash identical across runs", "0 mismatches", hashMismatches === 0 ? "all match" : `${hashMismatches} mismatch: ${mismatchDetail.join("; ")}`, hashMismatches === 0, ); record( "hash reproducibility: run_hash identical", r1.summary.run_hash.slice(0, 16) + "...", r2.summary.run_hash.slice(0, 16) + "...", r1.summary.run_hash === r2.summary.run_hash, ); // ── Aggregate result ── const passed = checks.every(c => c.passed); const failedCount = checks.filter(c => !c.passed).length; // ── Write report ── const reportPath = resolve(REPO_ROOT, "reports/distillation/phase6-acceptance-report.md"); const md: string[] = []; md.push("# Phase 6 — Acceptance Gate Report"); md.push(""); md.push(`**Run:** ${new Date().toISOString()}`); md.push(`**Fixture:** \`tests/fixtures/distillation/acceptance/\``); md.push(`**Temp root:** \`${TMP_ROOT}\``); md.push(`**Pipeline run_ids:** \`${r1.run_id}\` (first) + \`${r2.run_id}\` (second / hash reproducibility)`); md.push(""); md.push(`## Result: ${passed ? "**PASS** ✓" : `**FAIL ✗ — ${failedCount}/${checks.length} checks failed**`}`); md.push(""); md.push("## Pipeline counts (first run)"); md.push(""); md.push(`- collect: ${r1.summary.stages.find(s => s.stage === "collect")?.records_out ?? 0} records out · ${r1.summary.stages.find(s => s.stage === "collect")?.skipped ?? 0} skipped`); md.push(`- score: accepted=${r1.summary.stages.find(s => s.stage === "score")?.accepted ?? 0} rejected=${r1.summary.stages.find(s => s.stage === "score")?.rejected ?? 0} quarantined=${r1.summary.stages.find(s => s.stage === "score")?.quarantined ?? 0}`); md.push(`- export-rag: ${ragRows.length} rows`); md.push(`- export-sft: ${sftRows.length} rows`); md.push(`- export-preference: ${prefRows.length} pairs`); md.push(""); md.push("## Invariant checks (expected vs actual)"); md.push(""); md.push("| # | Check | Expected | Actual | Status |"); md.push("|---|---|---|---|---|"); for (let i = 0; i < checks.length; i++) { const c = checks[i]; md.push(`| ${i + 1} | ${c.name} | ${c.expected} | ${c.actual} | ${c.passed ? "✓" : "✗ FAIL"} |`); } md.push(""); md.push("## Hash reproducibility detail"); md.push(""); md.push(`run 1 run_hash: \`${r1.summary.run_hash}\``); md.push(""); md.push(`run 2 run_hash: \`${r2.summary.run_hash}\``); md.push(""); md.push(r1.summary.run_hash === r2.summary.run_hash ? "**Bit-for-bit identical.** Two runs of the entire pipeline on the same fixture with the same `recorded_at` produce the same outputs. Distillation is deterministic." : "**HASHES DIVERGED.** Same fixture, same recorded_at, different outputs. This is a determinism violation — investigate before any of these outputs become training data."); md.push(""); md.push("## Leak prevention confirmation"); md.push(""); md.push(`- SFT rows with rejected/needs_human_review quality_score: **${sftForbidden.length}** (must be 0)`); md.push(`- SFT rows with partially_accepted quality_score (default mode): **${sftPartialDefault.length}** (must be 0; would only appear with --include-partial)`); md.push(`- RAG rows with rejected success_score: **${ragRejected.length}** (must be 0)`); md.push(`- Preference self-pairs (chosen_run_id == rejected_run_id): **${prefSelfPairs.length}** (must be 0)`); md.push(`- Preference identical-text pairs: **${prefIdenticalText.length}** (must be 0)`); md.push(""); md.push("## What this proves"); md.push(""); md.push(passed ? "The distillation pipeline is **safe, reproducible, and gated**. Accepted data flows through; rejected/needs_human_review data is quarantined with reasons; preference pairs are real, not fabricated; every output traces to source via canonical sha256; running the whole pipeline twice on the same fixture produces byte-identical outputs." : "**ACCEPTANCE FAILED.** Inspect the failed rows above before treating any output of this pipeline as training-safe."); md.push(""); mkdirSync(resolve(REPO_ROOT, "reports/distillation"), { recursive: true }); writeFileSync(reportPath, md.join("\n")); // ── stdout summary ── console.log(""); console.log(`[acceptance] ${passed ? "PASS" : "FAIL"} — ${checks.filter(c => c.passed).length}/${checks.length} checks`); if (!passed) { for (const c of checks.filter(c => !c.passed)) { console.log(` ✗ ${c.name}: expected ${c.expected}, got ${c.actual}`); } } console.log(`[acceptance] report: ${reportPath}`); // Cleanup tmp on success; leave on fail for inspection if (passed) rmSync(TMP_ROOT, { recursive: true, force: true }); process.exit(passed ? 0 : 1); } if (import.meta.main) main().catch(e => { console.error(e); process.exit(1); });