lakehouse/scripts/distillation/export_preference.ts
root 68b6697bcb
Some checks failed
lakehouse/auditor 1 blocking issue: todo!() macro call in tests/real-world/scrum_master_pipeline.ts
distillation: Phase 4 — dataset export layer
Build the contamination firewall: RAG, SFT, and Preference exporters
that turn scored evidence into clean training datasets without
leaking rejected, unvalidated, hallucinated, or provenance-free
records.

Files (8 new + 4 schema updates):
  scripts/distillation/quarantine.ts      shared QuarantineWriter, 11-reason taxonomy
  scripts/distillation/export_rag.ts      RAG exporter (--include-review opt-in)
  scripts/distillation/export_sft.ts      SFT exporter (--include-partial opt-in, SFT_NEVER constant)
  scripts/distillation/export_preference.ts preference exporter, same task_id pairing
  scripts/distillation/distill.ts         CLI dispatcher (build-evidence/score/export-*)
  tests/distillation/exports.test.ts      15 contamination-firewall tests
  reports/distillation/phase4-export-report.md  acceptance report

Schema field-name alignment with now.md:
  rag_sample.ts        +source_category, exported_at→created_at
  sft_sample.ts        +id, exported_at→created_at, partially_accepted at schema (CLI gates)
  preference_sample.ts +id, source_run_ids→chosen_run_id+rejected_run_id, +created_at

Test metrics: 117 distillation tests pass · 0 fail · 315 expects · 327ms

Real-data export run (1052 scored input rows):
  RAG:        446 exported (351 acc + 95 partial), 606 quarantined
  SFT:        351 exported (all 'accepted'),       701 quarantined
  Preference:  83 pairs exported,                   16 quarantined

CONTAMINATION FIREWALL — verified held on real data:
  - SFT output: 351/351 quality_score='accepted' (ZERO leaked)
  - RAG output: 351 acc + 95 partial (ZERO rejected leaked)
  - Preference: 0 self-pairs (chosen_run_id != rejected_run_id)
  - 536 rejected+needs_human_review records caught at unsafe_sft_category
    gate, exact match to scored-runs forbidden-category total

Defense in depth (the firewall is two layers, not one):
  1. Schema layer (Phase 1): SftSample.quality_score enum forbids
     rejected/needs_human at write time
  2. Exporter layer: SFT_NEVER constant in export_sft.ts checks
     category before synthesis. Even if synthesis produced a row
     with quality_score=rejected, validateSftSample would reject it.

Quarantine reasons (11): missing_provenance, missing_source_run_id,
empty_content, schema_violation, unsafe_sft_category,
unsafe_rag_category, invalid_preference_pairing,
hallucinated_file_path, duplicate_id, self_pairing,
category_disallowed.

Bug surfaced + fixed during testing: module-level evidenceCache
shared state across test runs (tests wipe TMP, cache holds stale
empty Map). Moved cache to per-call scope. Same pattern bit Phase 2
materializer would have hit if its tests had multiple runs sharing
state — preventive fix.

Pairing logic v1: same task_id with category gap. accepted×rejected
preferred, accepted×partially_accepted as fallback. MAX_PAIRS_PER_TASK=5
cap prevents one hot task from dominating. Future: cross-source
pairing (scrum_reviews chosen vs observer_reviews rejected on same
file) to grow dataset beyond 83.

CLI: ./scripts/distill.ts {build-evidence|score|export-rag|export-sft|export-preference|export-all|health}
Flags: --dry-run, --include-partial (SFT only), --include-review (RAG only)

Carry-overs to Phase 5 (Receipts Harness):
- Each exporter currently writes results but no per-stage receipt.json.
  Phase 5 wraps build_evidence_index + score_runs + export_* in a
  withReceipt() helper that captures git_sha + sha256 of inputs/outputs
  + record_counts + validation_pass.
- reports/distillation/latest.md aggregating most-recent run of each stage.

Carry-overs to Phase 3 v2:
- mode_experiments scoring (168 needs_human_review): derive markers from
  validation_results.grounded_fraction
- extraction-class JOIN: distilled_*/audit_facts/observer_escalations
  → JOIN to verdict-bearing parent by task_id

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 22:57:40 -05:00

299 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// export_preference.ts — Phase 4c preference dataset export.
//
// Pairs scored runs that attempted comparable tasks but landed in
// different categories (one accepted, one rejected). The "chosen" is
// the better outcome's text, the "rejected" is the worse outcome's
// text, and "reason" cites the explicit category transition.
//
// Pairing signal v1: SAME task_id with categories accepted/rejected
// (or accepted/partially_accepted as a softer pair).
//
// Hard rules from spec:
// - chosen != rejected at content level
// - chosen_run_id != rejected_run_id
// - reason non-empty
// - never fabricate pairs from unrelated records
//
// If insufficient valid pairs exist for a task_id, we don't pad — we
// just emit fewer pairs and note the gap.
import { existsSync, readFileSync, readdirSync, mkdirSync, appendFileSync, statSync } from "node:fs";
import { resolve, dirname } from "node:path";
import {
PREFERENCE_SAMPLE_SCHEMA_VERSION, validatePreferenceSample, type PreferenceSample,
} from "../../auditor/schemas/distillation/preference_sample";
import type { ScoredRun } from "../../auditor/schemas/distillation/scored_run";
import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
import { QuarantineWriter } from "./quarantine";
export interface ExportPreferenceOptions {
root: string;
recorded_at: string;
dry_run?: boolean;
}
export interface ExportPreferenceResult {
scored_files_read: number;
records_read: number;
task_ids_with_pairs: number;
pairs_exported: number;
records_quarantined: number;
output_path: string;
quarantine_summary: string;
insufficient_pair_task_ids: number; // tasks where we had only 1 record OR all same category
}
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
interface IndexedRecord {
scored: ScoredRun;
ev: EvidenceRecord;
scored_path: string;
line: number;
}
function listScoredRunFiles(root: string): string[] {
const out: string[] = [];
const dir = resolve(root, "data/scored-runs");
if (!existsSync(dir)) return out;
for (const yyyy of readdirSync(dir).sort()) {
const yp = resolve(dir, yyyy);
if (!statSync(yp).isDirectory()) continue;
for (const mm of readdirSync(yp).sort()) {
const mp = resolve(yp, mm);
if (!statSync(mp).isDirectory()) continue;
for (const dd of readdirSync(mp).sort()) {
const dp = resolve(mp, dd);
if (!statSync(dp).isDirectory()) continue;
for (const f of readdirSync(dp)) {
if (f.endsWith(".jsonl")) out.push(resolve(dp, f));
}
}
}
}
return out;
}
function loadEvidenceByRunId(
scored_path: string,
cache: Map<string, Map<string, EvidenceRecord>>,
): Map<string, EvidenceRecord> {
const evidence_path = scored_path.replace("/scored-runs/", "/evidence/");
if (cache.has(evidence_path)) return cache.get(evidence_path)!;
const m = new Map<string, EvidenceRecord>();
if (!existsSync(evidence_path)) { cache.set(evidence_path, m); return m; }
for (const line of readFileSync(evidence_path, "utf8").split("\n")) {
if (!line) continue;
try { const ev = JSON.parse(line) as EvidenceRecord; m.set(ev.run_id, ev); } catch {}
}
cache.set(evidence_path, m);
return m;
}
// Build a pair from accepted + rejected (or accepted + partially) within
// the same task_id. Never invents pairs.
function buildPair(
chosen: IndexedRecord,
rejected: IndexedRecord,
recorded_at: string,
): PreferenceSample | { error: string } {
if (chosen.scored.evidence_task_id !== rejected.scored.evidence_task_id) {
return { error: "task_id mismatch — comparable signal violation" };
}
if (chosen.scored.evidence_run_id === rejected.scored.evidence_run_id) {
return { error: "same run_id — self-pairing" };
}
const chosenText = (chosen.ev.text ?? "").trim();
const rejectedText = (rejected.ev.text ?? "").trim();
if (chosenText.length === 0 || rejectedText.length === 0) {
return { error: "empty text in chosen or rejected" };
}
if (chosenText === rejectedText) {
return { error: "chosen and rejected texts identical" };
}
// Prompt synthesis: best-effort. For task_ids that encode a file
// (e.g. scrum_review:<file>), include the file. Generic otherwise.
const taskId = chosen.scored.evidence_task_id;
let prompt = `Task: ${taskId}`;
const file = chosen.ev.source_files?.[0];
if (file) prompt += ` · file=${file}`;
// Reason cites the explicit category gap.
const reasonParts = [
`chosen scored '${chosen.scored.category}'`,
`rejected scored '${rejected.scored.category}'`,
];
if (chosen.scored.reasons.length > 0) reasonParts.push(`chosen-rationale: ${chosen.scored.reasons[0].slice(0, 80)}`);
if (rejected.scored.reasons.length > 0) reasonParts.push(`rejected-rationale: ${rejected.scored.reasons[0].slice(0, 80)}`);
const reason = reasonParts.join(" | ");
const id_seed = `${chosen.scored.evidence_run_id}|${rejected.scored.evidence_run_id}|${taskId}`;
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(id_seed);
const pref_id = "pref-" + hasher.digest("hex").slice(0, 16);
return {
schema_version: PREFERENCE_SAMPLE_SCHEMA_VERSION,
id: pref_id,
prompt,
chosen: chosenText,
rejected: rejectedText,
reason,
chosen_run_id: chosen.scored.evidence_run_id,
rejected_run_id: rejected.scored.evidence_run_id,
created_at: recorded_at,
provenance: {
source_file: chosen.scored.provenance.source_file,
line_offset: chosen.scored.provenance.line_offset,
// sig_hash for the pair = canonical sha of {chosen_run_id, rejected_run_id}
// sorted so re-running produces the same provenance.
sig_hash: pref_id_to_sig(chosen.scored.evidence_run_id, rejected.scored.evidence_run_id),
recorded_at,
},
};
}
function pref_id_to_sig(a: string, b: string): string {
const seed = [a, b].sort().join("|");
const h = new Bun.CryptoHasher("sha256");
h.update(seed);
return h.digest("hex");
}
export async function exportPreference(opts: ExportPreferenceOptions): Promise<ExportPreferenceResult> {
const { root, recorded_at, dry_run = false } = opts;
const out_path = resolve(root, "exports/preference/chosen_rejected.jsonl");
const q = new QuarantineWriter(root, "preference", dry_run);
let records_read = 0;
const seenIds = new Set<string>();
if (!dry_run && existsSync(out_path)) {
for (const line of readFileSync(out_path, "utf8").split("\n")) {
if (!line) continue;
try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {}
}
}
// Index by task_id.
const evidenceCache = new Map<string, Map<string, EvidenceRecord>>();
const byTask = new Map<string, IndexedRecord[]>();
const scored_files = listScoredRunFiles(root);
for (const sp of scored_files) {
const evMap = loadEvidenceByRunId(sp, evidenceCache);
const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean);
for (let i = 0; i < lines.length; i++) {
records_read++;
let scored: ScoredRun;
try { scored = JSON.parse(lines[i]) as ScoredRun; } catch { continue; }
const ev = evMap.get(scored.evidence_run_id);
if (!ev) continue;
if (!scored.evidence_task_id) continue;
const list = byTask.get(scored.evidence_task_id) ?? [];
list.push({ scored, ev, scored_path: sp, line: i });
byTask.set(scored.evidence_task_id, list);
}
}
let pairs_exported = 0;
let task_ids_with_pairs = 0;
let insufficient_pair_task_ids = 0;
const rowsToWrite: string[] = [];
for (const [taskId, recs] of byTask) {
if (recs.length < 2) {
insufficient_pair_task_ids++;
continue;
}
const accepted = recs.filter(r => r.scored.category === "accepted");
const rejected = recs.filter(r => r.scored.category === "rejected");
const partial = recs.filter(r => r.scored.category === "partially_accepted");
// Strongest signal: accepted vs rejected.
let pairs = pairUp(accepted, rejected);
// Weaker but still valid: accepted vs partial.
if (pairs.length === 0) pairs = pairUp(accepted, partial);
if (pairs.length === 0) {
insufficient_pair_task_ids++;
continue;
}
let exportedThisTask = 0;
for (const [chosen, rej] of pairs) {
const built = buildPair(chosen, rej, recorded_at);
if ("error" in built) {
q.add({
reason: "invalid_preference_pairing",
source_record: { task_id: taskId, chosen_run_id: chosen.scored.evidence_run_id, rejected_run_id: rej.scored.evidence_run_id },
errors: [built.error],
recorded_at,
source_provenance: chosen.scored.provenance,
});
continue;
}
if (seenIds.has(built.id)) continue;
const v = validatePreferenceSample(built);
if (!v.valid) {
q.add({
reason: "schema_violation",
source_record: built as unknown as Record<string, unknown>,
errors: v.errors,
recorded_at,
source_provenance: chosen.scored.provenance,
});
continue;
}
seenIds.add(built.id);
rowsToWrite.push(JSON.stringify(v.value));
pairs_exported++;
exportedThisTask++;
}
if (exportedThisTask > 0) task_ids_with_pairs++;
}
if (!dry_run && rowsToWrite.length > 0) {
mkdirSync(dirname(out_path), { recursive: true });
appendFileSync(out_path, rowsToWrite.join("\n") + "\n");
}
return {
scored_files_read: scored_files.length,
records_read,
task_ids_with_pairs,
pairs_exported,
records_quarantined: q.total,
output_path: out_path.replace(root + "/", ""),
quarantine_summary: q.summary(),
insufficient_pair_task_ids,
};
}
// Cross-product pairing: every accepted × every rejected. For any
// task_id with k accepted and m rejected, we get k*m pairs. Capped
// per task to keep the dataset balanced.
const MAX_PAIRS_PER_TASK = 5;
function pairUp(a: IndexedRecord[], b: IndexedRecord[]): Array<[IndexedRecord, IndexedRecord]> {
const pairs: Array<[IndexedRecord, IndexedRecord]> = [];
for (const x of a) {
for (const y of b) {
if (pairs.length >= MAX_PAIRS_PER_TASK) return pairs;
pairs.push([x, y]);
}
}
return pairs;
}
async function cli() {
const dry_run = process.argv.includes("--dry-run");
const recorded_at = new Date().toISOString();
const r = await exportPreference({ root: DEFAULT_ROOT, recorded_at, dry_run });
console.log(`[export_preference] read=${r.records_read} pairs=${r.pairs_exported} task_ids_paired=${r.task_ids_with_pairs} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`);
console.log(`[export_preference] insufficient_pair_task_ids=${r.insufficient_pair_task_ids} (only one record OR all-same-category)`);
console.log(`[export_preference] output: ${r.output_path}`);
}
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });