// Phase 22 — Internal Knowledge Library. // // Sits on top of the successful-playbook store. Tracks which // configurations produced which outcomes for which playbook // signatures, detects fail→succeed error corrections across runs, // and emits pathway recommendations that the NEXT scenario reads // at startup. // // Event-driven cycle (not wall-clock polling): // scenario ends → kb.indexRun() → kb.recommendFor(nextSpec) → next // scenario reads top rec and applies. // // File layout under data/_kb/: // signatures.jsonl — (sig_hash, embedding[], first_seen, last_seen, run_count) // outcomes.jsonl — append-only per-run: {sig, run_id, models, pathway, ok_events, elapsed_s, errors[]} // config_snapshots.jsonl — config/models.json hash + env at each ingest // error_corrections.jsonl — fail→succeed deltas // pathway_recommendations.jsonl — AI-synthesized suggestions // // Why file-based: Phase 19 playbook_memory is already in the // catalogd+vectord stack. This is a separate meta-layer — keeping // it file-based first lets us iterate quickly without a gateway // schema migration. Rust port lands once the shape stabilizes // (mirrors how Phase 21 primitives are TS-first → Rust next sprint). import { mkdir, readFile, writeFile, appendFile, readdir } from "node:fs/promises"; import { join } from "node:path"; import { createHash } from "node:crypto"; import { SIDECAR, generateContinuable } from "./agent.ts"; const KB_DIR = "data/_kb"; const SIGNATURES_FILE = "signatures.jsonl"; const OUTCOMES_FILE = "outcomes.jsonl"; const CONFIG_SNAPSHOTS_FILE = "config_snapshots.jsonl"; const ERROR_CORRECTIONS_FILE = "error_corrections.jsonl"; const RECOMMENDATIONS_FILE = "pathway_recommendations.jsonl"; // What a playbook signature looks like — stable hash of the scenario // shape that ignores timestamps and specific worker IDs. Two runs with // the same sequence of (kind, role, count, city, state) get the same // hash. This is the retrieval key for the KB. export interface PlaybookSignature { sig_hash: string; client: string; events_digest: string; // human-readable event summary embedding: number[]; // sidecar embed, for nearest-neighbor first_seen: string; last_seen: string; run_count: number; } // What we record after a run completes. export interface RunOutcome { sig_hash: string; run_id: string; // scenario dir basename date: string; models: { executor: string; reviewer: string; overview: string; overview_cloud: boolean; }; ok_events: number; total_events: number; total_turns: number; total_gap_signals: number; total_citations: number; per_event: Array<{ at: string; kind: string; role: string; count: number; ok: boolean; turns: number; error: string | null; }>; elapsed_secs: number; created_at: string; } // The AI-synthesized recommendation written back for future runs. export interface PathwayRecommendation { sig_hash: string; generated_at: string; generated_by_model: string; confidence: "high" | "medium" | "low"; rationale: string; // prose from the recommender top_models: { // suggested tier assignments executor?: string; reviewer?: string; overview?: string; }; budget_hints: { // suggested max_tokens / think flags executor_max_tokens?: number; reviewer_max_tokens?: number; executor_think?: boolean; }; pathway_notes: string; // "pre-fetch cert data", "use buffer of 3+" neighbors_consulted: string[]; // sig_hashes of the playbooks that shaped this rec } // Compute a stable hash from the scenario spec. Two runs with the same // events list (ignoring SMS drafts / timestamps / exclude lists) get // the same hash. export function computeSignature(spec: { client: string; events: Array<{ kind: string; role: string; count: number; city: string; state: string }>; }): string { const canonical = JSON.stringify({ client: spec.client, events: spec.events.map(e => ({ kind: e.kind, role: e.role, count: e.count, city: e.city, state: e.state, })), }); return createHash("sha256").update(canonical).digest("hex").slice(0, 16); } // Human-readable digest of the spec, used in recommender prompts. export function specDigest(spec: { client: string; events: Array<{ kind: string; role: string; count: number; city: string; state: string }>; }): string { return `${spec.client}: ` + spec.events.map(e => `${e.kind}/${e.role}×${e.count} in ${e.city},${e.state}` ).join(" | "); } async function ensureKb(): Promise { await mkdir(KB_DIR, { recursive: true }); } async function readJsonl(file: string): Promise { try { const raw = await readFile(join(KB_DIR, file), "utf8"); return raw.split("\n").filter(l => l.trim()).map(l => JSON.parse(l) as T); } catch { return []; } } // Index a completed scenario run. Extracts signature, computes // embedding, appends outcome, updates or inserts signature entry. export async function indexRun( scenarioDir: string, spec: { client: string; date: string; events: Array }, models: { executor: string; reviewer: string; overview: string; overview_cloud: boolean }, elapsed_secs: number, ): Promise<{ sig_hash: string; outcome: RunOutcome }> { await ensureKb(); const sig_hash = computeSignature(spec); const digest = specDigest(spec); // Read results.json produced by scenario.ts to build the outcome record. const resultsRaw = await readFile(join(scenarioDir, "results.json"), "utf8"); const results = JSON.parse(resultsRaw) as any[]; const outcome: RunOutcome = { sig_hash, run_id: scenarioDir.split("/").pop() ?? scenarioDir, date: spec.date, models, ok_events: results.filter(r => r.ok).length, total_events: results.length, total_turns: results.reduce((s, r) => s + (r.turns ?? 0), 0), total_gap_signals: results.reduce((s, r) => s + (r.gap_signals?.length ?? 0), 0), total_citations: results.reduce((s, r) => s + (r.playbook_citations?.length ?? 0), 0), per_event: results.map(r => ({ at: r.event.at, kind: r.event.kind, role: r.event.role, count: r.event.count, ok: r.ok, turns: r.turns ?? 0, error: r.error ?? null, })), elapsed_secs, created_at: new Date().toISOString(), }; await appendFile(join(KB_DIR, OUTCOMES_FILE), JSON.stringify(outcome) + "\n"); // Embed + upsert signature. Skip if embedding fails — the outcome is // still persisted, just without neighbor-search hookup. try { const resp = await fetch(`${SIDECAR}/embed`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ texts: [digest] }), }); if (resp.ok) { const data: any = await resp.json(); const embedding: number[] = data.embeddings?.[0] ?? []; const existing = await readJsonl(SIGNATURES_FILE); const now = new Date().toISOString(); const found = existing.find(s => s.sig_hash === sig_hash); if (found) { found.last_seen = now; found.run_count += 1; } else { existing.push({ sig_hash, client: spec.client, events_digest: digest, embedding, first_seen: now, last_seen: now, run_count: 1, }); } // Rewrite the whole file — cheap while KB is small; swap to // append-with-periodic-compact when row count exceeds ~10k. await writeFile( join(KB_DIR, SIGNATURES_FILE), existing.map(s => JSON.stringify(s)).join("\n") + "\n", ); } } catch { // Signature indexing failed — outcome is still captured. Next // indexRun retries. } return { sig_hash, outcome }; } // Cosine similarity for neighbor lookup. Float32-in-memory is fine for // O(thousands) signatures; swap to vectord HNSW once the corpus grows. function cosine(a: number[], b: number[]): number { if (a.length === 0 || a.length !== b.length) return 0; let dot = 0, na = 0, nb = 0; for (let i = 0; i < a.length; i++) { dot += a[i] * b[i]; na += a[i] * a[i]; nb += b[i] * b[i]; } return dot / (Math.sqrt(na) * Math.sqrt(nb) + 1e-12); } // Find the k nearest-neighbor signatures for a target spec, returning // neighbor sigs + their outcome history. This is what the recommender // feeds to the overview model. export async function findNeighbors(spec: any, k = 5): Promise> { await ensureKb(); const digest = specDigest(spec); const targetHash = computeSignature(spec); const resp = await fetch(`${SIDECAR}/embed`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ texts: [digest] }), }); if (!resp.ok) return []; const data: any = await resp.json(); const targetVec: number[] = data.embeddings?.[0] ?? []; const sigs = await readJsonl(SIGNATURES_FILE); const outcomes = await readJsonl(OUTCOMES_FILE); const ranked = sigs .filter(s => s.sig_hash !== targetHash) // don't recommend against self .map(s => ({ sig: s, similarity: cosine(targetVec, s.embedding) })) .sort((a, b) => b.similarity - a.similarity) .slice(0, k); return ranked.map(r => ({ sig: r.sig, similarity: r.similarity, outcomes: outcomes.filter(o => o.sig_hash === r.sig.sig_hash), })); } // Detect fail→succeed pairs within outcomes for the same signature. // Diff the config/models between them; that's the correction. export async function detectErrorCorrections(): Promise; }>> { const outcomes = await readJsonl(OUTCOMES_FILE); const bySig = new Map(); for (const o of outcomes) { const arr = bySig.get(o.sig_hash) ?? []; arr.push(o); bySig.set(o.sig_hash, arr); } const corrections: any[] = []; for (const [sig, runs] of bySig) { runs.sort((a, b) => a.created_at.localeCompare(b.created_at)); for (let i = 1; i < runs.length; i++) { const prev = runs[i - 1]; const cur = runs[i]; if (prev.ok_events < prev.total_events && cur.ok_events > prev.ok_events) { const changed: Record = {}; for (const k of ["executor", "reviewer", "overview"] as const) { if (prev.models[k] !== cur.models[k]) { changed[k] = { from: prev.models[k], to: cur.models[k] }; } } corrections.push({ sig_hash: sig, before: prev, after: cur, models_changed: changed }); } } } return corrections; } // Generate a pathway recommendation for a target spec. Reads k-NN // signatures + their outcome history, asks an overview model to // synthesize "best path forward", writes to recommendations file. export async function recommendFor( spec: any, opts: { overview_model?: string; cloud?: boolean; k?: number } = {}, ): Promise { await ensureKb(); const k = opts.k ?? 5; const overviewModel = opts.overview_model ?? "gpt-oss:20b"; const cloud = opts.cloud ?? false; const neighbors = await findNeighbors(spec, k); const corrections = await detectErrorCorrections(); const targetHash = computeSignature(spec); if (neighbors.length === 0) { // Cold start — no history. Write a minimal rec so next run still // gets "we looked" instead of a missing file. const rec: PathwayRecommendation = { sig_hash: targetHash, generated_at: new Date().toISOString(), generated_by_model: overviewModel, confidence: "low", rationale: "No prior runs in KB — first time this signature is seen. Use default model matrix.", top_models: {}, budget_hints: {}, pathway_notes: "No neighbor history to draw on. Proceed with current config; KB will have data after this run.", neighbors_consulted: [], }; await appendFile(join(KB_DIR, RECOMMENDATIONS_FILE), JSON.stringify(rec) + "\n"); return rec; } // Build the prompt. Include target spec digest, neighbor digests with // their outcome stats, and recent error corrections. Ask for // structured output so we can parse it. const neighborBlock = neighbors.map(n => { const best = n.outcomes.reduce((a, b) => a && a.ok_events / Math.max(1, a.total_events) >= b.ok_events / Math.max(1, b.total_events) ? a : b, n.outcomes[0]); const avgOk = n.outcomes.length > 0 ? (n.outcomes.reduce((s, o) => s + o.ok_events, 0) / n.outcomes.length).toFixed(1) : "?"; return `- sig ${n.sig.sig_hash} sim=${n.similarity.toFixed(3)} (${n.sig.events_digest.slice(0, 100)}): ${n.outcomes.length} runs, avg ${avgOk}/${best?.total_events ?? "?"} ok; best models: exec=${best?.models.executor ?? "?"} review=${best?.models.reviewer ?? "?"}`; }).join("\n"); const correctionBlock = corrections.slice(-3).map(c => `- sig ${c.sig_hash}: ${c.before.ok_events}/${c.before.total_events} → ${c.after.ok_events}/${c.after.total_events}; changed=${JSON.stringify(c.models_changed)}` ).join("\n") || "(no corrections observed yet)"; const prompt = `You are the pathway recommender for a staffing coordinator agent system. A new scenario is about to run. Your job: use history of similar runs to recommend the best configuration. TARGET SCENARIO: ${specDigest(spec)} ${k} NEAREST-NEIGHBOR SIGNATURES FROM HISTORY (by cosine similarity): ${neighborBlock} RECENT ERROR CORRECTIONS (fail → succeed deltas on same signature): ${correctionBlock} Your output MUST be a valid JSON object with this shape (and nothing else — no prose before or after): { "confidence": "high" | "medium" | "low", "rationale": "2-3 sentence explanation of what pattern you saw", "top_models": {"executor": "...", "reviewer": "...", "overview": "..."}, "budget_hints": {"executor_max_tokens": 800, "reviewer_max_tokens": 600, "executor_think": false}, "pathway_notes": "concrete pre-run advice — what to pre-fetch, what to avoid, buffer sizes" } Respond with ONLY the JSON object.`; let raw = ""; try { raw = await generateContinuable(overviewModel, prompt, { max_tokens: 1200, shape: "json", cloud, max_continuations: 2, }); } catch (e) { return null; } let parsed: any; try { const m = raw.match(/\{[\s\S]*\}/); parsed = m ? JSON.parse(m[0]) : null; } catch { parsed = null; } if (!parsed) return null; const rec: PathwayRecommendation = { sig_hash: targetHash, generated_at: new Date().toISOString(), generated_by_model: overviewModel, confidence: parsed.confidence ?? "low", rationale: String(parsed.rationale ?? "").slice(0, 1000), top_models: parsed.top_models ?? {}, budget_hints: parsed.budget_hints ?? {}, pathway_notes: String(parsed.pathway_notes ?? "").slice(0, 2000), neighbors_consulted: neighbors.map(n => n.sig.sig_hash), }; await appendFile(join(KB_DIR, RECOMMENDATIONS_FILE), JSON.stringify(rec) + "\n"); return rec; } // Read-back at scenario start: find the newest recommendation for this // exact sig_hash (or nearest neighbor of it). Returns null if nothing // applicable found. export async function loadRecommendation(spec: any): Promise { await ensureKb(); const targetHash = computeSignature(spec); const recs = await readJsonl(RECOMMENDATIONS_FILE); const matching = recs.filter(r => r.sig_hash === targetHash); if (matching.length === 0) return null; matching.sort((a, b) => b.generated_at.localeCompare(a.generated_at)); return matching[0]; } // Record a config snapshot. Called whenever models.json or the active // model assignments change, so error-correction diffs have history to // reference beyond the outcomes file. export async function snapshotConfig( models_json_hash: string, active_models: Record, why: string, ): Promise { await ensureKb(); await appendFile( join(KB_DIR, CONFIG_SNAPSHOTS_FILE), JSON.stringify({ at: new Date().toISOString(), models_json_hash, active_models, why, }) + "\n", ); }