// kb_index — generic on-the-fly aggregation over append-only JSONL // scratchpads (audit_lessons, scrum_reviews, outcomes, observer ops). // // The mem0 insight: raw rows are CHEAP and tell the full story, but // downstream prompts need a DEFINITION, not a log. A definition is // the aggregate: "this signature has fired N times across M distinct // scopes, first_seen=X, last_seen=Y, confidence=M/N." // // This library is the single shared aggregator. Every KB writer keeps // appending raw rows; every KB reader uses aggregate() instead of // tailing the raw stream. No second file to sync, no ADD/UPDATE/NOOP // routing — the stats roll up from the raw rows every time. // // Why this works past hundreds of runs: // - aggregate() is bounded by distinct_signatures, not total_rows. // - confidence = distinct_scopes / count — low for same-scope noise, // high for cross-scope patterns. Downstream severity ramps on // confidence × count, not raw count, so one unfixed PR can't // inflate its own recurrence score (the classic mem0 failure). // - rotation (later) moves old raw to archive files; aggregate() // can still read both to compute lifetime counts when needed. import { readFile } from "node:fs/promises"; export interface AggregateRow { signature: string; count: number; distinct_scopes: number; first_seen: string; last_seen: string; confidence: number; // distinct_scopes / count — capped at 1.0 representative_summary: string; // most-recent summary for this signature scopes: string[]; // up to 20 most-recent scopes for debugging checks: string[]; // distinct `check` values (audit_lessons-specific) } export interface AggregateOptions { /** How to extract the dedup key from a row. */ keyFn: (row: T) => string | undefined; /** How to extract the "scope" — distinct scopes count gives confidence. */ scopeFn: (row: T) => string | undefined; /** How to extract the timestamp (defaults to row.audited_at / row.reviewed_at / row.timestamp). */ timeFn?: (row: T) => string | undefined; /** How to extract a representative summary (defaults to row.summary). */ summaryFn?: (row: T) => string | undefined; /** Max rows to read from the JSONL tail; 0 = read all. */ tailLimit?: number; /** Include per-row check field (for multi-check aggregates). */ checkFn?: (row: T) => string | undefined; } /** * Read a JSONL file and produce the aggregate map keyed by signature. * Safe on missing or malformed files — returns empty map. */ export async function aggregate( jsonlPath: string, opts: AggregateOptions, ): Promise> { const out = new Map(); let raw: string; try { raw = await readFile(jsonlPath, "utf8"); } catch { return out; } const lines = raw.split("\n").filter(l => l.length > 0); const sliceFrom = opts.tailLimit && opts.tailLimit > 0 ? Math.max(0, lines.length - opts.tailLimit) : 0; const timeFn = opts.timeFn ?? ((r: any) => r?.audited_at ?? r?.reviewed_at ?? r?.timestamp ?? r?.ran_at); const summaryFn = opts.summaryFn ?? ((r: any) => r?.summary ?? r?.representative_summary); // Per-signature scope tracking — need counts by scope to compute // distinct_scopes without double-counting a scope that appears 50 // times. Using a Set per signature. const scopeSets = new Map>(); const checkSets = new Map>(); for (let i = sliceFrom; i < lines.length; i++) { let row: T; try { row = JSON.parse(lines[i]) as T; } catch { continue; } const sig = opts.keyFn(row); if (!sig) continue; let agg = out.get(sig); if (!agg) { agg = { signature: sig, count: 0, distinct_scopes: 0, first_seen: "", last_seen: "", confidence: 0, representative_summary: "", scopes: [], checks: [], }; out.set(sig, agg); scopeSets.set(sig, new Set()); checkSets.set(sig, new Set()); } agg.count += 1; const scope = opts.scopeFn(row); if (scope !== undefined && scope !== null && scope !== "") { scopeSets.get(sig)!.add(String(scope)); // Keep scopes array ordered by recency (newest wins — shift // oldest when at cap). const arr = agg.scopes; const s = String(scope); const existing = arr.indexOf(s); if (existing >= 0) arr.splice(existing, 1); arr.push(s); if (arr.length > 20) arr.shift(); } if (opts.checkFn) { const c = opts.checkFn(row); if (c) checkSets.get(sig)!.add(String(c)); } const t = timeFn(row); if (t) { if (!agg.first_seen || t < agg.first_seen) agg.first_seen = t; if (!agg.last_seen || t > agg.last_seen) agg.last_seen = t; } const s = summaryFn(row); if (s) agg.representative_summary = String(s); } // Finalize derived fields. for (const [sig, agg] of out) { const scopes = scopeSets.get(sig) ?? new Set(); agg.distinct_scopes = scopes.size; agg.confidence = agg.count > 0 ? Math.min(1, agg.distinct_scopes / agg.count) : 0; const checks = checkSets.get(sig); if (checks) agg.checks = Array.from(checks).sort(); } return out; } /** * Severity policy derived from aggregate stats. The rating lives here * (not in each check) so all KB readers ramp severity consistently. * * - confidence × count product is the real signal. * - Low confidence (< 0.3) = same-scope noise → info regardless of count. * - Mid confidence (0.3-0.6) = mixed signal → warn at count ≥ 3. * - High confidence (> 0.6) with count ≥ 5 = block-worthy cross-cutting pattern. * * Callers can override by reading agg directly; this is the default * policy that matches the "don't escalate one unfixed PR" discipline. */ export function ratingSeverity(agg: AggregateRow): "info" | "warn" | "block" { if (agg.confidence >= 0.6 && agg.count >= 5) return "block"; if (agg.confidence >= 0.3 && agg.count >= 3) return "warn"; return "info"; } /** Human-friendly one-line summary of an aggregate row for finding evidence. */ export function formatAgg(agg: AggregateRow): string { return `count=${agg.count} distinct_scopes=${agg.distinct_scopes} confidence=${agg.confidence.toFixed(2)} seen=[${agg.first_seen.slice(0, 10)}..${agg.last_seen.slice(0, 10)}]`; }