Kimi For Coding (api.kimi.com, kimi-for-coding) ran a forensic audit on
distillation v1.0.0 with full file content. 7/7 flags verified real on
grep. Substrate now matches what v1.0.0 claimed: deterministic, no
schema bypasses, Rust tests compile.
Fixes:
- mode.rs:1035,1042 matrix_corpus Some/None -> vec![..]/vec![]; cargo
check --tests now compiles (was silently broken;
only bun tests were running)
- scorer.ts:30 SCORER_VERSION env override removed - identical
input now produces identical version stamp, not
env-dependent drift
- transforms.ts:181 auto_apply wall-clock fallback (new Date()) ->
deterministic recorded_at fallback
- replay.ts:378 recorded_run_id Date.now() -> sha256(recorded_at);
replay rows now reproducible given recorded_at
- receipts.ts:454,495 input_hash_match hardcoded true was misleading
telemetry; bumped DRIFT_REPORT_SCHEMA_VERSION 1->2,
field is now boolean|null with honest null when
not computed at this layer
- score_runs.ts:89-100,159 dedup keyed only on sig_hash made
scorer-version bumps invisible. Composite
sig_hash:scorer_version forces re-scoring
- export_sft.ts:126 (ev as any).contractor bypass emitted "<contractor>"
placeholder for every contract_analyses SFT row.
Added typed EvidenceRecord.metadata bucket;
transforms.ts populates metadata.contractor;
exporter reads typed value
Verification (all green):
cargo check -p gateway --tests compiles
bun test tests/distillation/ 145 pass / 0 fail
bun acceptance 22/22 invariants
bun audit-full 16/16 required checks
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
691 lines
25 KiB
TypeScript
691 lines
25 KiB
TypeScript
// receipts.ts — Phase 5 forensic harness wrapping every pipeline
|
|
// stage in a StageReceipt. Pure observability layer — does NOT change
|
|
// scoring, filtering, or schemas.
|
|
//
|
|
// USAGE
|
|
// bun run scripts/distillation/receipts.ts run-all
|
|
// bun run scripts/distillation/receipts.ts read --run-id <id>
|
|
//
|
|
// Output tree:
|
|
// reports/distillation/<run_id>/
|
|
// collect.json
|
|
// score.json
|
|
// export-rag.json
|
|
// export-sft.json
|
|
// export-preference.json
|
|
// summary.json
|
|
// summary.md
|
|
// drift.json (when prior run exists)
|
|
|
|
import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, statSync } from "node:fs";
|
|
import { resolve, dirname } from "node:path";
|
|
import { spawnSync } from "node:child_process";
|
|
import { randomUUID } from "node:crypto";
|
|
|
|
import { materializeAll } from "./build_evidence_index";
|
|
import { scoreAll } from "./score_runs";
|
|
import { exportRag } from "./export_rag";
|
|
import { exportSft } from "./export_sft";
|
|
import { exportPreference } from "./export_preference";
|
|
import { TRANSFORMS } from "./transforms";
|
|
|
|
import {
|
|
STAGE_RECEIPT_SCHEMA_VERSION, validateStageReceipt, aggregateIoHash,
|
|
type StageReceipt, type StageName, type StageFileRef, type StageIO, type StageStats,
|
|
} from "../../auditor/schemas/distillation/stage_receipt";
|
|
import {
|
|
RUN_SUMMARY_SCHEMA_VERSION, validateRunSummary,
|
|
type RunSummary, type RunStageSummary,
|
|
} from "../../auditor/schemas/distillation/run_summary";
|
|
import {
|
|
DRIFT_REPORT_SCHEMA_VERSION, DRIFT_THRESHOLD_PCT, validateDriftReport,
|
|
type DriftReport, type StageDrift, type DriftSeverity,
|
|
} from "../../auditor/schemas/distillation/drift_report";
|
|
|
|
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
|
|
|
|
function gitCommit(root: string): string {
|
|
const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" });
|
|
return r.status === 0 ? r.stdout.trim() : "0".repeat(40);
|
|
}
|
|
|
|
function sha256Of(buf: Buffer | string): string {
|
|
const h = new Bun.CryptoHasher("sha256");
|
|
h.update(buf);
|
|
return h.digest("hex");
|
|
}
|
|
|
|
function fileRef(root: string, abs_path: string): StageFileRef | null {
|
|
if (!existsSync(abs_path)) return null;
|
|
const buf = readFileSync(abs_path);
|
|
const lines = (buf.toString("utf8").match(/\n/g) ?? []).length;
|
|
return {
|
|
path: abs_path.replace(root + "/", ""),
|
|
sha256: sha256Of(buf),
|
|
bytes: statSync(abs_path).size,
|
|
record_count: lines,
|
|
};
|
|
}
|
|
|
|
function relPathToAbs(root: string, rel_or_abs: string): string {
|
|
return rel_or_abs.startsWith("/") ? rel_or_abs : resolve(root, rel_or_abs);
|
|
}
|
|
|
|
async function buildIO(root: string, paths: string[]): Promise<StageIO> {
|
|
const refs: StageFileRef[] = [];
|
|
let total_records = 0;
|
|
for (const p of paths) {
|
|
const abs = relPathToAbs(root, p);
|
|
const ref = fileRef(root, abs);
|
|
if (!ref) continue;
|
|
refs.push(ref);
|
|
total_records += ref.record_count ?? 0;
|
|
}
|
|
return {
|
|
files: refs,
|
|
record_count: total_records,
|
|
hash: await aggregateIoHash(refs),
|
|
};
|
|
}
|
|
|
|
interface StageRunCtx {
|
|
root: string;
|
|
run_id: string;
|
|
recorded_at: string;
|
|
}
|
|
|
|
function writeReceipt(root: string, run_id: string, receipt: StageReceipt) {
|
|
const dir = resolve(root, "reports/distillation", run_id);
|
|
mkdirSync(dir, { recursive: true });
|
|
writeFileSync(resolve(dir, `${receipt.stage}.json`), JSON.stringify(receipt, null, 2) + "\n");
|
|
}
|
|
|
|
// ─── Stage wrappers — call existing stage functions, build StageReceipt ──
|
|
|
|
async function runCollect(ctx: StageRunCtx): Promise<StageReceipt> {
|
|
const t0 = Date.now();
|
|
const r = await materializeAll({
|
|
root: ctx.root, transforms: TRANSFORMS, recorded_at: ctx.recorded_at,
|
|
});
|
|
|
|
const inputs = await buildIO(ctx.root, r.receipt.input_files.map(f => f.path));
|
|
const outputs = await buildIO(ctx.root, r.receipt.output_files.map(f => f.path));
|
|
|
|
return {
|
|
schema_version: STAGE_RECEIPT_SCHEMA_VERSION,
|
|
run_id: ctx.run_id,
|
|
stage: "collect",
|
|
timestamp: ctx.recorded_at,
|
|
git_commit: gitCommit(ctx.root),
|
|
inputs, outputs,
|
|
stats: {
|
|
accepted: r.totals.rows_written,
|
|
rejected: 0,
|
|
quarantined: 0, // collect doesn't quarantine — it skips with reasons
|
|
skipped: r.totals.rows_skipped,
|
|
},
|
|
validation: {
|
|
passed: r.totals.rows_skipped === 0,
|
|
errors: r.receipt.errors,
|
|
warnings: r.receipt.warnings,
|
|
},
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
}
|
|
|
|
async function runScore(ctx: StageRunCtx): Promise<StageReceipt> {
|
|
const t0 = Date.now();
|
|
const r = await scoreAll({ root: ctx.root, recorded_at: ctx.recorded_at });
|
|
|
|
const inputs = await buildIO(ctx.root, r.receipt.input_files.map(f => f.path));
|
|
const outputs = await buildIO(ctx.root, r.receipt.output_files.map(f => f.path));
|
|
|
|
return {
|
|
schema_version: STAGE_RECEIPT_SCHEMA_VERSION,
|
|
run_id: ctx.run_id,
|
|
stage: "score",
|
|
timestamp: ctx.recorded_at,
|
|
git_commit: gitCommit(ctx.root),
|
|
inputs, outputs,
|
|
stats: {
|
|
accepted: r.totals.by_category.accepted ?? 0,
|
|
rejected: r.totals.by_category.rejected ?? 0,
|
|
// Score's "quarantined" surfaces the partial+human review pool —
|
|
// they're not exported, but they're also not REJECTED. Keeping
|
|
// them in `quarantined` so summary's contamination math stays
|
|
// honest: anything not "accepted" or "rejected" is non-shipping.
|
|
quarantined: (r.totals.by_category.partially_accepted ?? 0)
|
|
+ (r.totals.by_category.needs_human_review ?? 0),
|
|
skipped: r.totals.rows_skipped,
|
|
},
|
|
validation: {
|
|
passed: r.receipt.validation_pass,
|
|
errors: r.receipt.errors,
|
|
warnings: r.receipt.warnings,
|
|
},
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
}
|
|
|
|
async function runExportRag(ctx: StageRunCtx, opts: { include_review?: boolean }): Promise<StageReceipt> {
|
|
const t0 = Date.now();
|
|
const r = await exportRag({
|
|
root: ctx.root, recorded_at: ctx.recorded_at, include_review: opts.include_review,
|
|
});
|
|
|
|
// Collect input files from the scored-runs tree explicitly so
|
|
// hash + record count match the stage's actual inputs.
|
|
const scored_files = collectScoredRunFiles(ctx.root);
|
|
const inputs = await buildIO(ctx.root, scored_files);
|
|
const outputs = await buildIO(ctx.root, [
|
|
"exports/rag/playbooks.jsonl",
|
|
"exports/quarantine/rag.jsonl",
|
|
]);
|
|
|
|
return {
|
|
schema_version: STAGE_RECEIPT_SCHEMA_VERSION,
|
|
run_id: ctx.run_id,
|
|
stage: "export-rag",
|
|
timestamp: ctx.recorded_at,
|
|
git_commit: gitCommit(ctx.root),
|
|
inputs, outputs,
|
|
stats: {
|
|
accepted: r.records_exported,
|
|
rejected: 0,
|
|
quarantined: r.records_quarantined,
|
|
skipped: 0,
|
|
},
|
|
validation: {
|
|
passed: true, // RAG has no hard fail — quarantine is expected
|
|
errors: [],
|
|
warnings: r.records_quarantined > 0 ? [r.quarantine_summary] : [],
|
|
},
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
}
|
|
|
|
async function runExportSft(ctx: StageRunCtx, opts: { include_partial?: boolean }): Promise<StageReceipt> {
|
|
const t0 = Date.now();
|
|
const r = await exportSft({
|
|
root: ctx.root, recorded_at: ctx.recorded_at, include_partial: opts.include_partial,
|
|
});
|
|
|
|
const scored_files = collectScoredRunFiles(ctx.root);
|
|
const inputs = await buildIO(ctx.root, scored_files);
|
|
const outputs = await buildIO(ctx.root, [
|
|
"exports/sft/instruction_response.jsonl",
|
|
"exports/quarantine/sft.jsonl",
|
|
]);
|
|
|
|
// Verify the contamination firewall held — re-read the SFT output and
|
|
// confirm every quality_score value is in the allowed set. If ANY
|
|
// forbidden value slipped through, validation FAILS LOUDLY.
|
|
const errors: string[] = [];
|
|
const sft_out = resolve(ctx.root, "exports/sft/instruction_response.jsonl");
|
|
if (existsSync(sft_out)) {
|
|
for (const line of readFileSync(sft_out, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try {
|
|
const row = JSON.parse(line);
|
|
const q = row.quality_score;
|
|
if (q !== "accepted" && q !== "partially_accepted") {
|
|
errors.push(`SFT FIREWALL BREACH: quality_score=${q} found in output (id=${row.id})`);
|
|
}
|
|
if (q === "partially_accepted" && !opts.include_partial) {
|
|
errors.push(`SFT FIREWALL BREACH: partial leaked without --include-partial (id=${row.id})`);
|
|
}
|
|
} catch { /* malformed — separate concern */ }
|
|
}
|
|
}
|
|
|
|
return {
|
|
schema_version: STAGE_RECEIPT_SCHEMA_VERSION,
|
|
run_id: ctx.run_id,
|
|
stage: "export-sft",
|
|
timestamp: ctx.recorded_at,
|
|
git_commit: gitCommit(ctx.root),
|
|
inputs, outputs,
|
|
stats: {
|
|
accepted: r.records_exported,
|
|
rejected: 0,
|
|
quarantined: r.records_quarantined,
|
|
skipped: 0,
|
|
},
|
|
validation: {
|
|
passed: errors.length === 0,
|
|
errors,
|
|
warnings: r.records_quarantined > 0 ? [r.quarantine_summary] : [],
|
|
},
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
}
|
|
|
|
async function runExportPreference(ctx: StageRunCtx): Promise<StageReceipt> {
|
|
const t0 = Date.now();
|
|
const r = await exportPreference({ root: ctx.root, recorded_at: ctx.recorded_at });
|
|
|
|
const scored_files = collectScoredRunFiles(ctx.root);
|
|
const inputs = await buildIO(ctx.root, scored_files);
|
|
const outputs = await buildIO(ctx.root, [
|
|
"exports/preference/chosen_rejected.jsonl",
|
|
"exports/quarantine/preference.jsonl",
|
|
]);
|
|
|
|
// Self-pair guard — re-verify on disk, fail loudly if found.
|
|
const errors: string[] = [];
|
|
const pref_out = resolve(ctx.root, "exports/preference/chosen_rejected.jsonl");
|
|
if (existsSync(pref_out)) {
|
|
for (const line of readFileSync(pref_out, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try {
|
|
const row = JSON.parse(line);
|
|
if (row.chosen_run_id === row.rejected_run_id) {
|
|
errors.push(`PREFERENCE FIREWALL BREACH: self-pair found (id=${row.id})`);
|
|
}
|
|
if (row.chosen === row.rejected) {
|
|
errors.push(`PREFERENCE FIREWALL BREACH: identical chosen/rejected text (id=${row.id})`);
|
|
}
|
|
} catch { }
|
|
}
|
|
}
|
|
|
|
return {
|
|
schema_version: STAGE_RECEIPT_SCHEMA_VERSION,
|
|
run_id: ctx.run_id,
|
|
stage: "export-preference",
|
|
timestamp: ctx.recorded_at,
|
|
git_commit: gitCommit(ctx.root),
|
|
inputs, outputs,
|
|
stats: {
|
|
accepted: r.pairs_exported,
|
|
rejected: 0,
|
|
quarantined: r.records_quarantined,
|
|
skipped: 0,
|
|
},
|
|
validation: {
|
|
passed: errors.length === 0,
|
|
errors,
|
|
warnings: [],
|
|
},
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
}
|
|
|
|
function collectScoredRunFiles(root: string): string[] {
|
|
const out: string[] = [];
|
|
const dir = resolve(root, "data/scored-runs");
|
|
if (!existsSync(dir)) return out;
|
|
for (const yyyy of readdirSync(dir).sort()) {
|
|
const yp = resolve(dir, yyyy);
|
|
if (!statSync(yp).isDirectory()) continue;
|
|
for (const mm of readdirSync(yp).sort()) {
|
|
const mp = resolve(yp, mm);
|
|
if (!statSync(mp).isDirectory()) continue;
|
|
for (const dd of readdirSync(mp).sort()) {
|
|
const dp = resolve(mp, dd);
|
|
if (!statSync(dp).isDirectory()) continue;
|
|
for (const f of readdirSync(dp)) {
|
|
if (f.endsWith(".jsonl")) out.push(resolve(dp, f).replace(root + "/", ""));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// ─── Aggregate stages → RunSummary ────────────────────────────────
|
|
|
|
async function buildSummary(root: string, run_id: string, stages: StageReceipt[]): Promise<RunSummary> {
|
|
const stageSummaries: RunStageSummary[] = stages.map(s => ({
|
|
stage: s.stage,
|
|
records_in: s.inputs.record_count,
|
|
records_out: s.outputs.record_count,
|
|
accepted: s.stats.accepted,
|
|
rejected: s.stats.rejected,
|
|
quarantined: s.stats.quarantined,
|
|
skipped: s.stats.skipped,
|
|
passed: s.validation.passed,
|
|
duration_ms: s.duration_ms,
|
|
output_hash: s.outputs.hash,
|
|
}));
|
|
|
|
const total_records_in = stageSummaries.reduce((a, s) => a + s.records_in, 0);
|
|
const total_records_out = stageSummaries.reduce((a, s) => a + s.records_out, 0);
|
|
const total_accepted = stageSummaries.reduce((a, s) => a + s.accepted, 0);
|
|
const total_rejected = stageSummaries.reduce((a, s) => a + s.rejected, 0);
|
|
const total_quarantined = stageSummaries.reduce((a, s) => a + s.quarantined, 0);
|
|
const total_skipped = stageSummaries.reduce((a, s) => a + s.skipped, 0);
|
|
const total_duration_ms = stageSummaries.reduce((a, s) => a + s.duration_ms, 0);
|
|
|
|
const ragStage = stages.find(s => s.stage === "export-rag");
|
|
const sftStage = stages.find(s => s.stage === "export-sft");
|
|
const prefStage = stages.find(s => s.stage === "export-preference");
|
|
|
|
// run_hash = sha256 over each stage's output hash (sorted by stage name)
|
|
const sortedHashes = stageSummaries
|
|
.map(s => `${s.stage}|${s.output_hash}`)
|
|
.sort();
|
|
const run_hash = sha256Of(sortedHashes.join("\n"));
|
|
|
|
const overall_passed = stages.every(s => s.validation.passed);
|
|
const started_at = stages.length > 0 ? stages[0].timestamp : new Date().toISOString();
|
|
const last = stages[stages.length - 1];
|
|
const ended_at = last ? new Date(new Date(last.timestamp).getTime() + last.duration_ms).toISOString() : started_at;
|
|
const git_commit = stages.length > 0 ? stages[0].git_commit : "0".repeat(40);
|
|
|
|
return {
|
|
schema_version: RUN_SUMMARY_SCHEMA_VERSION,
|
|
run_id,
|
|
started_at, ended_at,
|
|
git_commit,
|
|
stages: stageSummaries,
|
|
total_records_in, total_records_out,
|
|
total_accepted, total_rejected, total_quarantined, total_skipped,
|
|
rag_records: ragStage?.stats.accepted ?? 0,
|
|
sft_records: sftStage?.stats.accepted ?? 0,
|
|
preference_pairs: prefStage?.stats.accepted ?? 0,
|
|
overall_passed,
|
|
run_hash,
|
|
total_duration_ms,
|
|
};
|
|
}
|
|
|
|
// ─── Drift detection ──────────────────────────────────────────────
|
|
|
|
function findPriorRun(root: string, current_run_id: string): RunSummary | null {
|
|
const root_dir = resolve(root, "reports/distillation");
|
|
if (!existsSync(root_dir)) return null;
|
|
const candidates: Array<{ run_id: string; mtime: number; summary: RunSummary }> = [];
|
|
for (const entry of readdirSync(root_dir)) {
|
|
if (entry === current_run_id) continue;
|
|
const sumPath = resolve(root_dir, entry, "summary.json");
|
|
if (!existsSync(sumPath)) continue;
|
|
try {
|
|
const summary = JSON.parse(readFileSync(sumPath, "utf8")) as RunSummary;
|
|
candidates.push({
|
|
run_id: entry,
|
|
mtime: statSync(sumPath).mtimeMs,
|
|
summary,
|
|
});
|
|
} catch { /* skip malformed */ }
|
|
}
|
|
if (candidates.length === 0) return null;
|
|
candidates.sort((a, b) => b.mtime - a.mtime);
|
|
return candidates[0].summary;
|
|
}
|
|
|
|
function pctChange(prior: number, current: number): number | null {
|
|
if (prior === 0) return null;
|
|
return (current - prior) / prior;
|
|
}
|
|
|
|
export function buildDrift(current: RunSummary, prior: RunSummary | null): DriftReport {
|
|
const generated_at = new Date().toISOString();
|
|
if (!prior) {
|
|
return {
|
|
schema_version: DRIFT_REPORT_SCHEMA_VERSION,
|
|
run_id: current.run_id,
|
|
prior_run_id: null,
|
|
generated_at,
|
|
severity: "ok",
|
|
stages: [],
|
|
flags: ["no prior run on disk — first run, drift baseline established"],
|
|
};
|
|
}
|
|
|
|
const stagesByName = new Map<string, RunStageSummary>();
|
|
for (const s of prior.stages) stagesByName.set(s.stage, s);
|
|
|
|
const stageDrifts: StageDrift[] = [];
|
|
const flags: string[] = [];
|
|
let severity: DriftSeverity = "ok";
|
|
|
|
for (const cur of current.stages) {
|
|
const pri = stagesByName.get(cur.stage);
|
|
if (!pri) {
|
|
flags.push(`new stage not in prior run: ${cur.stage}`);
|
|
stageDrifts.push({
|
|
stage: cur.stage,
|
|
delta_records_in: cur.records_in,
|
|
delta_records_out: cur.records_out,
|
|
delta_accepted: cur.accepted,
|
|
delta_quarantined: cur.quarantined,
|
|
pct_change_out: null,
|
|
input_hash_match: null, // no prior stage to compare
|
|
output_hash_match: false,
|
|
deterministic_violation: false,
|
|
notes: ["stage not present in prior run"],
|
|
});
|
|
severity = "warn";
|
|
continue;
|
|
}
|
|
const pct = pctChange(pri.records_out, cur.records_out);
|
|
const out_match = pri.output_hash === cur.output_hash;
|
|
// input_hash is NOT materialized into stage summaries (lives on the
|
|
// per-stage StageReceipt files on disk). We don't load them here, so
|
|
// we honestly report null. Schema v2 makes this explicit; v1 returned
|
|
// `true` unconditionally which made deterministic_violation always
|
|
// false even when it should have alerted. Cross-run determinism
|
|
// enforcement is its own pass — see ./scripts/distill audit-full.
|
|
const notes: string[] = [];
|
|
if (pct !== null && Math.abs(pct) > DRIFT_THRESHOLD_PCT) {
|
|
const dir = pct > 0 ? "spike" : "drop";
|
|
notes.push(`${cur.stage} record_count ${dir} ${(pct * 100).toFixed(0)}%`);
|
|
flags.push(`${cur.stage}: ${dir} ${(pct * 100).toFixed(0)}% in records_out`);
|
|
if (severity === "ok") severity = "warn";
|
|
}
|
|
const qPct = pctChange(pri.quarantined, cur.quarantined);
|
|
if (qPct !== null && Math.abs(qPct) > DRIFT_THRESHOLD_PCT && pri.quarantined + cur.quarantined > 5) {
|
|
const dir = qPct > 0 ? "spike" : "drop";
|
|
notes.push(`${cur.stage} quarantined ${dir} ${(qPct * 100).toFixed(0)}%`);
|
|
flags.push(`${cur.stage}: quarantine ${dir} ${(qPct * 100).toFixed(0)}%`);
|
|
if (severity === "ok") severity = "warn";
|
|
}
|
|
if (!out_match) {
|
|
notes.push("output_hash differs from prior run");
|
|
}
|
|
|
|
stageDrifts.push({
|
|
stage: cur.stage,
|
|
delta_records_in: cur.records_in - pri.records_in,
|
|
delta_records_out: cur.records_out - pri.records_out,
|
|
delta_accepted: cur.accepted - pri.accepted,
|
|
delta_quarantined: cur.quarantined - pri.quarantined,
|
|
pct_change_out: pct,
|
|
input_hash_match: null, // not computed at this layer; see comment above
|
|
output_hash_match: out_match,
|
|
deterministic_violation: false, // requires input_hash match — null means "unknown", not "verified"
|
|
notes,
|
|
});
|
|
}
|
|
|
|
if (current.run_hash !== prior.run_hash) {
|
|
flags.push("run_hash differs from prior run (any stage output changed)");
|
|
}
|
|
|
|
return {
|
|
schema_version: DRIFT_REPORT_SCHEMA_VERSION,
|
|
run_id: current.run_id,
|
|
prior_run_id: prior.run_id,
|
|
generated_at,
|
|
severity,
|
|
stages: stageDrifts,
|
|
flags,
|
|
};
|
|
}
|
|
|
|
function renderSummaryMarkdown(summary: RunSummary, drift: DriftReport): string {
|
|
const md: string[] = [];
|
|
md.push(`# Distillation Run ${summary.run_id}`);
|
|
md.push("");
|
|
md.push(`**Started:** ${summary.started_at}`);
|
|
md.push(`**Duration:** ${(summary.total_duration_ms / 1000).toFixed(1)}s`);
|
|
md.push(`**Git commit:** ${summary.git_commit}`);
|
|
md.push(`**Overall passed:** ${summary.overall_passed ? "✓" : "✗"}`);
|
|
md.push(`**Run hash:** \`${summary.run_hash.slice(0, 16)}...\``);
|
|
md.push("");
|
|
md.push("## Aggregates");
|
|
md.push("");
|
|
md.push(`- Total records in: ${summary.total_records_in}`);
|
|
md.push(`- Total records out: ${summary.total_records_out}`);
|
|
md.push(`- Accepted: ${summary.total_accepted}`);
|
|
md.push(`- Rejected: ${summary.total_rejected}`);
|
|
md.push(`- Quarantined: ${summary.total_quarantined}`);
|
|
md.push(`- Skipped: ${summary.total_skipped}`);
|
|
md.push("");
|
|
md.push("## Dataset sizes");
|
|
md.push("");
|
|
md.push(`- RAG: ${summary.rag_records}`);
|
|
md.push(`- SFT: ${summary.sft_records}`);
|
|
md.push(`- Preference: ${summary.preference_pairs}`);
|
|
md.push("");
|
|
md.push("## Per-stage breakdown");
|
|
md.push("");
|
|
md.push("| Stage | In | Out | Acc | Rej | Quar | Skip | Pass | Hash |");
|
|
md.push("|---|---|---|---|---|---|---|---|---|");
|
|
for (const s of summary.stages) {
|
|
md.push(`| ${s.stage} | ${s.records_in} | ${s.records_out} | ${s.accepted} | ${s.rejected} | ${s.quarantined} | ${s.skipped} | ${s.passed ? "✓" : "✗"} | \`${s.output_hash.slice(0, 12)}\` |`);
|
|
}
|
|
md.push("");
|
|
md.push("## Drift vs prior run");
|
|
md.push("");
|
|
md.push(`**Severity:** ${drift.severity}`);
|
|
if (drift.prior_run_id) md.push(`**Prior run:** ${drift.prior_run_id}`);
|
|
if (drift.flags.length > 0) {
|
|
md.push("");
|
|
md.push("Flags:");
|
|
for (const f of drift.flags) md.push(`- ${f}`);
|
|
} else {
|
|
md.push("No drift flags raised.");
|
|
}
|
|
md.push("");
|
|
md.push("## Anomalies & next action");
|
|
md.push("");
|
|
if (!summary.overall_passed) {
|
|
md.push("**One or more stages failed validation.** Inspect per-stage receipts in this run dir.");
|
|
} else if (drift.severity !== "ok") {
|
|
md.push(`**Drift severity ${drift.severity}** — investigate flags above before assuming pipeline is stable.`);
|
|
} else {
|
|
md.push("Pipeline ran clean. No drift, no failures. Safe to continue.");
|
|
}
|
|
return md.join("\n");
|
|
}
|
|
|
|
export interface RunAllOptions {
|
|
root: string;
|
|
recorded_at?: string;
|
|
run_id?: string;
|
|
include_partial?: boolean;
|
|
include_review?: boolean;
|
|
}
|
|
|
|
export interface RunAllResult {
|
|
run_id: string;
|
|
summary: RunSummary;
|
|
drift: DriftReport;
|
|
receipts: StageReceipt[];
|
|
}
|
|
|
|
export async function runAllWithReceipts(opts: RunAllOptions): Promise<RunAllResult> {
|
|
const run_id = opts.run_id ?? randomUUID();
|
|
const recorded_at = opts.recorded_at ?? new Date().toISOString();
|
|
const ctx: StageRunCtx = { root: opts.root, run_id, recorded_at };
|
|
const stages: StageReceipt[] = [];
|
|
|
|
// Stage 1: collect
|
|
const r1 = await runCollect(ctx);
|
|
writeReceipt(opts.root, run_id, r1);
|
|
stages.push(r1);
|
|
|
|
// Stage 2: score
|
|
const r2 = await runScore(ctx);
|
|
writeReceipt(opts.root, run_id, r2);
|
|
stages.push(r2);
|
|
|
|
// Stages 3-5: exports (parallel-safe but kept serial for clean tracing)
|
|
const r3 = await runExportRag(ctx, { include_review: opts.include_review });
|
|
writeReceipt(opts.root, run_id, r3);
|
|
stages.push(r3);
|
|
|
|
const r4 = await runExportSft(ctx, { include_partial: opts.include_partial });
|
|
writeReceipt(opts.root, run_id, r4);
|
|
stages.push(r4);
|
|
|
|
const r5 = await runExportPreference(ctx);
|
|
writeReceipt(opts.root, run_id, r5);
|
|
stages.push(r5);
|
|
|
|
// Aggregate + drift
|
|
const summary = await buildSummary(opts.root, run_id, stages);
|
|
const prior = findPriorRun(opts.root, run_id);
|
|
const drift = buildDrift(summary, prior);
|
|
|
|
// Self-validate aggregate artifacts before write — fail loud if shape drifts
|
|
const sumV = validateRunSummary(summary);
|
|
const drV = validateDriftReport(drift);
|
|
|
|
const dir = resolve(opts.root, "reports/distillation", run_id);
|
|
mkdirSync(dir, { recursive: true });
|
|
writeFileSync(resolve(dir, "summary.json"), JSON.stringify(summary, null, 2) + "\n");
|
|
writeFileSync(resolve(dir, "summary.md"), renderSummaryMarkdown(summary, drift));
|
|
writeFileSync(resolve(dir, "drift.json"), JSON.stringify(drift, null, 2) + "\n");
|
|
|
|
// Validate every receipt on disk against schema — defense in depth
|
|
for (const r of stages) {
|
|
const v = validateStageReceipt(r);
|
|
if (!v.valid) {
|
|
summary.overall_passed = false;
|
|
throw new Error(`StageReceipt for ${r.stage} failed self-validation: ${v.errors.join("; ")}`);
|
|
}
|
|
}
|
|
if (!sumV.valid) throw new Error(`RunSummary self-validation failed: ${sumV.errors.join("; ")}`);
|
|
if (!drV.valid) throw new Error(`DriftReport self-validation failed: ${drV.errors.join("; ")}`);
|
|
|
|
return { run_id, summary, drift, receipts: stages };
|
|
}
|
|
|
|
// ─── CLI ──────────────────────────────────────────────────────────
|
|
|
|
async function cli() {
|
|
const cmd = process.argv[2];
|
|
const include_partial = process.argv.includes("--include-partial");
|
|
const include_review = process.argv.includes("--include-review");
|
|
|
|
switch (cmd) {
|
|
case "run-all": {
|
|
const r = await runAllWithReceipts({
|
|
root: DEFAULT_ROOT, include_partial, include_review,
|
|
});
|
|
console.log(`[receipts] run_id=${r.run_id}`);
|
|
console.log(`[receipts] overall_passed=${r.summary.overall_passed}`);
|
|
console.log(`[receipts] datasets: rag=${r.summary.rag_records} sft=${r.summary.sft_records} pref=${r.summary.preference_pairs}`);
|
|
console.log(`[receipts] drift severity=${r.drift.severity} (vs ${r.drift.prior_run_id ?? "no prior"})`);
|
|
console.log(`[receipts] reports/distillation/${r.run_id}/summary.md`);
|
|
if (!r.summary.overall_passed) process.exit(1);
|
|
break;
|
|
}
|
|
case "read": {
|
|
const idx = process.argv.indexOf("--run-id");
|
|
if (idx < 0 || !process.argv[idx + 1]) {
|
|
console.error("usage: bun run scripts/distillation/receipts.ts read --run-id <id>");
|
|
process.exit(2);
|
|
}
|
|
const run_id = process.argv[idx + 1];
|
|
const dir = resolve(DEFAULT_ROOT, "reports/distillation", run_id);
|
|
if (!existsSync(dir)) {
|
|
console.error(`run not found: ${dir}`);
|
|
process.exit(2);
|
|
}
|
|
const summaryPath = resolve(dir, "summary.md");
|
|
if (existsSync(summaryPath)) console.log(readFileSync(summaryPath, "utf8"));
|
|
else console.error(`no summary.md in ${dir}`);
|
|
break;
|
|
}
|
|
default:
|
|
console.error("usage: receipts.ts {run-all|read --run-id <id>}");
|
|
process.exit(2);
|
|
}
|
|
}
|
|
|
|
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });
|