// Phase 5 receipts harness tests. Pin: schema validity, hash // determinism, drift detection, multi-stage aggregation, failure // propagation. import { test, expect, beforeEach, afterEach } from "bun:test"; import { mkdirSync, writeFileSync, rmSync, existsSync, readFileSync } from "node:fs"; import { resolve } from "node:path"; import { STAGE_RECEIPT_SCHEMA_VERSION, validateStageReceipt, aggregateIoHash, type StageReceipt, } from "../../auditor/schemas/distillation/stage_receipt"; import { RUN_SUMMARY_SCHEMA_VERSION, validateRunSummary, type RunSummary, } from "../../auditor/schemas/distillation/run_summary"; import { DRIFT_REPORT_SCHEMA_VERSION, validateDriftReport, type DriftReport, } from "../../auditor/schemas/distillation/drift_report"; import { runAllWithReceipts, buildDrift } from "../../scripts/distillation/receipts"; import { EVIDENCE_SCHEMA_VERSION, type EvidenceRecord, type ModelRole } from "../../auditor/schemas/distillation/evidence_record"; const TMP = "/tmp/distillation_test_phase5"; const NOW = "2026-04-26T22:30:00.000Z"; const SHA = "0".repeat(64); const PARTITION = "2026/04/27"; function setupRoot() { if (existsSync(TMP)) rmSync(TMP, { recursive: true, force: true }); mkdirSync(resolve(TMP, `data/_kb`), { recursive: true }); // Seed source jsonl so the collect stage has input const ev = [ { run_id: "scrum:1:f", file: "f.rs", reviewed_at: NOW, accepted_model: "x", accepted_on_attempt: 1, suggestions_preview: "review of f.rs" }, { run_id: "scrum:2:f", file: "f.rs", reviewed_at: NOW, accepted_model: "x", accepted_on_attempt: 3, suggestions_preview: "second review" }, ]; writeFileSync(resolve(TMP, "data/_kb/scrum_reviews.jsonl"), ev.map(r => JSON.stringify(r)).join("\n") + "\n"); // Init git so receipts can find a commit hash Bun.spawnSync(["git", "init", "-q"], { cwd: TMP }); Bun.spawnSync(["git", "-C", TMP, "config", "user.email", "test@test"]); Bun.spawnSync(["git", "-C", TMP, "config", "user.name", "test"]); Bun.spawnSync(["git", "-C", TMP, "add", "."]); Bun.spawnSync(["git", "-C", TMP, "commit", "-q", "-m", "test"]); } beforeEach(setupRoot); afterEach(() => { if (existsSync(TMP)) rmSync(TMP, { recursive: true, force: true }); }); // ─── Schema validation ────────────────────────────────────────────── test("StageReceipt: positive validates", () => { const r: StageReceipt = { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: "test-run-id-12345", stage: "collect", timestamp: NOW, git_commit: "0".repeat(40), inputs: { files: [], record_count: 0, hash: SHA }, outputs: { files: [], record_count: 0, hash: SHA }, stats: { accepted: 0, rejected: 0, quarantined: 0, skipped: 0 }, validation: { passed: true, errors: [], warnings: [] }, duration_ms: 100, }; const v = validateStageReceipt(r); expect(v.valid).toBe(true); }); test("StageReceipt: validation.passed must be boolean (not inferred)", () => { const r = { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: "test-run-id-12345", stage: "collect", timestamp: NOW, git_commit: "0".repeat(40), inputs: { files: [], record_count: 0, hash: SHA }, outputs: { files: [], record_count: 0, hash: SHA }, stats: { accepted: 0, rejected: 0, quarantined: 0, skipped: 0 }, validation: { passed: "yes" as unknown, errors: [], warnings: [] }, duration_ms: 100, }; const v = validateStageReceipt(r); expect(v.valid).toBe(false); }); test("StageReceipt: bad git_commit rejected (must be 40-char hex)", () => { const v = validateStageReceipt({ schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: "test-run-id-12345", stage: "collect", timestamp: NOW, git_commit: "abc", inputs: { files: [], record_count: 0, hash: SHA }, outputs: { files: [], record_count: 0, hash: SHA }, stats: { accepted: 0, rejected: 0, quarantined: 0, skipped: 0 }, validation: { passed: true, errors: [], warnings: [] }, duration_ms: 0, }); expect(v.valid).toBe(false); }); test("StageReceipt: unknown stage rejected", () => { const v = validateStageReceipt({ schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: "test", stage: "unknown_stage", timestamp: NOW, git_commit: "0".repeat(40), inputs: { files: [], record_count: 0, hash: SHA }, outputs: { files: [], record_count: 0, hash: SHA }, stats: { accepted: 0, rejected: 0, quarantined: 0, skipped: 0 }, validation: { passed: true, errors: [], warnings: [] }, duration_ms: 0, }); expect(v.valid).toBe(false); }); // ─── aggregateIoHash determinism ──────────────────────────────────── test("aggregateIoHash: same files → same hash, regardless of input order", async () => { const a = [ { path: "x.jsonl", sha256: "a".repeat(64), record_count: 5 }, { path: "y.jsonl", sha256: "b".repeat(64), record_count: 3 }, ]; const b = [ { path: "y.jsonl", sha256: "b".repeat(64), record_count: 3 }, { path: "x.jsonl", sha256: "a".repeat(64), record_count: 5 }, ]; const ha = await aggregateIoHash(a); const hb = await aggregateIoHash(b); expect(ha).toBe(hb); expect(ha).toMatch(/^[0-9a-f]{64}$/); }); test("aggregateIoHash: different content → different hash", async () => { const a = [{ path: "x", sha256: "a".repeat(64) }]; const b = [{ path: "x", sha256: "b".repeat(64) }]; const ha = await aggregateIoHash(a); const hb = await aggregateIoHash(b); expect(ha).not.toBe(hb); }); test("aggregateIoHash: same content different paths → different hash", async () => { const a = [{ path: "x.jsonl", sha256: "a".repeat(64) }]; const b = [{ path: "y.jsonl", sha256: "a".repeat(64) }]; const ha = await aggregateIoHash(a); const hb = await aggregateIoHash(b); expect(ha).not.toBe(hb); }); // ─── runAllWithReceipts integration ──────────────────────────────── test("runAllWithReceipts: full pipeline emits 5 stage receipts + summary + drift", async () => { const r = await runAllWithReceipts({ root: TMP, recorded_at: NOW }); // 5 stage receipts on disk const dir = resolve(TMP, "reports/distillation", r.run_id); for (const stage of ["collect", "score", "export-rag", "export-sft", "export-preference"]) { expect(existsSync(resolve(dir, `${stage}.json`))).toBe(true); } expect(existsSync(resolve(dir, "summary.json"))).toBe(true); expect(existsSync(resolve(dir, "summary.md"))).toBe(true); expect(existsSync(resolve(dir, "drift.json"))).toBe(true); }); test("runAllWithReceipts: every receipt validates against StageReceipt schema", async () => { const r = await runAllWithReceipts({ root: TMP, recorded_at: NOW }); for (const receipt of r.receipts) { const v = validateStageReceipt(receipt); expect(v.valid).toBe(true); } }); test("runAllWithReceipts: summary aggregates match per-stage sums", async () => { const r = await runAllWithReceipts({ root: TMP, recorded_at: NOW }); const sumIn = r.summary.stages.reduce((a, s) => a + s.records_in, 0); const sumOut = r.summary.stages.reduce((a, s) => a + s.records_out, 0); expect(r.summary.total_records_in).toBe(sumIn); expect(r.summary.total_records_out).toBe(sumOut); }); test("runAllWithReceipts: all stages share one run_id", async () => { const r = await runAllWithReceipts({ root: TMP, recorded_at: NOW }); for (const receipt of r.receipts) { expect(receipt.run_id).toBe(r.run_id); } }); test("runAllWithReceipts: run_hash is sha256 hex", async () => { const r = await runAllWithReceipts({ root: TMP, recorded_at: NOW }); expect(r.summary.run_hash).toMatch(/^[0-9a-f]{64}$/); }); // ─── Drift detection ─────────────────────────────────────────────── test("buildDrift: no prior run → severity ok with first-run flag", () => { const summary: RunSummary = { schema_version: RUN_SUMMARY_SCHEMA_VERSION, run_id: "current", started_at: NOW, ended_at: NOW, git_commit: "0".repeat(40), stages: [], total_records_in: 0, total_records_out: 0, total_accepted: 0, total_rejected: 0, total_quarantined: 0, total_skipped: 0, rag_records: 0, sft_records: 0, preference_pairs: 0, overall_passed: true, run_hash: SHA, total_duration_ms: 0, }; const d = buildDrift(summary, null); expect(d.severity).toBe("ok"); expect(d.prior_run_id).toBeNull(); expect(d.flags.some(f => f.includes("first run"))).toBe(true); }); test("buildDrift: >20% record_count change flags warn", () => { const prior: RunSummary = { schema_version: RUN_SUMMARY_SCHEMA_VERSION, run_id: "prior", started_at: NOW, ended_at: NOW, git_commit: "0".repeat(40), stages: [{ stage: "collect", records_in: 100, records_out: 100, accepted: 100, rejected: 0, quarantined: 0, skipped: 0, passed: true, duration_ms: 0, output_hash: "a".repeat(64) }], total_records_in: 100, total_records_out: 100, total_accepted: 100, total_rejected: 0, total_quarantined: 0, total_skipped: 0, rag_records: 0, sft_records: 0, preference_pairs: 0, overall_passed: true, run_hash: "a".repeat(64), total_duration_ms: 0, }; const current: RunSummary = { ...prior, run_id: "current", stages: [{ stage: "collect", records_in: 100, records_out: 50, accepted: 50, rejected: 0, quarantined: 0, skipped: 0, passed: true, duration_ms: 0, output_hash: "b".repeat(64) }], total_records_out: 50, total_accepted: 50, run_hash: "b".repeat(64), }; const d = buildDrift(current, prior); expect(d.severity).toBe("warn"); expect(d.flags.some(f => f.includes("drop"))).toBe(true); }); test("buildDrift: identical summary → severity ok, no flags", () => { const s: RunSummary = { schema_version: RUN_SUMMARY_SCHEMA_VERSION, run_id: "x", started_at: NOW, ended_at: NOW, git_commit: "0".repeat(40), stages: [{ stage: "collect", records_in: 10, records_out: 10, accepted: 10, rejected: 0, quarantined: 0, skipped: 0, passed: true, duration_ms: 0, output_hash: "c".repeat(64) }], total_records_in: 10, total_records_out: 10, total_accepted: 10, total_rejected: 0, total_quarantined: 0, total_skipped: 0, rag_records: 0, sft_records: 0, preference_pairs: 0, overall_passed: true, run_hash: "c".repeat(64), total_duration_ms: 0, }; const d = buildDrift({ ...s, run_id: "current" }, s); expect(d.severity).toBe("ok"); }); test("buildDrift: validates against DriftReport schema", () => { const d = buildDrift({ schema_version: RUN_SUMMARY_SCHEMA_VERSION, run_id: "current", started_at: NOW, ended_at: NOW, git_commit: "0".repeat(40), stages: [], total_records_in: 0, total_records_out: 0, total_accepted: 0, total_rejected: 0, total_quarantined: 0, total_skipped: 0, rag_records: 0, sft_records: 0, preference_pairs: 0, overall_passed: true, run_hash: SHA, total_duration_ms: 0, }, null); const v = validateDriftReport(d); expect(v.valid).toBe(true); }); // ─── Failure propagation ──────────────────────────────────────────── test("runAllWithReceipts: idempotent — second run on same data produces matching run_hash for unchanged stages", async () => { const r1 = await runAllWithReceipts({ root: TMP, recorded_at: NOW, run_id: "run-A-deadbeef" }); // Wipe outputs but keep source so second run regenerates rmSync(resolve(TMP, "data/evidence"), { recursive: true, force: true }); rmSync(resolve(TMP, "data/scored-runs"), { recursive: true, force: true }); rmSync(resolve(TMP, "exports"), { recursive: true, force: true }); const r2 = await runAllWithReceipts({ root: TMP, recorded_at: NOW, run_id: "run-B-deadbeef" }); // The collect stage's output_hash should match: identical input + identical recorded_at // produce byte-stable evidence files (proven in Phase 2 tests). const c1 = r1.summary.stages.find(s => s.stage === "collect")!; const c2 = r2.summary.stages.find(s => s.stage === "collect")!; expect(c1.output_hash).toBe(c2.output_hash); }); test("runAllWithReceipts: drift between r1 and r2 (with different recorded_at) shows hash differences", async () => { await runAllWithReceipts({ root: TMP, recorded_at: NOW, run_id: "run-A-deadbeef" }); rmSync(resolve(TMP, "data/evidence"), { recursive: true, force: true }); rmSync(resolve(TMP, "data/scored-runs"), { recursive: true, force: true }); rmSync(resolve(TMP, "exports"), { recursive: true, force: true }); // Different recorded_at causes provenance.recorded_at to differ → output_hash differs const r2 = await runAllWithReceipts({ root: TMP, recorded_at: "2026-04-27T00:00:00.000Z", run_id: "run-B-deadbeef" }); // run-B finds run-A as prior; should show drift expect(r2.drift.prior_run_id).toBe("run-A-deadbeef"); });