Kimi For Coding (api.kimi.com, kimi-for-coding) ran a forensic audit on
distillation v1.0.0 with full file content. 7/7 flags verified real on
grep. Substrate now matches what v1.0.0 claimed: deterministic, no
schema bypasses, Rust tests compile.
Fixes:
- mode.rs:1035,1042 matrix_corpus Some/None -> vec![..]/vec![]; cargo
check --tests now compiles (was silently broken;
only bun tests were running)
- scorer.ts:30 SCORER_VERSION env override removed - identical
input now produces identical version stamp, not
env-dependent drift
- transforms.ts:181 auto_apply wall-clock fallback (new Date()) ->
deterministic recorded_at fallback
- replay.ts:378 recorded_run_id Date.now() -> sha256(recorded_at);
replay rows now reproducible given recorded_at
- receipts.ts:454,495 input_hash_match hardcoded true was misleading
telemetry; bumped DRIFT_REPORT_SCHEMA_VERSION 1->2,
field is now boolean|null with honest null when
not computed at this layer
- score_runs.ts:89-100,159 dedup keyed only on sig_hash made
scorer-version bumps invisible. Composite
sig_hash:scorer_version forces re-scoring
- export_sft.ts:126 (ev as any).contractor bypass emitted "<contractor>"
placeholder for every contract_analyses SFT row.
Added typed EvidenceRecord.metadata bucket;
transforms.ts populates metadata.contractor;
exporter reads typed value
Verification (all green):
cargo check -p gateway --tests compiles
bun test tests/distillation/ 145 pass / 0 fail
bun acceptance 22/22 invariants
bun audit-full 16/16 required checks
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
313 lines
12 KiB
TypeScript
313 lines
12 KiB
TypeScript
// export_sft.ts — Phase 4b SFT dataset export. Strict no-leak gates.
|
|
//
|
|
// Default: only category=accepted ships.
|
|
// --include-partial: category in {accepted, partially_accepted} ships.
|
|
// rejected and needs_human_review NEVER ship — schema layer (Phase 1)
|
|
// enforces this AND the exporter filters before validation. Defense
|
|
// in depth.
|
|
//
|
|
// Each SFT row:
|
|
// instruction = the prompt the executor saw
|
|
// context = retrieved context summary (matrix corpora used,
|
|
// pathway fingerprints seen, file_path)
|
|
// response = the executor's accepted output (evidence.text)
|
|
//
|
|
// Source restriction: SFT only takes records where evidence.text is a
|
|
// real model output (model_role in {executor, applier, reviewer with
|
|
// observer_verdict}). Pure-extraction rows lack a true "instruction"
|
|
// and are quarantined as missing_source_run_id (since they're not
|
|
// instruction→response shape).
|
|
|
|
import { existsSync, readFileSync, readdirSync, mkdirSync, appendFileSync, statSync } from "node:fs";
|
|
import { resolve, dirname } from "node:path";
|
|
import {
|
|
SFT_SAMPLE_SCHEMA_VERSION, validateSftSample, type SftSample, type SftQualityScore,
|
|
} from "../../auditor/schemas/distillation/sft_sample";
|
|
import type { ScoredRun, ScoreCategory } from "../../auditor/schemas/distillation/scored_run";
|
|
import type { EvidenceRecord } from "../../auditor/schemas/distillation/evidence_record";
|
|
import { QuarantineWriter } from "./quarantine";
|
|
|
|
export interface ExportSftOptions {
|
|
root: string;
|
|
recorded_at: string;
|
|
include_partial?: boolean;
|
|
dry_run?: boolean;
|
|
}
|
|
|
|
export interface ExportSftResult {
|
|
scored_files_read: number;
|
|
records_read: number;
|
|
records_exported: number;
|
|
records_quarantined: number;
|
|
output_path: string;
|
|
quarantine_summary: string;
|
|
}
|
|
|
|
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
|
|
|
|
// Hard non-negotiable: this set never expands. If you find yourself
|
|
// adding "needs_human_review" or "rejected" here, stop — that's the
|
|
// contamination the spec forbids.
|
|
const SFT_NEVER: ScoreCategory[] = ["rejected", "needs_human_review"];
|
|
|
|
function listScoredRunFiles(root: string): string[] {
|
|
const out: string[] = [];
|
|
const dir = resolve(root, "data/scored-runs");
|
|
if (!existsSync(dir)) return out;
|
|
for (const yyyy of readdirSync(dir).sort()) {
|
|
const yp = resolve(dir, yyyy);
|
|
if (!statSync(yp).isDirectory()) continue;
|
|
for (const mm of readdirSync(yp).sort()) {
|
|
const mp = resolve(yp, mm);
|
|
if (!statSync(mp).isDirectory()) continue;
|
|
for (const dd of readdirSync(mp).sort()) {
|
|
const dp = resolve(mp, dd);
|
|
if (!statSync(dp).isDirectory()) continue;
|
|
for (const f of readdirSync(dp)) {
|
|
if (f.endsWith(".jsonl")) out.push(resolve(dp, f));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
|
|
function loadEvidenceByRunId(
|
|
scored_path: string,
|
|
cache: Map<string, Map<string, EvidenceRecord>>,
|
|
): Map<string, EvidenceRecord> {
|
|
const evidence_path = scored_path.replace("/scored-runs/", "/evidence/");
|
|
if (cache.has(evidence_path)) return cache.get(evidence_path)!;
|
|
const m = new Map<string, EvidenceRecord>();
|
|
if (!existsSync(evidence_path)) { cache.set(evidence_path, m); return m; }
|
|
for (const line of readFileSync(evidence_path, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try { const ev = JSON.parse(line) as EvidenceRecord; m.set(ev.run_id, ev); } catch {}
|
|
}
|
|
cache.set(evidence_path, m);
|
|
return m;
|
|
}
|
|
|
|
// Synthesize SFT shape from the executed run. For sources where text
|
|
// isn't a model RESPONSE (pure-extraction), this returns null and the
|
|
// caller quarantines.
|
|
function synthesizeSft(
|
|
scored: ScoredRun,
|
|
ev: EvidenceRecord,
|
|
recorded_at: string,
|
|
sft_id: string,
|
|
): SftSample | null {
|
|
const text = ev.text ?? "";
|
|
// Skip extraction-class records — they don't have an instruction→response shape.
|
|
const role = ev.model_role;
|
|
if (role !== "executor" && role !== "reviewer" && role !== "applier") return null;
|
|
if (text.trim().length === 0) return null;
|
|
|
|
// Instruction synthesis depends on the source class.
|
|
const stem = ev.provenance.source_file.replace(/^data\/_kb\//, "").replace(/\.jsonl$/, "");
|
|
let instruction = "";
|
|
switch (stem) {
|
|
case "scrum_reviews":
|
|
instruction = `Review the file '${ev.source_files?.[0] ?? "<file>"}' against the PRD + change-proposal context. Produce a forensic audit with findings, severity, confidence, patches.`;
|
|
break;
|
|
case "mode_experiments":
|
|
instruction = `Run task_class='${ev.task_id}' for file '${ev.source_files?.[0] ?? "<file>"}'. Produce the mode-runner's expected output shape.`;
|
|
break;
|
|
case "auto_apply":
|
|
instruction = `Auto-apply: emit a 6-line surgical patch for '${ev.source_files?.[0] ?? "<file>"}' based on the latest scrum review findings.`;
|
|
break;
|
|
case "audits":
|
|
instruction = `Audit phase '${ev.task_id.replace(/^phase:/, "")}' and report findings with severity.`;
|
|
break;
|
|
case "observer_reviews":
|
|
instruction = `Observer-review the latest attempt on '${ev.source_files?.[0] ?? "<file>"}'. Verdict: accept | reject | cycle.`;
|
|
break;
|
|
case "contract_analyses": {
|
|
// Read contractor from the typed metadata bucket (populated in
|
|
// transforms.ts for contract_analyses rows). Pre-2026-04-27 this
|
|
// used `(ev as any).contractor` and silently emitted "<contractor>"
|
|
// for every row because EvidenceRecord didn't carry the field.
|
|
const contractor = typeof ev.metadata?.contractor === "string" ? ev.metadata.contractor : null;
|
|
const permit = ev.task_id.replace(/^permit:/, "");
|
|
instruction = contractor
|
|
? `Analyze contractor '${contractor}' for permit '${permit}'. Recommend with risk markers.`
|
|
: `Analyze permit '${permit}'. Recommend with risk markers.`;
|
|
break;
|
|
}
|
|
case "outcomes":
|
|
instruction = `Run scenario; report per-event outcome with citations.`;
|
|
break;
|
|
default:
|
|
instruction = `Source '${stem}' run; produce the appropriate output for this task type.`;
|
|
}
|
|
|
|
// Context — what the model could see. Keep terse.
|
|
const ctxParts: string[] = [];
|
|
if (ev.retrieved_context?.matrix_corpora?.length) {
|
|
ctxParts.push(`matrix=${ev.retrieved_context.matrix_corpora.join(",")}`);
|
|
}
|
|
if (typeof ev.retrieved_context?.pathway_fingerprints_seen === "number") {
|
|
ctxParts.push(`pathway_fingerprints=${ev.retrieved_context.pathway_fingerprints_seen}`);
|
|
}
|
|
if (ev.model_name) ctxParts.push(`model=${ev.model_name}`);
|
|
const context = ctxParts.join(" · ");
|
|
|
|
return {
|
|
schema_version: SFT_SAMPLE_SCHEMA_VERSION,
|
|
id: sft_id,
|
|
instruction,
|
|
context,
|
|
response: text,
|
|
source_run_id: scored.evidence_run_id,
|
|
quality_score: scored.category as SftQualityScore,
|
|
created_at: recorded_at,
|
|
provenance: {
|
|
source_file: scored.provenance.source_file,
|
|
line_offset: scored.provenance.line_offset,
|
|
sig_hash: scored.provenance.sig_hash,
|
|
recorded_at,
|
|
},
|
|
};
|
|
}
|
|
|
|
export async function exportSft(opts: ExportSftOptions): Promise<ExportSftResult> {
|
|
const { root, recorded_at, include_partial = false, dry_run = false } = opts;
|
|
const allowed: ScoreCategory[] = include_partial
|
|
? ["accepted", "partially_accepted"]
|
|
: ["accepted"];
|
|
const out_path = resolve(root, "exports/sft/instruction_response.jsonl");
|
|
const q = new QuarantineWriter(root, "sft", dry_run);
|
|
|
|
let records_read = 0;
|
|
let records_exported = 0;
|
|
const seenIds = new Set<string>();
|
|
const rowsToWrite: string[] = [];
|
|
|
|
if (!dry_run && existsSync(out_path)) {
|
|
for (const line of readFileSync(out_path, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try { const r = JSON.parse(line); if (r.id) seenIds.add(r.id); } catch {}
|
|
}
|
|
}
|
|
|
|
const evidenceCache = new Map<string, Map<string, EvidenceRecord>>();
|
|
const scored_files = listScoredRunFiles(root);
|
|
for (const sp of scored_files) {
|
|
const evMap = loadEvidenceByRunId(sp, evidenceCache);
|
|
const lines = readFileSync(sp, "utf8").split("\n").filter(Boolean);
|
|
for (let i = 0; i < lines.length; i++) {
|
|
records_read++;
|
|
let scored: ScoredRun;
|
|
try { scored = JSON.parse(lines[i]) as ScoredRun; }
|
|
catch (e) {
|
|
q.add({
|
|
reason: "schema_violation",
|
|
source_record: { _raw: lines[i].slice(0, 200) },
|
|
errors: ["scored-run not JSON: " + (e as Error).message.slice(0, 160)],
|
|
recorded_at,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// CONTAMINATION FIREWALL: any forbidden category goes straight
|
|
// to quarantine, never reaches the synthesizer.
|
|
if (SFT_NEVER.includes(scored.category)) {
|
|
q.add({
|
|
reason: "unsafe_sft_category",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: [`category=${scored.category} forbidden in SFT (spec non-negotiable)`],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
if (!allowed.includes(scored.category)) {
|
|
q.add({
|
|
reason: "category_disallowed",
|
|
source_record: scored as unknown as Record<string, unknown>,
|
|
errors: [`category=${scored.category} not in [${allowed.join(",")}] (--include-partial=${include_partial})`],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
if (!scored.provenance?.sig_hash) {
|
|
q.add({ reason: "missing_provenance", source_record: scored as any, errors: ["provenance missing"], recorded_at });
|
|
continue;
|
|
}
|
|
if (!scored.evidence_run_id) {
|
|
q.add({ reason: "missing_source_run_id", source_record: scored as any, errors: ["evidence_run_id missing"], recorded_at, source_provenance: scored.provenance });
|
|
continue;
|
|
}
|
|
|
|
const ev = evMap.get(scored.evidence_run_id);
|
|
if (!ev) {
|
|
q.add({ reason: "missing_source_run_id", source_record: scored as any, errors: [`evidence_run_id=${scored.evidence_run_id} not found`], recorded_at, source_provenance: scored.provenance });
|
|
continue;
|
|
}
|
|
|
|
// ID = sha256(evidence_run_id + sig_hash):16
|
|
const id_seed = `${scored.evidence_run_id}|${scored.provenance.sig_hash}`;
|
|
const hasher = new Bun.CryptoHasher("sha256");
|
|
hasher.update(id_seed);
|
|
const sft_id = "sft-" + hasher.digest("hex").slice(0, 16);
|
|
if (seenIds.has(sft_id)) continue;
|
|
|
|
const sample = synthesizeSft(scored, ev, recorded_at, sft_id);
|
|
if (!sample) {
|
|
q.add({
|
|
reason: "missing_source_run_id",
|
|
source_record: { run_id: scored.evidence_run_id, model_role: ev.model_role, has_text: typeof ev.text === "string" && ev.text.length > 0 },
|
|
errors: ["evidence has no instruction→response shape (extraction-class or empty text)"],
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
const v = validateSftSample(sample);
|
|
if (!v.valid) {
|
|
q.add({
|
|
reason: "schema_violation",
|
|
source_record: sample as unknown as Record<string, unknown>,
|
|
errors: v.errors,
|
|
recorded_at,
|
|
source_provenance: scored.provenance,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
seenIds.add(sft_id);
|
|
rowsToWrite.push(JSON.stringify(v.value));
|
|
records_exported++;
|
|
}
|
|
}
|
|
|
|
if (!dry_run && rowsToWrite.length > 0) {
|
|
mkdirSync(dirname(out_path), { recursive: true });
|
|
appendFileSync(out_path, rowsToWrite.join("\n") + "\n");
|
|
}
|
|
|
|
return {
|
|
scored_files_read: scored_files.length,
|
|
records_read,
|
|
records_exported,
|
|
records_quarantined: q.total,
|
|
output_path: out_path.replace(root + "/", ""),
|
|
quarantine_summary: q.summary(),
|
|
};
|
|
}
|
|
|
|
async function cli() {
|
|
const dry_run = process.argv.includes("--dry-run");
|
|
const include_partial = process.argv.includes("--include-partial");
|
|
const recorded_at = new Date().toISOString();
|
|
const r = await exportSft({ root: DEFAULT_ROOT, recorded_at, include_partial, dry_run });
|
|
|
|
console.log(`[export_sft] read=${r.records_read} exported=${r.records_exported} ${r.quarantine_summary}${dry_run ? " (DRY RUN)" : ""}`);
|
|
console.log(`[export_sft] output: ${r.output_path}`);
|
|
if (include_partial) console.log("[export_sft] partially_accepted INCLUDED (--include-partial)");
|
|
}
|
|
|
|
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });
|