// replay.ts — Phase 7 distillation replay layer. // // Takes a task → retrieves matching playbooks/RAG records → builds a // context bundle → calls a LOCAL model → validates → escalates if // needed → logs the run as new evidence. // // This is NOT training. It's runtime behavior shaping via retrieval. // A weak local model with the right prior context outperforms the // same model with no context — proven by the local_only vs retrieval // A/B in the report. // // Spec invariants: // - never bypass retrieval // - never discard provenance // - never allow free-form hallucinated output (validation gate) // - log every run as new evidence (data/_kb/replay_runs.jsonl) import { existsSync, readFileSync, mkdirSync, appendFileSync } from "node:fs"; import { resolve, dirname } from "node:path"; import { canonicalSha256 } from "../../auditor/schemas/distillation/types"; const DEFAULT_ROOT = process.env.LH_DISTILL_ROOT ?? "/home/profit/lakehouse"; // Read env per-call so tests can override GATEWAY mid-process. function gatewayUrl(): string { return process.env.LH_GATEWAY_URL ?? "http://localhost:3100"; } const LOCAL_MODEL = process.env.LH_REPLAY_LOCAL_MODEL ?? "qwen3.5:latest"; const ESCALATION_MODEL = process.env.LH_REPLAY_ESCALATION_MODEL ?? "deepseek-v3.1:671b"; export interface ReplayRequest { task: string; local_only?: boolean; // never escalate; just record validation result allow_escalation?: boolean; // try the bigger model on local failure no_retrieval?: boolean; // baseline mode: skip context bundle // Test-only: return a synthetic response without calling the gateway. // The synthetic response is deterministic (echoes context bundle // signals) so retrieval/bundle/log tests can run without an LLM. dry_run?: boolean; } export interface RetrievedArtifact { rag_id: string; source_run_id: string; title: string; content_preview: string; // first 240 chars of content success_score: string; tags: string[]; score: number; // overlap score with task } export interface ContextBundle { retrieved_playbooks: RetrievedArtifact[]; // top accepted prior_successful_outputs: RetrievedArtifact[]; // accepted samples used as in-context exemplars failure_patterns: RetrievedArtifact[]; // partial/needs-review samples used as warnings validation_steps: string[]; // extracted from accepted-content lines starting with "verify"/"check"/"assert" bundle_token_estimate: number; } export interface ValidationResult { passed: boolean; reasons: string[]; // explicit, every gate names itself } export interface ReplayResult { input_task: string; task_hash: string; // sha256 of task — stable replay id retrieved_artifacts: { rag_ids: string[] }; context_bundle: ContextBundle | null; // null when --no-retrieval model_response: string; model_used: string; escalation_path: string[]; // models tried in order, e.g. ["qwen3.5:latest", "deepseek-v3.1:671b"] validation_result: ValidationResult; recorded_run_id: string; recorded_at: string; duration_ms: number; } interface RagSample { id: string; title: string; content: string; tags: string[]; source_run_id: string; success_score: string; source_category: string; } // ─── Retrieval ──────────────────────────────────────────────────── function tokenize(text: string): Set { // Lowercase + alphanumeric tokens of length ≥3. Keeps it simple and // deterministic; future tightening: add embedding similarity if the // RAG corpus grows past keyword scaling limits. return new Set( text.toLowerCase() .split(/[^a-z0-9_]+/) .filter(t => t.length >= 3), ); } function jaccard(a: Set, b: Set): number { if (a.size === 0 || b.size === 0) return 0; let inter = 0; for (const t of a) if (b.has(t)) inter++; const union = a.size + b.size - inter; return union === 0 ? 0 : inter / union; } function loadRagCorpus(root: string): RagSample[] { const path = resolve(root, "exports/rag/playbooks.jsonl"); if (!existsSync(path)) return []; const out: RagSample[] = []; for (const line of readFileSync(path, "utf8").split("\n")) { if (!line) continue; try { out.push(JSON.parse(line) as RagSample); } catch { /* skip */ } } return out; } function retrieveRag(corpus: RagSample[], task: string, topK = 5): RetrievedArtifact[] { const taskTokens = tokenize(task); const scored = corpus.map(r => { const text = `${r.title} ${r.content} ${(r.tags ?? []).join(" ")}`; const score = jaccard(taskTokens, tokenize(text)); return { record: r, score }; }); scored.sort((a, b) => b.score - a.score); return scored.slice(0, topK) .filter(s => s.score > 0) .map(s => ({ rag_id: s.record.id, source_run_id: s.record.source_run_id, title: s.record.title, content_preview: s.record.content.slice(0, 240), success_score: s.record.success_score, tags: s.record.tags ?? [], score: s.score, })); } // Extract sentences that read like a check/verify/assert step from // accepted samples — these are the validation_steps the local model // should follow. function extractValidationSteps(samples: RetrievedArtifact[], corpus: RagSample[]): string[] { const ids = new Set(samples.map(s => s.rag_id)); const steps: string[] = []; for (const r of corpus) { if (!ids.has(r.id)) continue; for (const line of r.content.split("\n")) { const t = line.trim(); if (/^[-*]\s*(verify|check|assert|confirm|ensure)\b/i.test(t) || /^\s*(verify|check|assert|confirm|ensure)\s/i.test(t)) { steps.push(t.slice(0, 200)); if (steps.length >= 6) return steps; } } } return steps; } function buildContextBundle(corpus: RagSample[], task: string): ContextBundle { const top = retrieveRag(corpus, task, 8); const accepted = top.filter(t => t.success_score === "accepted").slice(0, 3); const warnings = top.filter(t => t.success_score === "partially_accepted").slice(0, 2); const validation_steps = extractValidationSteps(accepted, corpus); // Token estimate (~4 chars/token rough) const totalChars = [...accepted, ...warnings].reduce( (a, x) => a + x.content_preview.length + x.title.length, 0, ) + validation_steps.reduce((a, s) => a + s.length, 0); const bundle_token_estimate = Math.ceil(totalChars / 4); return { retrieved_playbooks: top, prior_successful_outputs: accepted, failure_patterns: warnings, validation_steps, bundle_token_estimate, }; } // ─── Prompt assembly ────────────────────────────────────────────── function buildPrompt(task: string, bundle: ContextBundle | null): { system: string; user: string } { const system = [ "You are a Lakehouse task executor. Stay grounded — only assert what you can derive from the prior successful patterns or the task itself.", "Do NOT hedge. Do NOT say 'as an AI'. Produce a concrete actionable answer.", "When prior successful outputs are provided, follow their style and format.", ].join(" "); if (!bundle) { return { system, user: `Task: ${task}\n\nProduce the answer.` }; } const parts: string[] = []; if (bundle.prior_successful_outputs.length > 0) { parts.push("## Prior successful runs on similar tasks"); parts.push(""); for (const r of bundle.prior_successful_outputs) { parts.push(`### ${r.title} (score: ${r.success_score})`); parts.push(r.content_preview); parts.push(""); } } if (bundle.failure_patterns.length > 0) { parts.push("## Patterns that produced PARTIAL results — avoid these failure modes"); parts.push(""); for (const r of bundle.failure_patterns) { parts.push(`- ${r.title}: ${r.content_preview.slice(0, 160)}`); } parts.push(""); } if (bundle.validation_steps.length > 0) { parts.push("## Validation checklist (from accepted runs)"); for (const s of bundle.validation_steps) parts.push(`- ${s}`); parts.push(""); } parts.push("## Task"); parts.push(task); parts.push(""); parts.push("Produce the answer following the style of the prior successful runs above."); return { system, user: parts.join("\n") }; } // ─── Model call ─────────────────────────────────────────────────── async function callModel(model: string, system: string, user: string): Promise<{ content: string; ok: boolean; error?: string }> { const provider = model.includes("/") ? "openrouter" : (model.startsWith("kimi-") || model.startsWith("qwen3-coder") || model.startsWith("deepseek-v") || model.startsWith("mistral-large") || model === "gpt-oss:120b" || model === "qwen3.5:397b") ? "ollama_cloud" : "ollama"; try { const r = await fetch(`${gatewayUrl()}/v1/chat`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ provider, model, messages: [ { role: "system", content: system }, { role: "user", content: user }, ], max_tokens: 1500, temperature: 0.1, }), signal: AbortSignal.timeout(180_000), }); if (!r.ok) return { content: "", ok: false, error: `HTTP ${r.status}: ${(await r.text()).slice(0, 240)}` }; const j: any = await r.json(); const content = j?.choices?.[0]?.message?.content ?? ""; return { content, ok: true }; } catch (e) { return { content: "", ok: false, error: (e as Error).message.slice(0, 240) }; } } // ─── Validation gate (deterministic; never an LLM) ─────────────── function validateResponse(response: string, bundle: ContextBundle | null): ValidationResult { const reasons: string[] = []; const trimmed = response.trim(); if (trimmed.length === 0) { return { passed: false, reasons: ["empty response"] }; } if (trimmed.length < 80) { reasons.push(`response too short (${trimmed.length} chars; min 80)`); } // Filler / hedge patterns the spec explicitly rejects. const fillers = [/as an ai/i, /i cannot/i, /i'm sorry, but/i, /i don'?t have access/i, /i am unable to/i]; for (const re of fillers) { if (re.test(trimmed)) { reasons.push(`filler/hedge phrase detected: ${re}`); } } // If a validation checklist was supplied, expect the response to // touch at least one of the checklist topics. Soft check: presence of // any checklist token (≥3 chars) in the response. if (bundle && bundle.validation_steps.length > 0) { const checklistTokens = new Set(); for (const s of bundle.validation_steps) { for (const t of tokenize(s)) checklistTokens.add(t); } const respTokens = tokenize(trimmed); let overlap = 0; for (const t of checklistTokens) if (respTokens.has(t)) overlap++; if (checklistTokens.size > 0 && overlap === 0) { reasons.push("response shares no tokens with validation checklist (may not have followed prior patterns)"); } } return { passed: reasons.length === 0, reasons }; } // Test/dry-run synthesizer. Produces a deterministic response that // echoes context-bundle signals so the retrieval+validation pipeline // can be tested without an LLM. NOT used outside tests. function dryRunSynthesize(task: string, bundle: ContextBundle | null): string { const parts: string[] = [ "Synthetic dry-run response for task: " + task.slice(0, 120), "", ]; if (bundle) { parts.push(`Retrieved ${bundle.retrieved_playbooks.length} playbooks; ${bundle.prior_successful_outputs.length} accepted, ${bundle.failure_patterns.length} partial.`); if (bundle.validation_steps.length > 0) { parts.push("Following validation checklist:"); for (const s of bundle.validation_steps.slice(0, 3)) parts.push("- " + s); } if (bundle.prior_successful_outputs[0]) { parts.push(""); parts.push("Anchored on prior accepted: " + bundle.prior_successful_outputs[0].title); } } else { parts.push("No retrieval context — answering from task alone. Verify and check produced output before approving."); } return parts.join("\n"); } // ─── Evidence logging ──────────────────────────────────────────── async function logReplayEvidence(root: string, result: ReplayResult): Promise { const path = resolve(root, "data/_kb/replay_runs.jsonl"); mkdirSync(dirname(path), { recursive: true }); const row = { schema: "replay_run.v1", ...result, // Truncate model_response in the persisted log to keep file lean; // full text lives in the in-memory ReplayResult and any caller // wanting the verbatim output can re-run with the same task. model_response: result.model_response.slice(0, 4000), }; appendFileSync(path, JSON.stringify(row) + "\n"); } // ─── Top-level replay function ─────────────────────────────────── export async function replay(opts: ReplayRequest, root = DEFAULT_ROOT): Promise { const t0 = Date.now(); const recorded_at = new Date().toISOString(); const task_hash = await canonicalSha256(opts.task); const corpus = loadRagCorpus(root); const bundle = opts.no_retrieval ? null : buildContextBundle(corpus, opts.task); const { system, user } = buildPrompt(opts.task, bundle); const escalation_path: string[] = []; let model_response = ""; let model_used = ""; let validation: ValidationResult = { passed: false, reasons: ["never executed"] }; // Try local model first. escalation_path.push(LOCAL_MODEL); model_used = LOCAL_MODEL; const localCall = opts.dry_run ? { ok: true, content: dryRunSynthesize(opts.task, bundle) } : await callModel(LOCAL_MODEL, system, user); if (localCall.ok) { model_response = localCall.content; validation = validateResponse(model_response, bundle); } else { validation = { passed: false, reasons: [`local call failed: ${localCall.error}`] }; } // Escalate if validation failed AND escalation allowed. if (!validation.passed && opts.allow_escalation && !opts.local_only) { escalation_path.push(ESCALATION_MODEL); const escalCall = opts.dry_run ? { ok: true, content: dryRunSynthesize(opts.task, bundle) + "\n\n[ESCALATED]" } : await callModel(ESCALATION_MODEL, system, user); if (escalCall.ok) { model_response = escalCall.content; model_used = ESCALATION_MODEL; validation = validateResponse(model_response, bundle); if (validation.passed) validation.reasons.unshift(`recovered via escalation to ${ESCALATION_MODEL}`); } else { validation.reasons.push(`escalation also failed: ${escalCall.error}`); } } // Stable derivation from task_hash + recorded_at (already an ISO // timestamp captured at start of the call). Avoids a second wall-clock // read and makes run_id reproducible given a fixed recorded_at — useful // for fixture-driven tests + acceptance gates. Replaces Date.now()-based // id post-Kimi-audit 2026-04-27. const recorded_run_id = `replay:${task_hash.slice(0, 16)}:${(await canonicalSha256(recorded_at)).slice(0, 12)}`; const result: ReplayResult = { input_task: opts.task, task_hash, retrieved_artifacts: { rag_ids: bundle?.retrieved_playbooks.map(p => p.rag_id) ?? [] }, context_bundle: bundle, model_response, model_used, escalation_path, validation_result: validation, recorded_run_id, recorded_at, duration_ms: Date.now() - t0, }; await logReplayEvidence(root, result); return result; } // ─── CLI ────────────────────────────────────────────────────────── async function cli() { const taskIdx = process.argv.indexOf("--task"); if (taskIdx < 0 || !process.argv[taskIdx + 1]) { console.error("usage: replay.ts --task \"\" [--local-only] [--allow-escalation] [--no-retrieval]"); process.exit(2); } const task = process.argv[taskIdx + 1]; const local_only = process.argv.includes("--local-only"); const allow_escalation = process.argv.includes("--allow-escalation"); const no_retrieval = process.argv.includes("--no-retrieval"); const r = await replay({ task, local_only, allow_escalation, no_retrieval }); console.log(`[replay] run_id=${r.recorded_run_id}`); console.log(`[replay] retrieval: ${r.context_bundle ? r.context_bundle.retrieved_playbooks.length + " playbooks" : "DISABLED"}`); console.log(`[replay] escalation_path: ${r.escalation_path.join(" → ")}`); console.log(`[replay] model_used: ${r.model_used} · ${r.duration_ms}ms`); console.log(`[replay] validation: ${r.validation_result.passed ? "PASS" : "FAIL"}${r.validation_result.reasons.length ? " (" + r.validation_result.reasons.join("; ") + ")" : ""}`); console.log(""); console.log("─── response ───"); console.log(r.model_response.slice(0, 1500)); if (r.model_response.length > 1500) console.log(`... [${r.model_response.length - 1500} more chars]`); } if (import.meta.main) cli().catch(e => { console.error(e); process.exit(1); });