// audit_full.ts — Phase 8 meta-audit across Phases 0-7. // // Pure observability. Calls existing scripts in dry-run mode + reads // output files. NEVER modifies pipeline logic. Compares current run // to a baseline saved at data/_kb/audit_baselines.jsonl (auto-grown // — first run establishes baseline, subsequent runs compare). // // Output: reports/distillation/phase8-full-audit-report.md // Exit code: 0 on PASS, 1 if any required check fails. import { existsSync, readFileSync, readdirSync, statSync, mkdirSync, writeFileSync, appendFileSync, } from "node:fs"; import { resolve, dirname } from "node:path"; import { spawnSync } from "node:child_process"; import { TRANSFORMS } from "./transforms"; import { materializeAll } from "./build_evidence_index"; import { scoreAll } from "./score_runs"; import { exportRag } from "./export_rag"; import { exportSft } from "./export_sft"; import { exportPreference } from "./export_preference"; import { replay } from "./replay"; import { validateStageReceipt } from "../../auditor/schemas/distillation/stage_receipt"; import { validateRunSummary, type RunSummary } from "../../auditor/schemas/distillation/run_summary"; const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; const BASELINE_PATH_FOR = (root: string) => resolve(root, "data/_kb/audit_baselines.jsonl"); const REPORT_PATH_FOR = (root: string) => resolve(root, "reports/distillation/phase8-full-audit-report.md"); interface PhaseCheck { phase: number; name: string; expected: string; actual: string; passed: boolean; required: boolean; // false = informational only, doesn't fail the audit notes: string[]; } interface AuditBaseline { recorded_at: string; git_commit: string; metrics: { p2_evidence_rows: number; p2_evidence_skips: number; p3_accepted: number; p3_partial: number; p3_rejected: number; p3_human: number; p4_rag_rows: number; p4_sft_rows: number; p4_pref_pairs: number; p4_total_quarantined: number; }; } const checks: PhaseCheck[] = []; function record(c: Omit & { notes?: string[] }) { checks.push({ ...c, notes: c.notes ?? [] }); } function gitHead(root: string): string { const r = spawnSync("git", ["-C", root, "rev-parse", "HEAD"], { encoding: "utf8" }); return r.status === 0 ? r.stdout.trim() : "0".repeat(40); } // ─── Phase 0 ───────────────────────────────────────────────────── function auditPhase0(root: string): void { const reconPath = resolve(root, "docs/recon/local-distillation-recon.md"); record({ phase: 0, name: "recon doc exists", expected: "docs/recon/local-distillation-recon.md present", actual: existsSync(reconPath) ? "present" : "MISSING", passed: existsSync(reconPath), required: true, }); // Streams that the recon enumerated as TIER 1 sources — must still // be on disk for the rest of the pipeline to be coherent. const tier1 = [ "data/_kb/distilled_facts.jsonl", "data/_kb/scrum_reviews.jsonl", "data/_kb/audit_facts.jsonl", "data/_kb/mode_experiments.jsonl", ]; const missing = tier1.filter(p => !existsSync(resolve(root, p))); record({ phase: 0, name: "tier-1 source streams present", expected: "all 4 tier-1 jsonls on disk", actual: missing.length === 0 ? "all present" : `missing: ${missing.join(", ")}`, passed: missing.length === 0, required: false, notes: missing.length > 0 ? ["fresh-clone or post-rotation environment — Phase 2 will tally as rows_present=false; not a hard fail"] : [], }); } // ─── Phase 1 ───────────────────────────────────────────────────── function auditPhase1(root: string): void { const t = spawnSync("bun", ["test", "auditor/schemas/distillation/", "--bail"], { cwd: root, encoding: "utf8", }); const out = (t.stdout ?? "") + (t.stderr ?? ""); const m = out.match(/(\d+) pass[^\n]*\n[^\n]*?(\d+) fail/); const pass = m ? Number(m[1]) : 0; const fail = m ? Number(m[2]) : 1; record({ phase: 1, name: "schema validators pass on fixtures", expected: "≥40 tests, 0 fail", actual: `${pass} pass, ${fail} fail`, passed: t.status === 0 && fail === 0, required: true, }); } // ─── Phase 2 ───────────────────────────────────────────────────── interface Phase2Result { rows: number; skips: number; by_source: Map; } async function auditPhase2(root: string): Promise { const recorded_at = new Date().toISOString(); const r = await materializeAll({ root, transforms: TRANSFORMS, recorded_at, dry_run: true }); const by_source = new Map(); for (const s of r.sources) by_source.set(s.source_file_relpath, s.rows_written); record({ phase: 2, name: "materializer dry-run completes", expected: ">=1 row from each tier-1 source", actual: `${r.totals.rows_read} read · ${r.totals.rows_written} written · ${r.totals.rows_skipped} skipped`, passed: r.totals.rows_written >= 1, required: true, }); const tier1Sources = ["distilled_facts", "scrum_reviews", "audit_facts", "mode_experiments"]; const presentTier1 = r.sources.filter(s => s.rows_present); const tier1Hits = tier1Sources.filter(t => presentTier1.some(s => s.source_file_relpath.includes(t) && s.rows_written > 0) ); record({ phase: 2, name: "tier-1 sources each materialize ≥1 row", expected: `4/4: ${tier1Sources.join(", ")}`, actual: `${tier1Hits.length}/4 hit (${tier1Hits.join(", ")})`, passed: tier1Hits.length >= 1, required: false, notes: tier1Hits.length < 4 ? ["fresh-environment OK; expect lower count when source streams are absent"] : [], }); return { rows: r.totals.rows_written, skips: r.totals.rows_skipped, by_source }; } // ─── Phase 3 ───────────────────────────────────────────────────── interface Phase3Result { accepted: number; partial: number; rejected: number; human: number; } async function auditPhase3(root: string): Promise { // Read existing scored-runs from disk rather than re-running the // scorer. Re-running in dry-run produces 0 NEW writes (everything // already deduped on disk) which is correct behavior but unhelpful // for an audit. The scorer's correctness is tested in unit tests; // here we verify the on-disk distribution looks right. const scoredDir = resolve(root, "data/scored-runs"); if (!existsSync(scoredDir)) { record({ phase: 3, name: "scored-runs on disk", expected: "data/scored-runs/ populated", actual: "missing", passed: false, required: true, notes: ["run `./scripts/distill score` (or run-all) before audit-full"], }); return { accepted: 0, partial: 0, rejected: 0, human: 0 }; } const counts = { accepted: 0, partially_accepted: 0, rejected: 0, needs_human_review: 0 }; function walk(p: string) { for (const e of readdirSync(p)) { const full = resolve(p, e); const st = statSync(full); if (st.isDirectory()) walk(full); else if (e.endsWith(".jsonl")) { for (const line of readFileSync(full, "utf8").split("\n")) { if (!line) continue; try { const r = JSON.parse(line); if (r.category && counts.hasOwnProperty(r.category)) (counts as any)[r.category]++; } catch { /* skip */ } } } } } walk(scoredDir); const total = counts.accepted + counts.partially_accepted + counts.rejected + counts.needs_human_review; record({ phase: 3, name: "on-disk scored-runs distribution non-empty", expected: ">=1 accepted", actual: `acc=${counts.accepted} part=${counts.partially_accepted} rej=${counts.rejected} hum=${counts.needs_human_review}`, passed: counts.accepted >= 1, required: true, }); record({ phase: 3, name: "scored-runs distribution sums positive", expected: ">0 total", actual: `${total} total`, passed: total > 0, required: false, }); return { accepted: counts.accepted, partial: counts.partially_accepted, rejected: counts.rejected, human: counts.needs_human_review, }; } // ─── Phase 4 ───────────────────────────────────────────────────── interface Phase4Result { rag: number; sft: number; pref: number; quarantined: number; } function auditPhase4(root: string): Phase4Result { const sftPath = resolve(root, "exports/sft/instruction_response.jsonl"); const ragPath = resolve(root, "exports/rag/playbooks.jsonl"); const prefPath = resolve(root, "exports/preference/chosen_rejected.jsonl"); const sftRows = existsSync(sftPath) ? readFileSync(sftPath, "utf8").split("\n").filter(Boolean) : []; const ragRows = existsSync(ragPath) ? readFileSync(ragPath, "utf8").split("\n").filter(Boolean) : []; const prefRows = existsSync(prefPath) ? readFileSync(prefPath, "utf8").split("\n").filter(Boolean) : []; // SFT contamination firewall: 0 forbidden quality_scores let sftForbidden = 0; for (const line of sftRows) { try { const r = JSON.parse(line); if (r.quality_score !== "accepted" && r.quality_score !== "partially_accepted") sftForbidden++; } catch { /* skip malformed */ } } record({ phase: 4, name: "SFT contamination firewall: 0 forbidden quality_scores", expected: "0", actual: `${sftForbidden}`, passed: sftForbidden === 0, required: true, notes: ["this is the spec non-negotiable — rejected/needs_human_review must NEVER appear in SFT"], }); // RAG: 0 rejected let ragRejected = 0; for (const line of ragRows) { try { if (JSON.parse(line).success_score === "rejected") ragRejected++; } catch {} } record({ phase: 4, name: "RAG firewall: 0 rejected leaks", expected: "0", actual: `${ragRejected}`, passed: ragRejected === 0, required: true, }); // Preference: 0 self-pairs let prefSelfPairs = 0; let prefIdenticalText = 0; for (const line of prefRows) { try { const r = JSON.parse(line); if (r.chosen_run_id === r.rejected_run_id) prefSelfPairs++; if (r.chosen === r.rejected) prefIdenticalText++; } catch {} } record({ phase: 4, name: "Preference: 0 self-pairs (chosen_run_id != rejected_run_id)", expected: "0", actual: `${prefSelfPairs}`, passed: prefSelfPairs === 0, required: true, }); record({ phase: 4, name: "Preference: 0 identical-text pairs", expected: "0", actual: `${prefIdenticalText}`, passed: prefIdenticalText === 0, required: true, }); // Provenance on every export row let noProv = 0; for (const line of [...sftRows, ...ragRows, ...prefRows]) { try { const r = JSON.parse(line); if (!r.provenance?.sig_hash || !/^[0-9a-f]{64}$/.test(r.provenance.sig_hash)) noProv++; } catch {} } record({ phase: 4, name: "every export row carries valid sha256 provenance.sig_hash", expected: "0 missing", actual: `${noProv} missing`, passed: noProv === 0, required: true, }); // Quarantine totals (informational) const quarantineFiles = ["exports/quarantine/sft.jsonl", "exports/quarantine/rag.jsonl", "exports/quarantine/preference.jsonl"]; let totalQuar = 0; for (const qp of quarantineFiles) { const p = resolve(root, qp); if (existsSync(p)) totalQuar += readFileSync(p, "utf8").split("\n").filter(Boolean).length; } return { rag: ragRows.length, sft: sftRows.length, pref: prefRows.length, quarantined: totalQuar }; } // ─── Phase 5 ───────────────────────────────────────────────────── function auditPhase5(root: string): void { const reportsDir = resolve(root, "reports/distillation"); if (!existsSync(reportsDir)) { record({ phase: 5, name: "receipts directory exists", expected: "reports/distillation/", actual: "MISSING", passed: false, required: true, }); return; } // Find most recent run_id directory (one with summary.json) const candidates: Array<{ id: string; mtime: number }> = []; for (const entry of readdirSync(reportsDir)) { const dir = resolve(reportsDir, entry); if (!statSync(dir).isDirectory()) continue; const sumPath = resolve(dir, "summary.json"); if (existsSync(sumPath)) candidates.push({ id: entry, mtime: statSync(sumPath).mtimeMs }); } candidates.sort((a, b) => b.mtime - a.mtime); if (candidates.length === 0) { record({ phase: 5, name: "≥1 run with summary.json", expected: "≥1", actual: "0", passed: false, required: false, notes: ["no Phase 5 run-all has executed yet — run `./scripts/distill run-all` first"], }); return; } const latest = candidates[0]; const runDir = resolve(reportsDir, latest.id); // All 5 stage receipts present const expected = ["collect", "score", "export-rag", "export-sft", "export-preference"]; const missing = expected.filter(s => !existsSync(resolve(runDir, `${s}.json`))); record({ phase: 5, name: `latest run (${latest.id}) has all 5 stage receipts`, expected: expected.join(","), actual: missing.length === 0 ? "all present" : `missing: ${missing.join(",")}`, passed: missing.length === 0, required: true, }); // Each receipt validates against schema let invalid = 0; for (const stage of expected) { const path = resolve(runDir, `${stage}.json`); if (!existsSync(path)) continue; try { const v = validateStageReceipt(JSON.parse(readFileSync(path, "utf8"))); if (!v.valid) invalid++; } catch { invalid++; } } record({ phase: 5, name: "every stage receipt validates against schema", expected: "0 invalid", actual: `${invalid} invalid`, passed: invalid === 0, required: true, }); // RunSummary validates const summary = JSON.parse(readFileSync(resolve(runDir, "summary.json"), "utf8")) as RunSummary; const sv = validateRunSummary(summary); record({ phase: 5, name: "RunSummary validates", expected: "valid", actual: sv.valid ? "valid" : `invalid (${sv.valid ? "" : sv.errors.join("; ").slice(0, 160)})`, passed: sv.valid, required: true, }); // git_sha sanity (40-char hex, but won't necessarily match HEAD if // commits landed since the run) record({ phase: 5, name: "summary.git_commit is 40-char hex", expected: /^[0-9a-f]{40}$/.test(summary.git_commit) ? "match" : "mismatch", actual: summary.git_commit.slice(0, 12) + "... (HEAD: " + gitHead(root).slice(0, 12) + "...)", passed: /^[0-9a-f]{40}$/.test(summary.git_commit), required: false, }); // run_hash present + sha256 record({ phase: 5, name: "run_hash is sha256", expected: "/^[0-9a-f]{64}$/", actual: summary.run_hash.slice(0, 16) + "...", passed: /^[0-9a-f]{64}$/.test(summary.run_hash), required: true, }); } // ─── Phase 6 ───────────────────────────────────────────────────── function auditPhase6(root: string): void { // Subprocess to keep our process clean const r = spawnSync("bun", ["run", "scripts/distillation/acceptance.ts"], { cwd: root, encoding: "utf8", env: { ...process.env, LH_DISTILL_ROOT: root }, }); const out = (r.stdout ?? "") + (r.stderr ?? ""); const passLine = out.match(/PASS\s*—\s*(\d+)\/(\d+)/); const passed = r.status === 0 && passLine && passLine[1] === passLine[2]; record({ phase: 6, name: "acceptance gate passes 22/22 invariants on fixture", expected: "PASS — 22/22", actual: passLine ? `${passLine[1]}/${passLine[2]} (exit=${r.status})` : `exit=${r.status}`, passed: !!passed, required: true, notes: passed ? [] : [`stderr/stdout tail: ${out.slice(-400)}`], }); } // ─── Phase 7 ───────────────────────────────────────────────────── async function auditPhase7(root: string): Promise { // Run dry-run replay on a handful of fixture-shaped tasks. These // exercise retrieval + bundle + validation deterministically without // depending on a running gateway. dry_run=true synthesizes a // structured response. const tasks = [ "Audit phase 38 provider routing for placeholder code", "Verify pr_audit mode is wired into the gateway", "Audit phase 40 PRD circuit breaker drift", ]; let passing = 0; let withRetrievalContext = 0; let escalationLoops = 0; for (const task of tasks) { const r = await replay({ task, local_only: true, dry_run: true, no_retrieval: false, }, root); if (r.validation_result.passed) passing++; if (r.context_bundle && r.context_bundle.retrieved_playbooks.length > 0) withRetrievalContext++; if (r.escalation_path.length > 2) escalationLoops++; } record({ phase: 7, name: "replay validation passes on 3/3 dry-run sample tasks", expected: "3/3", actual: `${passing}/${tasks.length}`, passed: passing === tasks.length, required: true, }); record({ phase: 7, name: "replay retrieval surfaces ≥1 playbook on each task (when corpus present)", expected: "≥1 task with retrieval", actual: `${withRetrievalContext}/${tasks.length}`, passed: withRetrievalContext >= 1 || !existsSync(resolve(root, "exports/rag/playbooks.jsonl")), required: false, notes: withRetrievalContext === 0 ? ["empty rag corpus on this root — expected on fresh environments"] : [], }); record({ phase: 7, name: "escalation loop guard: no path > 2 models", expected: "0 loops", actual: `${escalationLoops}`, passed: escalationLoops === 0, required: true, }); // Also check the persisted log shape const logPath = resolve(root, "data/_kb/replay_runs.jsonl"); record({ phase: 7, name: "replay_runs.jsonl populated by audit run", expected: "exists with ≥3 rows added", actual: existsSync(logPath) ? `${readFileSync(logPath, "utf8").split("\n").filter(Boolean).length} rows total` : "missing", passed: existsSync(logPath), required: false, }); } // ─── Drift comparison ─────────────────────────────────────────── interface DriftRow { metric: string; baseline: number | null; current: number; pct_change: number | null; flag: "ok" | "warn" | "alert" | "first_run"; } function loadBaseline(root: string): AuditBaseline | null { const p = BASELINE_PATH_FOR(root); if (!existsSync(p)) return null; const lines = readFileSync(p, "utf8").split("\n").filter(Boolean); if (lines.length === 0) return null; try { return JSON.parse(lines[lines.length - 1]) as AuditBaseline; } catch { return null; } } function appendBaseline(root: string, b: AuditBaseline) { const p = BASELINE_PATH_FOR(root); mkdirSync(dirname(p), { recursive: true }); appendFileSync(p, JSON.stringify(b) + "\n"); } function pctChange(prior: number, current: number): number | null { if (prior === 0) return null; return (current - prior) / prior; } function diff(metric: string, prior: number | null, current: number): DriftRow { if (prior === null) return { metric, baseline: null, current, pct_change: null, flag: "first_run" }; const pct = pctChange(prior, current); let flag: DriftRow["flag"] = "ok"; if (pct !== null && Math.abs(pct) > 0.20) flag = "warn"; return { metric, baseline: prior, current, pct_change: pct, flag }; } function buildDriftTable(prior: AuditBaseline | null, current: AuditBaseline["metrics"]): DriftRow[] { const p = prior?.metrics; return [ diff("p2_evidence_rows", p?.p2_evidence_rows ?? null, current.p2_evidence_rows), diff("p2_evidence_skips", p?.p2_evidence_skips ?? null, current.p2_evidence_skips), diff("p3_accepted", p?.p3_accepted ?? null, current.p3_accepted), diff("p3_partial", p?.p3_partial ?? null, current.p3_partial), diff("p3_rejected", p?.p3_rejected ?? null, current.p3_rejected), diff("p3_human", p?.p3_human ?? null, current.p3_human), diff("p4_rag_rows", p?.p4_rag_rows ?? null, current.p4_rag_rows), diff("p4_sft_rows", p?.p4_sft_rows ?? null, current.p4_sft_rows), diff("p4_pref_pairs", p?.p4_pref_pairs ?? null, current.p4_pref_pairs), diff("p4_total_quarantined", p?.p4_total_quarantined ?? null, current.p4_total_quarantined), ]; } // ─── Main ──────────────────────────────────────────────────────── async function main() { const root = DEFAULT_ROOT; console.log("[audit-full] starting..."); auditPhase0(root); auditPhase1(root); const p2 = await auditPhase2(root); const p3 = await auditPhase3(root); const p4 = auditPhase4(root); auditPhase5(root); auditPhase6(root); await auditPhase7(root); // Build current metrics + drift const current: AuditBaseline["metrics"] = { p2_evidence_rows: p2.rows, p2_evidence_skips: p2.skips, p3_accepted: p3.accepted, p3_partial: p3.partial, p3_rejected: p3.rejected, p3_human: p3.human, p4_rag_rows: p4.rag, p4_sft_rows: p4.sft, p4_pref_pairs: p4.pref, p4_total_quarantined: p4.quarantined, }; const baseline = loadBaseline(root); const drift = buildDriftTable(baseline, current); // Persist new baseline (so the next run has prior to compare against) const newBaseline: AuditBaseline = { recorded_at: new Date().toISOString(), git_commit: gitHead(root), metrics: current, }; appendBaseline(root, newBaseline); // Aggregate const required = checks.filter(c => c.required); const requiredFailed = required.filter(c => !c.passed); const auditPassed = requiredFailed.length === 0; // Render report const md: string[] = []; md.push("# Phase 8 — Full System Audit Report"); md.push(""); md.push(`**Run:** ${new Date().toISOString()}`); md.push(`**Git commit:** ${newBaseline.git_commit}`); md.push(`**Baseline:** ${baseline ? `${baseline.recorded_at} (${baseline.git_commit.slice(0, 12)})` : "no prior baseline (first audit-full run)"}`); md.push(""); md.push(`## Result: ${auditPassed ? "**PASS** ✓" : `**FAIL ✗** — ${requiredFailed.length}/${required.length} required checks failed`}`); md.push(""); md.push(`## Per-phase summary`); md.push(""); md.push("| Phase | Checks | Required | Required-Pass | Notes |"); md.push("|---|---|---|---|---|"); for (let p = 0; p <= 7; p++) { const phaseChecks = checks.filter(c => c.phase === p); const reqOnly = phaseChecks.filter(c => c.required); const passed = reqOnly.filter(c => c.passed); const status = reqOnly.length === 0 ? "(no required checks)" : passed.length === reqOnly.length ? "✓ pass" : `✗ ${reqOnly.length - passed.length} fail`; md.push(`| ${p} | ${phaseChecks.length} | ${reqOnly.length} | ${passed.length}/${reqOnly.length} | ${status} |`); } md.push(""); md.push("## Detailed checks"); md.push(""); md.push("| # | Phase | Check | Required | Expected | Actual | Status |"); md.push("|---|---|---|---|---|---|---|"); for (let i = 0; i < checks.length; i++) { const c = checks[i]; md.push(`| ${i + 1} | P${c.phase} | ${c.name} | ${c.required ? "Y" : "—"} | ${c.expected} | ${c.actual} | ${c.passed ? "✓" : "✗"} |`); } md.push(""); md.push("## Drift vs prior baseline"); md.push(""); if (!baseline) { md.push("First audit-full run on this root — baseline established. Subsequent runs will compare against this snapshot."); } else { md.push("| Metric | Baseline | Current | Δ% | Flag |"); md.push("|---|---|---|---|---|"); for (const d of drift) { const pct = d.pct_change === null ? "—" : `${(d.pct_change * 100).toFixed(0)}%`; const baselineCell = d.baseline === null ? "—" : `${d.baseline}`; md.push(`| ${d.metric} | ${baselineCell} | ${d.current} | ${pct} | ${d.flag} |`); } const warnCount = drift.filter(d => d.flag === "warn").length; md.push(""); if (warnCount > 0) md.push(`**${warnCount} metric(s) drifted >20% from baseline.** Investigate before treating outputs as stable.`); else md.push("All metrics within 20% of baseline — pipeline stable across runs."); } md.push(""); md.push("## System health status"); md.push(""); md.push(auditPassed ? "All required Phase 0-7 invariants hold. The distillation system is correct, stable, and reproducible at this commit." : "**System is in an INVALID state.** Required checks failed; do not treat outputs as production-safe until the failures listed above are resolved."); md.push(""); if (requiredFailed.length > 0) { md.push("### Failures"); md.push(""); for (const f of requiredFailed) { md.push(`- **P${f.phase} ${f.name}** — expected \`${f.expected}\`, got \`${f.actual}\``); for (const n of f.notes) md.push(` - ${n}`); } md.push(""); } const reportPath = REPORT_PATH_FOR(root); mkdirSync(dirname(reportPath), { recursive: true }); writeFileSync(reportPath, md.join("\n")); console.log(""); console.log(`[audit-full] ${auditPassed ? "PASS" : "FAIL"} — ${required.filter(c => c.passed).length}/${required.length} required checks passed`); if (!auditPassed) { for (const f of requiredFailed) console.log(` ✗ P${f.phase} ${f.name}: expected ${f.expected}, got ${f.actual}`); } console.log(`[audit-full] report: ${reportPath}`); console.log(`[audit-full] baseline updated: ${BASELINE_PATH_FOR(root)}`); process.exit(auditPassed ? 0 : 1); } if (import.meta.main) main().catch(e => { console.error(e); process.exit(1); });