Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
Build the contamination firewall: RAG, SFT, and Preference exporters
that turn scored evidence into clean training datasets without
leaking rejected, unvalidated, hallucinated, or provenance-free
records.
Files (8 new + 4 schema updates):
scripts/distillation/quarantine.ts shared QuarantineWriter, 11-reason taxonomy
scripts/distillation/export_rag.ts RAG exporter (--include-review opt-in)
scripts/distillation/export_sft.ts SFT exporter (--include-partial opt-in, SFT_NEVER constant)
scripts/distillation/export_preference.ts preference exporter, same task_id pairing
scripts/distillation/distill.ts CLI dispatcher (build-evidence/score/export-*)
tests/distillation/exports.test.ts 15 contamination-firewall tests
reports/distillation/phase4-export-report.md acceptance report
Schema field-name alignment with now.md:
rag_sample.ts +source_category, exported_at→created_at
sft_sample.ts +id, exported_at→created_at, partially_accepted at schema (CLI gates)
preference_sample.ts +id, source_run_ids→chosen_run_id+rejected_run_id, +created_at
Test metrics: 117 distillation tests pass · 0 fail · 315 expects · 327ms
Real-data export run (1052 scored input rows):
RAG: 446 exported (351 acc + 95 partial), 606 quarantined
SFT: 351 exported (all 'accepted'), 701 quarantined
Preference: 83 pairs exported, 16 quarantined
CONTAMINATION FIREWALL — verified held on real data:
- SFT output: 351/351 quality_score='accepted' (ZERO leaked)
- RAG output: 351 acc + 95 partial (ZERO rejected leaked)
- Preference: 0 self-pairs (chosen_run_id != rejected_run_id)
- 536 rejected+needs_human_review records caught at unsafe_sft_category
gate, exact match to scored-runs forbidden-category total
Defense in depth (the firewall is two layers, not one):
1. Schema layer (Phase 1): SftSample.quality_score enum forbids
rejected/needs_human at write time
2. Exporter layer: SFT_NEVER constant in export_sft.ts checks
category before synthesis. Even if synthesis produced a row
with quality_score=rejected, validateSftSample would reject it.
Quarantine reasons (11): missing_provenance, missing_source_run_id,
empty_content, schema_violation, unsafe_sft_category,
unsafe_rag_category, invalid_preference_pairing,
hallucinated_file_path, duplicate_id, self_pairing,
category_disallowed.
Bug surfaced + fixed during testing: module-level evidenceCache
shared state across test runs (tests wipe TMP, cache holds stale
empty Map). Moved cache to per-call scope. Same pattern bit Phase 2
materializer would have hit if its tests had multiple runs sharing
state — preventive fix.
Pairing logic v1: same task_id with category gap. accepted×rejected
preferred, accepted×partially_accepted as fallback. MAX_PAIRS_PER_TASK=5
cap prevents one hot task from dominating. Future: cross-source
pairing (scrum_reviews chosen vs observer_reviews rejected on same
file) to grow dataset beyond 83.
CLI: ./scripts/distill.ts {build-evidence|score|export-rag|export-sft|export-preference|export-all|health}
Flags: --dry-run, --include-partial (SFT only), --include-review (RAG only)
Carry-overs to Phase 5 (Receipts Harness):
- Each exporter currently writes results but no per-stage receipt.json.
Phase 5 wraps build_evidence_index + score_runs + export_* in a
withReceipt() helper that captures git_sha + sha256 of inputs/outputs
+ record_counts + validation_pass.
- reports/distillation/latest.md aggregating most-recent run of each stage.
Carry-overs to Phase 3 v2:
- mode_experiments scoring (168 needs_human_review): derive markers from
validation_results.grounded_fraction
- extraction-class JOIN: distilled_*/audit_facts/observer_escalations
→ JOIN to verdict-bearing parent by task_id
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
309 lines
11 KiB
TypeScript
309 lines
11 KiB
TypeScript
// export_rag.ts — Phase 4a RAG dataset export.
|
|
//
|
|
// Reads ScoredRun rows from data/scored-runs/YYYY/MM/DD/*.jsonl,
|
|
// pairs them with the originating EvidenceRecord (by reading the
|
|
// matching evidence file), filters to allowed RAG categories,
|
|
// validates against RagSample schema, writes exports/rag/playbooks.jsonl.
|
|
// Records that fail any check go to exports/quarantine/rag.jsonl with
|
|
// a structured reason.
|
|
//
|
|
// Default categories: accepted, partially_accepted.
|
|
// Optional --include-review opt-in lets needs_human_review through —
|
|
// useful for retrieval of warning patterns; SFT never gets this.
|
|
// Rejected NEVER enters RAG (schema enforces).
|
|
//
|
|
// IDs are deterministic: sha256(source_run_id + score_provenance.sig_hash).slice(0,16)
|
|
// so re-running on the same scored-runs produces identical rows.
|
|
|
|
import { existsSync, readFileSync, readdirSync, mkdirSync, writeFileSync, appendFileSync, statSync } from "node:fs";
|
|
import { resolve, dirname } from "node:path";
|
|
import {
|
|
RAG_SAMPLE_SCHEMA_VERSION, validateRagSample, type RagSample, type RagSourceCategory, RAG_ALLOWED_CATEGORIES,
|
|
} from "../../auditor/schemas/distillation/rag_sample";
|
|
import type { ScoredRun } from "../../auditor/schemas/distillation/scored_run";
|
|
import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
|
|
import { canonicalSha256 } from "../../auditor/schemas/distillation/types";
|
|
import { QuarantineWriter } from "./quarantine";
|
|
|
|
export interface ExportRagOptions {
|
|
root: string;
|
|
recorded_at: string;
|
|
include_review?: boolean; // include needs_human_review records
|
|
dry_run?: boolean;
|
|
}
|
|
|
|
export interface ExportRagResult {
|
|
scored_files_read: number;
|
|
records_read: number;
|
|
records_exported: number;
|
|
records_quarantined: number;
|
|
output_path: string;
|
|
quarantine_summary: string;
|
|
}
|
|
|
|
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
|
|
const ALLOWED_DEFAULT: RagSourceCategory[] = ["accepted", "partially_accepted"];
|
|
|
|
function listScoredRunFiles(root: string): string[] {
|
|
const out: string[] = [];
|
|
const dir = resolve(root, "data/scored-runs");
|
|
if (!existsSync(dir)) return out;
|
|
for (const yyyy of readdirSync(dir).sort()) {
|
|
const yp = resolve(dir, yyyy);
|
|
if (!statSync(yp).isDirectory()) continue;
|
|
for (const mm of readdirSync(yp).sort()) {
|
|
const mp = resolve(yp, mm);
|
|
if (!statSync(mp).isDirectory()) continue;
|
|
for (const dd of readdirSync(mp).sort()) {
|
|
const dp = resolve(mp, dd);
|
|
if (!statSync(dp).isDirectory()) continue;
|
|
for (const f of readdirSync(dp)) {
|
|
if (f.endsWith(".jsonl")) out.push(resolve(dp, f));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// Load matching evidence file. Cache is scoped per export call (passed
|
|
// in by caller) — module-level cache across calls would leak stale
|
|
// state in tests that wipe and recreate the temp root.
|
|
function loadEvidenceByRunId(
|
|
scored_path: string,
|
|
root: string,
|
|
cache: Map<string, Map<string, EvidenceRecord>>,
|
|
): Map<string, EvidenceRecord> {
|
|
// scored-runs path mirrors evidence path:
|
|
// data/scored-runs/YYYY/MM/DD/<stem>.jsonl
|
|
// data/evidence/YYYY/MM/DD/<stem>.jsonl
|
|
const evidence_path = scored_path.replace("/scored-runs/", "/evidence/");
|
|
if (cache.has(evidence_path)) return cache.get(evidence_path)!;
|
|
const m = new Map<string, EvidenceRecord>();
|
|
if (!existsSync(evidence_path)) {
|
|
cache.set(evidence_path, m);
|
|
return m;
|
|
}
|
|
for (const line of readFileSync(evidence_path, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try {
|
|
const ev = JSON.parse(line) as EvidenceRecord;
|
|
m.set(ev.run_id, ev);
|
|
} catch { /* skip bad lines */ }
|
|
}
|
|
cache.set(evidence_path, m);
|
|
return m;
|
|
}
|
|
|
|
// Synthesize fields needed for RAG from {ScoredRun, EvidenceRecord}.
|
|
// Pure transform; no I/O.
|
|
function synthesizeRagSample(
|
|
scored: ScoredRun,
|
|
ev: EvidenceRecord,
|
|
recorded_at: string,
|
|
rag_id: string,
|
|
): RagSample {
|
|
const text = ev.text ?? "";
|
|
const taskParts = ev.task_id.split(":");
|
|
const tags: string[] = [
|
|
`task:${taskParts[0] ?? ev.task_id}`,
|
|
`category:${scored.category}`,
|
|
];
|
|
if (ev.model_role) tags.push(`role:${ev.model_role}`);
|
|
if (ev.model_name) tags.push(`model:${ev.model_name}`);
|
|
if (Array.isArray(ev.source_files) && ev.source_files.length > 0) {
|
|
tags.push(`file:${ev.source_files[0]}`);
|
|
}
|
|
|
|
// Title: first line / first 80 chars of text, fallback to task_id
|
|
const firstLine = text.split("\n").find(l => l.trim().length > 0) ?? "";
|
|
const title = (firstLine || ev.task_id).slice(0, 120);
|
|
|
|
// Embedding text: same as content for now; future tuning may shorten
|
|
// (e.g. only the title + key claims).
|
|
const embedding_text = text.slice(0, 2000); // cap at 2KB to keep embeddings cheap
|
|
|
|
// Map ScoreCategory → RagSourceCategory. rejected was filtered above,
|
|
// but defensively narrow here.
|
|
const cat: RagSourceCategory = scored.category === "rejected"
|
|
? "needs_human_review" // shouldn't happen — caller filters
|
|
: (scored.category as RagSourceCategory);
|
|
|
|
return {
|
|
schema_version: RAG_SAMPLE_SCHEMA_VERSION,
|
|
id: rag_id,
|
|
title,
|
|
content: text,
|
|
tags,
|
|
source_run_id: scored.evidence_run_id,
|
|
success_score: cat,
|
|
source_category: cat,
|
|
embedding_text,
|
|
created_at: recorded_at,
|
|
provenance: {
|
|
source_file: scored.provenance.source_file,
|
|
line_offset: scored.provenance.line_offset,
|
|
sig_hash: scored.provenance.sig_hash,
|
|
recorded_at,
|
|
},
|
|
};
|
|
}
|
|
|
|
export async function exportRag(opts: ExportRagOptions): Promise<ExportRagResult> {
|
|
const { root, recorded_at, include_review = false, dry_run = false } = opts;
|
|
const allowed: RagSourceCategory[] = include_review
|
|
? ["accepted", "partially_accepted", "needs_human_review"]
|
|
: ALLOWED_DEFAULT;
|
|
|
|
const out_path = resolve(root, "exports/rag/playbooks.jsonl");
|
|
const q = new QuarantineWriter(root, "rag", dry_run);
|
|
|
|
let records_read = 0;
|
|
let records_exported = 0;
|
|
const seenIds = new Set<string>();
|
|
const rowsToWrite: string[] = [];
|
|
|
|
// Re-read existing output to populate seenIds — exporter idempotent.
|
|
if (!dry_run && existsSync(out_path)) {
|
|
for (const line of readFileSync(out_path, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {}
|
|
}
|
|
}
|
|
|
|
const evidenceCache = new Map<string, Map<string, EvidenceRecord>>();
|
|
const scored_files = listScoredRunFiles(root);
|
|
for (const sp of scored_files) {
|
|
const evMap = loadEvidenceByRunId(sp, root, evidenceCache);
|
|
const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean);
|
|
for (let i = 0; i < lines.length; i++) {
|
|
records_read++;
|
|
let scored: ScoredRun;
|
|
try { scored = JSON.parse(lines[i]) as ScoredRun; }
|
|
catch (e) {
|
|
q.add({
|
|
reason: "schema_violation",
|
|
source_record: { _raw: lines[i].slice(0, 200) },
|
|
errors: ["scored-run not JSON: " + (e as Error).message.slice(0, 160)],
|
|
recorded_at,
|
|
source_provenance: { source_file: sp.replace(root + "/", ""), line_offset: i },
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Provenance check — every record must have it.
|
|
if (!scored.provenance?.sig_hash || !scored.provenance.source_file) {
|
|
q.add({
|
|
reason: "missing_provenance",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: ["scored_run.provenance missing or incomplete"],
|
|
recorded_at,
|
|
});
|
|
continue;
|
|
}
|
|
if (!scored.evidence_run_id) {
|
|
q.add({
|
|
reason: "missing_source_run_id",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: ["evidence_run_id missing"],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Category gate — RAG never takes rejected; needs_human is opt-in.
|
|
if (!allowed.includes(scored.category as RagSourceCategory)) {
|
|
q.add({
|
|
reason: "category_disallowed",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: [`category=${scored.category} not in [${allowed.join(",")}]`],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Look up evidence row.
|
|
const ev = evMap.get(scored.evidence_run_id);
|
|
if (!ev) {
|
|
q.add({
|
|
reason: "missing_source_run_id",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: [`evidence_run_id=${scored.evidence_run_id} not found in matching evidence partition`],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Empty content gate.
|
|
if (typeof ev.text !== "string" || ev.text.trim().length === 0) {
|
|
q.add({
|
|
reason: "empty_content",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: ["evidence.text is empty/missing — RAG needs content"],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Deterministic ID: sha256(evidence_run_id + score_sig_hash):16
|
|
const id_seed = `${scored.evidence_run_id}|${scored.provenance.sig_hash}`;
|
|
const hasher = new Bun.CryptoHasher("sha256");
|
|
hasher.update(id_seed);
|
|
const rag_id = "rag-" + hasher.digest("hex").slice(0, 16);
|
|
|
|
if (seenIds.has(rag_id)) {
|
|
// Idempotent — same row appears in existing output. Skip silently.
|
|
continue;
|
|
}
|
|
|
|
const sample = synthesizeRagSample(scored, ev, recorded_at, rag_id);
|
|
const v = validateRagSample(sample);
|
|
if (!v.valid) {
|
|
q.add({
|
|
reason: "schema_violation",
|
|
source_record: sample as unknown as Record<string, unknown>,
|
|
errors: v.errors,
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
seenIds.add(rag_id);
|
|
rowsToWrite.push(JSON.stringify(v.value));
|
|
records_exported++;
|
|
}
|
|
}
|
|
|
|
if (!dry_run && rowsToWrite.length > 0) {
|
|
mkdirSync(dirname(out_path), { recursive: true });
|
|
appendFileSync(out_path, rowsToWrite.join("\n") + "\n");
|
|
}
|
|
|
|
return {
|
|
scored_files_read: scored_files.length,
|
|
records_read,
|
|
records_exported,
|
|
records_quarantined: q.total,
|
|
output_path: out_path.replace(root + "/", ""),
|
|
quarantine_summary: q.summary(),
|
|
};
|
|
}
|
|
|
|
async function cli() {
|
|
const dry_run = process.argv.includes("--dry-run");
|
|
const include_review = process.argv.includes("--include-review");
|
|
const recorded_at = new Date().toISOString();
|
|
const r = await exportRag({ root: DEFAULT_ROOT, recorded_at, include_review, dry_run });
|
|
|
|
console.log(`[export_rag] read=${r.records_read} exported=${r.records_exported} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`);
|
|
console.log(`[export_rag] output: ${r.output_path}`);
|
|
if (include_review) console.log("[export_rag] needs_human_review INCLUDED (--include-review)");
|
|
}
|
|
|
|
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });
|