From 1ea802943fb4349201da52960c1e916817c26fd4 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 26 Apr 2026 22:38:46 -0500 Subject: [PATCH] =?UTF-8?q?distillation:=20Phase=202=20=E2=80=94=20Evidenc?= =?UTF-8?q?e=20View=20materializer=20+=20health=20audit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 ships the JOIN script that turns 12 source JSONL streams into unified data/evidence/YYYY/MM/DD/.jsonl rows conforming to EvidenceRecord v1, plus a high-level health audit proving the substrate is real before Phase 3 reads from it. Files: scripts/distillation/build_evidence_index.ts materializeAll() + cli scripts/distillation/check_evidence_health.ts provenance + coverage audit tests/distillation/build_evidence_index.test.ts 9 acceptance tests Test metrics: 9/9 pass · 85 expect() calls · 323ms Real-data run (2026-04-27T03:33:53Z): 1053 rows read from 12 source streams 1051 written (99.8%) to data/evidence/2026/04/27/ 2 skipped (outcomes.jsonl rows missing created_at — schema-level catch) 0 deduped on first run Sources covered (priority order from recon): TIER 1 (validated 100% in Phase 1, 8 sources): distilled_facts/procedures/config_hints, contract_analyses, mode_experiments, scrum_reviews, observer_escalations, audit_facts TIER 2 (added by Phase 2): auto_apply, observer_reviews, audits, outcomes High-level audit results: Provenance round-trip: 30/30 sampled rows trace cleanly to source rows with matching canonicalSha256(orderedKeys(row)). Every output has source_file + line_offset + sig_hash + recorded_at. Proven. Score-readiness: 54% aggregate scoreable. Three-class taxonomy emerges from coverage matrix: - Verdict-bearing (100% scoreable): scrum_reviews, observer_reviews, audits, contract_analyses — direct scoring inputs - Telemetry-rich (0-70%): mode_experiments, audit_facts, outcomes — Phase 3 will derive markers from latency/grounding/retrieval - Pure-extraction (0%): distilled_*, observer_escalations — context for OTHER scoring, not scoreable themselves Invariants enforced (proven by tests + real-data audit): - ZERO model calls in materializer (deterministic only) - canonicalSha256(orderedKeys(row)) per source row → stable sig_hash - Schema validator gates output: rejected rows go to skips, never to evidence/ - JSON.parse failures caught + logged, never crash the run - Missing source files tallied as rows_present=false, never error - Idempotent: second run on identical input writes 0 rows (proven on real data: 1053 read, 0 written, 1051 deduped) - Bit-stable: identical input produces byte-identical output (proven by tests/distillation/build_evidence_index.test.ts case 3) - Receipt self-validates against schema before write - validation_pass = boolean (skipped == 0), never inferred Receipt at: reports/distillation/2026-04-27T03-33-53-972Z/receipt.json - schema_version=1, git_sha pinned, sha256 on every input/output - record_counts: {in:1053, out:1051, skipped:2, deduped:0} - validation_pass=false (skipped > 0; spec says explicit, never inferred) Skips at: data/_kb/distillation_skips.jsonl (2 rows from outcomes.jsonl, reason: timestamp field missing — schema layer caught it cleanly) Health audit at: data/_kb/evidence_health.md Phase 2 done-criteria all met: ✓ tests pass ✓ ≥1 row from each Tier-1 source on real data (8/8 + 4 Tier 2 bonus) ✓ data/_kb/distillation_skips.jsonl populated with reasons ✓ Receipt JSON written + self-validates ✓ Provenance round-trip proven on real sampled rows ✓ Score-readiness coverage measured Carry-overs to Phase 3: - audit_discrepancies transform (needed before Phase 4c preference data) - model_trust transform (needed before ModelLedgerEntry aggregation) - outcomes.jsonl created_at: 2 rows fail materialization, decide transform-side fix vs source-side fix - 11 untested streams from recon still have no transform; add as Phase 3+ consumers need them - mode_experiments + distilled_* are 0% scoreable; Phase 3 must JOIN to adjacent verdict-bearing records, NOT score in isolation Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/distillation/build_evidence_index.ts | 364 ++++++++++++++++++ scripts/distillation/check_evidence_health.ts | 254 ++++++++++++ .../distillation/build_evidence_index.test.ts | 295 ++++++++++++++ 3 files changed, 913 insertions(+) create mode 100644 scripts/distillation/build_evidence_index.ts create mode 100644 scripts/distillation/check_evidence_health.ts create mode 100644 tests/distillation/build_evidence_index.test.ts diff --git a/scripts/distillation/build_evidence_index.ts b/scripts/distillation/build_evidence_index.ts new file mode 100644 index 0000000..59caad9 --- /dev/null +++ b/scripts/distillation/build_evidence_index.ts @@ -0,0 +1,364 @@ +// build_evidence_index.ts — materialize EvidenceRecord rows from +// source JSONL streams. Pure mechanical view: every output row traces +// to a single source row via provenance.{source_file, line_offset, +// sig_hash}. +// +// USAGE +// bun run scripts/distillation/build_evidence_index.ts # materialize, write outputs +// bun run scripts/distillation/build_evidence_index.ts --dry-run # count + report, no writes +// LH_DISTILL_ROOT=/path bun run ... # override repo root (tests) +// +// OUTPUTS +// data/evidence/YYYY/MM/DD/.jsonl valid records +// data/_kb/distillation_skips.jsonl rows that failed validation (append) +// reports/distillation//receipt.json per-run audit (Receipt schema) +// +// IDEMPOTENCY +// sig_hash = canonicalSha256(orderedKeys(source_row)). Re-running +// loads existing day-partition output files into a seen set, so +// already-materialized rows are skipped (not duplicated). Bit-stable +// on identical input. +// +// NON-NEGOTIABLES (per spec + recon) +// - ZERO model calls — deterministic only +// - Provenance on every emitted row (the schema validator enforces it) +// - Empty/missing source files do not error — they're tallied as +// "rows_present: false" in the receipt +// - Validator rejection ≠ runtime error — invalid rows go to skips +// with their full error list, materialization continues + +import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, appendFileSync, statSync } from "node:fs"; +import { resolve, dirname } from "node:path"; +import { spawnSync } from "node:child_process"; + +import { TRANSFORMS, canonicalSha256, type TransformDef } from "./transforms"; +import { validateEvidenceRecord } from "../../auditor/schemas/distillation/evidence_record"; +import { RECEIPT_SCHEMA_VERSION, validateReceipt, type Receipt, type FileReference } from "../../auditor/schemas/distillation/receipt"; + +const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; + +export interface MaterializeOptions { + root: string; // repo root — source jsonls + outputs are relative to this + transforms: TransformDef[]; // override for tests + recorded_at: string; // ISO 8601 — fixed for the run, used in provenance + dry_run?: boolean; // count but don't write +} + +export interface SourceResult { + source_file_relpath: string; + rows_present: boolean; + rows_read: number; + rows_written: number; + rows_skipped: number; + rows_deduped: number; + output_files: string[]; +} + +export interface MaterializeResult { + sources: SourceResult[]; + totals: { + rows_read: number; + rows_written: number; + rows_skipped: number; + rows_deduped: number; + }; + receipt: Receipt; + receipt_path: string; + evidence_dir: string; + skips_path: string; +} + +const ISO_DATE_PARTITION = (iso: string): string => { + const d = new Date(iso); + return `${d.getUTCFullYear()}/${String(d.getUTCMonth() + 1).padStart(2, "0")}/${String(d.getUTCDate()).padStart(2, "0")}`; +}; + +const OUTPUT_STEM = (source_file_relpath: string): string => { + const m = source_file_relpath.match(/([^/]+)\.jsonl$/); + return m ? m[1] : source_file_relpath.replace(/[^a-z0-9_]/gi, "_"); +}; + +function sha256OfFile(path: string): string { + const hasher = new Bun.CryptoHasher("sha256"); + hasher.update(readFileSync(path)); + return hasher.digest("hex"); +} + +function getGitSha(root: string): string { + const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" }); + if (r.status !== 0) return "0".repeat(40); + return r.stdout.trim(); +} + +function getGitBranch(root: string): string | undefined { + const r = spawnSync("git", ["-C", root, "rev-parse", "--abbrev-ref", "HEAD"], { encoding: "utf8" }); + return r.status === 0 ? r.stdout.trim() : undefined; +} + +function getGitDirty(root: string): boolean { + const r = spawnSync("git", ["-C", root, "status", "--porcelain"], { encoding: "utf8" }); + return r.status === 0 && r.stdout.trim().length > 0; +} + +// Load sig_hashes already present in the target day-partition output +// file. Used to skip records that a prior run of the same day already +// materialized — keeps the materializer idempotent across reruns. +function loadSeenHashes(out_path: string): Set { + const seen = new Set(); + if (!existsSync(out_path)) return seen; + for (const line of readFileSync(out_path, "utf8").split("\n")) { + if (!line) continue; + try { + const row = JSON.parse(line); + if (row?.provenance?.sig_hash) seen.add(row.provenance.sig_hash); + } catch { + // Malformed line — ignore. Will be overwritten on next valid hash collision. + } + } + return seen; +} + +async function processSource( + transform: TransformDef, + opts: MaterializeOptions, + evidence_dir: string, + skips_path: string, +): Promise { + const source_path = resolve(opts.root, transform.source_file_relpath); + const result: SourceResult = { + source_file_relpath: transform.source_file_relpath, + rows_present: false, + rows_read: 0, + rows_written: 0, + rows_skipped: 0, + rows_deduped: 0, + output_files: [], + }; + + if (!existsSync(source_path)) return result; + result.rows_present = true; + + const partition = ISO_DATE_PARTITION(opts.recorded_at); + const stem = OUTPUT_STEM(transform.source_file_relpath); + const out_dir = resolve(evidence_dir, partition); + const out_path = resolve(out_dir, `${stem}.jsonl`); + + if (!opts.dry_run) mkdirSync(out_dir, { recursive: true }); + + // Idempotency: collect sig_hashes already in the target output file + // so we skip rows that were materialized in a prior run of the same + // day. Tests pin this invariant. + const seenHashes = loadSeenHashes(out_path); + + const lines = readFileSync(source_path, "utf8").split("\n"); + const rowsToWrite: string[] = []; + const skipsToWrite: string[] = []; + + for (let i = 0; i < lines.length; i++) { + const raw = lines[i]; + if (!raw) continue; + result.rows_read++; + + let row: any; + try { row = JSON.parse(raw); } + catch (e) { + result.rows_skipped++; + skipsToWrite.push(JSON.stringify({ + source_file: transform.source_file_relpath, + line_offset: i, + errors: ["JSON.parse failed: " + (e as Error).message.slice(0, 200)], + recorded_at: opts.recorded_at, + })); + continue; + } + + const sig_hash = await canonicalSha256(row); + if (seenHashes.has(sig_hash)) { + result.rows_deduped++; + continue; + } + seenHashes.add(sig_hash); + + const partial = transform.transform({ + row, + line_offset: i, + source_file_relpath: transform.source_file_relpath, + recorded_at: opts.recorded_at, + sig_hash, + }); + if (!partial) { + result.rows_skipped++; + skipsToWrite.push(JSON.stringify({ + source_file: transform.source_file_relpath, + line_offset: i, + errors: ["transform returned null"], + sig_hash, + recorded_at: opts.recorded_at, + })); + continue; + } + + const v = validateEvidenceRecord(partial); + if (!v.valid) { + result.rows_skipped++; + skipsToWrite.push(JSON.stringify({ + source_file: transform.source_file_relpath, + line_offset: i, + errors: v.errors, + sig_hash, + recorded_at: opts.recorded_at, + })); + continue; + } + + rowsToWrite.push(JSON.stringify(v.value)); + result.rows_written++; + } + + if (!opts.dry_run) { + if (rowsToWrite.length > 0) { + const block = rowsToWrite.join("\n") + "\n"; + // Append-mode preserves prior runs' rows; dedup above ensures + // we don't append duplicates of those. + appendFileSync(out_path, block); + result.output_files.push(out_path); + } + if (skipsToWrite.length > 0) { + mkdirSync(dirname(skips_path), { recursive: true }); + appendFileSync(skips_path, skipsToWrite.join("\n") + "\n"); + } + } + + return result; +} + +export async function materializeAll(opts: MaterializeOptions): Promise { + const evidence_dir = resolve(opts.root, "data/evidence"); + const skips_path = resolve(opts.root, "data/_kb/distillation_skips.jsonl"); + const reports_dir = resolve(opts.root, "reports/distillation"); + + const started_ms = Date.now(); + const sources: SourceResult[] = []; + + for (const t of opts.transforms) { + sources.push(await processSource(t, opts, evidence_dir, skips_path)); + } + + const totals = sources.reduce( + (acc, s) => ({ + rows_read: acc.rows_read + s.rows_read, + rows_written: acc.rows_written + s.rows_written, + rows_skipped: acc.rows_skipped + s.rows_skipped, + rows_deduped: acc.rows_deduped + s.rows_deduped, + }), + { rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0 }, + ); + + // Build the receipt — substantive per the spec non-negotiable. + const ended_at = new Date().toISOString(); + const duration_ms = Date.now() - started_ms; + + const input_files: FileReference[] = []; + for (const s of sources) { + if (s.rows_present) { + const path = resolve(opts.root, s.source_file_relpath); + try { + input_files.push({ + path: s.source_file_relpath, + sha256: sha256OfFile(path), + bytes: statSync(path).size, + }); + } catch { /* file vanished mid-run — tally as missing in record_counts */ } + } + } + + const output_files: FileReference[] = []; + for (const s of sources) { + for (const path of s.output_files) { + try { + output_files.push({ + path: path.replace(opts.root + "/", ""), + sha256: sha256OfFile(path), + bytes: statSync(path).size, + }); + } catch { /* not written in dry-run */ } + } + } + + const errors: string[] = []; + const warnings: string[] = []; + for (const s of sources) { + if (!s.rows_present) warnings.push(`${s.source_file_relpath}: source file not found (skipped)`); + if (s.rows_skipped > 0) warnings.push(`${s.source_file_relpath}: ${s.rows_skipped} rows skipped (validation/parse errors)`); + } + + const receipt: Receipt = { + schema_version: RECEIPT_SCHEMA_VERSION, + command: "bun run scripts/distillation/build_evidence_index.ts" + (opts.dry_run ? " --dry-run" : ""), + git_sha: getGitSha(opts.root), + git_branch: getGitBranch(opts.root), + git_dirty: getGitDirty(opts.root), + started_at: opts.recorded_at, + ended_at, + duration_ms, + input_files, + output_files, + record_counts: { + in: totals.rows_read, + out: totals.rows_written, + skipped: totals.rows_skipped, + deduped: totals.rows_deduped, + }, + validation_pass: totals.rows_skipped === 0, + errors, + warnings, + }; + + // Self-validate the receipt — spec says receipts must conform to + // their schema. Fail fast if our own writer drifts from the schema. + const rv = validateReceipt(receipt); + if (!rv.valid) { + errors.push(...rv.errors.map(e => "receipt schema: " + e)); + receipt.errors = errors; + receipt.validation_pass = false; + } + + const stamp = ended_at.replace(/[:.]/g, "-"); + const receipt_dir = resolve(reports_dir, stamp); + const receipt_path = resolve(receipt_dir, "receipt.json"); + if (!opts.dry_run) { + mkdirSync(receipt_dir, { recursive: true }); + writeFileSync(receipt_path, JSON.stringify(receipt, null, 2) + "\n"); + } + + return { sources, totals, receipt, receipt_path, evidence_dir, skips_path }; +} + +async function cli() { + const dry_run = process.argv.includes("--dry-run"); + const recorded_at = new Date().toISOString(); + const r = await materializeAll({ + root: DEFAULT_ROOT, + transforms: TRANSFORMS, + recorded_at, + dry_run, + }); + + console.log(`[evidence_index] ${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped · ${r.totals.rows_deduped} deduped${dry_run ? " (DRY RUN)" : ""}`); + for (const s of r.sources) { + if (!s.rows_present) { + console.log(` ${s.source_file_relpath}: (missing — skipped)`); + continue; + } + console.log(` ${s.source_file_relpath}: read=${s.rows_read} wrote=${s.rows_written} skip=${s.rows_skipped} dedup=${s.rows_deduped}`); + } + if (!dry_run) { + console.log(`[evidence_index] receipt: ${r.receipt_path}`); + console.log(`[evidence_index] validation_pass=${r.receipt.validation_pass}`); + } + if (!r.receipt.validation_pass) process.exit(1); +} + +if (import.meta.main) { + cli().catch(e => { console.error(e); process.exit(1); }); +} diff --git a/scripts/distillation/check_evidence_health.ts b/scripts/distillation/check_evidence_health.ts new file mode 100644 index 0000000..8899aba --- /dev/null +++ b/scripts/distillation/check_evidence_health.ts @@ -0,0 +1,254 @@ +// check_evidence_health.ts — high-level audit of the materialized +// EvidenceRecord substrate. Answers two questions Phase 3 needs: +// +// 1. PROVENANCE ROUND-TRIP — sample N output rows, look up the +// source row at the recorded (source_file, line_offset), +// recompute canonicalSha256, confirm it matches provenance.sig_hash. +// Hard pass/fail. If even one row fails, provenance is theater. +// +// 2. SCORE-READINESS COVERAGE — for each source, what fraction of +// materialized rows carry the signals the Success Scorer will +// need: model_role, success_markers, failure_markers, +// observer_verdict, latency_ms, retrieved_context, text. Tells +// Phase 3 which sources to read from for each gate. +// +// Output: markdown report to stdout + data/_kb/evidence_health.md. +// +// Run: bun run scripts/distillation/check_evidence_health.ts + +import { existsSync, readFileSync, readdirSync, statSync, writeFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { canonicalSha256 } from "../../auditor/schemas/distillation/types"; + +const ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; +const SAMPLE_FOR_PROVENANCE = 30; + +interface CoverageBucket { + source: string; + total: number; + with_model_role: number; + with_model_name: number; + with_success_markers: number; + with_failure_markers: number; + with_observer_verdict: number; + with_latency_ms: number; + with_retrieved_context: number; + with_text: number; + scoreable: number; // has at least ONE signal the scorer can use +} + +interface ProvenanceCheck { + passed: number; + failed: number; + failures: Array<{ output_path: string; line: number; reason: string }>; +} + +function listEvidenceFiles(evidence_root: string): string[] { + const out: string[] = []; + if (!existsSync(evidence_root)) return out; + for (const yyyy of readdirSync(evidence_root).sort()) { + const ydir = resolve(evidence_root, yyyy); + if (!statSync(ydir).isDirectory()) continue; + for (const mm of readdirSync(ydir).sort()) { + const mdir = resolve(ydir, mm); + if (!statSync(mdir).isDirectory()) continue; + for (const dd of readdirSync(mdir).sort()) { + const ddir = resolve(mdir, dd); + if (!statSync(ddir).isDirectory()) continue; + for (const f of readdirSync(ddir)) { + if (f.endsWith(".jsonl")) out.push(resolve(ddir, f)); + } + } + } + } + return out; +} + +// Has at least one deterministic signal the Phase 3 scorer can act on. +// Order is generous: any of these counts as "scoreable", because the +// scorer combines multiple signals. +function isScoreable(row: any): boolean { + if (Array.isArray(row.success_markers) && row.success_markers.length > 0) return true; + if (Array.isArray(row.failure_markers) && row.failure_markers.length > 0) return true; + if (typeof row.observer_verdict === "string") return true; + if (row.validation_results && Object.keys(row.validation_results).length > 0) return true; + if (Array.isArray(row.observer_notes) && row.observer_notes.length > 0) return true; + return false; +} + +function bucketStart(source: string): CoverageBucket { + return { + source, total: 0, + with_model_role: 0, with_model_name: 0, + with_success_markers: 0, with_failure_markers: 0, + with_observer_verdict: 0, with_latency_ms: 0, + with_retrieved_context: 0, with_text: 0, + scoreable: 0, + }; +} + +function pct(n: number, total: number): string { + if (total === 0) return "—"; + return Math.round(100 * n / total) + "%"; +} + +async function main() { + const evidenceFiles = listEvidenceFiles(resolve(ROOT, "data/evidence")); + if (evidenceFiles.length === 0) { + console.error("No evidence files found. Run scripts/distillation/build_evidence_index.ts first."); + process.exit(1); + } + + // ── 1. Coverage scan ──────────────────────────────────────────── + const buckets = new Map(); + const allOutputRows: Array<{ output_path: string; line: number; row: any }> = []; + + for (const evPath of evidenceFiles) { + const sourceLabel = evPath.split("/").pop()!.replace(/\.jsonl$/, ""); + const b = buckets.get(sourceLabel) ?? bucketStart(sourceLabel); + const lines = readFileSync(evPath, "utf8").split("\n").filter(Boolean); + for (let i = 0; i < lines.length; i++) { + const row = JSON.parse(lines[i]); + b.total++; + if (row.model_role) b.with_model_role++; + if (row.model_name) b.with_model_name++; + if (Array.isArray(row.success_markers) && row.success_markers.length > 0) b.with_success_markers++; + if (Array.isArray(row.failure_markers) && row.failure_markers.length > 0) b.with_failure_markers++; + if (typeof row.observer_verdict === "string") b.with_observer_verdict++; + if (typeof row.latency_ms === "number") b.with_latency_ms++; + if (row.retrieved_context && Object.keys(row.retrieved_context).length > 0) b.with_retrieved_context++; + if (typeof row.text === "string" && row.text.length > 0) b.with_text++; + if (isScoreable(row)) b.scoreable++; + allOutputRows.push({ output_path: evPath, line: i, row }); + } + buckets.set(sourceLabel, b); + } + + // ── 2. Provenance round-trip on a random sample ───────────────── + const sampleSize = Math.min(SAMPLE_FOR_PROVENANCE, allOutputRows.length); + const indices = new Set(); + // Deterministic-ish sample: stride through evenly so we hit different sources. + const stride = Math.max(1, Math.floor(allOutputRows.length / sampleSize)); + for (let i = 0; i < allOutputRows.length && indices.size < sampleSize; i += stride) indices.add(i); + // Top up with the tail in case stride truncates early. + while (indices.size < sampleSize) indices.add(allOutputRows.length - 1 - indices.size); + + const provCheck: ProvenanceCheck = { passed: 0, failed: 0, failures: [] }; + // Cache source-file lines so we don't re-read big files repeatedly. + const sourceCache = new Map(); + + for (const idx of indices) { + const { output_path, line, row } = allOutputRows[idx]; + const prov = row.provenance; + if (!prov || !prov.source_file || prov.line_offset == null || !prov.sig_hash) { + provCheck.failed++; + provCheck.failures.push({ output_path, line, reason: "missing provenance fields" }); + continue; + } + const sourceAbs = resolve(ROOT, prov.source_file); + if (!sourceCache.has(sourceAbs)) { + if (!existsSync(sourceAbs)) { + provCheck.failed++; + provCheck.failures.push({ output_path, line, reason: `source missing: ${prov.source_file}` }); + continue; + } + sourceCache.set(sourceAbs, readFileSync(sourceAbs, "utf8").split("\n")); + } + const sourceLines = sourceCache.get(sourceAbs)!; + if (prov.line_offset >= sourceLines.length) { + provCheck.failed++; + provCheck.failures.push({ output_path, line, reason: `line_offset ${prov.line_offset} past EOF (source has ${sourceLines.length} lines)` }); + continue; + } + const sourceLine = sourceLines[prov.line_offset]; + let sourceRow: any; + try { sourceRow = JSON.parse(sourceLine); } + catch (e) { + provCheck.failed++; + provCheck.failures.push({ output_path, line, reason: `source line not JSON: ${(e as Error).message.slice(0, 60)}` }); + continue; + } + const recomputed = await canonicalSha256(sourceRow); + if (recomputed !== prov.sig_hash) { + provCheck.failed++; + provCheck.failures.push({ + output_path, line, + reason: `sig_hash mismatch: prov=${prov.sig_hash.slice(0, 16)}… recomputed=${recomputed.slice(0, 16)}…`, + }); + continue; + } + provCheck.passed++; + } + + // ── 3. Render markdown ────────────────────────────────────────── + const md: string[] = []; + md.push("# Evidence Health — Phase 2 high-level audit"); + md.push(""); + md.push(`**Run:** ${new Date().toISOString()}`); + md.push(`**Evidence files:** ${evidenceFiles.length}`); + md.push(`**Total records:** ${allOutputRows.length}`); + md.push(""); + md.push("## 1. Provenance round-trip"); + md.push(""); + md.push(`Sample size: **${sampleSize}** rows (stride sample across all evidence).`); + md.push(""); + md.push(`| Passed | Failed |`); + md.push(`|---|---|`); + md.push(`| ${provCheck.passed} | ${provCheck.failed} |`); + md.push(""); + if (provCheck.failed > 0) { + md.push("### Failures"); + for (const f of provCheck.failures.slice(0, 20)) { + md.push(`- \`${f.output_path.split("/").slice(-2).join("/")}\` line ${f.line}: ${f.reason}`); + } + } else { + md.push("**All sampled rows traced cleanly back to source rows with matching canonical sig_hash.**"); + } + md.push(""); + md.push("## 2. Score-readiness coverage"); + md.push(""); + md.push("Per source, fraction of materialized rows carrying each signal the Phase 3 Success Scorer will read."); + md.push(""); + md.push("| Source | Rows | role | name | success | failure | obs.verdict | latency | retrieval | text | scoreable |"); + md.push("|---|---|---|---|---|---|---|---|---|---|---|"); + const sortedBuckets = Array.from(buckets.values()).sort((a, b) => b.total - a.total); + for (const b of sortedBuckets) { + md.push(`| ${b.source} | ${b.total} | ${pct(b.with_model_role, b.total)} | ${pct(b.with_model_name, b.total)} | ${pct(b.with_success_markers, b.total)} | ${pct(b.with_failure_markers, b.total)} | ${pct(b.with_observer_verdict, b.total)} | ${pct(b.with_latency_ms, b.total)} | ${pct(b.with_retrieved_context, b.total)} | ${pct(b.with_text, b.total)} | **${pct(b.scoreable, b.total)}** |`); + } + md.push(""); + // Aggregate totals row + const totals = sortedBuckets.reduce((acc, b) => ({ + total: acc.total + b.total, + role: acc.role + b.with_model_role, + name: acc.name + b.with_model_name, + success: acc.success + b.with_success_markers, + failure: acc.failure + b.with_failure_markers, + obs: acc.obs + b.with_observer_verdict, + lat: acc.lat + b.with_latency_ms, + ret: acc.ret + b.with_retrieved_context, + text: acc.text + b.with_text, + score: acc.score + b.scoreable, + }), { total: 0, role: 0, name: 0, success: 0, failure: 0, obs: 0, lat: 0, ret: 0, text: 0, score: 0 }); + md.push(`**Aggregate:** ${totals.total} rows · role ${pct(totals.role, totals.total)} · name ${pct(totals.name, totals.total)} · success ${pct(totals.success, totals.total)} · failure ${pct(totals.failure, totals.total)} · obs.verdict ${pct(totals.obs, totals.total)} · latency ${pct(totals.lat, totals.total)} · retrieval ${pct(totals.ret, totals.total)} · text ${pct(totals.text, totals.total)} · scoreable **${pct(totals.score, totals.total)}**`); + md.push(""); + md.push("## 3. Phase 3 readiness"); + md.push(""); + if (provCheck.failed > 0) { + md.push("**❌ NOT READY** — provenance round-trip failed. Fix materializer or transforms before Phase 3."); + } else if (totals.score < totals.total * 0.5) { + md.push(`**⚠️ PARTIAL READINESS** — only ${pct(totals.score, totals.total)} of records are scoreable. Phase 3 will produce many \`needs_human_review\` until transforms enrich more sources with markers.`); + } else { + md.push(`**✓ READY** — provenance traces, ${pct(totals.score, totals.total)} of records carry scorer signals. Phase 3 can begin.`); + } + md.push(""); + + const out = md.join("\n"); + console.log(out); + writeFileSync(resolve(ROOT, "data/_kb/evidence_health.md"), out); + + if (provCheck.failed > 0) process.exit(1); +} + +if (import.meta.main) { + main().catch(e => { console.error(e); process.exit(1); }); +} diff --git a/tests/distillation/build_evidence_index.test.ts b/tests/distillation/build_evidence_index.test.ts new file mode 100644 index 0000000..69c3377 --- /dev/null +++ b/tests/distillation/build_evidence_index.test.ts @@ -0,0 +1,295 @@ +// Phase 2 acceptance tests — pin the materializer's invariants: +// 1. Valid rows materialize; invalid rows go to skips with errors +// 2. Idempotency: re-running on same source yields zero new writes +// 3. Stability: identical input → byte-identical output (canonical hash) +// 4. Schema gating: rows that fail validateEvidenceRecord NEVER reach +// data/evidence/*.jsonl, only skips +// 5. Receipt: substantive (git_sha + sha256 + record_counts + +// validation_pass), conforms to Receipt schema +// 6. JSON-parse failures handled gracefully +// +// All tests run against a temp repo root with synthetic source jsonls +// and a custom TRANSFORMS list pointing at them. No live JSONLs touched. +// +// Run: bun test tests/distillation/build_evidence_index.test.ts + +import { test, expect, beforeEach, afterEach } from "bun:test"; +import { mkdirSync, writeFileSync, rmSync, existsSync, readFileSync, readdirSync, statSync } from "node:fs"; +import { resolve } from "node:path"; + +import { materializeAll, type MaterializeOptions } from "../../scripts/distillation/build_evidence_index"; +import type { TransformDef } from "../../scripts/distillation/transforms"; +import { EVIDENCE_SCHEMA_VERSION, type ModelRole } from "../../auditor/schemas/distillation/evidence_record"; +import { validateReceipt } from "../../auditor/schemas/distillation/receipt"; + +const TMP_ROOT = "/tmp/distillation_test_phase2"; +const RECORDED = "2026-04-26T22:30:00.000Z"; + +// Minimal transform — produces a valid EvidenceRecord from the +// synthetic source rows below. +const TEST_TRANSFORMS: TransformDef[] = [ + { + source_file_relpath: "data/_kb/synthetic_a.jsonl", + transform: ({ row, line_offset, source_file_relpath, recorded_at, sig_hash }) => { + // Test rows that intentionally fail validation set bad: true. + // Transform still returns a Partial — validator catches it. + if (row.bad) { + return { + // missing run_id (required) → forces validateEvidenceRecord to reject + task_id: row.task_id, + timestamp: row.ts, + schema_version: EVIDENCE_SCHEMA_VERSION, + provenance: { source_file: source_file_relpath, line_offset, sig_hash, recorded_at }, + } as any; + } + return { + run_id: row.run_id, + task_id: row.task_id, + timestamp: row.ts, + schema_version: EVIDENCE_SCHEMA_VERSION, + provenance: { source_file: source_file_relpath, line_offset, sig_hash, recorded_at }, + text: row.text, + model_role: "executor" as ModelRole, + }; + }, + }, + { + source_file_relpath: "data/_kb/synthetic_b.jsonl", + transform: ({ row, line_offset, source_file_relpath, recorded_at, sig_hash }) => ({ + run_id: row.run_id, + task_id: row.task_id, + timestamp: row.ts, + schema_version: EVIDENCE_SCHEMA_VERSION, + provenance: { source_file: source_file_relpath, line_offset, sig_hash, recorded_at }, + text: row.text, + model_role: "extractor" as ModelRole, + }), + }, +]; + +function setupRoot() { + if (existsSync(TMP_ROOT)) rmSync(TMP_ROOT, { recursive: true, force: true }); + mkdirSync(resolve(TMP_ROOT, "data/_kb"), { recursive: true }); + + // Source A: 3 valid + 1 invalid + 1 malformed JSON + const aRows = [ + { run_id: "a1", task_id: "task1", ts: "2026-04-26T20:00:00.000Z", text: "first" }, + { run_id: "a2", task_id: "task2", ts: "2026-04-26T20:01:00.000Z", text: "second" }, + { run_id: "a3", task_id: "task3", ts: "2026-04-26T20:02:00.000Z", text: "third" }, + { bad: true, task_id: "fail-row", ts: "2026-04-26T20:03:00.000Z" }, + ]; + const aLines = aRows.map(r => JSON.stringify(r)).join("\n") + "\n{not valid json\n"; + writeFileSync(resolve(TMP_ROOT, "data/_kb/synthetic_a.jsonl"), aLines); + + // Source B: 2 valid rows + const bRows = [ + { run_id: "b1", task_id: "btask1", ts: "2026-04-26T20:10:00.000Z", text: "alpha" }, + { run_id: "b2", task_id: "btask2", ts: "2026-04-26T20:11:00.000Z", text: "beta" }, + ]; + writeFileSync(resolve(TMP_ROOT, "data/_kb/synthetic_b.jsonl"), bRows.map(r => JSON.stringify(r)).join("\n") + "\n"); +} + +beforeEach(setupRoot); +afterEach(() => { + if (existsSync(TMP_ROOT)) rmSync(TMP_ROOT, { recursive: true, force: true }); +}); + +// ─── Acceptance Test 1: valid rows materialize, invalid go to skips ── + +test("materializer: 3 valid rows from source A reach evidence/, 1 invalid + 1 malformed go to skips", async () => { + const r = await materializeAll({ + root: TMP_ROOT, + transforms: TEST_TRANSFORMS, + recorded_at: RECORDED, + }); + + // Source A: 5 read, 3 written, 2 skipped (1 missing run_id, 1 malformed JSON) + const a = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_a.jsonl"))!; + expect(a.rows_read).toBe(5); + expect(a.rows_written).toBe(3); + expect(a.rows_skipped).toBe(2); + + // Source B: 2 read, 2 written + const b = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_b.jsonl"))!; + expect(b.rows_read).toBe(2); + expect(b.rows_written).toBe(2); + + // Skips file exists and contains both rejection reasons + const skipsContent = readFileSync(r.skips_path, "utf8"); + expect(skipsContent).toContain("run_id"); // missing required field + expect(skipsContent).toContain("JSON.parse"); // malformed JSON + + // Evidence files exist at the expected day partition + const partition = "2026/04/26"; + const aOut = resolve(TMP_ROOT, "data/evidence", partition, "synthetic_a.jsonl"); + const bOut = resolve(TMP_ROOT, "data/evidence", partition, "synthetic_b.jsonl"); + expect(existsSync(aOut)).toBe(true); + expect(existsSync(bOut)).toBe(true); + + // Output rows count matches written + const aLines = readFileSync(aOut, "utf8").trim().split("\n"); + expect(aLines.length).toBe(3); + for (const line of aLines) { + const row = JSON.parse(line); + expect(row.schema_version).toBe(EVIDENCE_SCHEMA_VERSION); + expect(row.provenance.source_file).toBe("data/_kb/synthetic_a.jsonl"); + expect(typeof row.provenance.sig_hash).toBe("string"); + expect(row.provenance.sig_hash.length).toBe(64); + } +}); + +// ─── Acceptance Test 2: idempotency ────────────────────────────────── + +test("materializer: re-running on same source produces 0 new writes (idempotent)", async () => { + await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + const r2 = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + + // Second run reads the same rows but dedups all of them — zero new writes + const a2 = r2.sources.find(s => s.source_file_relpath.endsWith("synthetic_a.jsonl"))!; + expect(a2.rows_written).toBe(0); + expect(a2.rows_deduped).toBe(3); +}); + +// ─── Acceptance Test 3: stable sig_hash → byte-identical output ────── + +test("materializer: identical input produces byte-identical output across runs", async () => { + const r1 = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + const aPath = resolve(TMP_ROOT, "data/evidence/2026/04/26/synthetic_a.jsonl"); + const aBeforeBytes = readFileSync(aPath); + + // Wipe the output file and re-run with the same inputs + rmSync(aPath); + await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + const aAfterBytes = readFileSync(aPath); + + expect(aBeforeBytes.equals(aAfterBytes)).toBe(true); +}); + +// ─── Acceptance Test 4: schema gating ──────────────────────────────── + +test("materializer: rows failing validateEvidenceRecord NEVER reach evidence/, only skips", async () => { + const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + + const aOut = resolve(TMP_ROOT, "data/evidence/2026/04/26/synthetic_a.jsonl"); + const aRows = readFileSync(aOut, "utf8").trim().split("\n").filter(Boolean).map(l => JSON.parse(l)); + + // Every output row has a non-empty run_id (the invalid row had no + // run_id, so it MUST be absent from output). + for (const row of aRows) { + expect(typeof row.run_id).toBe("string"); + expect(row.run_id.length).toBeGreaterThan(0); + } + // Specifically: no row carries the failing fixture's task_id "fail-row" + expect(aRows.find((r: any) => r.task_id === "fail-row")).toBeUndefined(); +}); + +// ─── Acceptance Test 5: receipt is substantive + schema-conforming ─── + +test("materializer: receipt has git_sha + sha256(input) + sha256(output) + record_counts and validates", async () => { + const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + + // Self-validation against the Receipt schema + const v = validateReceipt(r.receipt); + expect(v.valid).toBe(true); + + // git_sha is 40 hex chars (real or 0...0 fallback) + expect(r.receipt.git_sha).toMatch(/^[0-9a-f]{40}$/); + + // Each input file has a real sha256 + bytes + expect(r.receipt.input_files.length).toBe(2); + for (const f of r.receipt.input_files) { + expect(f.sha256).toMatch(/^[0-9a-f]{64}$/); + expect(typeof f.bytes).toBe("number"); + expect(f.bytes).toBeGreaterThan(0); + } + + // Each output file too + expect(r.receipt.output_files.length).toBe(2); + for (const f of r.receipt.output_files) { + expect(f.sha256).toMatch(/^[0-9a-f]{64}$/); + } + + // Counts add up + expect(r.receipt.record_counts.in).toBe(7); // 5 from A + 2 from B + expect(r.receipt.record_counts.out).toBe(5); // 3 + 2 + expect(r.receipt.record_counts.skipped).toBe(2); // both from A + + // validation_pass MUST be a boolean — never inferred + expect(typeof r.receipt.validation_pass).toBe("boolean"); + // With skips > 0, validation_pass should be false + expect(r.receipt.validation_pass).toBe(false); + + // Receipt persisted + expect(existsSync(r.receipt_path)).toBe(true); +}); + +// ─── Acceptance Test 6: clean run sets validation_pass=true ────────── + +test("materializer: with all-valid sources, validation_pass=true and skips=0", async () => { + // Strip the bad row + malformed JSON from source A + const cleanRows = [ + { run_id: "c1", task_id: "ct1", ts: "2026-04-26T22:00:00.000Z", text: "clean" }, + { run_id: "c2", task_id: "ct2", ts: "2026-04-26T22:01:00.000Z", text: "clean2" }, + ]; + writeFileSync(resolve(TMP_ROOT, "data/_kb/synthetic_a.jsonl"), cleanRows.map(r => JSON.stringify(r)).join("\n") + "\n"); + + const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + + expect(r.receipt.record_counts.skipped).toBe(0); + expect(r.receipt.validation_pass).toBe(true); +}); + +// ─── Acceptance Test 7: dry-run does not write ─────────────────────── + +test("materializer: --dry-run reports counts but writes no evidence files", async () => { + const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED, dry_run: true }); + + // Counts populated + expect(r.totals.rows_read).toBe(7); + expect(r.totals.rows_written).toBe(5); + + // No evidence files written + const evidenceDir = resolve(TMP_ROOT, "data/evidence"); + expect(existsSync(evidenceDir)).toBe(false); + + // No skips file written + const skipsPath = resolve(TMP_ROOT, "data/_kb/distillation_skips.jsonl"); + expect(existsSync(skipsPath)).toBe(false); +}); + +// ─── Acceptance Test 8: missing source file does not crash ─────────── + +test("materializer: missing source file is tallied as rows_present=false, no error", async () => { + rmSync(resolve(TMP_ROOT, "data/_kb/synthetic_b.jsonl")); + + const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + + const b = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_b.jsonl"))!; + expect(b.rows_present).toBe(false); + expect(b.rows_read).toBe(0); + + // Source A still processes normally + const a = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_a.jsonl"))!; + expect(a.rows_present).toBe(true); + expect(a.rows_written).toBe(3); +}); + +// ─── Acceptance Test 9: provenance preserved on every row ──────────── + +test("materializer: every output row has provenance traceable to a source row", async () => { + const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED }); + + for (const s of r.sources) { + for (const out_path of s.output_files) { + const lines = readFileSync(out_path, "utf8").trim().split("\n").filter(Boolean); + for (const line of lines) { + const row = JSON.parse(line); + expect(row.provenance).toBeTruthy(); + expect(row.provenance.source_file).toBe(s.source_file_relpath); + expect(typeof row.provenance.line_offset).toBe("number"); + expect(row.provenance.sig_hash).toMatch(/^[0-9a-f]{64}$/); + expect(row.provenance.recorded_at).toBe(RECORDED); + } + } + } +});