// export_sft.ts — Phase 4b SFT dataset export. Strict no-leak gates. // // Default: only category=accepted ships. // --include-partial: category in {accepted, partially_accepted} ships. // rejected and needs_human_review NEVER ship — schema layer (Phase 1) // enforces this AND the exporter filters before validation. Defense // in depth. // // Each SFT row: // instruction = the prompt the executor saw // context = retrieved context summary (matrix corpora used, // pathway fingerprints seen, file_path) // response = the executor's accepted output (evidence.text) // // Source restriction: SFT only takes records where evidence.text is a // real model output (model_role in {executor, applier, reviewer with // observer_verdict}). Pure-extraction rows lack a true "instruction" // and are quarantined as missing_source_run_id (since they're not // instruction→response shape). import { existsSync, readFileSync, readdirSync, mkdirSync, appendFileSync, statSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { SFT_SAMPLE_SCHEMA_VERSION, validateSftSample, type SftSample, type SftQualityScore, } from "../../auditor/schemas/distillation/sft_sample"; import type { ScoredRun, ScoreCategory } from "../../auditor/schemas/distillation/scored_run"; import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record"; import { QuarantineWriter } from "./quarantine"; export interface ExportSftOptions { root: string; recorded_at: string; include_partial?: boolean; dry_run?: boolean; } export interface ExportSftResult { scored_files_read: number; records_read: number; records_exported: number; records_quarantined: number; output_path: string; quarantine_summary: string; } const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; // Hard non-negotiable: this set never expands. If you find yourself // adding "needs_human_review" or "rejected" here, stop — that's the // contamination the spec forbids. const SFT_NEVER: ScoreCategory[] = ["rejected", "needs_human_review"]; function listScoredRunFiles(root: string): string[] { const out: string[] = []; const dir = resolve(root, "data/scored-runs"); if (!existsSync(dir)) return out; for (const yyyy of readdirSync(dir).sort()) { const yp = resolve(dir, yyyy); if (!statSync(yp).isDirectory()) continue; for (const mm of readdirSync(yp).sort()) { const mp = resolve(yp, mm); if (!statSync(mp).isDirectory()) continue; for (const dd of readdirSync(mp).sort()) { const dp = resolve(mp, dd); if (!statSync(dp).isDirectory()) continue; for (const f of readdirSync(dp)) { if (f.endsWith(".jsonl")) out.push(resolve(dp, f)); } } } } return out; } function loadEvidenceByRunId( scored_path: string, cache: Map>, ): Map { const evidence_path = scored_path.replace("/scored-runs/", "/evidence/"); if (cache.has(evidence_path)) return cache.get(evidence_path)!; const m = new Map(); if (!existsSync(evidence_path)) { cache.set(evidence_path, m); return m; } for (const line of readFileSync(evidence_path, "utf8").split("\n")) { if (!line) continue; try { const ev = JSON.parse(line) as EvidenceRecord; m.set(ev.run_id, ev); } catch {} } cache.set(evidence_path, m); return m; } // Synthesize SFT shape from the executed run. For sources where text // isn't a model RESPONSE (pure-extraction), this returns null and the // caller quarantines. function synthesizeSft( scored: ScoredRun, ev: EvidenceRecord, recorded_at: string, sft_id: string, ): SftSample | null { const text = ev.text ?? ""; // Skip extraction-class records — they don't have an instruction→response shape. const role = ev.model_role; if (role !== "executor" && role !== "reviewer" && role !== "applier") return null; if (text.trim().length === 0) return null; // Instruction synthesis depends on the source class. const stem = ev.provenance.source_file.replace(/^data\/_kb\//, "").replace(/\.jsonl$/, ""); let instruction = ""; switch (stem) { case "scrum_reviews": instruction = `Review the file '${ev.source_files?.[0] ?? ""}' against the PRD + change-proposal context. Produce a forensic audit with findings, severity, confidence, patches.`; break; case "mode_experiments": instruction = `Run task_class='${ev.task_id}' for file '${ev.source_files?.[0] ?? ""}'. Produce the mode-runner's expected output shape.`; break; case "auto_apply": instruction = `Auto-apply: emit a 6-line surgical patch for '${ev.source_files?.[0] ?? ""}' based on the latest scrum review findings.`; break; case "audits": instruction = `Audit phase '${ev.task_id.replace(/^phase:/, "")}' and report findings with severity.`; break; case "observer_reviews": instruction = `Observer-review the latest attempt on '${ev.source_files?.[0] ?? ""}'. Verdict: accept | reject | cycle.`; break; case "contract_analyses": { // Read contractor from the typed metadata bucket (populated in // transforms.ts for contract_analyses rows). Pre-2026-04-27 this // used `(ev as any).contractor` and silently emitted "" // for every row because EvidenceRecord didn't carry the field. const contractor = typeof ev.metadata?.contractor === "string" ? ev.metadata.contractor : null; const permit = ev.task_id.replace(/^permit:/, ""); instruction = contractor ? `Analyze contractor '${contractor}' for permit '${permit}'. Recommend with risk markers.` : `Analyze permit '${permit}'. Recommend with risk markers.`; break; } case "outcomes": instruction = `Run scenario; report per-event outcome with citations.`; break; default: instruction = `Source '${stem}' run; produce the appropriate output for this task type.`; } // Context — what the model could see. Keep terse. const ctxParts: string[] = []; if (ev.retrieved_context?.matrix_corpora?.length) { ctxParts.push(`matrix=${ev.retrieved_context.matrix_corpora.join(",")}`); } if (typeof ev.retrieved_context?.pathway_fingerprints_seen === "number") { ctxParts.push(`pathway_fingerprints=${ev.retrieved_context.pathway_fingerprints_seen}`); } if (ev.model_name) ctxParts.push(`model=${ev.model_name}`); const context = ctxParts.join(" · "); return { schema_version: SFT_SAMPLE_SCHEMA_VERSION, id: sft_id, instruction, context, response: text, source_run_id: scored.evidence_run_id, quality_score: scored.category as SftQualityScore, created_at: recorded_at, provenance: { source_file: scored.provenance.source_file, line_offset: scored.provenance.line_offset, sig_hash: scored.provenance.sig_hash, recorded_at, }, }; } export async function exportSft(opts: ExportSftOptions): Promise { const { root, recorded_at, include_partial = false, dry_run = false } = opts; const allowed: ScoreCategory[] = include_partial ? ["accepted", "partially_accepted"] : ["accepted"]; const out_path = resolve(root, "exports/sft/instruction_response.jsonl"); const q = new QuarantineWriter(root, "sft", dry_run); let records_read = 0; let records_exported = 0; const seenIds = new Set(); const rowsToWrite: string[] = []; if (!dry_run && existsSync(out_path)) { for (const line of readFileSync(out_path, "utf8").split("\n")) { if (!line) continue; try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {} } } const evidenceCache = new Map>(); const scored_files = listScoredRunFiles(root); for (const sp of scored_files) { const evMap = loadEvidenceByRunId(sp, evidenceCache); const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean); for (let i = 0; i < lines.length; i++) { records_read++; let scored: ScoredRun; try { scored = JSON.parse(lines[i]) as ScoredRun; } catch (e) { q.add({ reason: "schema_violation", source_record: { _raw: lines[i].slice(0, 200) }, errors: ["scored-run not JSON: " + (e as Error).message.slice(0, 160)], recorded_at, }); continue; } // CONTAMINATION FIREWALL: any forbidden category goes straight // to quarantine, never reaches the synthesizer. if (SFT_NEVER.includes(scored.category)) { q.add({ reason: "unsafe_sft_category", source_record: scored as unknown as Record, errors: [`category=${scored.category} forbidden in SFT (spec non-negotiable)`], recorded_at, source_provenance: scored.provenance, }); continue; } if (!allowed.includes(scored.category)) { q.add({ reason: "category_disallowed", source_record: scored as unknown as Record, errors: [`category=${scored.category} not in [${allowed.join(",")}] (--include-partial=${include_partial})`], recorded_at, source_provenance: scored.provenance, }); continue; } if (!scored.provenance?.sig_hash) { q.add({ reason: "missing_provenance", source_record: scored as any, errors: ["provenance missing"], recorded_at }); continue; } if (!scored.evidence_run_id) { q.add({ reason: "missing_source_run_id", source_record: scored as any, errors: ["evidence_run_id missing"], recorded_at, source_provenance: scored.provenance }); continue; } const ev = evMap.get(scored.evidence_run_id); if (!ev) { q.add({ reason: "missing_source_run_id", source_record: scored as any, errors: [`evidence_run_id=${scored.evidence_run_id} not found`], recorded_at, source_provenance: scored.provenance }); continue; } // ID = sha256(evidence_run_id + sig_hash):16 const id_seed = `${scored.evidence_run_id}|${scored.provenance.sig_hash}`; const hasher = new Bun.CryptoHasher("sha256"); hasher.update(id_seed); const sft_id = "sft-" + hasher.digest("hex").slice(0, 16); if (seenIds.has(sft_id)) continue; const sample = synthesizeSft(scored, ev, recorded_at, sft_id); if (!sample) { q.add({ reason: "missing_source_run_id", source_record: { run_id: scored.evidence_run_id, model_role: ev.model_role, has_text: typeof ev.text === "string" && ev.text.length > 0 }, errors: ["evidence has no instruction→response shape (extraction-class or empty text)"], recorded_at, source_provenance: scored.provenance, }); continue; } const v = validateSftSample(sample); if (!v.valid) { q.add({ reason: "schema_violation", source_record: sample as unknown as Record, errors: v.errors, recorded_at, source_provenance: scored.provenance, }); continue; } seenIds.add(sft_id); rowsToWrite.push(JSON.stringify(v.value)); records_exported++; } } if (!dry_run && rowsToWrite.length > 0) { mkdirSync(dirname(out_path), { recursive: true }); appendFileSync(out_path, rowsToWrite.join("\n") + "\n"); } return { scored_files_read: scored_files.length, records_read, records_exported, records_quarantined: q.total, output_path: out_path.replace(root + "/", ""), quarantine_summary: q.summary(), }; } async function cli() { const dry_run = process.argv.includes("--dry-run"); const include_partial = process.argv.includes("--include-partial"); const recorded_at = new Date().toISOString(); const r = await exportSft({ root: DEFAULT_ROOT, recorded_at, include_partial, dry_run }); console.log(`[export_sft] read=${r.records_read} exported=${r.records_exported} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`); console.log(`[export_sft] output: ${r.output_path}`); if (include_partial) console.log("[export_sft] partially_accepted INCLUDED (--include-partial)"); } if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });