root d77622fc6b distillation: fix 7 grounding bugs found by Kimi audit
Kimi For Coding (api.kimi.com, kimi-for-coding) ran a forensic audit on
distillation v1.0.0 with full file content. 7/7 flags verified real on
grep. Substrate now matches what v1.0.0 claimed: deterministic, no
schema bypasses, Rust tests compile.

Fixes:
- mode.rs:1035,1042  matrix_corpus Some/None -> vec![..]/vec![]; cargo
                     check --tests now compiles (was silently broken;
                     only bun tests were running)
- scorer.ts:30       SCORER_VERSION env override removed - identical
                     input now produces identical version stamp, not
                     env-dependent drift
- transforms.ts:181  auto_apply wall-clock fallback (new Date()) ->
                     deterministic recorded_at fallback
- replay.ts:378      recorded_run_id Date.now() -> sha256(recorded_at);
                     replay rows now reproducible given recorded_at
- receipts.ts:454,495  input_hash_match hardcoded true was misleading
                       telemetry; bumped DRIFT_REPORT_SCHEMA_VERSION 1->2,
                       field is now boolean|null with honest null when
                       not computed at this layer
- score_runs.ts:89-100,159  dedup keyed only on sig_hash made
                            scorer-version bumps invisible. Composite
                            sig_hash:scorer_version forces re-scoring
- export_sft.ts:126  (ev as any).contractor bypass emitted "<contractor>"
                     placeholder for every contract_analyses SFT row.
                     Added typed EvidenceRecord.metadata bucket;
                     transforms.ts populates metadata.contractor;
                     exporter reads typed value

Verification (all green):
  cargo check -p gateway --tests   compiles
  bun test tests/distillation/     145 pass / 0 fail
  bun acceptance                   22/22 invariants
  bun audit-full                   16/16 required checks

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 05:34:31 -05:00

320 lines
12 KiB
TypeScript

