// Phase 22 — Internal Knowledge Library. // // Sits on top of the successful-playbook store. Tracks which // configurations produced which outcomes for which playbook // signatures, detects fail→succeed error corrections across runs, // and emits pathway recommendations that the NEXT scenario reads // at startup. // // Event-driven cycle (not wall-clock polling): // scenario ends → kb.indexRun() → kb.recommendFor(nextSpec) → next // scenario reads top rec and applies. // // File layout under data/_kb/: // signatures.jsonl — (sig_hash, embedding[], first_seen, last_seen, run_count) // outcomes.jsonl — append-only per-run: {sig, run_id, models, pathway, ok_events, elapsed_s, errors[]} // config_snapshots.jsonl — config/models.json hash + env at each ingest // error_corrections.jsonl — fail→succeed deltas // pathway_recommendations.jsonl — AI-synthesized suggestions // // Why file-based: Phase 19 playbook_memory is already in the // catalogd+vectord stack. This is a separate meta-layer — keeping // it file-based first lets us iterate quickly without a gateway // schema migration. Rust port lands once the shape stabilizes // (mirrors how Phase 21 primitives are TS-first → Rust next sprint). import { mkdir, readFile, writeFile, appendFile, readdir } from "node:fs/promises"; import { join } from "node:path"; import { createHash } from "node:crypto"; import { SIDECAR, generateContinuable } from "./agent.ts"; const KB_DIR = "data/_kb"; const SIGNATURES_FILE = "signatures.jsonl"; const OUTCOMES_FILE = "outcomes.jsonl"; const CONFIG_SNAPSHOTS_FILE = "config_snapshots.jsonl"; const ERROR_CORRECTIONS_FILE = "error_corrections.jsonl"; const RECOMMENDATIONS_FILE = "pathway_recommendations.jsonl"; const STAFFERS_FILE = "staffers.jsonl"; // What a playbook signature looks like — stable hash of the scenario // shape that ignores timestamps and specific worker IDs. Two runs with // the same sequence of (kind, role, count, city, state) get the same // hash. This is the retrieval key for the KB. export interface PlaybookSignature { sig_hash: string; client: string; events_digest: string; // human-readable event summary embedding: number[]; // sidecar embed, for nearest-neighbor first_seen: string; last_seen: string; run_count: number; } // What we record after a run completes. export interface RunOutcome { sig_hash: string; run_id: string; // scenario dir basename date: string; models: { executor: string; reviewer: string; overview: string; overview_cloud: boolean; }; staffer?: { // Phase 23 — who ran this scenario id: string; name: string; tenure_months: number; role: string; }; ok_events: number; total_events: number; total_turns: number; total_gap_signals: number; total_citations: number; rescue_attempts: number; // Phase 22 item B rescue_successes: number; per_event: Array<{ at: string; kind: string; role: string; count: number; ok: boolean; turns: number; error: string | null; }>; elapsed_secs: number; created_at: string; } // Per-staffer aggregate. Recomputed from outcomes.jsonl after every // run. Competence_score weights neighbor retrieval so Senior staffers' // playbooks rank higher for similar scenarios than Junior staffers'. export interface StafferStats { id: string; name: string; tenure_months: number; role: string; total_runs: number; total_events_attempted: number; total_events_ok: number; fill_rate: number; // ok / attempted avg_turns_per_event: number; avg_citations_per_run: number; rescue_attempts: number; rescue_successes: number; rescue_rate: number; competence_score: number; // 0.0 - 1.0, see formula below last_updated: string; } // The AI-synthesized recommendation written back for future runs. export interface PathwayRecommendation { sig_hash: string; generated_at: string; generated_by_model: string; confidence: "high" | "medium" | "low"; rationale: string; // prose from the recommender top_models: { // suggested tier assignments executor?: string; reviewer?: string; overview?: string; }; budget_hints: { // suggested max_tokens / think flags executor_max_tokens?: number; reviewer_max_tokens?: number; executor_think?: boolean; }; pathway_notes: string; // "pre-fetch cert data", "use buffer of 3+" neighbors_consulted: string[]; // sig_hashes of the playbooks that shaped this rec } // Compute a stable hash from the scenario spec. Two runs with the same // events list (ignoring SMS drafts / timestamps / exclude lists) get // the same hash. export function computeSignature(spec: { client: string; events: Array<{ kind: string; role: string; count: number; city: string; state: string }>; }): string { const canonical = JSON.stringify({ client: spec.client, events: spec.events.map(e => ({ kind: e.kind, role: e.role, count: e.count, city: e.city, state: e.state, })), }); return createHash("sha256").update(canonical).digest("hex").slice(0, 16); } // Human-readable digest of the spec, used in recommender prompts. export function specDigest(spec: { client: string; events: Array<{ kind: string; role: string; count: number; city: string; state: string }>; }): string { return `${spec.client}: ` + spec.events.map(e => `${e.kind}/${e.role}×${e.count} in ${e.city},${e.state}` ).join(" | "); } async function ensureKb(): Promise { await mkdir(KB_DIR, { recursive: true }); } async function readJsonl(file: string): Promise { try { const raw = await readFile(join(KB_DIR, file), "utf8"); return raw.split("\n").filter(l => l.trim()).map(l => JSON.parse(l) as T); } catch { return []; } } // Index a completed scenario run. Extracts signature, computes // embedding, appends outcome, updates or inserts signature entry. export async function indexRun( scenarioDir: string, spec: { client: string; date: string; events: Array; staffer?: { id: string; name: string; tenure_months: number; role: string } }, models: { executor: string; reviewer: string; overview: string; overview_cloud: boolean }, elapsed_secs: number, ): Promise<{ sig_hash: string; outcome: RunOutcome }> { await ensureKb(); const sig_hash = computeSignature(spec); const digest = specDigest(spec); // Read results.json produced by scenario.ts to build the outcome record. const resultsRaw = await readFile(join(scenarioDir, "results.json"), "utf8"); const results = JSON.parse(resultsRaw) as any[]; const rescue_attempts = results.filter(r => r.retry_remediation).length; const rescue_successes = results.filter(r => r.retry_remediation && r.ok).length; const outcome: RunOutcome = { sig_hash, run_id: scenarioDir.split("/").pop() ?? scenarioDir, date: spec.date, models, staffer: spec.staffer, ok_events: results.filter(r => r.ok).length, total_events: results.length, total_turns: results.reduce((s, r) => s + (r.turns ?? 0), 0), total_gap_signals: results.reduce((s, r) => s + (r.gap_signals?.length ?? 0), 0), total_citations: results.reduce((s, r) => s + (r.playbook_citations?.length ?? 0), 0), rescue_attempts, rescue_successes, per_event: results.map(r => ({ at: r.event.at, kind: r.event.kind, role: r.event.role, count: r.event.count, ok: r.ok, turns: r.turns ?? 0, error: r.error ?? null, })), elapsed_secs, created_at: new Date().toISOString(), }; await appendFile(join(KB_DIR, OUTCOMES_FILE), JSON.stringify(outcome) + "\n"); // Phase 23 — recompute this staffer's aggregate + competence_score. // Cheap until we have thousands of runs; swap to streaming aggregate // when that becomes a concern. if (spec.staffer) { await recomputeStafferStats(spec.staffer.id); } // Embed + upsert signature. Skip if embedding fails — the outcome is // still persisted, just without neighbor-search hookup. try { const resp = await fetch(`${SIDECAR}/embed`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ texts: [digest] }), }); if (resp.ok) { const data: any = await resp.json(); const embedding: number[] = data.embeddings?.[0] ?? []; const existing = await readJsonl(SIGNATURES_FILE); const now = new Date().toISOString(); const found = existing.find(s => s.sig_hash === sig_hash); if (found) { found.last_seen = now; found.run_count += 1; } else { existing.push({ sig_hash, client: spec.client, events_digest: digest, embedding, first_seen: now, last_seen: now, run_count: 1, }); } // Rewrite the whole file — cheap while KB is small; swap to // append-with-periodic-compact when row count exceeds ~10k. await writeFile( join(KB_DIR, SIGNATURES_FILE), existing.map(s => JSON.stringify(s)).join("\n") + "\n", ); } } catch { // Signature indexing failed — outcome is still captured. Next // indexRun retries. } return { sig_hash, outcome }; } // Phase 23 — per-staffer aggregate, recomputed from scratch on each // run. Writes the full table to STAFFERS_FILE (not append-only; we // always want the current-state snapshot). Competence score is bounded // 0..1 and combines four dimensions: // fill_rate — how often the staffer completes their events // turn_efficiency — lower turn counts are better // citation_density — signals use of playbook_memory feedback // rescue_rate — cloud rescue successes per attempt (doesn't // penalize staffers who never needed one) // Weights: fill 0.45, turn_eff 0.20, citation 0.20, rescue 0.15. // Rationale: completing the job matters most; everything else is style. export async function recomputeStafferStats(staffer_id: string): Promise { const outcomes = await readJsonl(OUTCOMES_FILE); const mine = outcomes.filter(o => o.staffer?.id === staffer_id); if (mine.length === 0) return null; const latest = mine[mine.length - 1].staffer!; const total_events_attempted = mine.reduce((s, o) => s + o.total_events, 0); const total_events_ok = mine.reduce((s, o) => s + o.ok_events, 0); const total_turns = mine.reduce((s, o) => s + o.total_turns, 0); const total_citations = mine.reduce((s, o) => s + o.total_citations, 0); const rescue_attempts = mine.reduce((s, o) => s + (o.rescue_attempts ?? 0), 0); const rescue_successes = mine.reduce((s, o) => s + (o.rescue_successes ?? 0), 0); const fill_rate = total_events_attempted > 0 ? total_events_ok / total_events_attempted : 0; const avg_turns_per_event = total_events_attempted > 0 ? total_turns / total_events_attempted : 0; const avg_citations_per_run = mine.length > 0 ? total_citations / mine.length : 0; const rescue_rate = rescue_attempts > 0 ? rescue_successes / rescue_attempts : 1.0; // never-rescued defaults to perfect // Normalize each axis to 0..1. Caps chosen from observed distributions. const turn_eff = avg_turns_per_event === 0 ? 0 : Math.max(0, 1 - Math.min(avg_turns_per_event / 10, 1)); const cite_norm = Math.min(avg_citations_per_run / 3, 1); const competence_score = 0.45 * fill_rate + 0.20 * turn_eff + 0.20 * cite_norm + 0.15 * rescue_rate; const stats: StafferStats = { id: staffer_id, name: latest.name, tenure_months: latest.tenure_months, role: latest.role, total_runs: mine.length, total_events_attempted, total_events_ok, fill_rate, avg_turns_per_event, avg_citations_per_run, rescue_attempts, rescue_successes, rescue_rate, competence_score, last_updated: new Date().toISOString(), }; // Rewrite whole file — cheap at O(staffers) scale. const existing = await readJsonl(STAFFERS_FILE); const others = existing.filter(s => s.id !== staffer_id); others.push(stats); await writeFile( join(KB_DIR, STAFFERS_FILE), others.map(s => JSON.stringify(s)).join("\n") + "\n", ); return stats; } // Public accessor — used by findNeighbors + the staffer report script. export async function loadStafferStats(): Promise { return readJsonl(STAFFERS_FILE); } // Cosine similarity for neighbor lookup. Float32-in-memory is fine for // O(thousands) signatures; swap to vectord HNSW once the corpus grows. function cosine(a: number[], b: number[]): number { if (a.length === 0 || a.length !== b.length) return 0; let dot = 0, na = 0, nb = 0; for (let i = 0; i < a.length; i++) { dot += a[i] * b[i]; na += a[i] * a[i]; nb += b[i] * b[i]; } return dot / (Math.sqrt(na) * Math.sqrt(nb) + 1e-12); } // Find the k nearest-neighbor signatures for a target spec, returning // neighbor sigs + their outcome history. This is what the recommender // feeds to the overview model. // // Phase 23 — weighted by staffer competence. The ranking score is // cosine_similarity * max_competence_among_this_sig's_outcomes. Top // staffers' playbooks surface first even if their scenario was // slightly less similar than a junior's. Sigs with no staffer history // fall back to raw similarity (competence defaults to 0.5). export async function findNeighbors(spec: any, k = 5): Promise> { await ensureKb(); const digest = specDigest(spec); const targetHash = computeSignature(spec); const resp = await fetch(`${SIDECAR}/embed`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ texts: [digest] }), }); if (!resp.ok) return []; const data: any = await resp.json(); const targetVec: number[] = data.embeddings?.[0] ?? []; const sigs = await readJsonl(SIGNATURES_FILE); const outcomes = await readJsonl(OUTCOMES_FILE); const staffers = await readJsonl(STAFFERS_FILE); const competenceById = new Map(staffers.map(s => [s.id, s.competence_score])); // Per-sig best staffer — the highest-competence coordinator who has // ever run this signature. Used to weight the sig's ranking. function bestStafferFor(sig_hash: string): { id: string | null; competence: number } { const mine = outcomes.filter(o => o.sig_hash === sig_hash && o.staffer?.id); if (mine.length === 0) return { id: null, competence: 0.5 }; let bestId: string | null = null; let bestComp = 0; for (const o of mine) { const c = competenceById.get(o.staffer!.id) ?? 0.5; if (c > bestComp) { bestComp = c; bestId = o.staffer!.id; } } return { id: bestId, competence: bestComp }; } const ranked = sigs .filter(s => s.sig_hash !== targetHash) // don't recommend against self .map(s => { const similarity = cosine(targetVec, s.embedding); const best = bestStafferFor(s.sig_hash); // Weighted score: similarity multiplied by the best staffer's // competence (floored at 0.3 so a sig with a low-competence // staffer still shows up if similarity is very high). const weight = Math.max(best.competence, 0.3); return { sig: s, similarity, weighted_score: similarity * weight, best_staffer_competence: best.competence, best_staffer_id: best.id, }; }) .sort((a, b) => b.weighted_score - a.weighted_score) .slice(0, k); return ranked.map(r => ({ ...r, outcomes: outcomes.filter(o => o.sig_hash === r.sig.sig_hash), })); } // Detect fail→succeed pairs within outcomes for the same signature. // Diff the config/models between them; that's the correction. export async function detectErrorCorrections(): Promise; }>> { const outcomes = await readJsonl(OUTCOMES_FILE); const bySig = new Map(); for (const o of outcomes) { const arr = bySig.get(o.sig_hash) ?? []; arr.push(o); bySig.set(o.sig_hash, arr); } const corrections: any[] = []; for (const [sig, runs] of bySig) { runs.sort((a, b) => a.created_at.localeCompare(b.created_at)); for (let i = 1; i < runs.length; i++) { const prev = runs[i - 1]; const cur = runs[i]; if (prev.ok_events < prev.total_events && cur.ok_events > prev.ok_events) { const changed: Record = {}; for (const k of ["executor", "reviewer", "overview"] as const) { if (prev.models[k] !== cur.models[k]) { changed[k] = { from: prev.models[k], to: cur.models[k] }; } } corrections.push({ sig_hash: sig, before: prev, after: cur, models_changed: changed }); } } } return corrections; } // Generate a pathway recommendation for a target spec. Reads k-NN // signatures + their outcome history, asks an overview model to // synthesize "best path forward", writes to recommendations file. export async function recommendFor( spec: any, opts: { overview_model?: string; cloud?: boolean; k?: number } = {}, ): Promise { await ensureKb(); const k = opts.k ?? 5; const overviewModel = opts.overview_model ?? "gpt-oss:20b"; const cloud = opts.cloud ?? false; const neighbors = await findNeighbors(spec, k); const corrections = await detectErrorCorrections(); const targetHash = computeSignature(spec); if (neighbors.length === 0) { // Cold start — no history. Write a minimal rec so next run still // gets "we looked" instead of a missing file. const rec: PathwayRecommendation = { sig_hash: targetHash, generated_at: new Date().toISOString(), generated_by_model: overviewModel, confidence: "low", rationale: "No prior runs in KB — first time this signature is seen. Use default model matrix.", top_models: {}, budget_hints: {}, pathway_notes: "No neighbor history to draw on. Proceed with current config; KB will have data after this run.", neighbors_consulted: [], }; await appendFile(join(KB_DIR, RECOMMENDATIONS_FILE), JSON.stringify(rec) + "\n"); return rec; } // Build the prompt. Include target spec digest, neighbor digests with // their outcome stats, and recent error corrections. Ask for // structured output so we can parse it. const staffers = await readJsonl(STAFFERS_FILE); const stafferById = new Map(staffers.map(s => [s.id, s])); const neighborBlock = neighbors.map(n => { const best = n.outcomes.reduce((a, b) => a && a.ok_events / Math.max(1, a.total_events) >= b.ok_events / Math.max(1, b.total_events) ? a : b, n.outcomes[0]); const avgOk = n.outcomes.length > 0 ? (n.outcomes.reduce((s, o) => s + o.ok_events, 0) / n.outcomes.length).toFixed(1) : "?"; const topStaffer = n.best_staffer_id ? stafferById.get(n.best_staffer_id) : null; const stafferTag = topStaffer ? ` [top staffer: ${topStaffer.name} (${topStaffer.role}, competence=${topStaffer.competence_score.toFixed(2)})]` : ""; return `- sig ${n.sig.sig_hash} sim=${n.similarity.toFixed(3)} weighted=${n.weighted_score.toFixed(3)}${stafferTag}: ${n.outcomes.length} runs, avg ${avgOk}/${best?.total_events ?? "?"} ok; best models: exec=${best?.models.executor ?? "?"} review=${best?.models.reviewer ?? "?"}`; }).join("\n"); const correctionBlock = corrections.slice(-3).map(c => `- sig ${c.sig_hash}: ${c.before.ok_events}/${c.before.total_events} → ${c.after.ok_events}/${c.after.total_events}; changed=${JSON.stringify(c.models_changed)}` ).join("\n") || "(no corrections observed yet)"; const prompt = `You are the pathway recommender for a staffing coordinator agent system. A new scenario is about to run. Your job: use history of similar runs to recommend the best configuration. TARGET SCENARIO: ${specDigest(spec)} ${k} NEAREST-NEIGHBOR SIGNATURES FROM HISTORY (by cosine similarity): ${neighborBlock} RECENT ERROR CORRECTIONS (fail → succeed deltas on same signature): ${correctionBlock} Your output MUST be a valid JSON object with this shape (and nothing else — no prose before or after): { "confidence": "high" | "medium" | "low", "rationale": "2-3 sentence explanation of what pattern you saw", "top_models": {"executor": "...", "reviewer": "...", "overview": "..."}, "budget_hints": {"executor_max_tokens": 800, "reviewer_max_tokens": 600, "executor_think": false}, "pathway_notes": "concrete pre-run advice — what to pre-fetch, what to avoid, buffer sizes" } Respond with ONLY the JSON object.`; let raw = ""; try { raw = await generateContinuable(overviewModel, prompt, { max_tokens: 1200, shape: "json", cloud, max_continuations: 2, }); } catch (e) { return null; } let parsed: any; try { const m = raw.match(/\{[\s\S]*\}/); parsed = m ? JSON.parse(m[0]) : null; } catch { parsed = null; } if (!parsed) return null; const rec: PathwayRecommendation = { sig_hash: targetHash, generated_at: new Date().toISOString(), generated_by_model: overviewModel, confidence: parsed.confidence ?? "low", rationale: String(parsed.rationale ?? "").slice(0, 1000), top_models: parsed.top_models ?? {}, budget_hints: parsed.budget_hints ?? {}, pathway_notes: String(parsed.pathway_notes ?? "").slice(0, 2000), neighbors_consulted: neighbors.map(n => n.sig.sig_hash), }; await appendFile(join(KB_DIR, RECOMMENDATIONS_FILE), JSON.stringify(rec) + "\n"); return rec; } // Read-back at scenario start: find the newest recommendation for this // exact sig_hash (or nearest neighbor of it). Returns null if nothing // applicable found. export async function loadRecommendation(spec: any): Promise { await ensureKb(); const targetHash = computeSignature(spec); const recs = await readJsonl(RECOMMENDATIONS_FILE); const matching = recs.filter(r => r.sig_hash === targetHash); if (matching.length === 0) return null; matching.sort((a, b) => b.generated_at.localeCompare(a.generated_at)); return matching[0]; } // Record a config snapshot. Called whenever models.json or the active // model assignments change, so error-correction diffs have history to // reference beyond the outcomes file. export async function snapshotConfig( models_json_hash: string, active_models: Record, why: string, ): Promise { await ensureKb(); await appendFile( join(KB_DIR, CONFIG_SNAPSHOTS_FILE), JSON.stringify({ at: new Date().toISOString(), models_json_hash, active_models, why, }) + "\n", ); }