Kimi For Coding (api.kimi.com, kimi-for-coding) ran a forensic audit on
distillation v1.0.0 with full file content. 7/7 flags verified real on
grep. Substrate now matches what v1.0.0 claimed: deterministic, no
schema bypasses, Rust tests compile.
Fixes:
- mode.rs:1035,1042 matrix_corpus Some/None -> vec![..]/vec![]; cargo
check --tests now compiles (was silently broken;
only bun tests were running)
- scorer.ts:30 SCORER_VERSION env override removed - identical
input now produces identical version stamp, not
env-dependent drift
- transforms.ts:181 auto_apply wall-clock fallback (new Date()) ->
deterministic recorded_at fallback
- replay.ts:378 recorded_run_id Date.now() -> sha256(recorded_at);
replay rows now reproducible given recorded_at
- receipts.ts:454,495 input_hash_match hardcoded true was misleading
telemetry; bumped DRIFT_REPORT_SCHEMA_VERSION 1->2,
field is now boolean|null with honest null when
not computed at this layer
- score_runs.ts:89-100,159 dedup keyed only on sig_hash made
scorer-version bumps invisible. Composite
sig_hash:scorer_version forces re-scoring
- export_sft.ts:126 (ev as any).contractor bypass emitted "<contractor>"
placeholder for every contract_analyses SFT row.
Added typed EvidenceRecord.metadata bucket;
transforms.ts populates metadata.contractor;
exporter reads typed value
Verification (all green):
cargo check -p gateway --tests compiles
bun test tests/distillation/ 145 pass / 0 fail
bun acceptance 22/22 invariants
bun audit-full 16/16 required checks
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
429 lines
17 KiB
TypeScript
429 lines
17 KiB
TypeScript
// replay.ts — Phase 7 distillation replay layer.
|
|
//
|
|
// Takes a task → retrieves matching playbooks/RAG records → builds a
|
|
// context bundle → calls a LOCAL model → validates → escalates if
|
|
// needed → logs the run as new evidence.
|
|
//
|
|
// This is NOT training. It's runtime behavior shaping via retrieval.
|
|
// A weak local model with the right prior context outperforms the
|
|
// same model with no context — proven by the local_only vs retrieval
|
|
// A/B in the report.
|
|
//
|
|
// Spec invariants:
|
|
// - never bypass retrieval
|
|
// - never discard provenance
|
|
// - never allow free-form hallucinated output (validation gate)
|
|
// - log every run as new evidence (data/_kb/replay_runs.jsonl)
|
|
|
|
import { existsSync, readFileSync, mkdirSync, appendFileSync } from "node:fs";
|
|
import { resolve, dirname } from "node:path";
|
|
import { canonicalSha256 } from "../../auditor/schemas/distillation/types";
|
|
|
|
const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse";
|
|
// Read env per-call so tests can override GATEWAY mid-process.
|
|
function gatewayUrl(): string { return process.env.LH_GATEWAY_URL ?? "http://localhost:3100"; }
|
|
const LOCAL_MODEL = process.env.LH_REPLAY_LOCAL_MODEL ?? "qwen3.5:latest";
|
|
const ESCALATION_MODEL = process.env.LH_REPLAY_ESCALATION_MODEL ?? "deepseek-v3.1:671b";
|
|
|
|
export interface ReplayRequest {
|
|
task: string;
|
|
local_only?: boolean; // never escalate; just record validation result
|
|
allow_escalation?: boolean; // try the bigger model on local failure
|
|
no_retrieval?: boolean; // baseline mode: skip context bundle
|
|
// Test-only: return a synthetic response without calling the gateway.
|
|
// The synthetic response is deterministic (echoes context bundle
|
|
// signals) so retrieval/bundle/log tests can run without an LLM.
|
|
dry_run?: boolean;
|
|
}
|
|
|
|
export interface RetrievedArtifact {
|
|
rag_id: string;
|
|
source_run_id: string;
|
|
title: string;
|
|
content_preview: string; // first 240 chars of content
|
|
success_score: string;
|
|
tags: string[];
|
|
score: number; // overlap score with task
|
|
}
|
|
|
|
export interface ContextBundle {
|
|
retrieved_playbooks: RetrievedArtifact[]; // top accepted
|
|
prior_successful_outputs: RetrievedArtifact[]; // accepted samples used as in-context exemplars
|
|
failure_patterns: RetrievedArtifact[]; // partial/needs-review samples used as warnings
|
|
validation_steps: string[]; // extracted from accepted-content lines starting with "verify"/"check"/"assert"
|
|
bundle_token_estimate: number;
|
|
}
|
|
|
|
export interface ValidationResult {
|
|
passed: boolean;
|
|
reasons: string[]; // explicit, every gate names itself
|
|
}
|
|
|
|
export interface ReplayResult {
|
|
input_task: string;
|
|
task_hash: string; // sha256 of task — stable replay id
|
|
retrieved_artifacts: { rag_ids: string[] };
|
|
context_bundle: ContextBundle | null; // null when --no-retrieval
|
|
model_response: string;
|
|
model_used: string;
|
|
escalation_path: string[]; // models tried in order, e.g. ["qwen3.5:latest", "deepseek-v3.1:671b"]
|
|
validation_result: ValidationResult;
|
|
recorded_run_id: string;
|
|
recorded_at: string;
|
|
duration_ms: number;
|
|
}
|
|
|
|
interface RagSample {
|
|
id: string;
|
|
title: string;
|
|
content: string;
|
|
tags: string[];
|
|
source_run_id: string;
|
|
success_score: string;
|
|
source_category: string;
|
|
}
|
|
|
|
// ─── Retrieval ────────────────────────────────────────────────────
|
|
|
|
function tokenize(text: string): Set<string> {
|
|
// Lowercase + alphanumeric tokens of length ≥3. Keeps it simple and
|
|
// deterministic; future tightening: add embedding similarity if the
|
|
// RAG corpus grows past keyword scaling limits.
|
|
return new Set(
|
|
text.toLowerCase()
|
|
.split(/[^a-z0-9_]+/)
|
|
.filter(t => t.length >= 3),
|
|
);
|
|
}
|
|
|
|
function jaccard(a: Set<string>, b: Set<string>): number {
|
|
if (a.size === 0 || b.size === 0) return 0;
|
|
let inter = 0;
|
|
for (const t of a) if (b.has(t)) inter++;
|
|
const union = a.size + b.size - inter;
|
|
return union === 0 ? 0 : inter / union;
|
|
}
|
|
|
|
function loadRagCorpus(root: string): RagSample[] {
|
|
const path = resolve(root, "exports/rag/playbooks.jsonl");
|
|
if (!existsSync(path)) return [];
|
|
const out: RagSample[] = [];
|
|
for (const line of readFileSync(path, "utf8").split("\n")) {
|
|
if (!line) continue;
|
|
try { out.push(JSON.parse(line) as RagSample); } catch { /* skip */ }
|
|
}
|
|
return out;
|
|
}
|
|
|
|
function retrieveRag(corpus: RagSample[], task: string, topK = 5): RetrievedArtifact[] {
|
|
const taskTokens = tokenize(task);
|
|
const scored = corpus.map(r => {
|
|
const text = `${r.title} ${r.content} ${(r.tags ?? []).join(" ")}`;
|
|
const score = jaccard(taskTokens, tokenize(text));
|
|
return { record: r, score };
|
|
});
|
|
scored.sort((a, b) => b.score - a.score);
|
|
return scored.slice(0, topK)
|
|
.filter(s => s.score > 0)
|
|
.map(s => ({
|
|
rag_id: s.record.id,
|
|
source_run_id: s.record.source_run_id,
|
|
title: s.record.title,
|
|
content_preview: s.record.content.slice(0, 240),
|
|
success_score: s.record.success_score,
|
|
tags: s.record.tags ?? [],
|
|
score: s.score,
|
|
}));
|
|
}
|
|
|
|
// Extract sentences that read like a check/verify/assert step from
|
|
// accepted samples — these are the validation_steps the local model
|
|
// should follow.
|
|
function extractValidationSteps(samples: RetrievedArtifact[], corpus: RagSample[]): string[] {
|
|
const ids = new Set(samples.map(s => s.rag_id));
|
|
const steps: string[] = [];
|
|
for (const r of corpus) {
|
|
if (!ids.has(r.id)) continue;
|
|
for (const line of r.content.split("\n")) {
|
|
const t = line.trim();
|
|
if (/^[-*]\s*(verify|check|assert|confirm|ensure)\b/i.test(t) ||
|
|
/^\s*(verify|check|assert|confirm|ensure)\s/i.test(t)) {
|
|
steps.push(t.slice(0, 200));
|
|
if (steps.length >= 6) return steps;
|
|
}
|
|
}
|
|
}
|
|
return steps;
|
|
}
|
|
|
|
function buildContextBundle(corpus: RagSample[], task: string): ContextBundle {
|
|
const top = retrieveRag(corpus, task, 8);
|
|
const accepted = top.filter(t => t.success_score === "accepted").slice(0, 3);
|
|
const warnings = top.filter(t => t.success_score === "partially_accepted").slice(0, 2);
|
|
const validation_steps = extractValidationSteps(accepted, corpus);
|
|
|
|
// Token estimate (~4 chars/token rough)
|
|
const totalChars = [...accepted, ...warnings].reduce(
|
|
(a, x) => a + x.content_preview.length + x.title.length, 0,
|
|
) + validation_steps.reduce((a, s) => a + s.length, 0);
|
|
const bundle_token_estimate = Math.ceil(totalChars / 4);
|
|
|
|
return {
|
|
retrieved_playbooks: top,
|
|
prior_successful_outputs: accepted,
|
|
failure_patterns: warnings,
|
|
validation_steps,
|
|
bundle_token_estimate,
|
|
};
|
|
}
|
|
|
|
// ─── Prompt assembly ──────────────────────────────────────────────
|
|
|
|
function buildPrompt(task: string, bundle: ContextBundle | null): { system: string; user: string } {
|
|
const system = [
|
|
"You are a Lakehouse task executor. Stay grounded — only assert what you can derive from the prior successful patterns or the task itself.",
|
|
"Do NOT hedge. Do NOT say 'as an AI'. Produce a concrete actionable answer.",
|
|
"When prior successful outputs are provided, follow their style and format.",
|
|
].join(" ");
|
|
|
|
if (!bundle) {
|
|
return { system, user: `Task: ${task}\n\nProduce the answer.` };
|
|
}
|
|
|
|
const parts: string[] = [];
|
|
if (bundle.prior_successful_outputs.length > 0) {
|
|
parts.push("## Prior successful runs on similar tasks");
|
|
parts.push("");
|
|
for (const r of bundle.prior_successful_outputs) {
|
|
parts.push(`### ${r.title} (score: ${r.success_score})`);
|
|
parts.push(r.content_preview);
|
|
parts.push("");
|
|
}
|
|
}
|
|
if (bundle.failure_patterns.length > 0) {
|
|
parts.push("## Patterns that produced PARTIAL results — avoid these failure modes");
|
|
parts.push("");
|
|
for (const r of bundle.failure_patterns) {
|
|
parts.push(`- ${r.title}: ${r.content_preview.slice(0, 160)}`);
|
|
}
|
|
parts.push("");
|
|
}
|
|
if (bundle.validation_steps.length > 0) {
|
|
parts.push("## Validation checklist (from accepted runs)");
|
|
for (const s of bundle.validation_steps) parts.push(`- ${s}`);
|
|
parts.push("");
|
|
}
|
|
parts.push("## Task");
|
|
parts.push(task);
|
|
parts.push("");
|
|
parts.push("Produce the answer following the style of the prior successful runs above.");
|
|
|
|
return { system, user: parts.join("\n") };
|
|
}
|
|
|
|
// ─── Model call ───────────────────────────────────────────────────
|
|
|
|
async function callModel(model: string, system: string, user: string): Promise<{ content: string; ok: boolean; error?: string }> {
|
|
const provider = model.includes("/") ? "openrouter"
|
|
: (model.startsWith("kimi-") || model.startsWith("qwen3-coder") || model.startsWith("deepseek-v") ||
|
|
model.startsWith("mistral-large") || model === "gpt-oss:120b" || model === "qwen3.5:397b")
|
|
? "ollama_cloud" : "ollama";
|
|
try {
|
|
const r = await fetch(`${gatewayUrl()}/v1/chat`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
provider, model,
|
|
messages: [
|
|
{ role: "system", content: system },
|
|
{ role: "user", content: user },
|
|
],
|
|
max_tokens: 1500,
|
|
temperature: 0.1,
|
|
}),
|
|
signal: AbortSignal.timeout(180_000),
|
|
});
|
|
if (!r.ok) return { content: "", ok: false, error: `HTTP ${r.status}: ${(await r.text()).slice(0, 240)}` };
|
|
const j: any = await r.json();
|
|
const content = j?.choices?.[0]?.message?.content ?? "";
|
|
return { content, ok: true };
|
|
} catch (e) {
|
|
return { content: "", ok: false, error: (e as Error).message.slice(0, 240) };
|
|
}
|
|
}
|
|
|
|
// ─── Validation gate (deterministic; never an LLM) ───────────────
|
|
|
|
function validateResponse(response: string, bundle: ContextBundle | null): ValidationResult {
|
|
const reasons: string[] = [];
|
|
const trimmed = response.trim();
|
|
|
|
if (trimmed.length === 0) {
|
|
return { passed: false, reasons: ["empty response"] };
|
|
}
|
|
if (trimmed.length < 80) {
|
|
reasons.push(`response too short (${trimmed.length} chars; min 80)`);
|
|
}
|
|
// Filler / hedge patterns the spec explicitly rejects.
|
|
const fillers = [/as an ai/i, /i cannot/i, /i'm sorry, but/i, /i don'?t have access/i, /i am unable to/i];
|
|
for (const re of fillers) {
|
|
if (re.test(trimmed)) {
|
|
reasons.push(`filler/hedge phrase detected: ${re}`);
|
|
}
|
|
}
|
|
// If a validation checklist was supplied, expect the response to
|
|
// touch at least one of the checklist topics. Soft check: presence of
|
|
// any checklist token (≥3 chars) in the response.
|
|
if (bundle && bundle.validation_steps.length > 0) {
|
|
const checklistTokens = new Set<string>();
|
|
for (const s of bundle.validation_steps) {
|
|
for (const t of tokenize(s)) checklistTokens.add(t);
|
|
}
|
|
const respTokens = tokenize(trimmed);
|
|
let overlap = 0;
|
|
for (const t of checklistTokens) if (respTokens.has(t)) overlap++;
|
|
if (checklistTokens.size > 0 && overlap === 0) {
|
|
reasons.push("response shares no tokens with validation checklist (may not have followed prior patterns)");
|
|
}
|
|
}
|
|
|
|
return { passed: reasons.length === 0, reasons };
|
|
}
|
|
|
|
// Test/dry-run synthesizer. Produces a deterministic response that
|
|
// echoes context-bundle signals so the retrieval+validation pipeline
|
|
// can be tested without an LLM. NOT used outside tests.
|
|
function dryRunSynthesize(task: string, bundle: ContextBundle | null): string {
|
|
const parts: string[] = [
|
|
"Synthetic dry-run response for task: " + task.slice(0, 120),
|
|
"",
|
|
];
|
|
if (bundle) {
|
|
parts.push(`Retrieved ${bundle.retrieved_playbooks.length} playbooks; ${bundle.prior_successful_outputs.length} accepted, ${bundle.failure_patterns.length} partial.`);
|
|
if (bundle.validation_steps.length > 0) {
|
|
parts.push("Following validation checklist:");
|
|
for (const s of bundle.validation_steps.slice(0, 3)) parts.push("- " + s);
|
|
}
|
|
if (bundle.prior_successful_outputs[0]) {
|
|
parts.push("");
|
|
parts.push("Anchored on prior accepted: " + bundle.prior_successful_outputs[0].title);
|
|
}
|
|
} else {
|
|
parts.push("No retrieval context — answering from task alone. Verify and check produced output before approving.");
|
|
}
|
|
return parts.join("\n");
|
|
}
|
|
|
|
// ─── Evidence logging ────────────────────────────────────────────
|
|
|
|
async function logReplayEvidence(root: string, result: ReplayResult): Promise<void> {
|
|
const path = resolve(root, "data/_kb/replay_runs.jsonl");
|
|
mkdirSync(dirname(path), { recursive: true });
|
|
const row = {
|
|
schema: "replay_run.v1",
|
|
...result,
|
|
// Truncate model_response in the persisted log to keep file lean;
|
|
// full text lives in the in-memory ReplayResult and any caller
|
|
// wanting the verbatim output can re-run with the same task.
|
|
model_response: result.model_response.slice(0, 4000),
|
|
};
|
|
appendFileSync(path, JSON.stringify(row) + "\n");
|
|
}
|
|
|
|
// ─── Top-level replay function ───────────────────────────────────
|
|
|
|
export async function replay(opts: ReplayRequest, root = DEFAULT_ROOT): Promise<ReplayResult> {
|
|
const t0 = Date.now();
|
|
const recorded_at = new Date().toISOString();
|
|
const task_hash = await canonicalSha256(opts.task);
|
|
|
|
const corpus = loadRagCorpus(root);
|
|
const bundle = opts.no_retrieval ? null : buildContextBundle(corpus, opts.task);
|
|
const { system, user } = buildPrompt(opts.task, bundle);
|
|
|
|
const escalation_path: string[] = [];
|
|
let model_response = "";
|
|
let model_used = "";
|
|
let validation: ValidationResult = { passed: false, reasons: ["never executed"] };
|
|
|
|
// Try local model first.
|
|
escalation_path.push(LOCAL_MODEL);
|
|
model_used = LOCAL_MODEL;
|
|
const localCall = opts.dry_run
|
|
? { ok: true, content: dryRunSynthesize(opts.task, bundle) }
|
|
: await callModel(LOCAL_MODEL, system, user);
|
|
if (localCall.ok) {
|
|
model_response = localCall.content;
|
|
validation = validateResponse(model_response, bundle);
|
|
} else {
|
|
validation = { passed: false, reasons: [`local call failed: ${localCall.error}`] };
|
|
}
|
|
|
|
// Escalate if validation failed AND escalation allowed.
|
|
if (!validation.passed && opts.allow_escalation && !opts.local_only) {
|
|
escalation_path.push(ESCALATION_MODEL);
|
|
const escalCall = opts.dry_run
|
|
? { ok: true, content: dryRunSynthesize(opts.task, bundle) + "\n\n[ESCALATED]" }
|
|
: await callModel(ESCALATION_MODEL, system, user);
|
|
if (escalCall.ok) {
|
|
model_response = escalCall.content;
|
|
model_used = ESCALATION_MODEL;
|
|
validation = validateResponse(model_response, bundle);
|
|
if (validation.passed) validation.reasons.unshift(`recovered via escalation to ${ESCALATION_MODEL}`);
|
|
} else {
|
|
validation.reasons.push(`escalation also failed: ${escalCall.error}`);
|
|
}
|
|
}
|
|
|
|
// Stable derivation from task_hash + recorded_at (already an ISO
|
|
// timestamp captured at start of the call). Avoids a second wall-clock
|
|
// read and makes run_id reproducible given a fixed recorded_at — useful
|
|
// for fixture-driven tests + acceptance gates. Replaces Date.now()-based
|
|
// id post-Kimi-audit 2026-04-27.
|
|
const recorded_run_id = `replay:${task_hash.slice(0, 16)}:${(await canonicalSha256(recorded_at)).slice(0, 12)}`;
|
|
const result: ReplayResult = {
|
|
input_task: opts.task,
|
|
task_hash,
|
|
retrieved_artifacts: { rag_ids: bundle?.retrieved_playbooks.map(p => p.rag_id) ?? [] },
|
|
context_bundle: bundle,
|
|
model_response,
|
|
model_used,
|
|
escalation_path,
|
|
validation_result: validation,
|
|
recorded_run_id,
|
|
recorded_at,
|
|
duration_ms: Date.now() - t0,
|
|
};
|
|
|
|
await logReplayEvidence(root, result);
|
|
return result;
|
|
}
|
|
|
|
// ─── CLI ──────────────────────────────────────────────────────────
|
|
|
|
async function cli() {
|
|
const taskIdx = process.argv.indexOf("--task");
|
|
if (taskIdx < 0 || !process.argv[taskIdx + 1]) {
|
|
console.error("usage: replay.ts --task \"<input>\" [--local-only] [--allow-escalation] [--no-retrieval]");
|
|
process.exit(2);
|
|
}
|
|
const task = process.argv[taskIdx + 1];
|
|
const local_only = process.argv.includes("--local-only");
|
|
const allow_escalation = process.argv.includes("--allow-escalation");
|
|
const no_retrieval = process.argv.includes("--no-retrieval");
|
|
|
|
const r = await replay({ task, local_only, allow_escalation, no_retrieval });
|
|
|
|
console.log(`[replay] run_id=${r.recorded_run_id}`);
|
|
console.log(`[replay] retrieval: ${r.context_bundle ? r.context_bundle.retrieved_playbooks.length + " playbooks" : "DISABLED"}`);
|
|
console.log(`[replay] escalation_path: ${r.escalation_path.join(" → ")}`);
|
|
console.log(`[replay] model_used: ${r.model_used} · ${r.duration_ms}ms`);
|
|
console.log(`[replay] validation: ${r.validation_result.passed ? "PASS" : "FAIL"}${r.validation_result.reasons.length ? " (" + r.validation_result.reasons.join("; ") + ")" : ""}`);
|
|
console.log("");
|
|
console.log("─── response ───");
|
|
console.log(r.model_response.slice(0, 1500));
|
|
if (r.model_response.length > 1500) console.log(`... [${r.model_response.length - 1500} more chars]`);
|
|
}
|
|
|
|
if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });
|