distillation: Phase 2 — Evidence View materializer + health audit

Phase 2 ships the JOIN script that turns 12 source JSONL streams
into unified data/evidence/YYYY/MM/DD/<source>.jsonl rows conforming
to EvidenceRecord v1, plus a high-level health audit proving the
substrate is real before Phase 3 reads from it.

Files:
  scripts/distillation/build_evidence_index.ts    materializeAll() + cli
  scripts/distillation/check_evidence_health.ts   provenance + coverage audit
  tests/distillation/build_evidence_index.test.ts 9 acceptance tests

Test metrics:
  9/9 pass · 85 expect() calls · 323ms

Real-data run (2026-04-27T03:33:53Z):
  1053 rows read from 12 source streams
  1051 written (99.8%) to data/evidence/2026/04/27/
  2 skipped (outcomes.jsonl rows missing created_at — schema-level catch)
  0 deduped on first run

Sources covered (priority order from recon):
  TIER 1 (validated 100% in Phase 1, 8 sources):
    distilled_facts/procedures/config_hints, contract_analyses,
    mode_experiments, scrum_reviews, observer_escalations, audit_facts
  TIER 2 (added by Phase 2):
    auto_apply, observer_reviews, audits, outcomes

High-level audit results:
  Provenance round-trip: 30/30 sampled rows trace cleanly to source
  rows with matching canonicalSha256(orderedKeys(row)). Every output
  has source_file + line_offset + sig_hash + recorded_at. Proven.

  Score-readiness: 54% aggregate scoreable. Three-class taxonomy
  emerges from coverage matrix:
    - Verdict-bearing (100% scoreable): scrum_reviews, observer_reviews,
      audits, contract_analyses — direct scoring inputs
    - Telemetry-rich (0-70%): mode_experiments, audit_facts, outcomes
      — Phase 3 will derive markers from latency/grounding/retrieval
    - Pure-extraction (0%): distilled_*, observer_escalations
      — context for OTHER scoring, not scoreable themselves

Invariants enforced (proven by tests + real-data audit):
  - ZERO model calls in materializer (deterministic only)
  - canonicalSha256(orderedKeys(row)) per source row → stable sig_hash
  - Schema validator gates output: rejected rows go to skips, never to evidence/
  - JSON.parse failures caught + logged, never crash the run
  - Missing source files tallied as rows_present=false, never error
  - Idempotent: second run on identical input writes 0 rows (proven on
    real data: 1053 read, 0 written, 1051 deduped)
  - Bit-stable: identical input produces byte-identical output (proven
    by tests/distillation/build_evidence_index.test.ts case 3)
  - Receipt self-validates against schema before write
  - validation_pass = boolean (skipped == 0), never inferred

Receipt at:
  reports/distillation/2026-04-27T03-33-53-972Z/receipt.json
  - schema_version=1, git_sha pinned, sha256 on every input/output
  - record_counts: {in:1053, out:1051, skipped:2, deduped:0}
  - validation_pass=false (skipped > 0; spec says explicit, never inferred)

Skips at:
  data/_kb/distillation_skips.jsonl (2 rows from outcomes.jsonl,
  reason: timestamp field missing — schema layer caught it cleanly)

Health audit at:
  data/_kb/evidence_health.md

Phase 2 done-criteria all met:
  ✓ tests pass
  ✓ ≥1 row from each Tier-1 source on real data (8/8 + 4 Tier 2 bonus)
  ✓ data/_kb/distillation_skips.jsonl populated with reasons
  ✓ Receipt JSON written + self-validates
  ✓ Provenance round-trip proven on real sampled rows
  ✓ Score-readiness coverage measured

Carry-overs to Phase 3:
  - audit_discrepancies transform (needed before Phase 4c preference data)
  - model_trust transform (needed before ModelLedgerEntry aggregation)
  - outcomes.jsonl created_at: 2 rows fail materialization, decide
    transform-side fix vs source-side fix
  - 11 untested streams from recon still have no transform; add as
    Phase 3+ consumers need them
  - mode_experiments + distilled_* are 0% scoreable; Phase 3 must
    JOIN to adjacent verdict-bearing records, NOT score in isolation

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-26 22:38:46 -05:00
parent 27b1d27605
commit 1ea802943f
3 changed files with 913 additions and 0 deletions

View File

@ -0,0 +1,364 @@
// build_evidence_index.ts — materialize EvidenceRecord rows from
// source JSONL streams. Pure mechanical view: every output row traces
// to a single source row via provenance.{source_file, line_offset,
// sig_hash}.
//
// USAGE
// bun run scripts/distillation/build_evidence_index.ts # materialize, write outputs
// bun run scripts/distillation/build_evidence_index.ts --dry-run # count + report, no writes
// LH_DISTILL_ROOT=/path bun run ... # override repo root (tests)
//
// OUTPUTS
// data/evidence/YYYY/MM/DD/<source-stem>.jsonl valid records
// data/_kb/distillation_skips.jsonl rows that failed validation (append)
// reports/distillation/<ts>/receipt.json per-run audit (Receipt schema)
//
// IDEMPOTENCY
// sig_hash = canonicalSha256(orderedKeys(source_row)). Re-running
// loads existing day-partition output files into a seen set, so
// already-materialized rows are skipped (not duplicated). Bit-stable
// on identical input.
//
// NON-NEGOTIABLES (per spec + recon)
// - ZERO model calls — deterministic only
// - Provenance on every emitted row (the schema validator enforces it)
// - Empty/missing source files do not error — they're tallied as
// "rows_present: false" in the receipt
// - Validator rejection ≠ runtime error — invalid rows go to skips
// with their full error list, materialization continues
import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync, appendFileSync, statSync } from "node:fs";
import { resolve, dirname } from "node:path";
import { spawnSync } from "node:child_process";
import { TRANSFORMS, canonicalSha256, type TransformDef } from "./transforms";
import { validateEvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
import { RECEIPT_SCHEMA_VERSION, validateReceipt, type Receipt, type FileReference } from "../../auditor/schemas/distillation/receipt";
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
export interface MaterializeOptions {
root: string; // repo root — source jsonls + outputs are relative to this
transforms: TransformDef[]; // override for tests
recorded_at: string; // ISO 8601 — fixed for the run, used in provenance
dry_run?: boolean; // count but don't write
}
export interface SourceResult {
source_file_relpath: string;
rows_present: boolean;
rows_read: number;
rows_written: number;
rows_skipped: number;
rows_deduped: number;
output_files: string[];
}
export interface MaterializeResult {
sources: SourceResult[];
totals: {
rows_read: number;
rows_written: number;
rows_skipped: number;
rows_deduped: number;
};
receipt: Receipt;
receipt_path: string;
evidence_dir: string;
skips_path: string;
}
const ISO_DATE_PARTITION = (iso: string): string => {
const d = new Date(iso);
return `${d.getUTCFullYear()}/${String(d.getUTCMonth() + 1).padStart(2, "0")}/${String(d.getUTCDate()).padStart(2, "0")}`;
};
const OUTPUT_STEM = (source_file_relpath: string): string => {
const m = source_file_relpath.match(/([^/]+)\.jsonl$/);
return m ? m[1] : source_file_relpath.replace(/[^a-z0-9_]/gi, "_");
};
function sha256OfFile(path: string): string {
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(readFileSync(path));
return hasher.digest("hex");
}
function getGitSha(root: string): string {
const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" });
if (r.status !== 0) return "0".repeat(40);
return r.stdout.trim();
}
function getGitBranch(root: string): string | undefined {
const r = spawnSync("git", ["-C", root, "rev-parse", "--abbrev-ref", "HEAD"], { encoding: "utf8" });
return r.status === 0 ? r.stdout.trim() : undefined;
}
function getGitDirty(root: string): boolean {
const r = spawnSync("git", ["-C", root, "status", "--porcelain"], { encoding: "utf8" });
return r.status === 0 && r.stdout.trim().length > 0;
}
// Load sig_hashes already present in the target day-partition output
// file. Used to skip records that a prior run of the same day already
// materialized — keeps the materializer idempotent across reruns.
function loadSeenHashes(out_path: string): Set<string> {
const seen = new Set<string>();
if (!existsSync(out_path)) return seen;
for (const line of readFileSync(out_path, "utf8").split("\n")) {
if (!line) continue;
try {
const row = JSON.parse(line);
if (row?.provenance?.sig_hash) seen.add(row.provenance.sig_hash);
} catch {
// Malformed line — ignore. Will be overwritten on next valid hash collision.
}
}
return seen;
}
async function processSource(
transform: TransformDef,
opts: MaterializeOptions,
evidence_dir: string,
skips_path: string,
): Promise<SourceResult> {
const source_path = resolve(opts.root, transform.source_file_relpath);
const result: SourceResult = {
source_file_relpath: transform.source_file_relpath,
rows_present: false,
rows_read: 0,
rows_written: 0,
rows_skipped: 0,
rows_deduped: 0,
output_files: [],
};
if (!existsSync(source_path)) return result;
result.rows_present = true;
const partition = ISO_DATE_PARTITION(opts.recorded_at);
const stem = OUTPUT_STEM(transform.source_file_relpath);
const out_dir = resolve(evidence_dir, partition);
const out_path = resolve(out_dir, `${stem}.jsonl`);
if (!opts.dry_run) mkdirSync(out_dir, { recursive: true });
// Idempotency: collect sig_hashes already in the target output file
// so we skip rows that were materialized in a prior run of the same
// day. Tests pin this invariant.
const seenHashes = loadSeenHashes(out_path);
const lines = readFileSync(source_path, "utf8").split("\n");
const rowsToWrite: string[] = [];
const skipsToWrite: string[] = [];
for (let i = 0; i < lines.length; i++) {
const raw = lines[i];
if (!raw) continue;
result.rows_read++;
let row: any;
try { row = JSON.parse(raw); }
catch (e) {
result.rows_skipped++;
skipsToWrite.push(JSON.stringify({
source_file: transform.source_file_relpath,
line_offset: i,
errors: ["JSON.parse failed: " + (e as Error).message.slice(0, 200)],
recorded_at: opts.recorded_at,
}));
continue;
}
const sig_hash = await canonicalSha256(row);
if (seenHashes.has(sig_hash)) {
result.rows_deduped++;
continue;
}
seenHashes.add(sig_hash);
const partial = transform.transform({
row,
line_offset: i,
source_file_relpath: transform.source_file_relpath,
recorded_at: opts.recorded_at,
sig_hash,
});
if (!partial) {
result.rows_skipped++;
skipsToWrite.push(JSON.stringify({
source_file: transform.source_file_relpath,
line_offset: i,
errors: ["transform returned null"],
sig_hash,
recorded_at: opts.recorded_at,
}));
continue;
}
const v = validateEvidenceRecord(partial);
if (!v.valid) {
result.rows_skipped++;
skipsToWrite.push(JSON.stringify({
source_file: transform.source_file_relpath,
line_offset: i,
errors: v.errors,
sig_hash,
recorded_at: opts.recorded_at,
}));
continue;
}
rowsToWrite.push(JSON.stringify(v.value));
result.rows_written++;
}
if (!opts.dry_run) {
if (rowsToWrite.length > 0) {
const block = rowsToWrite.join("\n") + "\n";
// Append-mode preserves prior runs' rows; dedup above ensures
// we don't append duplicates of those.
appendFileSync(out_path, block);
result.output_files.push(out_path);
}
if (skipsToWrite.length > 0) {
mkdirSync(dirname(skips_path), { recursive: true });
appendFileSync(skips_path, skipsToWrite.join("\n") + "\n");
}
}
return result;
}
export async function materializeAll(opts: MaterializeOptions): Promise<MaterializeResult> {
const evidence_dir = resolve(opts.root, "data/evidence");
const skips_path = resolve(opts.root, "data/_kb/distillation_skips.jsonl");
const reports_dir = resolve(opts.root, "reports/distillation");
const started_ms = Date.now();
const sources: SourceResult[] = [];
for (const t of opts.transforms) {
sources.push(await processSource(t, opts, evidence_dir, skips_path));
}
const totals = sources.reduce(
(acc, s) => ({
rows_read: acc.rows_read + s.rows_read,
rows_written: acc.rows_written + s.rows_written,
rows_skipped: acc.rows_skipped + s.rows_skipped,
rows_deduped: acc.rows_deduped + s.rows_deduped,
}),
{ rows_read: 0, rows_written: 0, rows_skipped: 0, rows_deduped: 0 },
);
// Build the receipt — substantive per the spec non-negotiable.
const ended_at = new Date().toISOString();
const duration_ms = Date.now() - started_ms;
const input_files: FileReference[] = [];
for (const s of sources) {
if (s.rows_present) {
const path = resolve(opts.root, s.source_file_relpath);
try {
input_files.push({
path: s.source_file_relpath,
sha256: sha256OfFile(path),
bytes: statSync(path).size,
});
} catch { /* file vanished mid-run — tally as missing in record_counts */ }
}
}
const output_files: FileReference[] = [];
for (const s of sources) {
for (const path of s.output_files) {
try {
output_files.push({
path: path.replace(opts.root + "/", ""),
sha256: sha256OfFile(path),
bytes: statSync(path).size,
});
} catch { /* not written in dry-run */ }
}
}
const errors: string[] = [];
const warnings: string[] = [];
for (const s of sources) {
if (!s.rows_present) warnings.push(`${s.source_file_relpath}: source file not found (skipped)`);
if (s.rows_skipped > 0) warnings.push(`${s.source_file_relpath}: ${s.rows_skipped} rows skipped (validation/parse errors)`);
}
const receipt: Receipt = {
schema_version: RECEIPT_SCHEMA_VERSION,
command: "bun run scripts/distillation/build_evidence_index.ts" + (opts.dry_run ? " --dry-run" : ""),
git_sha: getGitSha(opts.root),
git_branch: getGitBranch(opts.root),
git_dirty: getGitDirty(opts.root),
started_at: opts.recorded_at,
ended_at,
duration_ms,
input_files,
output_files,
record_counts: {
in: totals.rows_read,
out: totals.rows_written,
skipped: totals.rows_skipped,
deduped: totals.rows_deduped,
},
validation_pass: totals.rows_skipped === 0,
errors,
warnings,
};
// Self-validate the receipt — spec says receipts must conform to
// their schema. Fail fast if our own writer drifts from the schema.
const rv = validateReceipt(receipt);
if (!rv.valid) {
errors.push(...rv.errors.map(e => "receipt schema: " + e));
receipt.errors = errors;
receipt.validation_pass = false;
}
const stamp = ended_at.replace(/[:.]/g, "-");
const receipt_dir = resolve(reports_dir, stamp);
const receipt_path = resolve(receipt_dir, "receipt.json");
if (!opts.dry_run) {
mkdirSync(receipt_dir, { recursive: true });
writeFileSync(receipt_path, JSON.stringify(receipt, null, 2) + "\n");
}
return { sources, totals, receipt, receipt_path, evidence_dir, skips_path };
}
async function cli() {
const dry_run = process.argv.includes("--dry-run");
const recorded_at = new Date().toISOString();
const r = await materializeAll({
root: DEFAULT_ROOT,
transforms: TRANSFORMS,
recorded_at,
dry_run,
});
console.log(`[evidence_index] ${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped · ${r.totals.rows_deduped} deduped${dry_run ? " (DRY RUN)" : ""}`);
for (const s of r.sources) {
if (!s.rows_present) {
console.log(` ${s.source_file_relpath}: (missing — skipped)`);
continue;
}
console.log(` ${s.source_file_relpath}: read=${s.rows_read} wrote=${s.rows_written} skip=${s.rows_skipped} dedup=${s.rows_deduped}`);
}
if (!dry_run) {
console.log(`[evidence_index] receipt: ${r.receipt_path}`);
console.log(`[evidence_index] validation_pass=${r.receipt.validation_pass}`);
}
if (!r.receipt.validation_pass) process.exit(1);
}
if (import.meta.main) {
cli().catch(e => { console.error(e); process.exit(1); });
}

View File

@ -0,0 +1,254 @@
// check_evidence_health.ts — high-level audit of the materialized
// EvidenceRecord substrate. Answers two questions Phase 3 needs:
//
// 1. PROVENANCE ROUND-TRIP — sample N output rows, look up the
// source row at the recorded (source_file, line_offset),
// recompute canonicalSha256, confirm it matches provenance.sig_hash.
// Hard pass/fail. If even one row fails, provenance is theater.
//
// 2. SCORE-READINESS COVERAGE — for each source, what fraction of
// materialized rows carry the signals the Success Scorer will
// need: model_role, success_markers, failure_markers,
// observer_verdict, latency_ms, retrieved_context, text. Tells
// Phase 3 which sources to read from for each gate.
//
// Output: markdown report to stdout + data/_kb/evidence_health.md.
//
// Run: bun run scripts/distillation/check_evidence_health.ts
import { existsSync, readFileSync, readdirSync, statSync, writeFileSync } from "node:fs";
import { resolve } from "node:path";
import { canonicalSha256 } from "../../auditor/schemas/distillation/types";
const ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
const SAMPLE_FOR_PROVENANCE = 30;
interface CoverageBucket {
source: string;
total: number;
with_model_role: number;
with_model_name: number;
with_success_markers: number;
with_failure_markers: number;
with_observer_verdict: number;
with_latency_ms: number;
with_retrieved_context: number;
with_text: number;
scoreable: number; // has at least ONE signal the scorer can use
}
interface ProvenanceCheck {
passed: number;
failed: number;
failures: Array<{ output_path: string; line: number; reason: string }>;
}
function listEvidenceFiles(evidence_root: string): string[] {
const out: string[] = [];
if (!existsSync(evidence_root)) return out;
for (const yyyy of readdirSync(evidence_root).sort()) {
const ydir = resolve(evidence_root, yyyy);
if (!statSync(ydir).isDirectory()) continue;
for (const mm of readdirSync(ydir).sort()) {
const mdir = resolve(ydir, mm);
if (!statSync(mdir).isDirectory()) continue;
for (const dd of readdirSync(mdir).sort()) {
const ddir = resolve(mdir, dd);
if (!statSync(ddir).isDirectory()) continue;
for (const f of readdirSync(ddir)) {
if (f.endsWith(".jsonl")) out.push(resolve(ddir, f));
}
}
}
}
return out;
}
// Has at least one deterministic signal the Phase 3 scorer can act on.
// Order is generous: any of these counts as "scoreable", because the
// scorer combines multiple signals.
function isScoreable(row: any): boolean {
if (Array.isArray(row.success_markers) && row.success_markers.length > 0) return true;
if (Array.isArray(row.failure_markers) && row.failure_markers.length > 0) return true;
if (typeof row.observer_verdict === "string") return true;
if (row.validation_results && Object.keys(row.validation_results).length > 0) return true;
if (Array.isArray(row.observer_notes) && row.observer_notes.length > 0) return true;
return false;
}
function bucketStart(source: string): CoverageBucket {
return {
source, total: 0,
with_model_role: 0, with_model_name: 0,
with_success_markers: 0, with_failure_markers: 0,
with_observer_verdict: 0, with_latency_ms: 0,
with_retrieved_context: 0, with_text: 0,
scoreable: 0,
};
}
function pct(n: number, total: number): string {
if (total === 0) return "—";
return Math.round(100 * n / total) + "%";
}
async function main() {
const evidenceFiles = listEvidenceFiles(resolve(ROOT, "data/evidence"));
if (evidenceFiles.length === 0) {
console.error("No evidence files found. Run scripts/distillation/build_evidence_index.ts first.");
process.exit(1);
}
// ── 1. Coverage scan ────────────────────────────────────────────
const buckets = new Map<string, CoverageBucket>();
const allOutputRows: Array<{ output_path: string; line: number; row: any }> = [];
for (const evPath of evidenceFiles) {
const sourceLabel = evPath.split("/").pop()!.replace(/\.jsonl$/, "");
const b = buckets.get(sourceLabel) ?? bucketStart(sourceLabel);
const lines = readFileSync(evPath, "utf8").split("\n").filter(Boolean);
for (let i = 0; i < lines.length; i++) {
const row = JSON.parse(lines[i]);
b.total++;
if (row.model_role) b.with_model_role++;
if (row.model_name) b.with_model_name++;
if (Array.isArray(row.success_markers) && row.success_markers.length > 0) b.with_success_markers++;
if (Array.isArray(row.failure_markers) && row.failure_markers.length > 0) b.with_failure_markers++;
if (typeof row.observer_verdict === "string") b.with_observer_verdict++;
if (typeof row.latency_ms === "number") b.with_latency_ms++;
if (row.retrieved_context && Object.keys(row.retrieved_context).length > 0) b.with_retrieved_context++;
if (typeof row.text === "string" && row.text.length > 0) b.with_text++;
if (isScoreable(row)) b.scoreable++;
allOutputRows.push({ output_path: evPath, line: i, row });
}
buckets.set(sourceLabel, b);
}
// ── 2. Provenance round-trip on a random sample ─────────────────
const sampleSize = Math.min(SAMPLE_FOR_PROVENANCE, allOutputRows.length);
const indices = new Set<number>();
// Deterministic-ish sample: stride through evenly so we hit different sources.
const stride = Math.max(1, Math.floor(allOutputRows.length / sampleSize));
for (let i = 0; i < allOutputRows.length && indices.size < sampleSize; i += stride) indices.add(i);
// Top up with the tail in case stride truncates early.
while (indices.size < sampleSize) indices.add(allOutputRows.length - 1 - indices.size);
const provCheck: ProvenanceCheck = { passed: 0, failed: 0, failures: [] };
// Cache source-file lines so we don't re-read big files repeatedly.
const sourceCache = new Map<string, string[]>();
for (const idx of indices) {
const { output_path, line, row } = allOutputRows[idx];
const prov = row.provenance;
if (!prov || !prov.source_file || prov.line_offset == null || !prov.sig_hash) {
provCheck.failed++;
provCheck.failures.push({ output_path, line, reason: "missing provenance fields" });
continue;
}
const sourceAbs = resolve(ROOT, prov.source_file);
if (!sourceCache.has(sourceAbs)) {
if (!existsSync(sourceAbs)) {
provCheck.failed++;
provCheck.failures.push({ output_path, line, reason: `source missing: ${prov.source_file}` });
continue;
}
sourceCache.set(sourceAbs, readFileSync(sourceAbs, "utf8").split("\n"));
}
const sourceLines = sourceCache.get(sourceAbs)!;
if (prov.line_offset >= sourceLines.length) {
provCheck.failed++;
provCheck.failures.push({ output_path, line, reason: `line_offset ${prov.line_offset} past EOF (source has ${sourceLines.length} lines)` });
continue;
}
const sourceLine = sourceLines[prov.line_offset];
let sourceRow: any;
try { sourceRow = JSON.parse(sourceLine); }
catch (e) {
provCheck.failed++;
provCheck.failures.push({ output_path, line, reason: `source line not JSON: ${(e as Error).message.slice(0, 60)}` });
continue;
}
const recomputed = await canonicalSha256(sourceRow);
if (recomputed !== prov.sig_hash) {
provCheck.failed++;
provCheck.failures.push({
output_path, line,
reason: `sig_hash mismatch: prov=${prov.sig_hash.slice(0, 16)}… recomputed=${recomputed.slice(0, 16)}`,
});
continue;
}
provCheck.passed++;
}
// ── 3. Render markdown ──────────────────────────────────────────
const md: string[] = [];
md.push("# Evidence Health — Phase 2 high-level audit");
md.push("");
md.push(`**Run:** ${new Date().toISOString()}`);
md.push(`**Evidence files:** ${evidenceFiles.length}`);
md.push(`**Total records:** ${allOutputRows.length}`);
md.push("");
md.push("## 1. Provenance round-trip");
md.push("");
md.push(`Sample size: **${sampleSize}** rows (stride sample across all evidence).`);
md.push("");
md.push(`| Passed | Failed |`);
md.push(`|---|---|`);
md.push(`| ${provCheck.passed} | ${provCheck.failed} |`);
md.push("");
if (provCheck.failed > 0) {
md.push("### Failures");
for (const f of provCheck.failures.slice(0, 20)) {
md.push(`- \`${f.output_path.split("/").slice(-2).join("/")}\` line ${f.line}: ${f.reason}`);
}
} else {
md.push("**All sampled rows traced cleanly back to source rows with matching canonical sig_hash.**");
}
md.push("");
md.push("## 2. Score-readiness coverage");
md.push("");
md.push("Per source, fraction of materialized rows carrying each signal the Phase 3 Success Scorer will read.");
md.push("");
md.push("| Source | Rows | role | name | success | failure | obs.verdict | latency | retrieval | text | scoreable |");
md.push("|---|---|---|---|---|---|---|---|---|---|---|");
const sortedBuckets = Array.from(buckets.values()).sort((a, b) => b.total - a.total);
for (const b of sortedBuckets) {
md.push(`| ${b.source} | ${b.total} | ${pct(b.with_model_role, b.total)} | ${pct(b.with_model_name, b.total)} | ${pct(b.with_success_markers, b.total)} | ${pct(b.with_failure_markers, b.total)} | ${pct(b.with_observer_verdict, b.total)} | ${pct(b.with_latency_ms, b.total)} | ${pct(b.with_retrieved_context, b.total)} | ${pct(b.with_text, b.total)} | **${pct(b.scoreable, b.total)}** |`);
}
md.push("");
// Aggregate totals row
const totals = sortedBuckets.reduce((acc, b) => ({
total: acc.total + b.total,
role: acc.role + b.with_model_role,
name: acc.name + b.with_model_name,
success: acc.success + b.with_success_markers,
failure: acc.failure + b.with_failure_markers,
obs: acc.obs + b.with_observer_verdict,
lat: acc.lat + b.with_latency_ms,
ret: acc.ret + b.with_retrieved_context,
text: acc.text + b.with_text,
score: acc.score + b.scoreable,
}), { total: 0, role: 0, name: 0, success: 0, failure: 0, obs: 0, lat: 0, ret: 0, text: 0, score: 0 });
md.push(`**Aggregate:** ${totals.total} rows · role ${pct(totals.role, totals.total)} · name ${pct(totals.name, totals.total)} · success ${pct(totals.success, totals.total)} · failure ${pct(totals.failure, totals.total)} · obs.verdict ${pct(totals.obs, totals.total)} · latency ${pct(totals.lat, totals.total)} · retrieval ${pct(totals.ret, totals.total)} · text ${pct(totals.text, totals.total)} · scoreable **${pct(totals.score, totals.total)}**`);
md.push("");
md.push("## 3. Phase 3 readiness");
md.push("");
if (provCheck.failed > 0) {
md.push("**❌ NOT READY** — provenance round-trip failed. Fix materializer or transforms before Phase 3.");
} else if (totals.score < totals.total * 0.5) {
md.push(`**⚠️ PARTIAL READINESS** — only ${pct(totals.score, totals.total)} of records are scoreable. Phase 3 will produce many \`needs_human_review\` until transforms enrich more sources with markers.`);
} else {
md.push(`**✓ READY** — provenance traces, ${pct(totals.score, totals.total)} of records carry scorer signals. Phase 3 can begin.`);
}
md.push("");
const out = md.join("\n");
console.log(out);
writeFileSync(resolve(ROOT, "data/_kb/evidence_health.md"), out);
if (provCheck.failed > 0) process.exit(1);
}
if (import.meta.main) {
main().catch(e => { console.error(e); process.exit(1); });
}

View File

@ -0,0 +1,295 @@
// Phase 2 acceptance tests — pin the materializer's invariants:
// 1. Valid rows materialize; invalid rows go to skips with errors
// 2. Idempotency: re-running on same source yields zero new writes
// 3. Stability: identical input → byte-identical output (canonical hash)
// 4. Schema gating: rows that fail validateEvidenceRecord NEVER reach
// data/evidence/*.jsonl, only skips
// 5. Receipt: substantive (git_sha + sha256 + record_counts +
// validation_pass), conforms to Receipt schema
// 6. JSON-parse failures handled gracefully
//
// All tests run against a temp repo root with synthetic source jsonls
// and a custom TRANSFORMS list pointing at them. No live JSONLs touched.
//
// Run: bun test tests/distillation/build_evidence_index.test.ts
import { test, expect, beforeEach, afterEach } from "bun:test";
import { mkdirSync, writeFileSync, rmSync, existsSync, readFileSync, readdirSync, statSync } from "node:fs";
import { resolve } from "node:path";
import { materializeAll, type MaterializeOptions } from "../../scripts/distillation/build_evidence_index";
import type { TransformDef } from "../../scripts/distillation/transforms";
import { EVIDENCE_SCHEMA_VERSION, type ModelRole } from "../../auditor/schemas/distillation/evidence_record";
import { validateReceipt } from "../../auditor/schemas/distillation/receipt";
const TMP_ROOT = "/tmp/distillation_test_phase2";
const RECORDED = "2026-04-26T22:30:00.000Z";
// Minimal transform — produces a valid EvidenceRecord from the
// synthetic source rows below.
const TEST_TRANSFORMS: TransformDef[] = [
{
source_file_relpath: "data/_kb/synthetic_a.jsonl",
transform: ({ row, line_offset, source_file_relpath, recorded_at, sig_hash }) => {
// Test rows that intentionally fail validation set bad: true.
// Transform still returns a Partial — validator catches it.
if (row.bad) {
return {
// missing run_id (required) → forces validateEvidenceRecord to reject
task_id: row.task_id,
timestamp: row.ts,
schema_version: EVIDENCE_SCHEMA_VERSION,
provenance: { source_file: source_file_relpath, line_offset, sig_hash, recorded_at },
} as any;
}
return {
run_id: row.run_id,
task_id: row.task_id,
timestamp: row.ts,
schema_version: EVIDENCE_SCHEMA_VERSION,
provenance: { source_file: source_file_relpath, line_offset, sig_hash, recorded_at },
text: row.text,
model_role: "executor" as ModelRole,
};
},
},
{
source_file_relpath: "data/_kb/synthetic_b.jsonl",
transform: ({ row, line_offset, source_file_relpath, recorded_at, sig_hash }) => ({
run_id: row.run_id,
task_id: row.task_id,
timestamp: row.ts,
schema_version: EVIDENCE_SCHEMA_VERSION,
provenance: { source_file: source_file_relpath, line_offset, sig_hash, recorded_at },
text: row.text,
model_role: "extractor" as ModelRole,
}),
},
];
function setupRoot() {
if (existsSync(TMP_ROOT)) rmSync(TMP_ROOT, { recursive: true, force: true });
mkdirSync(resolve(TMP_ROOT, "data/_kb"), { recursive: true });
// Source A: 3 valid + 1 invalid + 1 malformed JSON
const aRows = [
{ run_id: "a1", task_id: "task1", ts: "2026-04-26T20:00:00.000Z", text: "first" },
{ run_id: "a2", task_id: "task2", ts: "2026-04-26T20:01:00.000Z", text: "second" },
{ run_id: "a3", task_id: "task3", ts: "2026-04-26T20:02:00.000Z", text: "third" },
{ bad: true, task_id: "fail-row", ts: "2026-04-26T20:03:00.000Z" },
];
const aLines = aRows.map(r => JSON.stringify(r)).join("\n") + "\n{not valid json\n";
writeFileSync(resolve(TMP_ROOT, "data/_kb/synthetic_a.jsonl"), aLines);
// Source B: 2 valid rows
const bRows = [
{ run_id: "b1", task_id: "btask1", ts: "2026-04-26T20:10:00.000Z", text: "alpha" },
{ run_id: "b2", task_id: "btask2", ts: "2026-04-26T20:11:00.000Z", text: "beta" },
];
writeFileSync(resolve(TMP_ROOT, "data/_kb/synthetic_b.jsonl"), bRows.map(r => JSON.stringify(r)).join("\n") + "\n");
}
beforeEach(setupRoot);
afterEach(() => {
if (existsSync(TMP_ROOT)) rmSync(TMP_ROOT, { recursive: true, force: true });
});
// ─── Acceptance Test 1: valid rows materialize, invalid go to skips ──
test("materializer: 3 valid rows from source A reach evidence/, 1 invalid + 1 malformed go to skips", async () => {
const r = await materializeAll({
root: TMP_ROOT,
transforms: TEST_TRANSFORMS,
recorded_at: RECORDED,
});
// Source A: 5 read, 3 written, 2 skipped (1 missing run_id, 1 malformed JSON)
const a = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_a.jsonl"))!;
expect(a.rows_read).toBe(5);
expect(a.rows_written).toBe(3);
expect(a.rows_skipped).toBe(2);
// Source B: 2 read, 2 written
const b = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_b.jsonl"))!;
expect(b.rows_read).toBe(2);
expect(b.rows_written).toBe(2);
// Skips file exists and contains both rejection reasons
const skipsContent = readFileSync(r.skips_path, "utf8");
expect(skipsContent).toContain("run_id"); // missing required field
expect(skipsContent).toContain("JSON.parse"); // malformed JSON
// Evidence files exist at the expected day partition
const partition = "2026/04/26";
const aOut = resolve(TMP_ROOT, "data/evidence", partition, "synthetic_a.jsonl");
const bOut = resolve(TMP_ROOT, "data/evidence", partition, "synthetic_b.jsonl");
expect(existsSync(aOut)).toBe(true);
expect(existsSync(bOut)).toBe(true);
// Output rows count matches written
const aLines = readFileSync(aOut, "utf8").trim().split("\n");
expect(aLines.length).toBe(3);
for (const line of aLines) {
const row = JSON.parse(line);
expect(row.schema_version).toBe(EVIDENCE_SCHEMA_VERSION);
expect(row.provenance.source_file).toBe("data/_kb/synthetic_a.jsonl");
expect(typeof row.provenance.sig_hash).toBe("string");
expect(row.provenance.sig_hash.length).toBe(64);
}
});
// ─── Acceptance Test 2: idempotency ──────────────────────────────────
test("materializer: re-running on same source produces 0 new writes (idempotent)", async () => {
await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
const r2 = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
// Second run reads the same rows but dedups all of them — zero new writes
const a2 = r2.sources.find(s => s.source_file_relpath.endsWith("synthetic_a.jsonl"))!;
expect(a2.rows_written).toBe(0);
expect(a2.rows_deduped).toBe(3);
});
// ─── Acceptance Test 3: stable sig_hash → byte-identical output ──────
test("materializer: identical input produces byte-identical output across runs", async () => {
const r1 = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
const aPath = resolve(TMP_ROOT, "data/evidence/2026/04/26/synthetic_a.jsonl");
const aBeforeBytes = readFileSync(aPath);
// Wipe the output file and re-run with the same inputs
rmSync(aPath);
await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
const aAfterBytes = readFileSync(aPath);
expect(aBeforeBytes.equals(aAfterBytes)).toBe(true);
});
// ─── Acceptance Test 4: schema gating ────────────────────────────────
test("materializer: rows failing validateEvidenceRecord NEVER reach evidence/, only skips", async () => {
const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
const aOut = resolve(TMP_ROOT, "data/evidence/2026/04/26/synthetic_a.jsonl");
const aRows = readFileSync(aOut, "utf8").trim().split("\n").filter(Boolean).map(l => JSON.parse(l));
// Every output row has a non-empty run_id (the invalid row had no
// run_id, so it MUST be absent from output).
for (const row of aRows) {
expect(typeof row.run_id).toBe("string");
expect(row.run_id.length).toBeGreaterThan(0);
}
// Specifically: no row carries the failing fixture's task_id "fail-row"
expect(aRows.find((r: any) => r.task_id === "fail-row")).toBeUndefined();
});
// ─── Acceptance Test 5: receipt is substantive + schema-conforming ───
test("materializer: receipt has git_sha + sha256(input) + sha256(output) + record_counts and validates", async () => {
const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
// Self-validation against the Receipt schema
const v = validateReceipt(r.receipt);
expect(v.valid).toBe(true);
// git_sha is 40 hex chars (real or 0...0 fallback)
expect(r.receipt.git_sha).toMatch(/^[0-9a-f]{40}$/);
// Each input file has a real sha256 + bytes
expect(r.receipt.input_files.length).toBe(2);
for (const f of r.receipt.input_files) {
expect(f.sha256).toMatch(/^[0-9a-f]{64}$/);
expect(typeof f.bytes).toBe("number");
expect(f.bytes).toBeGreaterThan(0);
}
// Each output file too
expect(r.receipt.output_files.length).toBe(2);
for (const f of r.receipt.output_files) {
expect(f.sha256).toMatch(/^[0-9a-f]{64}$/);
}
// Counts add up
expect(r.receipt.record_counts.in).toBe(7); // 5 from A + 2 from B
expect(r.receipt.record_counts.out).toBe(5); // 3 + 2
expect(r.receipt.record_counts.skipped).toBe(2); // both from A
// validation_pass MUST be a boolean — never inferred
expect(typeof r.receipt.validation_pass).toBe("boolean");
// With skips > 0, validation_pass should be false
expect(r.receipt.validation_pass).toBe(false);
// Receipt persisted
expect(existsSync(r.receipt_path)).toBe(true);
});
// ─── Acceptance Test 6: clean run sets validation_pass=true ──────────
test("materializer: with all-valid sources, validation_pass=true and skips=0", async () => {
// Strip the bad row + malformed JSON from source A
const cleanRows = [
{ run_id: "c1", task_id: "ct1", ts: "2026-04-26T22:00:00.000Z", text: "clean" },
{ run_id: "c2", task_id: "ct2", ts: "2026-04-26T22:01:00.000Z", text: "clean2" },
];
writeFileSync(resolve(TMP_ROOT, "data/_kb/synthetic_a.jsonl"), cleanRows.map(r => JSON.stringify(r)).join("\n") + "\n");
const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
expect(r.receipt.record_counts.skipped).toBe(0);
expect(r.receipt.validation_pass).toBe(true);
});
// ─── Acceptance Test 7: dry-run does not write ───────────────────────
test("materializer: --dry-run reports counts but writes no evidence files", async () => {
const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED, dry_run: true });
// Counts populated
expect(r.totals.rows_read).toBe(7);
expect(r.totals.rows_written).toBe(5);
// No evidence files written
const evidenceDir = resolve(TMP_ROOT, "data/evidence");
expect(existsSync(evidenceDir)).toBe(false);
// No skips file written
const skipsPath = resolve(TMP_ROOT, "data/_kb/distillation_skips.jsonl");
expect(existsSync(skipsPath)).toBe(false);
});
// ─── Acceptance Test 8: missing source file does not crash ───────────
test("materializer: missing source file is tallied as rows_present=false, no error", async () => {
rmSync(resolve(TMP_ROOT, "data/_kb/synthetic_b.jsonl"));
const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
const b = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_b.jsonl"))!;
expect(b.rows_present).toBe(false);
expect(b.rows_read).toBe(0);
// Source A still processes normally
const a = r.sources.find(s => s.source_file_relpath.endsWith("synthetic_a.jsonl"))!;
expect(a.rows_present).toBe(true);
expect(a.rows_written).toBe(3);
});
// ─── Acceptance Test 9: provenance preserved on every row ────────────
test("materializer: every output row has provenance traceable to a source row", async () => {
const r = await materializeAll({ root: TMP_ROOT, transforms: TEST_TRANSFORMS, recorded_at: RECORDED });
for (const s of r.sources) {
for (const out_path of s.output_files) {
const lines = readFileSync(out_path, "utf8").trim().split("\n").filter(Boolean);
for (const line of lines) {
const row = JSON.parse(line);
expect(row.provenance).toBeTruthy();
expect(row.provenance.source_file).toBe(s.source_file_relpath);
expect(typeof row.provenance.line_offset).toBe("number");
expect(row.provenance.sig_hash).toMatch(/^[0-9a-f]{64}$/);
expect(row.provenance.recorded_at).toBe(RECORDED);
}
}
}
});