// receipts.ts — Phase 5 forensic harness wrapping every pipeline // stage in a StageReceipt. Pure observability layer — does NOT change // scoring, filtering, or schemas. // // USAGE // bun run scripts/distillation/receipts.ts run-all // bun run scripts/distillation/receipts.ts read --run-id // // Output tree: // reports/distillation// // collect.json // score.json // export-rag.json // export-sft.json // export-preference.json // summary.json // summary.md // drift.json (when prior run exists) import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, statSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { spawnSync } from "node:child_process"; import { randomUUID } from "node:crypto"; import { materializeAll } from "./build_evidence_index"; import { scoreAll } from "./score_runs"; import { exportRag } from "./export_rag"; import { exportSft } from "./export_sft"; import { exportPreference } from "./export_preference"; import { TRANSFORMS } from "./transforms"; import { STAGE_RECEIPT_SCHEMA_VERSION, validateStageReceipt, aggregateIoHash, type StageReceipt, type StageName, type StageFileRef, type StageIO, type StageStats, } from "../../auditor/schemas/distillation/stage_receipt"; import { RUN_SUMMARY_SCHEMA_VERSION, validateRunSummary, type RunSummary, type RunStageSummary, } from "../../auditor/schemas/distillation/run_summary"; import { DRIFT_REPORT_SCHEMA_VERSION, DRIFT_THRESHOLD_PCT, validateDriftReport, type DriftReport, type StageDrift, type DriftSeverity, } from "../../auditor/schemas/distillation/drift_report"; const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; function gitCommit(root: string): string { const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" }); return r.status === 0 ? r.stdout.trim() : "0".repeat(40); } function sha256Of(buf: Buffer | string): string { const h = new Bun.CryptoHasher("sha256"); h.update(buf); return h.digest("hex"); } function fileRef(root: string, abs_path: string): StageFileRef | null { if (!existsSync(abs_path)) return null; const buf = readFileSync(abs_path); const lines = (buf.toString("utf8").match(/\n/g) ?? []).length; return { path: abs_path.replace(root + "/", ""), sha256: sha256Of(buf), bytes: statSync(abs_path).size, record_count: lines, }; } function relPathToAbs(root: string, rel_or_abs: string): string { return rel_or_abs.startsWith("/") ? rel_or_abs : resolve(root, rel_or_abs); } async function buildIO(root: string, paths: string[]): Promise { const refs: StageFileRef[] = []; let total_records = 0; for (const p of paths) { const abs = relPathToAbs(root, p); const ref = fileRef(root, abs); if (!ref) continue; refs.push(ref); total_records += ref.record_count ?? 0; } return { files: refs, record_count: total_records, hash: await aggregateIoHash(refs), }; } interface StageRunCtx { root: string; run_id: string; recorded_at: string; } function writeReceipt(root: string, run_id: string, receipt: StageReceipt) { const dir = resolve(root, "reports/distillation", run_id); mkdirSync(dir, { recursive: true }); writeFileSync(resolve(dir, `${receipt.stage}.json`), JSON.stringify(receipt, null, 2) + "\n"); } // ─── Stage wrappers — call existing stage functions, build StageReceipt ── async function runCollect(ctx: StageRunCtx): Promise { const t0 = Date.now(); const r = await materializeAll({ root: ctx.root, transforms: TRANSFORMS, recorded_at: ctx.recorded_at, }); const inputs = await buildIO(ctx.root, r.receipt.input_files.map(f => f.path)); const outputs = await buildIO(ctx.root, r.receipt.output_files.map(f => f.path)); return { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: ctx.run_id, stage: "collect", timestamp: ctx.recorded_at, git_commit: gitCommit(ctx.root), inputs, outputs, stats: { accepted: r.totals.rows_written, rejected: 0, quarantined: 0, // collect doesn't quarantine — it skips with reasons skipped: r.totals.rows_skipped, }, validation: { passed: r.totals.rows_skipped === 0, errors: r.receipt.errors, warnings: r.receipt.warnings, }, duration_ms: Date.now() - t0, }; } async function runScore(ctx: StageRunCtx): Promise { const t0 = Date.now(); const r = await scoreAll({ root: ctx.root, recorded_at: ctx.recorded_at }); const inputs = await buildIO(ctx.root, r.receipt.input_files.map(f => f.path)); const outputs = await buildIO(ctx.root, r.receipt.output_files.map(f => f.path)); return { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: ctx.run_id, stage: "score", timestamp: ctx.recorded_at, git_commit: gitCommit(ctx.root), inputs, outputs, stats: { accepted: r.totals.by_category.accepted ?? 0, rejected: r.totals.by_category.rejected ?? 0, // Score's "quarantined" surfaces the partial+human review pool — // they're not exported, but they're also not REJECTED. Keeping // them in `quarantined` so summary's contamination math stays // honest: anything not "accepted" or "rejected" is non-shipping. quarantined: (r.totals.by_category.partially_accepted ?? 0) + (r.totals.by_category.needs_human_review ?? 0), skipped: r.totals.rows_skipped, }, validation: { passed: r.receipt.validation_pass, errors: r.receipt.errors, warnings: r.receipt.warnings, }, duration_ms: Date.now() - t0, }; } async function runExportRag(ctx: StageRunCtx, opts: { include_review?: boolean }): Promise { const t0 = Date.now(); const r = await exportRag({ root: ctx.root, recorded_at: ctx.recorded_at, include_review: opts.include_review, }); // Collect input files from the scored-runs tree explicitly so // hash + record count match the stage's actual inputs. const scored_files = collectScoredRunFiles(ctx.root); const inputs = await buildIO(ctx.root, scored_files); const outputs = await buildIO(ctx.root, [ "exports/rag/playbooks.jsonl", "exports/quarantine/rag.jsonl", ]); return { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: ctx.run_id, stage: "export-rag", timestamp: ctx.recorded_at, git_commit: gitCommit(ctx.root), inputs, outputs, stats: { accepted: r.records_exported, rejected: 0, quarantined: r.records_quarantined, skipped: 0, }, validation: { passed: true, // RAG has no hard fail — quarantine is expected errors: [], warnings: r.records_quarantined > 0 ? [r.quarantine_summary] : [], }, duration_ms: Date.now() - t0, }; } async function runExportSft(ctx: StageRunCtx, opts: { include_partial?: boolean }): Promise { const t0 = Date.now(); const r = await exportSft({ root: ctx.root, recorded_at: ctx.recorded_at, include_partial: opts.include_partial, }); const scored_files = collectScoredRunFiles(ctx.root); const inputs = await buildIO(ctx.root, scored_files); const outputs = await buildIO(ctx.root, [ "exports/sft/instruction_response.jsonl", "exports/quarantine/sft.jsonl", ]); // Verify the contamination firewall held — re-read the SFT output and // confirm every quality_score value is in the allowed set. If ANY // forbidden value slipped through, validation FAILS LOUDLY. const errors: string[] = []; const sft_out = resolve(ctx.root, "exports/sft/instruction_response.jsonl"); if (existsSync(sft_out)) { for (const line of readFileSync(sft_out, "utf8").split("\n")) { if (!line) continue; try { const row = JSON.parse(line); const q = row.quality_score; if (q !== "accepted" && q !== "partially_accepted") { errors.push(`SFT FIREWALL BREACH: quality_score=${q} found in output (id=${row.id})`); } if (q === "partially_accepted" && !opts.include_partial) { errors.push(`SFT FIREWALL BREACH: partial leaked without --include-partial (id=${row.id})`); } } catch { /* malformed — separate concern */ } } } return { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: ctx.run_id, stage: "export-sft", timestamp: ctx.recorded_at, git_commit: gitCommit(ctx.root), inputs, outputs, stats: { accepted: r.records_exported, rejected: 0, quarantined: r.records_quarantined, skipped: 0, }, validation: { passed: errors.length === 0, errors, warnings: r.records_quarantined > 0 ? [r.quarantine_summary] : [], }, duration_ms: Date.now() - t0, }; } async function runExportPreference(ctx: StageRunCtx): Promise { const t0 = Date.now(); const r = await exportPreference({ root: ctx.root, recorded_at: ctx.recorded_at }); const scored_files = collectScoredRunFiles(ctx.root); const inputs = await buildIO(ctx.root, scored_files); const outputs = await buildIO(ctx.root, [ "exports/preference/chosen_rejected.jsonl", "exports/quarantine/preference.jsonl", ]); // Self-pair guard — re-verify on disk, fail loudly if found. const errors: string[] = []; const pref_out = resolve(ctx.root, "exports/preference/chosen_rejected.jsonl"); if (existsSync(pref_out)) { for (const line of readFileSync(pref_out, "utf8").split("\n")) { if (!line) continue; try { const row = JSON.parse(line); if (row.chosen_run_id === row.rejected_run_id) { errors.push(`PREFERENCE FIREWALL BREACH: self-pair found (id=${row.id})`); } if (row.chosen === row.rejected) { errors.push(`PREFERENCE FIREWALL BREACH: identical chosen/rejected text (id=${row.id})`); } } catch { } } } return { schema_version: STAGE_RECEIPT_SCHEMA_VERSION, run_id: ctx.run_id, stage: "export-preference", timestamp: ctx.recorded_at, git_commit: gitCommit(ctx.root), inputs, outputs, stats: { accepted: r.pairs_exported, rejected: 0, quarantined: r.records_quarantined, skipped: 0, }, validation: { passed: errors.length === 0, errors, warnings: [], }, duration_ms: Date.now() - t0, }; } function collectScoredRunFiles(root: string): string[] { const out: string[] = []; const dir = resolve(root, "data/scored-runs"); if (!existsSync(dir)) return out; for (const yyyy of readdirSync(dir).sort()) { const yp = resolve(dir, yyyy); if (!statSync(yp).isDirectory()) continue; for (const mm of readdirSync(yp).sort()) { const mp = resolve(yp, mm); if (!statSync(mp).isDirectory()) continue; for (const dd of readdirSync(mp).sort()) { const dp = resolve(mp, dd); if (!statSync(dp).isDirectory()) continue; for (const f of readdirSync(dp)) { if (f.endsWith(".jsonl")) out.push(resolve(dp, f).replace(root + "/", "")); } } } } return out; } // ─── Aggregate stages → RunSummary ──────────────────────────────── async function buildSummary(root: string, run_id: string, stages: StageReceipt[]): Promise { const stageSummaries: RunStageSummary[] = stages.map(s => ({ stage: s.stage, records_in: s.inputs.record_count, records_out: s.outputs.record_count, accepted: s.stats.accepted, rejected: s.stats.rejected, quarantined: s.stats.quarantined, skipped: s.stats.skipped, passed: s.validation.passed, duration_ms: s.duration_ms, output_hash: s.outputs.hash, })); const total_records_in = stageSummaries.reduce((a, s) => a + s.records_in, 0); const total_records_out = stageSummaries.reduce((a, s) => a + s.records_out, 0); const total_accepted = stageSummaries.reduce((a, s) => a + s.accepted, 0); const total_rejected = stageSummaries.reduce((a, s) => a + s.rejected, 0); const total_quarantined = stageSummaries.reduce((a, s) => a + s.quarantined, 0); const total_skipped = stageSummaries.reduce((a, s) => a + s.skipped, 0); const total_duration_ms = stageSummaries.reduce((a, s) => a + s.duration_ms, 0); const ragStage = stages.find(s => s.stage === "export-rag"); const sftStage = stages.find(s => s.stage === "export-sft"); const prefStage = stages.find(s => s.stage === "export-preference"); // run_hash = sha256 over each stage's output hash (sorted by stage name) const sortedHashes = stageSummaries .map(s => `${s.stage}|${s.output_hash}`) .sort(); const run_hash = sha256Of(sortedHashes.join("\n")); const overall_passed = stages.every(s => s.validation.passed); const started_at = stages.length > 0 ? stages[0].timestamp : new Date().toISOString(); const last = stages[stages.length - 1]; const ended_at = last ? new Date(new Date(last.timestamp).getTime() + last.duration_ms).toISOString() : started_at; const git_commit = stages.length > 0 ? stages[0].git_commit : "0".repeat(40); return { schema_version: RUN_SUMMARY_SCHEMA_VERSION, run_id, started_at, ended_at, git_commit, stages: stageSummaries, total_records_in, total_records_out, total_accepted, total_rejected, total_quarantined, total_skipped, rag_records: ragStage?.stats.accepted ?? 0, sft_records: sftStage?.stats.accepted ?? 0, preference_pairs: prefStage?.stats.accepted ?? 0, overall_passed, run_hash, total_duration_ms, }; } // ─── Drift detection ────────────────────────────────────────────── function findPriorRun(root: string, current_run_id: string): RunSummary | null { const root_dir = resolve(root, "reports/distillation"); if (!existsSync(root_dir)) return null; const candidates: Array<{ run_id: string; mtime: number; summary: RunSummary }> = []; for (const entry of readdirSync(root_dir)) { if (entry === current_run_id) continue; const sumPath = resolve(root_dir, entry, "summary.json"); if (!existsSync(sumPath)) continue; try { const summary = JSON.parse(readFileSync(sumPath, "utf8")) as RunSummary; candidates.push({ run_id: entry, mtime: statSync(sumPath).mtimeMs, summary, }); } catch { /* skip malformed */ } } if (candidates.length === 0) return null; candidates.sort((a, b) => b.mtime - a.mtime); return candidates[0].summary; } function pctChange(prior: number, current: number): number | null { if (prior === 0) return null; return (current - prior) / prior; } export function buildDrift(current: RunSummary, prior: RunSummary | null): DriftReport { const generated_at = new Date().toISOString(); if (!prior) { return { schema_version: DRIFT_REPORT_SCHEMA_VERSION, run_id: current.run_id, prior_run_id: null, generated_at, severity: "ok", stages: [], flags: ["no prior run on disk — first run, drift baseline established"], }; } const stagesByName = new Map(); for (const s of prior.stages) stagesByName.set(s.stage, s); const stageDrifts: StageDrift[] = []; const flags: string[] = []; let severity: DriftSeverity = "ok"; for (const cur of current.stages) { const pri = stagesByName.get(cur.stage); if (!pri) { flags.push(`new stage not in prior run: ${cur.stage}`); stageDrifts.push({ stage: cur.stage, delta_records_in: cur.records_in, delta_records_out: cur.records_out, delta_accepted: cur.accepted, delta_quarantined: cur.quarantined, pct_change_out: null, input_hash_match: null, // no prior stage to compare output_hash_match: false, deterministic_violation: false, notes: ["stage not present in prior run"], }); severity = "warn"; continue; } const pct = pctChange(pri.records_out, cur.records_out); const out_match = pri.output_hash === cur.output_hash; // input_hash is NOT materialized into stage summaries (lives on the // per-stage StageReceipt files on disk). We don't load them here, so // we honestly report null. Schema v2 makes this explicit; v1 returned // `true` unconditionally which made deterministic_violation always // false even when it should have alerted. Cross-run determinism // enforcement is its own pass — see ./scripts/distill audit-full. const notes: string[] = []; if (pct !== null && Math.abs(pct) > DRIFT_THRESHOLD_PCT) { const dir = pct > 0 ? "spike" : "drop"; notes.push(`${cur.stage} record_count ${dir} ${(pct * 100).toFixed(0)}%`); flags.push(`${cur.stage}: ${dir} ${(pct * 100).toFixed(0)}% in records_out`); if (severity === "ok") severity = "warn"; } const qPct = pctChange(pri.quarantined, cur.quarantined); if (qPct !== null && Math.abs(qPct) > DRIFT_THRESHOLD_PCT && pri.quarantined + cur.quarantined > 5) { const dir = qPct > 0 ? "spike" : "drop"; notes.push(`${cur.stage} quarantined ${dir} ${(qPct * 100).toFixed(0)}%`); flags.push(`${cur.stage}: quarantine ${dir} ${(qPct * 100).toFixed(0)}%`); if (severity === "ok") severity = "warn"; } if (!out_match) { notes.push("output_hash differs from prior run"); } stageDrifts.push({ stage: cur.stage, delta_records_in: cur.records_in - pri.records_in, delta_records_out: cur.records_out - pri.records_out, delta_accepted: cur.accepted - pri.accepted, delta_quarantined: cur.quarantined - pri.quarantined, pct_change_out: pct, input_hash_match: null, // not computed at this layer; see comment above output_hash_match: out_match, deterministic_violation: false, // requires input_hash match — null means "unknown", not "verified" notes, }); } if (current.run_hash !== prior.run_hash) { flags.push("run_hash differs from prior run (any stage output changed)"); } return { schema_version: DRIFT_REPORT_SCHEMA_VERSION, run_id: current.run_id, prior_run_id: prior.run_id, generated_at, severity, stages: stageDrifts, flags, }; } function renderSummaryMarkdown(summary: RunSummary, drift: DriftReport): string { const md: string[] = []; md.push(`# Distillation Run ${summary.run_id}`); md.push(""); md.push(`**Started:** ${summary.started_at}`); md.push(`**Duration:** ${(summary.total_duration_ms / 1000).toFixed(1)}s`); md.push(`**Git commit:** ${summary.git_commit}`); md.push(`**Overall passed:** ${summary.overall_passed ? "✓" : "✗"}`); md.push(`**Run hash:** \`${summary.run_hash.slice(0, 16)}...\``); md.push(""); md.push("## Aggregates"); md.push(""); md.push(`- Total records in: ${summary.total_records_in}`); md.push(`- Total records out: ${summary.total_records_out}`); md.push(`- Accepted: ${summary.total_accepted}`); md.push(`- Rejected: ${summary.total_rejected}`); md.push(`- Quarantined: ${summary.total_quarantined}`); md.push(`- Skipped: ${summary.total_skipped}`); md.push(""); md.push("## Dataset sizes"); md.push(""); md.push(`- RAG: ${summary.rag_records}`); md.push(`- SFT: ${summary.sft_records}`); md.push(`- Preference: ${summary.preference_pairs}`); md.push(""); md.push("## Per-stage breakdown"); md.push(""); md.push("| Stage | In | Out | Acc | Rej | Quar | Skip | Pass | Hash |"); md.push("|---|---|---|---|---|---|---|---|---|"); for (const s of summary.stages) { md.push(`| ${s.stage} | ${s.records_in} | ${s.records_out} | ${s.accepted} | ${s.rejected} | ${s.quarantined} | ${s.skipped} | ${s.passed ? "✓" : "✗"} | \`${s.output_hash.slice(0, 12)}\` |`); } md.push(""); md.push("## Drift vs prior run"); md.push(""); md.push(`**Severity:** ${drift.severity}`); if (drift.prior_run_id) md.push(`**Prior run:** ${drift.prior_run_id}`); if (drift.flags.length > 0) { md.push(""); md.push("Flags:"); for (const f of drift.flags) md.push(`- ${f}`); } else { md.push("No drift flags raised."); } md.push(""); md.push("## Anomalies & next action"); md.push(""); if (!summary.overall_passed) { md.push("**One or more stages failed validation.** Inspect per-stage receipts in this run dir."); } else if (drift.severity !== "ok") { md.push(`**Drift severity ${drift.severity}** — investigate flags above before assuming pipeline is stable.`); } else { md.push("Pipeline ran clean. No drift, no failures. Safe to continue."); } return md.join("\n"); } export interface RunAllOptions { root: string; recorded_at?: string; run_id?: string; include_partial?: boolean; include_review?: boolean; } export interface RunAllResult { run_id: string; summary: RunSummary; drift: DriftReport; receipts: StageReceipt[]; } export async function runAllWithReceipts(opts: RunAllOptions): Promise { const run_id = opts.run_id ?? randomUUID(); const recorded_at = opts.recorded_at ?? new Date().toISOString(); const ctx: StageRunCtx = { root: opts.root, run_id, recorded_at }; const stages: StageReceipt[] = []; // Stage 1: collect const r1 = await runCollect(ctx); writeReceipt(opts.root, run_id, r1); stages.push(r1); // Stage 2: score const r2 = await runScore(ctx); writeReceipt(opts.root, run_id, r2); stages.push(r2); // Stages 3-5: exports (parallel-safe but kept serial for clean tracing) const r3 = await runExportRag(ctx, { include_review: opts.include_review }); writeReceipt(opts.root, run_id, r3); stages.push(r3); const r4 = await runExportSft(ctx, { include_partial: opts.include_partial }); writeReceipt(opts.root, run_id, r4); stages.push(r4); const r5 = await runExportPreference(ctx); writeReceipt(opts.root, run_id, r5); stages.push(r5); // Aggregate + drift const summary = await buildSummary(opts.root, run_id, stages); const prior = findPriorRun(opts.root, run_id); const drift = buildDrift(summary, prior); // Self-validate aggregate artifacts before write — fail loud if shape drifts const sumV = validateRunSummary(summary); const drV = validateDriftReport(drift); const dir = resolve(opts.root, "reports/distillation", run_id); mkdirSync(dir, { recursive: true }); writeFileSync(resolve(dir, "summary.json"), JSON.stringify(summary, null, 2) + "\n"); writeFileSync(resolve(dir, "summary.md"), renderSummaryMarkdown(summary, drift)); writeFileSync(resolve(dir, "drift.json"), JSON.stringify(drift, null, 2) + "\n"); // Validate every receipt on disk against schema — defense in depth for (const r of stages) { const v = validateStageReceipt(r); if (!v.valid) { summary.overall_passed = false; throw new Error(`StageReceipt for ${r.stage} failed self-validation: ${v.errors.join("; ")}`); } } if (!sumV.valid) throw new Error(`RunSummary self-validation failed: ${sumV.errors.join("; ")}`); if (!drV.valid) throw new Error(`DriftReport self-validation failed: ${drV.errors.join("; ")}`); return { run_id, summary, drift, receipts: stages }; } // ─── CLI ────────────────────────────────────────────────────────── async function cli() { const cmd = process.argv[2]; const include_partial = process.argv.includes("--include-partial"); const include_review = process.argv.includes("--include-review"); switch (cmd) { case "run-all": { const r = await runAllWithReceipts({ root: DEFAULT_ROOT, include_partial, include_review, }); console.log(`[receipts] run_id=${r.run_id}`); console.log(`[receipts] overall_passed=${r.summary.overall_passed}`); console.log(`[receipts] datasets: rag=${r.summary.rag_records} sft=${r.summary.sft_records} pref=${r.summary.preference_pairs}`); console.log(`[receipts] drift severity=${r.drift.severity} (vs ${r.drift.prior_run_id ?? "no prior"})`); console.log(`[receipts] reports/distillation/${r.run_id}/summary.md`); if (!r.summary.overall_passed) process.exit(1); break; } case "read": { const idx = process.argv.indexOf("--run-id"); if (idx < 0 || !process.argv[idx + 1]) { console.error("usage: bun run scripts/distillation/receipts.ts read --run-id "); process.exit(2); } const run_id = process.argv[idx + 1]; const dir = resolve(DEFAULT_ROOT, "reports/distillation", run_id); if (!existsSync(dir)) { console.error(`run not found: ${dir}`); process.exit(2); } const summaryPath = resolve(dir, "summary.md"); if (existsSync(summaryPath)) console.log(readFileSync(summaryPath, "utf8")); else console.error(`no summary.md in ${dir}`); break; } default: console.error("usage: receipts.ts {run-all|read --run-id }"); process.exit(2); } } if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });