// build_evidence_index.ts — materialize EvidenceRecord rows from // source JSONL streams. Pure mechanical view: every output row traces // to a single source row via provenance.{source_file, line_offset, // sig_hash}. // // USAGE // bun run scripts/distillation/build_evidence_index.ts # materialize, write outputs // bun run scripts/distillation/build_evidence_index.ts --dry-run # count + report, no writes // LH_DISTILL_ROOT=/path bun run ... # override repo root (tests) // // OUTPUTS // data/evidence/YYYY/MM/DD/.jsonl valid records // data/_kb/distillation_skips.jsonl rows that failed validation (append) // reports/distillation//receipt.json per-run audit (Receipt schema) // // IDEMPOTENCY // sig_hash = canonicalSha256(orderedKeys(source_row)). Re-running // loads existing day-partition output files into a seen set, so // already-materialized rows are skipped (not duplicated). Bit-stable // on identical input. // // NON-NEGOTIABLES (per spec + recon) // - ZERO model calls — deterministic only // - Provenance on every emitted row (the schema validator enforces it) // - Empty/missing source files do not error — they're tallied as // "rows_present: false" in the receipt // - Validator rejection ≠ runtime error — invalid rows go to skips // with their full error list, materialization continues import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, appendFileSync, statSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { spawnSync } from "node:child_process"; import { TRANSFORMS, canonicalSha256, type TransformDef } from "./transforms"; import { validateEvidenceRecord } from "../../auditor/schemas/distillation/evidence_record"; import { RECEIPT_SCHEMA_VERSION, validateReceipt, type Receipt, type FileReference } from "../../auditor/schemas/distillation/receipt"; const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; export interface MaterializeOptions { root: string; // repo root — source jsonls + outputs are relative to this transforms: TransformDef[]; // override for tests recorded_at: string; // ISO 8601 — fixed for the run, used in provenance dry_run?: boolean; // count but don't write } export interface SourceResult { source_file_relpath: string; rows_present: boolean; rows_read: number; rows_written: number; rows_skipped: number; rows_deduped: number; output_files: string[]; } export interface MaterializeResult { sources: SourceResult[]; totals: { rows_read: number; rows_written: number; rows_skipped: number; rows_deduped: number; }; receipt: Receipt; receipt_path: string; evidence_dir: string; skips_path: string; } const ISO_DATE_PARTITION = (iso: string): string => { const d = new Date(iso); return `${d.getUTCFullYear()}/${String(d.getUTCMonth() + 1).padStart(2, "0")}/${String(d.getUTCDate()).padStart(2, "0")}`; }; const OUTPUT_STEM = (source_file_relpath: string): string => { const m = source_file_relpath.match(/([^/]+)\.jsonl$/); return m ? m[1] : source_file_relpath.replace(/[^a-z0-9_]/gi, "_"); }; function sha256OfFile(path: string): string { const hasher = new Bun.CryptoHasher("sha256"); hasher.update(readFileSync(path)); return hasher.digest("hex"); } function getGitSha(root: string): string { const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" }); if (r.status !== 0) return "0".repeat(40); return r.stdout.trim(); } function getGitBranch(root: string): string | undefined { const r = spawnSync("git", ["-C", root, "rev-parse", "--abbrev-ref", "HEAD"], { encoding: "utf8" }); return r.status === 0 ? r.stdout.trim() : undefined; } function getGitDirty(root: string): boolean { const r = spawnSync("git", ["-C", root, "status", "--porcelain"], { encoding: "utf8" }); return r.status === 0 && r.stdout.trim().length > 0; } // Load sig_hashes already present in the target day-partition output // file. Used to skip records that a prior run of the same day already // materialized — keeps the materializer idempotent across reruns. function loadSeenHashes(out_path: string): Set { const seen = new Set(); if (!existsSync(out_path)) return seen; for (const line of readFileSync(out_path, "utf8").split("\n")) { if (!line) continue; try { const row = JSON.parse(line); if (row?.provenance?.sig_hash) seen.add(row.provenance.sig_hash); } catch { // Malformed line — ignore. Will be overwritten on next valid hash collision. } } return seen; } async function processSource( transform: TransformDef, opts: MaterializeOptions, evidence_dir: string, skips_path: string, ): Promise { const source_path = resolve(opts.root, transform.source_file_relpath); const result: SourceResult = { source_file_relpath: transform.source_file_relpath, rows_present: false, rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0, output_files: [], }; if (!existsSync(source_path)) return result; result.rows_present = true; const partition = ISO_DATE_PARTITION(opts.recorded_at); const stem = OUTPUT_STEM(transform.source_file_relpath); const out_dir = resolve(evidence_dir, partition); const out_path = resolve(out_dir, `${stem}.jsonl`); if (!opts.dry_run) mkdirSync(out_dir, { recursive: true }); // Idempotency: collect sig_hashes already in the target output file // so we skip rows that were materialized in a prior run of the same // day. Tests pin this invariant. const seenHashes = loadSeenHashes(out_path); const lines = readFileSync(source_path, "utf8").split("\n"); const rowsToWrite: string[] = []; const skipsToWrite: string[] = []; for (let i = 0; i < lines.length; i++) { const raw = lines[i]; if (!raw) continue; result.rows_read++; let row: any; try { row = JSON.parse(raw); } catch (e) { result.rows_skipped++; skipsToWrite.push(JSON.stringify({ source_file: transform.source_file_relpath, line_offset: i, errors: ["JSON.parse failed: " + (e as Error).message.slice(0, 200)], recorded_at: opts.recorded_at, })); continue; } const sig_hash = await canonicalSha256(row); if (seenHashes.has(sig_hash)) { result.rows_deduped++; continue; } seenHashes.add(sig_hash); const partial = transform.transform({ row, line_offset: i, source_file_relpath: transform.source_file_relpath, recorded_at: opts.recorded_at, sig_hash, }); if (!partial) { result.rows_skipped++; skipsToWrite.push(JSON.stringify({ source_file: transform.source_file_relpath, line_offset: i, errors: ["transform returned null"], sig_hash, recorded_at: opts.recorded_at, })); continue; } const v = validateEvidenceRecord(partial); if (!v.valid) { result.rows_skipped++; skipsToWrite.push(JSON.stringify({ source_file: transform.source_file_relpath, line_offset: i, errors: v.errors, sig_hash, recorded_at: opts.recorded_at, })); continue; } rowsToWrite.push(JSON.stringify(v.value)); result.rows_written++; } if (!opts.dry_run) { if (rowsToWrite.length > 0) { const block = rowsToWrite.join("\n") + "\n"; // Append-mode preserves prior runs' rows; dedup above ensures // we don't append duplicates of those. appendFileSync(out_path, block); result.output_files.push(out_path); } if (skipsToWrite.length > 0) { mkdirSync(dirname(skips_path), { recursive: true }); appendFileSync(skips_path, skipsToWrite.join("\n") + "\n"); } } return result; } export async function materializeAll(opts: MaterializeOptions): Promise { const evidence_dir = resolve(opts.root, "data/evidence"); const skips_path = resolve(opts.root, "data/_kb/distillation_skips.jsonl"); const reports_dir = resolve(opts.root, "reports/distillation"); const started_ms = Date.now(); const sources: SourceResult[] = []; for (const t of opts.transforms) { sources.push(await processSource(t, opts, evidence_dir, skips_path)); } const totals = sources.reduce( (acc, s) => ({ rows_read: acc.rows_read + s.rows_read, rows_written: acc.rows_written + s.rows_written, rows_skipped: acc.rows_skipped + s.rows_skipped, rows_deduped: acc.rows_deduped + s.rows_deduped, }), { rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0 }, ); // Build the receipt — substantive per the spec non-negotiable. const ended_at = new Date().toISOString(); const duration_ms = Date.now() - started_ms; const input_files: FileReference[] = []; for (const s of sources) { if (s.rows_present) { const path = resolve(opts.root, s.source_file_relpath); try { input_files.push({ path: s.source_file_relpath, sha256: sha256OfFile(path), bytes: statSync(path).size, }); } catch { /* file vanished mid-run — tally as missing in record_counts */ } } } const output_files: FileReference[] = []; for (const s of sources) { for (const path of s.output_files) { try { output_files.push({ path: path.replace(opts.root + "/", ""), sha256: sha256OfFile(path), bytes: statSync(path).size, }); } catch { /* not written in dry-run */ } } } const errors: string[] = []; const warnings: string[] = []; for (const s of sources) { if (!s.rows_present) warnings.push(`${s.source_file_relpath}: source file not found (skipped)`); if (s.rows_skipped > 0) warnings.push(`${s.source_file_relpath}: ${s.rows_skipped} rows skipped (validation/parse errors)`); } const receipt: Receipt = { schema_version: RECEIPT_SCHEMA_VERSION, command: "bun run scripts/distillation/build_evidence_index.ts" + (opts.dry_run ? " --dry-run" : ""), git_sha: getGitSha(opts.root), git_branch: getGitBranch(opts.root), git_dirty: getGitDirty(opts.root), started_at: opts.recorded_at, ended_at, duration_ms, input_files, output_files, record_counts: { in: totals.rows_read, out: totals.rows_written, skipped: totals.rows_skipped, deduped: totals.rows_deduped, }, validation_pass: totals.rows_skipped === 0, errors, warnings, }; // Self-validate the receipt — spec says receipts must conform to // their schema. Fail fast if our own writer drifts from the schema. const rv = validateReceipt(receipt); if (!rv.valid) { errors.push(...rv.errors.map(e => "receipt schema: " + e)); receipt.errors = errors; receipt.validation_pass = false; } const stamp = ended_at.replace(/[:.]/g, "-"); const receipt_dir = resolve(reports_dir, stamp); const receipt_path = resolve(receipt_dir, "receipt.json"); if (!opts.dry_run) { mkdirSync(receipt_dir, { recursive: true }); writeFileSync(receipt_path, JSON.stringify(receipt, null, 2) + "\n"); } return { sources, totals, receipt, receipt_path, evidence_dir, skips_path }; } async function cli() { const dry_run = process.argv.includes("--dry-run"); const recorded_at = new Date().toISOString(); const r = await materializeAll({ root: DEFAULT_ROOT, transforms: TRANSFORMS, recorded_at, dry_run, }); console.log(`[evidence_index] ${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped · ${r.totals.rows_deduped} deduped${dry_run ? " (DRY RUN)" : ""}`); for (const s of r.sources) { if (!s.rows_present) { console.log(` ${s.source_file_relpath}: (missing — skipped)`); continue; } console.log(` ${s.source_file_relpath}: read=${s.rows_read} wrote=${s.rows_written} skip=${s.rows_skipped} dedup=${s.rows_deduped}`); } if (!dry_run) { console.log(`[evidence_index] receipt: ${r.receipt_path}`); console.log(`[evidence_index] validation_pass=${r.receipt.validation_pass}`); } if (!r.receipt.validation_pass) process.exit(1); } if (import.meta.main) { cli().catch(e => { console.error(e); process.exit(1); }); }