Phase 2 ships the JOIN script that turns 12 source JSONL streams
into unified data/evidence/YYYY/MM/DD/<source>.jsonl rows conforming
to EvidenceRecord v1, plus a high-level health audit proving the
substrate is real before Phase 3 reads from it.
Files:
scripts/distillation/build_evidence_index.ts materializeAll() + cli
scripts/distillation/check_evidence_health.ts provenance + coverage audit
tests/distillation/build_evidence_index.test.ts 9 acceptance tests
Test metrics:
9/9 pass · 85 expect() calls · 323ms
Real-data run (2026-04-27T03:33:53Z):
1053 rows read from 12 source streams
1051 written (99.8%) to data/evidence/2026/04/27/
2 skipped (outcomes.jsonl rows missing created_at — schema-level catch)
0 deduped on first run
Sources covered (priority order from recon):
TIER 1 (validated 100% in Phase 1, 8 sources):
distilled_facts/procedures/config_hints, contract_analyses,
mode_experiments, scrum_reviews, observer_escalations, audit_facts
TIER 2 (added by Phase 2):
auto_apply, observer_reviews, audits, outcomes
High-level audit results:
Provenance round-trip: 30/30 sampled rows trace cleanly to source
rows with matching canonicalSha256(orderedKeys(row)). Every output
has source_file + line_offset + sig_hash + recorded_at. Proven.
Score-readiness: 54% aggregate scoreable. Three-class taxonomy
emerges from coverage matrix:
- Verdict-bearing (100% scoreable): scrum_reviews, observer_reviews,
audits, contract_analyses — direct scoring inputs
- Telemetry-rich (0-70%): mode_experiments, audit_facts, outcomes
— Phase 3 will derive markers from latency/grounding/retrieval
- Pure-extraction (0%): distilled_*, observer_escalations
— context for OTHER scoring, not scoreable themselves
Invariants enforced (proven by tests + real-data audit):
- ZERO model calls in materializer (deterministic only)
- canonicalSha256(orderedKeys(row)) per source row → stable sig_hash
- Schema validator gates output: rejected rows go to skips, never to evidence/
- JSON.parse failures caught + logged, never crash the run
- Missing source files tallied as rows_present=false, never error
- Idempotent: second run on identical input writes 0 rows (proven on
real data: 1053 read, 0 written, 1051 deduped)
- Bit-stable: identical input produces byte-identical output (proven
by tests/distillation/build_evidence_index.test.ts case 3)
- Receipt self-validates against schema before write
- validation_pass = boolean (skipped == 0), never inferred
Receipt at:
reports/distillation/2026-04-27T03-33-53-972Z/receipt.json
- schema_version=1, git_sha pinned, sha256 on every input/output
- record_counts: {in:1053, out:1051, skipped:2, deduped:0}
- validation_pass=false (skipped > 0; spec says explicit, never inferred)
Skips at:
data/_kb/distillation_skips.jsonl (2 rows from outcomes.jsonl,
reason: timestamp field missing — schema layer caught it cleanly)
Health audit at:
data/_kb/evidence_health.md
Phase 2 done-criteria all met:
✓ tests pass
✓ ≥1 row from each Tier-1 source on real data (8/8 + 4 Tier 2 bonus)
✓ data/_kb/distillation_skips.jsonl populated with reasons
✓ Receipt JSON written + self-validates
✓ Provenance round-trip proven on real sampled rows
✓ Score-readiness coverage measured
Carry-overs to Phase 3:
- audit_discrepancies transform (needed before Phase 4c preference data)
- model_trust transform (needed before ModelLedgerEntry aggregation)
- outcomes.jsonl created_at: 2 rows fail materialization, decide
transform-side fix vs source-side fix
- 11 untested streams from recon still have no transform; add as
Phase 3+ consumers need them
- mode_experiments + distilled_* are 0% scoreable; Phase 3 must
JOIN to adjacent verdict-bearing records, NOT score in isolation
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
365 lines
12 KiB
TypeScript
365 lines
12 KiB
TypeScript
// build_evidence_index.ts — materialize EvidenceRecord rows from
|
|
// source JSONL streams. Pure mechanical view: every output row traces
|
|
// to a single source row via provenance.{source_file, line_offset,
|
|
// sig_hash}.
|
|
//
|
|
// USAGE
|
|
// bun run scripts/distillation/build_evidence_index.ts # materialize, write outputs
|
|
// bun run scripts/distillation/build_evidence_index.ts --dry-run # count + report, no writes
|
|
// LH_DISTILL_ROOT=/path bun run ... # override repo root (tests)
|
|
//
|
|
// OUTPUTS
|
|
// data/evidence/YYYY/MM/DD/<source-stem>.jsonl valid records
|
|
// data/_kb/distillation_skips.jsonl rows that failed validation (append)
|
|
// reports/distillation/<ts>/receipt.json per-run audit (Receipt schema)
|
|
//
|
|
// IDEMPOTENCY
|
|
// sig_hash = canonicalSha256(orderedKeys(source_row)). Re-running
|
|
// loads existing day-partition output files into a seen set, so
|
|
// already-materialized rows are skipped (not duplicated). Bit-stable
|
|
// on identical input.
|
|
//
|
|
// NON-NEGOTIABLES (per spec + recon)
|
|
// - ZERO model calls — deterministic only
|
|
// - Provenance on every emitted row (the schema validator enforces it)
|
|
// - Empty/missing source files do not error — they're tallied as
|
|
// "rows_present: false" in the receipt
|
|
// - Validator rejection ≠ runtime error — invalid rows go to skips
|
|
// with their full error list, materialization continues
|
|
|
|
import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, appendFileSync, statSync } from "node:fs";
|
|
import { resolve, dirname } from "node:path";
|
|
import { spawnSync } from "node:child_process";
|
|
|
|
import { TRANSFORMS, canonicalSha256, type TransformDef } from "./transforms";
|
|
import { validateEvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
|
|
import { RECEIPT_SCHEMA_VERSION, validateReceipt, type Receipt, type FileReference } from "../../auditor/schemas/distillation/receipt";
|
|
|
|
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
|
|
|
|
export interface MaterializeOptions {
|
|
root: string; // repo root — source jsonls + outputs are relative to this
|
|
transforms: TransformDef[]; // override for tests
|
|
recorded_at: string; // ISO 8601 — fixed for the run, used in provenance
|
|
dry_run?: boolean; // count but don't write
|
|
}
|
|
|
|
export interface SourceResult {
|
|
source_file_relpath: string;
|
|
rows_present: boolean;
|
|
rows_read: number;
|
|
rows_written: number;
|
|
rows_skipped: number;
|
|
rows_deduped: number;
|
|
output_files: string[];
|
|
}
|
|
|
|
export interface MaterializeResult {
|
|
sources: SourceResult[];
|
|
totals: {
|
|
rows_read: number;
|
|
rows_written: number;
|
|
rows_skipped: number;
|
|
rows_deduped: number;
|
|
};
|
|
receipt: Receipt;
|
|
receipt_path: string;
|
|
evidence_dir: string;
|
|
skips_path: string;
|
|
}
|
|
|
|
const ISO_DATE_PARTITION = (iso: string): string => {
|
|
const d = new Date(iso);
|
|
return `${d.getUTCFullYear()}/${String(d.getUTCMonth() + 1).padStart(2, "0")}/${String(d.getUTCDate()).padStart(2, "0")}`;
|
|
};
|
|
|
|
const OUTPUT_STEM = (source_file_relpath: string): string => {
|
|
const m = source_file_relpath.match(/([^/]+)\.jsonl$/);
|
|
return m ? m[1] : source_file_relpath.replace(/[^a-z0-9_]/gi, "_");
|
|
};
|
|
|
|
function sha256OfFile(path: string): string {
|
|
const hasher = new Bun.CryptoHasher("sha256");
|
|
hasher.update(readFileSync(path));
|
|
return hasher.digest("hex");
|
|
}
|
|
|
|
function getGitSha(root: string): string {
|
|
const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" });
|
|
if (r.status !== 0) return "0".repeat(40);
|
|
return r.stdout.trim();
|
|
}
|
|
|
|
function getGitBranch(root: string): string | undefined {
|
|
const r = spawnSync("git", ["-C", root, "rev-parse", "--abbrev-ref", "HEAD"], { encoding: "utf8" });
|
|
return r.status === 0 ? r.stdout.trim() : undefined;
|
|
}
|
|
|
|
function getGitDirty(root: string): boolean {
|
|
const r = spawnSync("git", ["-C", root, "status", "--porcelain"], { encoding: "utf8" });
|
|
return r.status === 0 && r.stdout.trim().length > 0;
|
|
}
|
|
|
|
// Load sig_hashes already present in the target day-partition output
|
|
// file. Used to skip records that a prior run of the same day already
|
|
// materialized — keeps the materializer idempotent across reruns.
|
|
function loadSeenHashes(out_path: string): Set<string> {
|
|
const seen = new Set<string>();
|
|
if (!existsSync(out_path)) return seen;
|
|
for (const line of readFileSync(out_path, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try {
|
|
const row = JSON.parse(line);
|
|
if (row?.provenance?.sig_hash) seen.add(row.provenance.sig_hash);
|
|
} catch {
|
|
// Malformed line — ignore. Will be overwritten on next valid hash collision.
|
|
}
|
|
}
|
|
return seen;
|
|
}
|
|
|
|
async function processSource(
|
|
transform: TransformDef,
|
|
opts: MaterializeOptions,
|
|
evidence_dir: string,
|
|
skips_path: string,
|
|
): Promise<SourceResult> {
|
|
const source_path = resolve(opts.root, transform.source_file_relpath);
|
|
const result: SourceResult = {
|
|
source_file_relpath: transform.source_file_relpath,
|
|
rows_present: false,
|
|
rows_read: 0,
|
|
rows_written: 0,
|
|
rows_skipped: 0,
|
|
rows_deduped: 0,
|
|
output_files: [],
|
|
};
|
|
|
|
if (!existsSync(source_path)) return result;
|
|
result.rows_present = true;
|
|
|
|
const partition = ISO_DATE_PARTITION(opts.recorded_at);
|
|
const stem = OUTPUT_STEM(transform.source_file_relpath);
|
|
const out_dir = resolve(evidence_dir, partition);
|
|
const out_path = resolve(out_dir, `${stem}.jsonl`);
|
|
|
|
if (!opts.dry_run) mkdirSync(out_dir, { recursive: true });
|
|
|
|
// Idempotency: collect sig_hashes already in the target output file
|
|
// so we skip rows that were materialized in a prior run of the same
|
|
// day. Tests pin this invariant.
|
|
const seenHashes = loadSeenHashes(out_path);
|
|
|
|
const lines = readFileSync(source_path, "utf8").split("\n");
|
|
const rowsToWrite: string[] = [];
|
|
const skipsToWrite: string[] = [];
|
|
|
|
for (let i = 0; i < lines.length; i++) {
|
|
const raw = lines[i];
|
|
if (!raw) continue;
|
|
result.rows_read++;
|
|
|
|
let row: any;
|
|
try { row = JSON.parse(raw); }
|
|
catch (e) {
|
|
result.rows_skipped++;
|
|
skipsToWrite.push(JSON.stringify({
|
|
source_file: transform.source_file_relpath,
|
|
line_offset: i,
|
|
errors: ["JSON.parse failed: " + (e as Error).message.slice(0, 200)],
|
|
recorded_at: opts.recorded_at,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
const sig_hash = await canonicalSha256(row);
|
|
if (seenHashes.has(sig_hash)) {
|
|
result.rows_deduped++;
|
|
continue;
|
|
}
|
|
seenHashes.add(sig_hash);
|
|
|
|
const partial = transform.transform({
|
|
row,
|
|
line_offset: i,
|
|
source_file_relpath: transform.source_file_relpath,
|
|
recorded_at: opts.recorded_at,
|
|
sig_hash,
|
|
});
|
|
if (!partial) {
|
|
result.rows_skipped++;
|
|
skipsToWrite.push(JSON.stringify({
|
|
source_file: transform.source_file_relpath,
|
|
line_offset: i,
|
|
errors: ["transform returned null"],
|
|
sig_hash,
|
|
recorded_at: opts.recorded_at,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
const v = validateEvidenceRecord(partial);
|
|
if (!v.valid) {
|
|
result.rows_skipped++;
|
|
skipsToWrite.push(JSON.stringify({
|
|
source_file: transform.source_file_relpath,
|
|
line_offset: i,
|
|
errors: v.errors,
|
|
sig_hash,
|
|
recorded_at: opts.recorded_at,
|
|
}));
|
|
continue;
|
|
}
|
|
|
|
rowsToWrite.push(JSON.stringify(v.value));
|
|
result.rows_written++;
|
|
}
|
|
|
|
if (!opts.dry_run) {
|
|
if (rowsToWrite.length > 0) {
|
|
const block = rowsToWrite.join("\n") + "\n";
|
|
// Append-mode preserves prior runs' rows; dedup above ensures
|
|
// we don't append duplicates of those.
|
|
appendFileSync(out_path, block);
|
|
result.output_files.push(out_path);
|
|
}
|
|
if (skipsToWrite.length > 0) {
|
|
mkdirSync(dirname(skips_path), { recursive: true });
|
|
appendFileSync(skips_path, skipsToWrite.join("\n") + "\n");
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
export async function materializeAll(opts: MaterializeOptions): Promise<MaterializeResult> {
|
|
const evidence_dir = resolve(opts.root, "data/evidence");
|
|
const skips_path = resolve(opts.root, "data/_kb/distillation_skips.jsonl");
|
|
const reports_dir = resolve(opts.root, "reports/distillation");
|
|
|
|
const started_ms = Date.now();
|
|
const sources: SourceResult[] = [];
|
|
|
|
for (const t of opts.transforms) {
|
|
sources.push(await processSource(t, opts, evidence_dir, skips_path));
|
|
}
|
|
|
|
const totals = sources.reduce(
|
|
(acc, s) => ({
|
|
rows_read: acc.rows_read + s.rows_read,
|
|
rows_written: acc.rows_written + s.rows_written,
|
|
rows_skipped: acc.rows_skipped + s.rows_skipped,
|
|
rows_deduped: acc.rows_deduped + s.rows_deduped,
|
|
}),
|
|
{ rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0 },
|
|
);
|
|
|
|
// Build the receipt — substantive per the spec non-negotiable.
|
|
const ended_at = new Date().toISOString();
|
|
const duration_ms = Date.now() - started_ms;
|
|
|
|
const input_files: FileReference[] = [];
|
|
for (const s of sources) {
|
|
if (s.rows_present) {
|
|
const path = resolve(opts.root, s.source_file_relpath);
|
|
try {
|
|
input_files.push({
|
|
path: s.source_file_relpath,
|
|
sha256: sha256OfFile(path),
|
|
bytes: statSync(path).size,
|
|
});
|
|
} catch { /* file vanished mid-run — tally as missing in record_counts */ }
|
|
}
|
|
}
|
|
|
|
const output_files: FileReference[] = [];
|
|
for (const s of sources) {
|
|
for (const path of s.output_files) {
|
|
try {
|
|
output_files.push({
|
|
path: path.replace(opts.root + "/", ""),
|
|
sha256: sha256OfFile(path),
|
|
bytes: statSync(path).size,
|
|
});
|
|
} catch { /* not written in dry-run */ }
|
|
}
|
|
}
|
|
|
|
const errors: string[] = [];
|
|
const warnings: string[] = [];
|
|
for (const s of sources) {
|
|
if (!s.rows_present) warnings.push(`${s.source_file_relpath}: source file not found (skipped)`);
|
|
if (s.rows_skipped > 0) warnings.push(`${s.source_file_relpath}: ${s.rows_skipped} rows skipped (validation/parse errors)`);
|
|
}
|
|
|
|
const receipt: Receipt = {
|
|
schema_version: RECEIPT_SCHEMA_VERSION,
|
|
command: "bun run scripts/distillation/build_evidence_index.ts" + (opts.dry_run ? " --dry-run" : ""),
|
|
git_sha: getGitSha(opts.root),
|
|
git_branch: getGitBranch(opts.root),
|
|
git_dirty: getGitDirty(opts.root),
|
|
started_at: opts.recorded_at,
|
|
ended_at,
|
|
duration_ms,
|
|
input_files,
|
|
output_files,
|
|
record_counts: {
|
|
in: totals.rows_read,
|
|
out: totals.rows_written,
|
|
skipped: totals.rows_skipped,
|
|
deduped: totals.rows_deduped,
|
|
},
|
|
validation_pass: totals.rows_skipped === 0,
|
|
errors,
|
|
warnings,
|
|
};
|
|
|
|
// Self-validate the receipt — spec says receipts must conform to
|
|
// their schema. Fail fast if our own writer drifts from the schema.
|
|
const rv = validateReceipt(receipt);
|
|
if (!rv.valid) {
|
|
errors.push(...rv.errors.map(e => "receipt schema: " + e));
|
|
receipt.errors = errors;
|
|
receipt.validation_pass = false;
|
|
}
|
|
|
|
const stamp = ended_at.replace(/[:.]/g, "-");
|
|
const receipt_dir = resolve(reports_dir, stamp);
|
|
const receipt_path = resolve(receipt_dir, "receipt.json");
|
|
if (!opts.dry_run) {
|
|
mkdirSync(receipt_dir, { recursive: true });
|
|
writeFileSync(receipt_path, JSON.stringify(receipt, null, 2) + "\n");
|
|
}
|
|
|
|
return { sources, totals, receipt, receipt_path, evidence_dir, skips_path };
|
|
}
|
|
|
|
async function cli() {
|
|
const dry_run = process.argv.includes("--dry-run");
|
|
const recorded_at = new Date().toISOString();
|
|
const r = await materializeAll({
|
|
root: DEFAULT_ROOT,
|
|
transforms: TRANSFORMS,
|
|
recorded_at,
|
|
dry_run,
|
|
});
|
|
|
|
console.log(`[evidence_index] ${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped · ${r.totals.rows_deduped} deduped${dry_run ? " (DRY RUN)" : ""}`);
|
|
for (const s of r.sources) {
|
|
if (!s.rows_present) {
|
|
console.log(` ${s.source_file_relpath}: (missing — skipped)`);
|
|
continue;
|
|
}
|
|
console.log(` ${s.source_file_relpath}: read=${s.rows_read} wrote=${s.rows_written} skip=${s.rows_skipped} dedup=${s.rows_deduped}`);
|
|
}
|
|
if (!dry_run) {
|
|
console.log(`[evidence_index] receipt: ${r.receipt_path}`);
|
|
console.log(`[evidence_index] validation_pass=${r.receipt.validation_pass}`);
|
|
}
|
|
if (!r.receipt.validation_pass) process.exit(1);
|
|
}
|
|
|
|
if (import.meta.main) {
|
|
cli().catch(e => { console.error(e); process.exit(1); });
|
|
}
|