// score_runs.ts — CLI + I/O around the pure scoreRecord function.
// Reads data/evidence/YYYY/MM/DD/*.jsonl, writes scored-runs at the
// matching partition. Mirrors build_evidence_index.ts conventions:
// idempotent, schema-gated, receipt-emitting.
import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, statSync, appendFileSync } from "node:fs";
import { resolve, dirname } from "node:path";
import { spawnSync } from "node:child_process";
import { buildScoredRun } from "./scorer";
import { validateEvidenceRecord, type EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
import { validateScoredRun } from "../../auditor/schemas/distillation/scored_run";
import { RECEIPT_SCHEMA_VERSION, validateReceipt, type Receipt, type FileReference } from "../../auditor/schemas/distillation/receipt";
import { canonicalSha256 } from "../../auditor/schemas/distillation/types";
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
export interface ScoreOptions {
root: string;
recorded_at: string;
dry_run?: boolean;
}
export interface ScoreSourceResult {
source_file: string;
rows_read: number;
rows_written: number;
rows_skipped: number;
rows_deduped: number;
by_category: Record<string, number>;
output_files: string[];
}
export interface ScoreResult {
sources: ScoreSourceResult[];
totals: {
rows_read: number;
rows_written: number;
rows_skipped: number;
rows_deduped: number;
by_category: Record<string, number>;
};
receipt: Receipt;
receipt_path: string;
scored_dir: string;
skips_path: string;
}
function listEvidenceFiles(evidence_root: string): string[] {
const out: string[] = [];
if (!existsSync(evidence_root)) return out;
for (const yyyy of readdirSync(evidence_root).sort()) {
const yp = resolve(evidence_root, yyyy);
if (!statSync(yp).isDirectory()) continue;
for (const mm of readdirSync(yp).sort()) {
const mp = resolve(yp, mm);
if (!statSync(mp).isDirectory()) continue;
for (const dd of readdirSync(mp).sort()) {
const dp = resolve(mp, dd);
if (!statSync(dp).isDirectory()) continue;
for (const f of readdirSync(dp)) {
if (f.endsWith(".jsonl")) out.push(resolve(dp, f));
}
}
}
}
return out;
}
function sha256OfFile(path: string): string {
const h = new Bun.CryptoHasher("sha256");
h.update(readFileSync(path));
return h.digest("hex");
}
function gitSha(root: string): string {
const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" });
return r.status === 0 ? r.stdout.trim() : "0".repeat(40);
}
function gitBranch(root: string): string | undefined {
const r = spawnSync("git", ["-C", root, "rev-parse", "--abbrev-ref", "HEAD"], { encoding: "utf8" });
return r.status === 0 ? r.stdout.trim() : undefined;
}
function gitDirty(root: string): boolean {
const r = spawnSync("git", ["-C", root, "status", "--porcelain"], { encoding: "utf8" });
return r.status === 0 && r.stdout.trim().length > 0;
}
// Composite dedup key — `sig_hash:scorer_version`. Keying on sig_hash
// alone made scorer-rule bumps invisible: a bumped SCORER_VERSION
// produced different scoring categories, but pre-existing rows on disk
// (with the OLD version) still matched the new sig_hash and the new
// scoring was silently skipped. Compositing version forces re-scoring
// when the version changes. Caller tags `scorer_version` on the
// ScoredRun row, which we read alongside sig_hash.
function dedupKey(sig_hash: string, scorer_version: string): string {
return `${sig_hash}:${scorer_version}`;
}
function loadSeenHashes(out_path: string): Set<string> {
const seen = new Set<string>();
if (!existsSync(out_path)) return seen;
for (const line of readFileSync(out_path, "utf8").split("\n")) {
if (!line) continue;
try {
const row = JSON.parse(line);
const sh = row?.provenance?.sig_hash;
const sv = row?.scorer_version;
if (sh && sv) seen.add(dedupKey(sh, sv));
} catch { /* malformed — ignore */ }
}
return seen;
}
async function processEvidenceFile(
ev_path: string,
opts: ScoreOptions,
scored_dir: string,
skips_path: string,
): Promise<ScoreSourceResult> {
// Output mirrors the input partition (YYYY/MM/DD/<source-stem>.jsonl)
const partition = ev_path.match(/data\/evidence\/(\d{4}\/\d{2}\/\d{2})\//)?.[1] ?? "unpartitioned";
const stem = ev_path.split("/").pop()!.replace(/\.jsonl$/, "");
const out_path = resolve(scored_dir, partition, `${stem}.jsonl`);
const out_relpath = `data/scored-runs/${partition}/${stem}.jsonl`;
const result: ScoreSourceResult = {
source_file: ev_path.replace(opts.root + "/", ""),
rows_read: 0,
rows_written: 0,
rows_skipped: 0,
rows_deduped: 0,
by_category: { accepted: 0, partially_accepted: 0, rejected: 0, needs_human_review: 0 },
output_files: [],
};
if (!opts.dry_run) mkdirSync(dirname(out_path), { recursive: true });
const seen = loadSeenHashes(out_path);
const lines = readFileSync(ev_path, "utf8").split("\n").filter(Boolean);
const rowsToWrite: string[] = [];
const skipsToWrite: string[] = [];
for (let i = 0; i < lines.length; i++) {
result.rows_read++;
let evRow: any;
try { evRow = JSON.parse(lines[i]); }
catch (e) {
result.rows_skipped++;
skipsToWrite.push(JSON.stringify({
evidence_file: result.source_file, line: i,
errors: ["evidence not JSON: " + (e as Error).message.slice(0, 200)],
recorded_at: opts.recorded_at,
}));
continue;
}
// Re-validate the evidence row before scoring — defensive; if a
// malformed row slipped past Phase 2 it shouldn't poison Phase 3.
const ev = validateEvidenceRecord(evRow);
if (!ev.valid) {
result.rows_skipped++;
skipsToWrite.push(JSON.stringify({
evidence_file: result.source_file, line: i,
errors: ev.errors,
recorded_at: opts.recorded_at,
}));
continue;
}
const scored = await buildScoredRun(ev.value as EvidenceRecord, out_relpath, i, opts.recorded_at);
const key = dedupKey(scored.provenance.sig_hash, scored.scorer_version);
if (seen.has(key)) {
result.rows_deduped++;
continue;
}
seen.add(key);
const sv = validateScoredRun(scored);
if (!sv.valid) {
result.rows_skipped++;
skipsToWrite.push(JSON.stringify({
evidence_file: result.source_file, line: i,
errors: sv.errors.map(e => "scored_run schema: " + e),
recorded_at: opts.recorded_at,
}));
continue;
}
rowsToWrite.push(JSON.stringify(sv.value));
result.rows_written++;
result.by_category[sv.value.category] = (result.by_category[sv.value.category] ?? 0) + 1;
}
if (!opts.dry_run) {
if (rowsToWrite.length > 0) {
appendFileSync(out_path, rowsToWrite.join("\n") + "\n");
result.output_files.push(out_path);
}
if (skipsToWrite.length > 0) {
mkdirSync(dirname(skips_path), { recursive: true });
appendFileSync(skips_path, skipsToWrite.join("\n") + "\n");
}
}
return result;
}
export async function scoreAll(opts: ScoreOptions): Promise<ScoreResult> {
const evidence_root = resolve(opts.root, "data/evidence");
const scored_dir = resolve(opts.root, "data/scored-runs");
const skips_path = resolve(opts.root, "data/_kb/scoring_skips.jsonl");
const reports_dir = resolve(opts.root, "reports/distillation");
const started_ms = Date.now();
const ev_files = listEvidenceFiles(evidence_root);
const sources: ScoreSourceResult[] = [];
for (const ev of ev_files) {
sources.push(await processEvidenceFile(ev, opts, scored_dir, skips_path));
}
const totals = sources.reduce((acc, s) => ({
rows_read: acc.rows_read + s.rows_read,
rows_written: acc.rows_written + s.rows_written,
rows_skipped: acc.rows_skipped + s.rows_skipped,
rows_deduped: acc.rows_deduped + s.rows_deduped,
by_category: {
accepted: (acc.by_category.accepted ?? 0) + (s.by_category.accepted ?? 0),
partially_accepted: (acc.by_category.partially_accepted ?? 0) + (s.by_category.partially_accepted ?? 0),
rejected: (acc.by_category.rejected ?? 0) + (s.by_category.rejected ?? 0),
needs_human_review: (acc.by_category.needs_human_review ?? 0) + (s.by_category.needs_human_review ?? 0),
},
}), { rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0, by_category: {} as Record<string, number> });
const ended_at = new Date().toISOString();
const duration_ms = Date.now() - started_ms;
const input_files: FileReference[] = ev_files.map(p => ({
path: p.replace(opts.root + "/", ""),
sha256: sha256OfFile(p),
bytes: statSync(p).size,
}));
const output_files: FileReference[] = [];
for (const s of sources) {
for (const out_path of s.output_files) {
try {
output_files.push({
path: out_path.replace(opts.root + "/", ""),
sha256: sha256OfFile(out_path),
bytes: statSync(out_path).size,
});
} catch { /* dry-run path */ }
}
}
const errors: string[] = [];
const warnings: string[] = [];
for (const s of sources) {
if (s.rows_skipped > 0) warnings.push(`${s.source_file}: ${s.rows_skipped} skipped`);
}
const receipt: Receipt = {
schema_version: RECEIPT_SCHEMA_VERSION,
command: "bun run scripts/distillation/score_runs.ts" + (opts.dry_run ? " --dry-run" : ""),
git_sha: gitSha(opts.root),
git_branch: gitBranch(opts.root),
git_dirty: gitDirty(opts.root),
started_at: opts.recorded_at,
ended_at,
duration_ms,
input_files,
output_files,
record_counts: {
in: totals.rows_read,
out: totals.rows_written,
skipped: totals.rows_skipped,
deduped: totals.rows_deduped,
cat_accepted: totals.by_category.accepted ?? 0,
cat_partially_accepted: totals.by_category.partially_accepted ?? 0,
cat_rejected: totals.by_category.rejected ?? 0,
cat_needs_human_review: totals.by_category.needs_human_review ?? 0,
},
validation_pass: totals.rows_skipped === 0,
errors,
warnings,
};
const rv = validateReceipt(receipt);
if (!rv.valid) {
receipt.errors.push(...rv.errors.map(e => "receipt schema: " + e));
receipt.validation_pass = false;
}
const stamp = ended_at.replace(/[:.]/g, "-");
const receipt_path = resolve(reports_dir, stamp, "receipt.json");
if (!opts.dry_run) {
mkdirSync(dirname(receipt_path), { recursive: true });
writeFileSync(receipt_path, JSON.stringify(receipt, null, 2) + "\n");
}
return { sources, totals, receipt, receipt_path, scored_dir, skips_path };
}
async function cli() {
const dry_run = process.argv.includes("--dry-run");
const recorded_at = new Date().toISOString();
const r = await scoreAll({ root: DEFAULT_ROOT, recorded_at, dry_run });
console.log(`[score_runs] ${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped · ${r.totals.rows_deduped} deduped${dry_run ? " (DRY RUN)" : ""}`);
console.log(`[score_runs] categories: accepted=${r.totals.by_category.accepted} partial=${r.totals.by_category.partially_accepted} rejected=${r.totals.by_category.rejected} needs_human=${r.totals.by_category.needs_human_review}`);
for (const s of r.sources) {
const c = s.by_category;
console.log(` ${s.source_file}: read=${s.rows_read} wrote=${s.rows_written} acc=${c.accepted ?? 0} part=${c.partially_accepted ?? 0} rej=${c.rejected ?? 0} hum=${c.needs_human_review ?? 0}`);
}
if (!dry_run) console.log(`[score_runs] receipt: ${r.receipt_path}`);
if (!r.receipt.validation_pass) process.exit(1);
}
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });