// export_preference.ts — Phase 4c preference dataset export. // // Pairs scored runs that attempted comparable tasks but landed in // different categories (one accepted, one rejected). The "chosen" is // the better outcome's text, the "rejected" is the worse outcome's // text, and "reason" cites the explicit category transition. // // Pairing signal v1: SAME task_id with categories accepted/rejected // (or accepted/partially_accepted as a softer pair). // // Hard rules from spec: // - chosen != rejected at content level // - chosen_run_id != rejected_run_id // - reason non-empty // - never fabricate pairs from unrelated records // // If insufficient valid pairs exist for a task_id, we don't pad — we // just emit fewer pairs and note the gap. import { existsSync, readFileSync, readdirSync, mkdirSync, appendFileSync, statSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { PREFERENCE_SAMPLE_SCHEMA_VERSION, validatePreferenceSample, type PreferenceSample, } from "../../auditor/schemas/distillation/preference_sample"; import type { ScoredRun } from "../../auditor/schemas/distillation/scored_run"; import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record"; import { QuarantineWriter } from "./quarantine"; export interface ExportPreferenceOptions { root: string; recorded_at: string; dry_run?: boolean; } export interface ExportPreferenceResult { scored_files_read: number; records_read: number; task_ids_with_pairs: number; pairs_exported: number; records_quarantined: number; output_path: string; quarantine_summary: string; insufficient_pair_task_ids: number; // tasks where we had only 1 record OR all same category } const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; interface IndexedRecord { scored: ScoredRun; ev: EvidenceRecord; scored_path: string; line: number; } function listScoredRunFiles(root: string): string[] { const out: string[] = []; const dir = resolve(root, "data/scored-runs"); if (!existsSync(dir)) return out; for (const yyyy of readdirSync(dir).sort()) { const yp = resolve(dir, yyyy); if (!statSync(yp).isDirectory()) continue; for (const mm of readdirSync(yp).sort()) { const mp = resolve(yp, mm); if (!statSync(mp).isDirectory()) continue; for (const dd of readdirSync(mp).sort()) { const dp = resolve(mp, dd); if (!statSync(dp).isDirectory()) continue; for (const f of readdirSync(dp)) { if (f.endsWith(".jsonl")) out.push(resolve(dp, f)); } } } } return out; } function loadEvidenceByRunId( scored_path: string, cache: Map>, ): Map { const evidence_path = scored_path.replace("/scored-runs/", "/evidence/"); if (cache.has(evidence_path)) return cache.get(evidence_path)!; const m = new Map(); if (!existsSync(evidence_path)) { cache.set(evidence_path, m); return m; } for (const line of readFileSync(evidence_path, "utf8").split("\n")) { if (!line) continue; try { const ev = JSON.parse(line) as EvidenceRecord; m.set(ev.run_id, ev); } catch {} } cache.set(evidence_path, m); return m; } // Build a pair from accepted + rejected (or accepted + partially) within // the same task_id. Never invents pairs. function buildPair( chosen: IndexedRecord, rejected: IndexedRecord, recorded_at: string, ): PreferenceSample | { error: string } { if (chosen.scored.evidence_task_id !== rejected.scored.evidence_task_id) { return { error: "task_id mismatch — comparable signal violation" }; } if (chosen.scored.evidence_run_id === rejected.scored.evidence_run_id) { return { error: "same run_id — self-pairing" }; } const chosenText = (chosen.ev.text ?? "").trim(); const rejectedText = (rejected.ev.text ?? "").trim(); if (chosenText.length === 0 || rejectedText.length === 0) { return { error: "empty text in chosen or rejected" }; } if (chosenText === rejectedText) { return { error: "chosen and rejected texts identical" }; } // Prompt synthesis: best-effort. For task_ids that encode a file // (e.g. scrum_review:), include the file. Generic otherwise. const taskId = chosen.scored.evidence_task_id; let prompt = `Task: ${taskId}`; const file = chosen.ev.source_files?.[0]; if (file) prompt += ` · file=${file}`; // Reason cites the explicit category gap. const reasonParts = [ `chosen scored '${chosen.scored.category}'`, `rejected scored '${rejected.scored.category}'`, ]; if (chosen.scored.reasons.length > 0) reasonParts.push(`chosen-rationale: ${chosen.scored.reasons[0].slice(0, 80)}`); if (rejected.scored.reasons.length > 0) reasonParts.push(`rejected-rationale: ${rejected.scored.reasons[0].slice(0, 80)}`); const reason = reasonParts.join(" | "); const id_seed = `${chosen.scored.evidence_run_id}|${rejected.scored.evidence_run_id}|${taskId}`; const hasher = new Bun.CryptoHasher("sha256"); hasher.update(id_seed); const pref_id = "pref-" + hasher.digest("hex").slice(0, 16); return { schema_version: PREFERENCE_SAMPLE_SCHEMA_VERSION, id: pref_id, prompt, chosen: chosenText, rejected: rejectedText, reason, chosen_run_id: chosen.scored.evidence_run_id, rejected_run_id: rejected.scored.evidence_run_id, created_at: recorded_at, provenance: { source_file: chosen.scored.provenance.source_file, line_offset: chosen.scored.provenance.line_offset, // sig_hash for the pair = canonical sha of {chosen_run_id, rejected_run_id} // sorted so re-running produces the same provenance. sig_hash: pref_id_to_sig(chosen.scored.evidence_run_id, rejected.scored.evidence_run_id), recorded_at, }, }; } function pref_id_to_sig(a: string, b: string): string { const seed = [a, b].sort().join("|"); const h = new Bun.CryptoHasher("sha256"); h.update(seed); return h.digest("hex"); } export async function exportPreference(opts: ExportPreferenceOptions): Promise { const { root, recorded_at, dry_run = false } = opts; const out_path = resolve(root, "exports/preference/chosen_rejected.jsonl"); const q = new QuarantineWriter(root, "preference", dry_run); let records_read = 0; const seenIds = new Set(); if (!dry_run && existsSync(out_path)) { for (const line of readFileSync(out_path, "utf8").split("\n")) { if (!line) continue; try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {} } } // Index by task_id. const evidenceCache = new Map>(); const byTask = new Map(); const scored_files = listScoredRunFiles(root); for (const sp of scored_files) { const evMap = loadEvidenceByRunId(sp, evidenceCache); const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean); for (let i = 0; i < lines.length; i++) { records_read++; let scored: ScoredRun; try { scored = JSON.parse(lines[i]) as ScoredRun; } catch { continue; } const ev = evMap.get(scored.evidence_run_id); if (!ev) continue; if (!scored.evidence_task_id) continue; const list = byTask.get(scored.evidence_task_id) ?? []; list.push({ scored, ev, scored_path: sp, line: i }); byTask.set(scored.evidence_task_id, list); } } let pairs_exported = 0; let task_ids_with_pairs = 0; let insufficient_pair_task_ids = 0; const rowsToWrite: string[] = []; for (const [taskId, recs] of byTask) { if (recs.length < 2) { insufficient_pair_task_ids++; continue; } const accepted = recs.filter(r => r.scored.category === "accepted"); const rejected = recs.filter(r => r.scored.category === "rejected"); const partial = recs.filter(r => r.scored.category === "partially_accepted"); // Strongest signal: accepted vs rejected. let pairs = pairUp(accepted, rejected); // Weaker but still valid: accepted vs partial. if (pairs.length === 0) pairs = pairUp(accepted, partial); if (pairs.length === 0) { insufficient_pair_task_ids++; continue; } let exportedThisTask = 0; for (const [chosen, rej] of pairs) { const built = buildPair(chosen, rej, recorded_at); if ("error" in built) { q.add({ reason: "invalid_preference_pairing", source_record: { task_id: taskId, chosen_run_id: chosen.scored.evidence_run_id, rejected_run_id: rej.scored.evidence_run_id }, errors: [built.error], recorded_at, source_provenance: chosen.scored.provenance, }); continue; } if (seenIds.has(built.id)) continue; const v = validatePreferenceSample(built); if (!v.valid) { q.add({ reason: "schema_violation", source_record: built as unknown as Record, errors: v.errors, recorded_at, source_provenance: chosen.scored.provenance, }); continue; } seenIds.add(built.id); rowsToWrite.push(JSON.stringify(v.value)); pairs_exported++; exportedThisTask++; } if (exportedThisTask > 0) task_ids_with_pairs++; } if (!dry_run && rowsToWrite.length > 0) { mkdirSync(dirname(out_path), { recursive: true }); appendFileSync(out_path, rowsToWrite.join("\n") + "\n"); } return { scored_files_read: scored_files.length, records_read, task_ids_with_pairs, pairs_exported, records_quarantined: q.total, output_path: out_path.replace(root + "/", ""), quarantine_summary: q.summary(), insufficient_pair_task_ids, }; } // Cross-product pairing: every accepted × every rejected. For any // task_id with k accepted and m rejected, we get k*m pairs. Capped // per task to keep the dataset balanced. const MAX_PAIRS_PER_TASK = 5; function pairUp(a: IndexedRecord[], b: IndexedRecord[]): Array<[IndexedRecord, IndexedRecord]> { const pairs: Array<[IndexedRecord, IndexedRecord]> = []; for (const x of a) { for (const y of b) { if (pairs.length >= MAX_PAIRS_PER_TASK) return pairs; pairs.push([x, y]); } } return pairs; } async function cli() { const dry_run = process.argv.includes("--dry-run"); const recorded_at = new Date().toISOString(); const r = await exportPreference({ root: DEFAULT_ROOT, recorded_at, dry_run }); console.log(`[export_preference] read=${r.records_read} pairs=${r.pairs_exported} task_ids_paired=${r.task_ids_with_pairs} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`); console.log(`[export_preference] insufficient_pair_task_ids=${r.insufficient_pair_task_ids} (only one record OR all-same-category)`); console.log(`[export_preference] output: ${r.output_path}`); } if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });