root 68b6697bcb
Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
distillation: Phase 4 — dataset export layer
Build the contamination firewall: RAG, SFT, and Preference exporters
that turn scored evidence into clean training datasets without
leaking rejected, unvalidated, hallucinated, or provenance-free
records.

Files (8 new + 4 schema updates):
  scripts/distillation/quarantine.ts      shared QuarantineWriter, 11-reason taxonomy
  scripts/distillation/export_rag.ts      RAG exporter (--include-review opt-in)
  scripts/distillation/export_sft.ts      SFT exporter (--include-partial opt-in, SFT_NEVER constant)
  scripts/distillation/export_preference.ts preference exporter, same task_id pairing
  scripts/distillation/distill.ts         CLI dispatcher (build-evidence/score/export-*)
  tests/distillation/exports.test.ts      15 contamination-firewall tests
  reports/distillation/phase4-export-report.md  acceptance report

Schema field-name alignment with now.md:
  rag_sample.ts        +source_category, exported_at→created_at
  sft_sample.ts        +id, exported_at→created_at, partially_accepted at schema (CLI gates)
  preference_sample.ts +id, source_run_ids→chosen_run_id+rejected_run_id, +created_at

Test metrics: 117 distillation tests pass · 0 fail · 315 expects · 327ms

Real-data export run (1052 scored input rows):
  RAG:        446 exported (351 acc + 95 partial), 606 quarantined
  SFT:        351 exported (all 'accepted'),       701 quarantined
  Preference:  83 pairs exported,                   16 quarantined

CONTAMINATION FIREWALL — verified held on real data:
  - SFT output: 351/351 quality_score='accepted' (ZERO leaked)
  - RAG output: 351 acc + 95 partial (ZERO rejected leaked)
  - Preference: 0 self-pairs (chosen_run_id != rejected_run_id)
  - 536 rejected+needs_human_review records caught at unsafe_sft_category
    gate, exact match to scored-runs forbidden-category total

Defense in depth (the firewall is two layers, not one):
  1. Schema layer (Phase 1): SftSample.quality_score enum forbids
     rejected/needs_human at write time
  2. Exporter layer: SFT_NEVER constant in export_sft.ts checks
     category before synthesis. Even if synthesis produced a row
     with quality_score=rejected, validateSftSample would reject it.

Quarantine reasons (11): missing_provenance, missing_source_run_id,
empty_content, schema_violation, unsafe_sft_category,
unsafe_rag_category, invalid_preference_pairing,
hallucinated_file_path, duplicate_id, self_pairing,
category_disallowed.

Bug surfaced + fixed during testing: module-level evidenceCache
shared state across test runs (tests wipe TMP, cache holds stale
empty Map). Moved cache to per-call scope. Same pattern bit Phase 2
materializer would have hit if its tests had multiple runs sharing
state — preventive fix.

Pairing logic v1: same task_id with category gap. accepted×rejected
preferred, accepted×partially_accepted as fallback. MAX_PAIRS_PER_TASK=5
cap prevents one hot task from dominating. Future: cross-source
pairing (scrum_reviews chosen vs observer_reviews rejected on same
file) to grow dataset beyond 83.

CLI: ./scripts/distill.ts {build-evidence|score|export-rag|export-sft|export-preference|export-all|health}
Flags: --dry-run, --include-partial (SFT only), --include-review (RAG only)

Carry-overs to Phase 5 (Receipts Harness):
- Each exporter currently writes results but no per-stage receipt.json.
  Phase 5 wraps build_evidence_index + score_runs + export_* in a
  withReceipt() helper that captures git_sha + sha256 of inputs/outputs
  + record_counts + validation_pass.
- reports/distillation/latest.md aggregating most-recent run of each stage.

Carry-overs to Phase 3 v2:
- mode_experiments scoring (168 needs_human_review): derive markers from
  validation_results.grounded_fraction
- extraction-class JOIN: distilled_*/audit_facts/observer_escalations
  → JOIN to verdict-bearing parent by task_id

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 22:57:40 -05:00

309 lines
11 KiB
TypeScript

// export_rag.ts — Phase 4a RAG dataset export.
//
// Reads ScoredRun rows from data/scored-runs/YYYY/MM/DD/*.jsonl,
// pairs them with the originating EvidenceRecord (by reading the
// matching evidence file), filters to allowed RAG categories,
// validates against RagSample schema, writes exports/rag/playbooks.jsonl.
// Records that fail any check go to exports/quarantine/rag.jsonl with
// a structured reason.
//
// Default categories: accepted, partially_accepted.
// Optional --include-review opt-in lets needs_human_review through —
// useful for retrieval of warning patterns; SFT never gets this.
// Rejected NEVER enters RAG (schema enforces).
//
// IDs are deterministic: sha256(source_run_id + score_provenance.sig_hash).slice(0,16)
// so re-running on the same scored-runs produces identical rows.
import { existsSync, readFileSync, readdirSync, mkdirSync, writeFileSync, appendFileSync, statSync } from "node:fs";
import { resolve, dirname } from "node:path";
import {
RAG_SAMPLE_SCHEMA_VERSION, validateRagSample, type RagSample, type RagSourceCategory, RAG_ALLOWED_CATEGORIES,
} from "../../auditor/schemas/distillation/rag_sample";
import type { ScoredRun } from "../../auditor/schemas/distillation/scored_run";
import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
import { canonicalSha256 } from "../../auditor/schemas/distillation/types";
import { QuarantineWriter } from "./quarantine";
export interface ExportRagOptions {
root: string;
recorded_at: string;
include_review?: boolean; // include needs_human_review records
dry_run?: boolean;
}
export interface ExportRagResult {
scored_files_read: number;
records_read: number;
records_exported: number;
records_quarantined: number;
output_path: string;
quarantine_summary: string;
}
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
const ALLOWED_DEFAULT: RagSourceCategory[] = ["accepted", "partially_accepted"];
function listScoredRunFiles(root: string): string[] {
const out: string[] = [];
const dir = resolve(root, "data/scored-runs");
if (!existsSync(dir)) return out;
for (const yyyy of readdirSync(dir).sort()) {
const yp = resolve(dir, yyyy);
if (!statSync(yp).isDirectory()) continue;
for (const mm of readdirSync(yp).sort()) {
const mp = resolve(yp, mm);
if (!statSync(mp).isDirectory()) continue;
for (const dd of readdirSync(mp).sort()) {
const dp = resolve(mp, dd);
if (!statSync(dp).isDirectory()) continue;
for (const f of readdirSync(dp)) {
if (f.endsWith(".jsonl")) out.push(resolve(dp, f));
}
}
}
}
return out;
}
// Load matching evidence file. Cache is scoped per export call (passed
// in by caller) — module-level cache across calls would leak stale
// state in tests that wipe and recreate the temp root.
function loadEvidenceByRunId(
scored_path: string,
root: string,
cache: Map<string, Map<string, EvidenceRecord>>,
): Map<string, EvidenceRecord> {
// scored-runs path mirrors evidence path:
// data/scored-runs/YYYY/MM/DD/<stem>.jsonl
// data/evidence/YYYY/MM/DD/<stem>.jsonl
const evidence_path = scored_path.replace("/scored-runs/", "/evidence/");
if (cache.has(evidence_path)) return cache.get(evidence_path)!;
const m = new Map<string, EvidenceRecord>();
if (!existsSync(evidence_path)) {
cache.set(evidence_path, m);
return m;
}
for (const line of readFileSync(evidence_path, "utf8").split("\n")) {
if (!line) continue;
try {
const ev = JSON.parse(line) as EvidenceRecord;
m.set(ev.run_id, ev);
} catch { /* skip bad lines */ }
}
cache.set(evidence_path, m);
return m;
}
// Synthesize fields needed for RAG from {ScoredRun, EvidenceRecord}.
// Pure transform; no I/O.
function synthesizeRagSample(
scored: ScoredRun,
ev: EvidenceRecord,
recorded_at: string,
rag_id: string,
): RagSample {
const text = ev.text ?? "";
const taskParts = ev.task_id.split(":");
const tags: string[] = [
`task:${taskParts[0] ?? ev.task_id}`,
`category:${scored.category}`,
];
if (ev.model_role) tags.push(`role:${ev.model_role}`);
if (ev.model_name) tags.push(`model:${ev.model_name}`);
if (Array.isArray(ev.source_files) && ev.source_files.length > 0) {
tags.push(`file:${ev.source_files[0]}`);
}
// Title: first line / first 80 chars of text, fallback to task_id
const firstLine = text.split("\n").find(l => l.trim().length > 0) ?? "";
const title = (firstLine || ev.task_id).slice(0, 120);
// Embedding text: same as content for now; future tuning may shorten
// (e.g. only the title + key claims).
const embedding_text = text.slice(0, 2000); // cap at 2KB to keep embeddings cheap
// Map ScoreCategory → RagSourceCategory. rejected was filtered above,
// but defensively narrow here.
const cat: RagSourceCategory = scored.category === "rejected"
? "needs_human_review" // shouldn't happen — caller filters
: (scored.category as RagSourceCategory);
return {
schema_version: RAG_SAMPLE_SCHEMA_VERSION,
id: rag_id,
title,
content: text,
tags,
source_run_id: scored.evidence_run_id,
success_score: cat,
source_category: cat,
embedding_text,
created_at: recorded_at,
provenance: {
source_file: scored.provenance.source_file,
line_offset: scored.provenance.line_offset,
sig_hash: scored.provenance.sig_hash,
recorded_at,
},
};
}
export async function exportRag(opts: ExportRagOptions): Promise<ExportRagResult> {
const { root, recorded_at, include_review = false, dry_run = false } = opts;
const allowed: RagSourceCategory[] = include_review
? ["accepted", "partially_accepted", "needs_human_review"]
: ALLOWED_DEFAULT;
const out_path = resolve(root, "exports/rag/playbooks.jsonl");
const q = new QuarantineWriter(root, "rag", dry_run);
let records_read = 0;
let records_exported = 0;
const seenIds = new Set<string>();
const rowsToWrite: string[] = [];
// Re-read existing output to populate seenIds — exporter idempotent.
if (!dry_run && existsSync(out_path)) {
for (const line of readFileSync(out_path, "utf8").split("\n")) {
if (!line) continue;
try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {}
}
}
const evidenceCache = new Map<string, Map<string, EvidenceRecord>>();
const scored_files = listScoredRunFiles(root);
for (const sp of scored_files) {
const evMap = loadEvidenceByRunId(sp, root, evidenceCache);
const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean);
for (let i = 0; i < lines.length; i++) {
records_read++;
let scored: ScoredRun;
try { scored = JSON.parse(lines[i]) as ScoredRun; }
catch (e) {
q.add({
reason: "schema_violation",
source_record: { _raw: lines[i].slice(0, 200) },
errors: ["scored-run not JSON: " + (e as Error).message.slice(0, 160)],
recorded_at,
source_provenance: { source_file: sp.replace(root + "/", ""), line_offset: i },
});
continue;
}
// Provenance check — every record must have it.
if (!scored.provenance?.sig_hash || !scored.provenance.source_file) {
q.add({
reason: "missing_provenance",
source_record: scored as unknown as Record<string, unknown>,
errors: ["scored_run.provenance missing or incomplete"],
recorded_at,
});
continue;
}
if (!scored.evidence_run_id) {
q.add({
reason: "missing_source_run_id",
source_record: scored as unknown as Record<string, unknown>,
errors: ["evidence_run_id missing"],
recorded_at,
source_provenance: scored.provenance,
});
continue;
}
// Category gate — RAG never takes rejected; needs_human is opt-in.
if (!allowed.includes(scored.category as RagSourceCategory)) {
q.add({
reason: "category_disallowed",
source_record: scored as unknown as Record<string, unknown>,
errors: [`category=${scored.category} not in [${allowed.join(",")}]`],
recorded_at,
source_provenance: scored.provenance,
});
continue;
}
// Look up evidence row.
const ev = evMap.get(scored.evidence_run_id);
if (!ev) {
q.add({
reason: "missing_source_run_id",
source_record: scored as unknown as Record<string, unknown>,
errors: [`evidence_run_id=${scored.evidence_run_id} not found in matching evidence partition`],
recorded_at,
source_provenance: scored.provenance,
});
continue;
}
// Empty content gate.
if (typeof ev.text !== "string" || ev.text.trim().length === 0) {
q.add({
reason: "empty_content",
source_record: scored as unknown as Record<string, unknown>,
errors: ["evidence.text is empty/missing — RAG needs content"],
recorded_at,
source_provenance: scored.provenance,
});
continue;
}
// Deterministic ID: sha256(evidence_run_id + score_sig_hash):16
const id_seed = `${scored.evidence_run_id}|${scored.provenance.sig_hash}`;
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(id_seed);
const rag_id = "rag-" + hasher.digest("hex").slice(0, 16);
if (seenIds.has(rag_id)) {
// Idempotent — same row appears in existing output. Skip silently.
continue;
}
const sample = synthesizeRagSample(scored, ev, recorded_at, rag_id);
const v = validateRagSample(sample);
if (!v.valid) {
q.add({
reason: "schema_violation",
source_record: sample as unknown as Record<string, unknown>,
errors: v.errors,
recorded_at,
source_provenance: scored.provenance,
});
continue;
}
seenIds.add(rag_id);
rowsToWrite.push(JSON.stringify(v.value));
records_exported++;
}
}
if (!dry_run && rowsToWrite.length > 0) {
mkdirSync(dirname(out_path), { recursive: true });
appendFileSync(out_path, rowsToWrite.join("\n") + "\n");
}
return {
scored_files_read: scored_files.length,
records_read,
records_exported,
records_quarantined: q.total,
output_path: out_path.replace(root + "/", ""),
quarantine_summary: q.summary(),
};
}
async function cli() {
const dry_run = process.argv.includes("--dry-run");
const include_review = process.argv.includes("--include-review");
const recorded_at = new Date().toISOString();
const r = await exportRag({ root: DEFAULT_ROOT, recorded_at, include_review, dry_run });
console.log(`[export_rag] read=${r.records_read} exported=${r.records_exported} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`);
console.log(`[export_rag] output: ${r.output_path}`);
if (include_review) console.log("[export_rag] needs_human_review INCLUDED (--include-review)");
}
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });