// score_runs.ts — CLI + I/O around the pure scoreRecord function. // Reads data/evidence/YYYY/MM/DD/*.jsonl, writes scored-runs at the // matching partition. Mirrors build_evidence_index.ts conventions: // idempotent, schema-gated, receipt-emitting. import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, statSync, appendFileSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { spawnSync } from "node:child_process"; import { buildScoredRun } from "./scorer"; import { validateEvidenceRecord, type EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record"; import { validateScoredRun } from "../../auditor/schemas/distillation/scored_run"; import { RECEIPT_SCHEMA_VERSION, validateReceipt, type Receipt, type FileReference } from "../../auditor/schemas/distillation/receipt"; import { canonicalSha256 } from "../../auditor/schemas/distillation/types"; const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; export interface ScoreOptions { root: string; recorded_at: string; dry_run?: boolean; } export interface ScoreSourceResult { source_file: string; rows_read: number; rows_written: number; rows_skipped: number; rows_deduped: number; by_category: Record; output_files: string[]; } export interface ScoreResult { sources: ScoreSourceResult[]; totals: { rows_read: number; rows_written: number; rows_skipped: number; rows_deduped: number; by_category: Record; }; receipt: Receipt; receipt_path: string; scored_dir: string; skips_path: string; } function listEvidenceFiles(evidence_root: string): string[] { const out: string[] = []; if (!existsSync(evidence_root)) return out; for (const yyyy of readdirSync(evidence_root).sort()) { const yp = resolve(evidence_root, yyyy); if (!statSync(yp).isDirectory()) continue; for (const mm of readdirSync(yp).sort()) { const mp = resolve(yp, mm); if (!statSync(mp).isDirectory()) continue; for (const dd of readdirSync(mp).sort()) { const dp = resolve(mp, dd); if (!statSync(dp).isDirectory()) continue; for (const f of readdirSync(dp)) { if (f.endsWith(".jsonl")) out.push(resolve(dp, f)); } } } } return out; } function sha256OfFile(path: string): string { const h = new Bun.CryptoHasher("sha256"); h.update(readFileSync(path)); return h.digest("hex"); } function gitSha(root: string): string { const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" }); return r.status === 0 ? r.stdout.trim() : "0".repeat(40); } function gitBranch(root: string): string | undefined { const r = spawnSync("git", ["-C", root, "rev-parse", "--abbrev-ref", "HEAD"], { encoding: "utf8" }); return r.status === 0 ? r.stdout.trim() : undefined; } function gitDirty(root: string): boolean { const r = spawnSync("git", ["-C", root, "status", "--porcelain"], { encoding: "utf8" }); return r.status === 0 && r.stdout.trim().length > 0; } function loadSeenHashes(out_path: string): Set { const seen = new Set(); if (!existsSync(out_path)) return seen; for (const line of readFileSync(out_path, "utf8").split("\n")) { if (!line) continue; try { const row = JSON.parse(line); if (row?.provenance?.sig_hash) seen.add(row.provenance.sig_hash); } catch { /* malformed — ignore */ } } return seen; } async function processEvidenceFile( ev_path: string, opts: ScoreOptions, scored_dir: string, skips_path: string, ): Promise { // Output mirrors the input partition (YYYY/MM/DD/.jsonl) const partition = ev_path.match(/data\/evidence\/(\d{4}\/\d{2}\/\d{2})\//)?.[1] ?? "unpartitioned"; const stem = ev_path.split("/").pop()!.replace(/\.jsonl$/, ""); const out_path = resolve(scored_dir, partition, `${stem}.jsonl`); const out_relpath = `data/scored-runs/${partition}/${stem}.jsonl`; const result: ScoreSourceResult = { source_file: ev_path.replace(opts.root + "/", ""), rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0, by_category: { accepted: 0, partially_accepted: 0, rejected: 0, needs_human_review: 0 }, output_files: [], }; if (!opts.dry_run) mkdirSync(dirname(out_path), { recursive: true }); const seen = loadSeenHashes(out_path); const lines = readFileSync(ev_path, "utf8").split("\n").filter(Boolean); const rowsToWrite: string[] = []; const skipsToWrite: string[] = []; for (let i = 0; i < lines.length; i++) { result.rows_read++; let evRow: any; try { evRow = JSON.parse(lines[i]); } catch (e) { result.rows_skipped++; skipsToWrite.push(JSON.stringify({ evidence_file: result.source_file, line: i, errors: ["evidence not JSON: " + (e as Error).message.slice(0, 200)], recorded_at: opts.recorded_at, })); continue; } // Re-validate the evidence row before scoring — defensive; if a // malformed row slipped past Phase 2 it shouldn't poison Phase 3. const ev = validateEvidenceRecord(evRow); if (!ev.valid) { result.rows_skipped++; skipsToWrite.push(JSON.stringify({ evidence_file: result.source_file, line: i, errors: ev.errors, recorded_at: opts.recorded_at, })); continue; } const scored = await buildScoredRun(ev.value as EvidenceRecord, out_relpath, i, opts.recorded_at); if (seen.has(scored.provenance.sig_hash)) { result.rows_deduped++; continue; } seen.add(scored.provenance.sig_hash); const sv = validateScoredRun(scored); if (!sv.valid) { result.rows_skipped++; skipsToWrite.push(JSON.stringify({ evidence_file: result.source_file, line: i, errors: sv.errors.map(e => "scored_run schema: " + e), recorded_at: opts.recorded_at, })); continue; } rowsToWrite.push(JSON.stringify(sv.value)); result.rows_written++; result.by_category[sv.value.category] = (result.by_category[sv.value.category] ?? 0) + 1; } if (!opts.dry_run) { if (rowsToWrite.length > 0) { appendFileSync(out_path, rowsToWrite.join("\n") + "\n"); result.output_files.push(out_path); } if (skipsToWrite.length > 0) { mkdirSync(dirname(skips_path), { recursive: true }); appendFileSync(skips_path, skipsToWrite.join("\n") + "\n"); } } return result; } export async function scoreAll(opts: ScoreOptions): Promise { const evidence_root = resolve(opts.root, "data/evidence"); const scored_dir = resolve(opts.root, "data/scored-runs"); const skips_path = resolve(opts.root, "data/_kb/scoring_skips.jsonl"); const reports_dir = resolve(opts.root, "reports/distillation"); const started_ms = Date.now(); const ev_files = listEvidenceFiles(evidence_root); const sources: ScoreSourceResult[] = []; for (const ev of ev_files) { sources.push(await processEvidenceFile(ev, opts, scored_dir, skips_path)); } const totals = sources.reduce((acc, s) => ({ rows_read: acc.rows_read + s.rows_read, rows_written: acc.rows_written + s.rows_written, rows_skipped: acc.rows_skipped + s.rows_skipped, rows_deduped: acc.rows_deduped + s.rows_deduped, by_category: { accepted: (acc.by_category.accepted ?? 0) + (s.by_category.accepted ?? 0), partially_accepted: (acc.by_category.partially_accepted ?? 0) + (s.by_category.partially_accepted ?? 0), rejected: (acc.by_category.rejected ?? 0) + (s.by_category.rejected ?? 0), needs_human_review: (acc.by_category.needs_human_review ?? 0) + (s.by_category.needs_human_review ?? 0), }, }), { rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0, by_category: {} as Record }); const ended_at = new Date().toISOString(); const duration_ms = Date.now() - started_ms; const input_files: FileReference[] = ev_files.map(p => ({ path: p.replace(opts.root + "/", ""), sha256: sha256OfFile(p), bytes: statSync(p).size, })); const output_files: FileReference[] = []; for (const s of sources) { for (const out_path of s.output_files) { try { output_files.push({ path: out_path.replace(opts.root + "/", ""), sha256: sha256OfFile(out_path), bytes: statSync(out_path).size, }); } catch { /* dry-run path */ } } } const errors: string[] = []; const warnings: string[] = []; for (const s of sources) { if (s.rows_skipped > 0) warnings.push(`${s.source_file}: ${s.rows_skipped} skipped`); } const receipt: Receipt = { schema_version: RECEIPT_SCHEMA_VERSION, command: "bun run scripts/distillation/score_runs.ts" + (opts.dry_run ? " --dry-run" : ""), git_sha: gitSha(opts.root), git_branch: gitBranch(opts.root), git_dirty: gitDirty(opts.root), started_at: opts.recorded_at, ended_at, duration_ms, input_files, output_files, record_counts: { in: totals.rows_read, out: totals.rows_written, skipped: totals.rows_skipped, deduped: totals.rows_deduped, cat_accepted: totals.by_category.accepted ?? 0, cat_partially_accepted: totals.by_category.partially_accepted ?? 0, cat_rejected: totals.by_category.rejected ?? 0, cat_needs_human_review: totals.by_category.needs_human_review ?? 0, }, validation_pass: totals.rows_skipped === 0, errors, warnings, }; const rv = validateReceipt(receipt); if (!rv.valid) { receipt.errors.push(...rv.errors.map(e => "receipt schema: " + e)); receipt.validation_pass = false; } const stamp = ended_at.replace(/[:.]/g, "-"); const receipt_path = resolve(reports_dir, stamp, "receipt.json"); if (!opts.dry_run) { mkdirSync(dirname(receipt_path), { recursive: true }); writeFileSync(receipt_path, JSON.stringify(receipt, null, 2) + "\n"); } return { sources, totals, receipt, receipt_path, scored_dir, skips_path }; } async function cli() { const dry_run = process.argv.includes("--dry-run"); const recorded_at = new Date().toISOString(); const r = await scoreAll({ root: DEFAULT_ROOT, recorded_at, dry_run }); console.log(`[score_runs] ${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped · ${r.totals.rows_deduped} deduped${dry_run ? " (DRY RUN)" : ""}`); console.log(`[score_runs] categories: accepted=${r.totals.by_category.accepted} partial=${r.totals.by_category.partially_accepted} rejected=${r.totals.by_category.rejected} needs_human=${r.totals.by_category.needs_human_review}`); for (const s of r.sources) { const c = s.by_category; console.log(` ${s.source_file}: read=${s.rows_read} wrote=${s.rows_written} acc=${c.accepted ?? 0} part=${c.partially_accepted ?? 0} rej=${c.rejected ?? 0} hum=${c.needs_human_review ?? 0}`); } if (!dry_run) console.log(`[score_runs] receipt: ${r.receipt_path}`); if (!r.receipt.validation_pass) process.exit(1); } if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });