// export_rag.ts — Phase 4a RAG dataset export. // // Reads ScoredRun rows from data/scored-runs/YYYY/MM/DD/*.jsonl, // pairs them with the originating EvidenceRecord (by reading the // matching evidence file), filters to allowed RAG categories, // validates against RagSample schema, writes exports/rag/playbooks.jsonl. // Records that fail any check go to exports/quarantine/rag.jsonl with // a structured reason. // // Default categories: accepted, partially_accepted. // Optional --include-review opt-in lets needs_human_review through — // useful for retrieval of warning patterns; SFT never gets this. // Rejected NEVER enters RAG (schema enforces). // // IDs are deterministic: sha256(source_run_id + score_provenance.sig_hash).slice(0,16) // so re-running on the same scored-runs produces identical rows. import { existsSync, readFileSync, readdirSync, mkdirSync, writeFileSync, appendFileSync, statSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { RAG_SAMPLE_SCHEMA_VERSION, validateRagSample, type RagSample, type RagSourceCategory, RAG_ALLOWED_CATEGORIES, } from "../../auditor/schemas/distillation/rag_sample"; import type { ScoredRun } from "../../auditor/schemas/distillation/scored_run"; import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record"; import { canonicalSha256 } from "../../auditor/schemas/distillation/types"; import { QuarantineWriter } from "./quarantine"; export interface ExportRagOptions { root: string; recorded_at: string; include_review?: boolean; // include needs_human_review records dry_run?: boolean; } export interface ExportRagResult { scored_files_read: number; records_read: number; records_exported: number; records_quarantined: number; output_path: string; quarantine_summary: string; } const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; const ALLOWED_DEFAULT: RagSourceCategory[] = ["accepted", "partially_accepted"]; function listScoredRunFiles(root: string): string[] { const out: string[] = []; const dir = resolve(root, "data/scored-runs"); if (!existsSync(dir)) return out; for (const yyyy of readdirSync(dir).sort()) { const yp = resolve(dir, yyyy); if (!statSync(yp).isDirectory()) continue; for (const mm of readdirSync(yp).sort()) { const mp = resolve(yp, mm); if (!statSync(mp).isDirectory()) continue; for (const dd of readdirSync(mp).sort()) { const dp = resolve(mp, dd); if (!statSync(dp).isDirectory()) continue; for (const f of readdirSync(dp)) { if (f.endsWith(".jsonl")) out.push(resolve(dp, f)); } } } } return out; } // Load matching evidence file. Cache is scoped per export call (passed // in by caller) — module-level cache across calls would leak stale // state in tests that wipe and recreate the temp root. function loadEvidenceByRunId( scored_path: string, root: string, cache: Map>, ): Map { // scored-runs path mirrors evidence path: // data/scored-runs/YYYY/MM/DD/.jsonl // data/evidence/YYYY/MM/DD/.jsonl const evidence_path = scored_path.replace("/scored-runs/", "/evidence/"); if (cache.has(evidence_path)) return cache.get(evidence_path)!; const m = new Map(); if (!existsSync(evidence_path)) { cache.set(evidence_path, m); return m; } for (const line of readFileSync(evidence_path, "utf8").split("\n")) { if (!line) continue; try { const ev = JSON.parse(line) as EvidenceRecord; m.set(ev.run_id, ev); } catch { /* skip bad lines */ } } cache.set(evidence_path, m); return m; } // Synthesize fields needed for RAG from {ScoredRun, EvidenceRecord}. // Pure transform; no I/O. function synthesizeRagSample( scored: ScoredRun, ev: EvidenceRecord, recorded_at: string, rag_id: string, ): RagSample { const text = ev.text ?? ""; const taskParts = ev.task_id.split(":"); const tags: string[] = [ `task:${taskParts[0] ?? ev.task_id}`, `category:${scored.category}`, ]; if (ev.model_role) tags.push(`role:${ev.model_role}`); if (ev.model_name) tags.push(`model:${ev.model_name}`); if (Array.isArray(ev.source_files) && ev.source_files.length > 0) { tags.push(`file:${ev.source_files[0]}`); } // Title: first line / first 80 chars of text, fallback to task_id const firstLine = text.split("\n").find(l => l.trim().length > 0) ?? ""; const title = (firstLine || ev.task_id).slice(0, 120); // Embedding text: same as content for now; future tuning may shorten // (e.g. only the title + key claims). const embedding_text = text.slice(0, 2000); // cap at 2KB to keep embeddings cheap // Map ScoreCategory → RagSourceCategory. rejected was filtered above, // but defensively narrow here. const cat: RagSourceCategory = scored.category === "rejected" ? "needs_human_review" // shouldn't happen — caller filters : (scored.category as RagSourceCategory); return { schema_version: RAG_SAMPLE_SCHEMA_VERSION, id: rag_id, title, content: text, tags, source_run_id: scored.evidence_run_id, success_score: cat, source_category: cat, embedding_text, created_at: recorded_at, provenance: { source_file: scored.provenance.source_file, line_offset: scored.provenance.line_offset, sig_hash: scored.provenance.sig_hash, recorded_at, }, }; } export async function exportRag(opts: ExportRagOptions): Promise { const { root, recorded_at, include_review = false, dry_run = false } = opts; const allowed: RagSourceCategory[] = include_review ? ["accepted", "partially_accepted", "needs_human_review"] : ALLOWED_DEFAULT; const out_path = resolve(root, "exports/rag/playbooks.jsonl"); const q = new QuarantineWriter(root, "rag", dry_run); let records_read = 0; let records_exported = 0; const seenIds = new Set(); const rowsToWrite: string[] = []; // Re-read existing output to populate seenIds — exporter idempotent. if (!dry_run && existsSync(out_path)) { for (const line of readFileSync(out_path, "utf8").split("\n")) { if (!line) continue; try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {} } } const evidenceCache = new Map>(); const scored_files = listScoredRunFiles(root); for (const sp of scored_files) { const evMap = loadEvidenceByRunId(sp, root, evidenceCache); const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean); for (let i = 0; i < lines.length; i++) { records_read++; let scored: ScoredRun; try { scored = JSON.parse(lines[i]) as ScoredRun; } catch (e) { q.add({ reason: "schema_violation", source_record: { _raw: lines[i].slice(0, 200) }, errors: ["scored-run not JSON: " + (e as Error).message.slice(0, 160)], recorded_at, source_provenance: { source_file: sp.replace(root + "/", ""), line_offset: i }, }); continue; } // Provenance check — every record must have it. if (!scored.provenance?.sig_hash || !scored.provenance.source_file) { q.add({ reason: "missing_provenance", source_record: scored as unknown as Record, errors: ["scored_run.provenance missing or incomplete"], recorded_at, }); continue; } if (!scored.evidence_run_id) { q.add({ reason: "missing_source_run_id", source_record: scored as unknown as Record, errors: ["evidence_run_id missing"], recorded_at, source_provenance: scored.provenance, }); continue; } // Category gate — RAG never takes rejected; needs_human is opt-in. if (!allowed.includes(scored.category as RagSourceCategory)) { q.add({ reason: "category_disallowed", source_record: scored as unknown as Record, errors: [`category=${scored.category} not in [${allowed.join(",")}]`], recorded_at, source_provenance: scored.provenance, }); continue; } // Look up evidence row. const ev = evMap.get(scored.evidence_run_id); if (!ev) { q.add({ reason: "missing_source_run_id", source_record: scored as unknown as Record, errors: [`evidence_run_id=${scored.evidence_run_id} not found in matching evidence partition`], recorded_at, source_provenance: scored.provenance, }); continue; } // Empty content gate. if (typeof ev.text !== "string" || ev.text.trim().length === 0) { q.add({ reason: "empty_content", source_record: scored as unknown as Record, errors: ["evidence.text is empty/missing — RAG needs content"], recorded_at, source_provenance: scored.provenance, }); continue; } // Deterministic ID: sha256(evidence_run_id + score_sig_hash):16 const id_seed = `${scored.evidence_run_id}|${scored.provenance.sig_hash}`; const hasher = new Bun.CryptoHasher("sha256"); hasher.update(id_seed); const rag_id = "rag-" + hasher.digest("hex").slice(0, 16); if (seenIds.has(rag_id)) { // Idempotent — same row appears in existing output. Skip silently. continue; } const sample = synthesizeRagSample(scored, ev, recorded_at, rag_id); const v = validateRagSample(sample); if (!v.valid) { q.add({ reason: "schema_violation", source_record: sample as unknown as Record, errors: v.errors, recorded_at, source_provenance: scored.provenance, }); continue; } seenIds.add(rag_id); rowsToWrite.push(JSON.stringify(v.value)); records_exported++; } } if (!dry_run && rowsToWrite.length > 0) { mkdirSync(dirname(out_path), { recursive: true }); appendFileSync(out_path, rowsToWrite.join("\n") + "\n"); } return { scored_files_read: scored_files.length, records_read, records_exported, records_quarantined: q.total, output_path: out_path.replace(root + "/", ""), quarantine_summary: q.summary(), }; } async function cli() { const dry_run = process.argv.includes("--dry-run"); const include_review = process.argv.includes("--include-review"); const recorded_at = new Date().toISOString(); const r = await exportRag({ root: DEFAULT_ROOT, recorded_at, include_review, dry_run }); console.log(`[export_rag] read=${r.records_read} exported=${r.records_exported} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`); console.log(`[export_rag] output: ${r.output_path}`); if (include_review) console.log("[export_rag] needs_human_review INCLUDED (--include-review)"); } if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });