diff --git a/auditor/checks/inference.ts b/auditor/checks/inference.ts index d24bad2..d3a0bde 100644 --- a/auditor/checks/inference.ts +++ b/auditor/checks/inference.ts @@ -260,10 +260,13 @@ function extractSymbols(text: string): string[] { // Scan the repo for at least one definition of each symbol. Uses Bun's // Glob to walk TS/Rust/Python/JS sources; ignores node_modules, data/, -// and target/. +// and target/. Skips files > 500KB — those are fixtures/snapshots that +// won't contain a definition line and slurping them slows the audit. async function symbolsExistInRepo(symbols: string[]): Promise { const patterns = ["**/*.ts", "**/*.tsx", "**/*.rs", "**/*.py", "**/*.js"]; const skip = (p: string) => p.includes("/node_modules/") || p.startsWith("data/") || p.includes("/target/") || p.startsWith("dist/"); + const MAX_FILE_BYTES = 500_000; + const { stat } = await import("node:fs/promises"); const resolved = new Set(); const toFind = new Set(symbols); for (const pat of patterns) { @@ -271,6 +274,7 @@ async function symbolsExistInRepo(symbols: string[]): Promise { const glob = new Glob(pat); for await (const f of glob.scan({ cwd: REPO_ROOT, onlyFiles: true })) { if (skip(f)) continue; + try { const s = await stat(`${REPO_ROOT}/${f}`); if (s.size > MAX_FILE_BYTES) continue; } catch { continue; } let content: string; try { content = await readFile(`${REPO_ROOT}/${f}`, "utf8"); } catch { continue; } for (const sym of Array.from(toFind)) { diff --git a/auditor/checks/kb_query.ts b/auditor/checks/kb_query.ts index 0c79242..8daa410 100644 --- a/auditor/checks/kb_query.ts +++ b/auditor/checks/kb_query.ts @@ -18,6 +18,7 @@ import { readFile, readdir, stat } from "node:fs/promises"; import { join } from "node:path"; import type { Claim, Finding } from "../types.ts"; +import { aggregate, ratingSeverity, formatAgg } from "../kb_index.ts"; const KB_DIR = "/home/profit/lakehouse/data/_kb"; const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl"; @@ -26,11 +27,6 @@ const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl"; const TAIL_LINES = 500; const MAX_BOT_CYCLE_FILES = 30; -// Recurrence threshold — at this count a warn becomes a block. -// The rationale: three independent audits all flagging the SAME -// pattern signature is strong evidence the pattern is a real -// problem, not noise. One occurrence = info, two = warn, three+ = block. -const RECURRENCE_BLOCK_THRESHOLD = 3; export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promise { const findings: Finding[] = []; @@ -212,52 +208,35 @@ function observerBySource(ops: any[]): string { } // Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by -// every audit's appendAuditLessons). Groups rows by `signature` (the -// check-normalized dedup key) and emits a finding per signature that -// has 2+ occurrences. Severity ramps with count: 2 = info, 3-4 = warn, -// 5+ = block. This is how the auditor accumulates institutional -// memory: without this check, a recurring flaw (placeholder code -// class X, unbacked claim pattern Y) looks new every audit. +// every audit's appendAuditLessons). Uses the shared kb_index +// aggregator: groups by `signature`, distinct-scopes keyed by PR +// number, severity from ratingSeverity(agg) which applies the +// confidence × count rating (see kb_index.ts). This is the same +// aggregation any other KB reader uses — shared discipline, not +// per-check custom logic. async function checkAuditLessons(): Promise { - const rows = await tailJsonl(AUDIT_LESSONS_JSONL, TAIL_LINES * 4); - if (rows.length === 0) return []; - - type Agg = { count: number; last_summary: string; last_pr: number; last_sha: string; checks: Set; prs: Set }; - const bySig = new Map(); - for (const r of rows) { - const sig = String(r.signature ?? ""); - if (!sig) continue; - const a = bySig.get(sig) ?? { - count: 0, last_summary: "", last_pr: 0, last_sha: "", - checks: new Set(), prs: new Set(), - }; - a.count += 1; - a.last_summary = String(r.summary ?? a.last_summary); - a.last_pr = Number(r.pr_number ?? a.last_pr); - a.last_sha = String(r.head_sha ?? a.last_sha); - if (r.check) a.checks.add(String(r.check)); - if (r.pr_number) a.prs.add(Number(r.pr_number)); - bySig.set(sig, a); - } + const bySig = await aggregate(AUDIT_LESSONS_JSONL, { + keyFn: (r) => r?.signature, + scopeFn: (r) => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined), + checkFn: (r) => r?.check, + tailLimit: TAIL_LINES * 4, + }); + if (bySig.size === 0) return []; const findings: Finding[] = []; - // Emit only signatures with 2+ prior PRs (not just 2+ rows — a - // single unresolved PR being re-audited on every push would - // otherwise self-inflate). Distinct-PRs count is the real signal. - for (const [sig, a] of bySig) { - if (a.prs.size < 2) continue; - const sev: "block" | "warn" | "info" = - a.prs.size >= RECURRENCE_BLOCK_THRESHOLD + 2 ? "block" : - a.prs.size >= RECURRENCE_BLOCK_THRESHOLD ? "warn" : "info"; + for (const [sig, agg] of bySig) { + // Silent on first-ever occurrence — not yet signal. + if (agg.count < 2) continue; + const sev = ratingSeverity(agg); findings.push({ check: "kb_query", severity: sev, - summary: `recurring audit pattern (${a.prs.size} distinct PRs, ${a.count} total flaggings): ${a.last_summary.slice(0, 180)}`, + summary: `recurring audit pattern (${agg.distinct_scopes} distinct PRs, ${agg.count} flaggings, conf=${agg.confidence.toFixed(2)}): ${agg.representative_summary.slice(0, 160)}`, evidence: [ `signature=${sig}`, - `checks: ${Array.from(a.checks).join(",")}`, - `PRs: ${Array.from(a.prs).sort((x,y)=>x-y).join(",")}`, - `most recent: PR #${a.last_pr} @ ${a.last_sha.slice(0, 12)}`, + `checks: ${agg.checks.join(",")}`, + `scopes: ${agg.scopes.slice(-6).join(",")}`, + formatAgg(agg), ], }); } diff --git a/auditor/kb_index.ts b/auditor/kb_index.ts new file mode 100644 index 0000000..d7cbeb6 --- /dev/null +++ b/auditor/kb_index.ts @@ -0,0 +1,161 @@ +// kb_index — generic on-the-fly aggregation over append-only JSONL +// scratchpads (audit_lessons, scrum_reviews, outcomes, observer ops). +// +// The mem0 insight: raw rows are CHEAP and tell the full story, but +// downstream prompts need a DEFINITION, not a log. A definition is +// the aggregate: "this signature has fired N times across M distinct +// scopes, first_seen=X, last_seen=Y, confidence=M/N." +// +// This library is the single shared aggregator. Every KB writer keeps +// appending raw rows; every KB reader uses aggregate() instead of +// tailing the raw stream. No second file to sync, no ADD/UPDATE/NOOP +// routing — the stats roll up from the raw rows every time. +// +// Why this works past hundreds of runs: +// - aggregate() is bounded by distinct_signatures, not total_rows. +// - confidence = distinct_scopes / count — low for same-scope noise, +// high for cross-scope patterns. Downstream severity ramps on +// confidence × count, not raw count, so one unfixed PR can't +// inflate its own recurrence score (the classic mem0 failure). +// - rotation (later) moves old raw to archive files; aggregate() +// can still read both to compute lifetime counts when needed. + +import { readFile } from "node:fs/promises"; + +export interface AggregateRow { + signature: string; + count: number; + distinct_scopes: number; + first_seen: string; + last_seen: string; + confidence: number; // distinct_scopes / count — capped at 1.0 + representative_summary: string; // most-recent summary for this signature + scopes: string[]; // up to 20 most-recent scopes for debugging + checks: string[]; // distinct `check` values (audit_lessons-specific) +} + +export interface AggregateOptions { + /** How to extract the dedup key from a row. */ + keyFn: (row: T) => string | undefined; + /** How to extract the "scope" — distinct scopes count gives confidence. */ + scopeFn: (row: T) => string | undefined; + /** How to extract the timestamp (defaults to row.audited_at / row.reviewed_at / row.timestamp). */ + timeFn?: (row: T) => string | undefined; + /** How to extract a representative summary (defaults to row.summary). */ + summaryFn?: (row: T) => string | undefined; + /** Max rows to read from the JSONL tail; 0 = read all. */ + tailLimit?: number; + /** Include per-row check field (for multi-check aggregates). */ + checkFn?: (row: T) => string | undefined; +} + +/** + * Read a JSONL file and produce the aggregate map keyed by signature. + * Safe on missing or malformed files — returns empty map. + */ +export async function aggregate( + jsonlPath: string, + opts: AggregateOptions, +): Promise> { + const out = new Map(); + let raw: string; + try { raw = await readFile(jsonlPath, "utf8"); } catch { return out; } + const lines = raw.split("\n").filter(l => l.length > 0); + const sliceFrom = opts.tailLimit && opts.tailLimit > 0 ? Math.max(0, lines.length - opts.tailLimit) : 0; + + const timeFn = opts.timeFn ?? ((r: any) => r?.audited_at ?? r?.reviewed_at ?? r?.timestamp ?? r?.ran_at); + const summaryFn = opts.summaryFn ?? ((r: any) => r?.summary ?? r?.representative_summary); + + // Per-signature scope tracking — need counts by scope to compute + // distinct_scopes without double-counting a scope that appears 50 + // times. Using a Set per signature. + const scopeSets = new Map>(); + const checkSets = new Map>(); + + for (let i = sliceFrom; i < lines.length; i++) { + let row: T; + try { row = JSON.parse(lines[i]) as T; } catch { continue; } + const sig = opts.keyFn(row); + if (!sig) continue; + + let agg = out.get(sig); + if (!agg) { + agg = { + signature: sig, + count: 0, + distinct_scopes: 0, + first_seen: "", + last_seen: "", + confidence: 0, + representative_summary: "", + scopes: [], + checks: [], + }; + out.set(sig, agg); + scopeSets.set(sig, new Set()); + checkSets.set(sig, new Set()); + } + + agg.count += 1; + + const scope = opts.scopeFn(row); + if (scope !== undefined && scope !== null && scope !== "") { + scopeSets.get(sig)!.add(String(scope)); + // Keep scopes array ordered by recency (newest wins — shift + // oldest when at cap). + const arr = agg.scopes; + const s = String(scope); + const existing = arr.indexOf(s); + if (existing >= 0) arr.splice(existing, 1); + arr.push(s); + if (arr.length > 20) arr.shift(); + } + + if (opts.checkFn) { + const c = opts.checkFn(row); + if (c) checkSets.get(sig)!.add(String(c)); + } + + const t = timeFn(row); + if (t) { + if (!agg.first_seen || t < agg.first_seen) agg.first_seen = t; + if (!agg.last_seen || t > agg.last_seen) agg.last_seen = t; + } + + const s = summaryFn(row); + if (s) agg.representative_summary = String(s); + } + + // Finalize derived fields. + for (const [sig, agg] of out) { + const scopes = scopeSets.get(sig) ?? new Set(); + agg.distinct_scopes = scopes.size; + agg.confidence = agg.count > 0 ? Math.min(1, agg.distinct_scopes / agg.count) : 0; + const checks = checkSets.get(sig); + if (checks) agg.checks = Array.from(checks).sort(); + } + return out; +} + +/** + * Severity policy derived from aggregate stats. The rating lives here + * (not in each check) so all KB readers ramp severity consistently. + * + * - confidence × count product is the real signal. + * - Low confidence (< 0.3) = same-scope noise → info regardless of count. + * - Mid confidence (0.3-0.6) = mixed signal → warn at count ≥ 3. + * - High confidence (> 0.6) with count ≥ 5 = block-worthy cross-cutting pattern. + * + * Callers can override by reading agg directly; this is the default + * policy that matches the "don't escalate one unfixed PR" discipline. + */ +export function ratingSeverity(agg: AggregateRow): "info" | "warn" | "block" { + if (agg.confidence >= 0.6 && agg.count >= 5) return "block"; + if (agg.confidence >= 0.3 && agg.count >= 3) return "warn"; + return "info"; +} + +/** Human-friendly one-line summary of an aggregate row for finding evidence. */ +export function formatAgg(agg: AggregateRow): string { + return `count=${agg.count} distinct_scopes=${agg.distinct_scopes} confidence=${agg.confidence.toFixed(2)} seen=[${agg.first_seen.slice(0, 10)}..${agg.last_seen.slice(0, 10)}]`; +} diff --git a/tests/real-world/nine_consecutive_audits.ts b/tests/real-world/nine_consecutive_audits.ts new file mode 100644 index 0000000..999a9de --- /dev/null +++ b/tests/real-world/nine_consecutive_audits.ts @@ -0,0 +1,181 @@ +// Nine-consecutive audit runner — empirical test of the predictive- +// compounding property. Pushes 9 empty commits to the current branch, +// waits for each audit to complete on the new SHA, captures the +// verdict + audit_lessons state after each run, and reports whether +// the KB stabilizes or drifts. +// +// What we expect (favorable compounding): +// - signature_count grows sublinearly (same patterns recur, so +// distinct-signature count stabilizes fast) +// - verdict settles on a stable value after run 2-3 (first audit +// establishes baseline, rest repeat) +// - confidence stays LOW for all signatures (same PR repeatedly) +// - NO new recurring findings fire because confidence < 0.3 on +// same-PR noise (kb_index rating policy) +// +// What would indicate drift (the thing we want to prove DOESN'T happen): +// - signature_count grows linearly — each run produces new signatures +// - verdict oscillates (block → approve → block ...) +// - confidence inflates — kb_index rating escalates on repeated runs +// +// Run: bun run tests/real-world/nine_consecutive_audits.ts + +import { readFile } from "node:fs/promises"; +import { aggregate } from "../../auditor/kb_index.ts"; + +const REPO = "/home/profit/lakehouse"; +const AUDIT_LESSONS = `${REPO}/data/_kb/audit_lessons.jsonl`; +const VERDICTS_DIR = `${REPO}/data/_auditor/verdicts`; +const POLL_INTERVAL_MS = 5_000; +const AUDIT_TIMEOUT_MS = 180_000; +const RUNS = 9; +const TARGET_PR = Number(process.env.LH_AUDIT_PR ?? 8); + +async function sh(cmd: string): Promise<{ stdout: string; stderr: string; code: number }> { + const p = Bun.spawn(["bash", "-lc", cmd], { cwd: REPO, stdout: "pipe", stderr: "pipe" }); + const [stdout, stderr] = await Promise.all([new Response(p.stdout).text(), new Response(p.stderr).text()]); + const code = await p.exited; + return { stdout, stderr, code }; +} + +async function getHeadSha(): Promise { + const r = await sh("git rev-parse HEAD"); + return r.stdout.trim(); +} + +async function pushEmptyCommit(n: number): Promise { + const msg = `test: nine-consecutive audit run ${n}/${RUNS} (compounding probe)`; + await sh(`GIT_AUTHOR_NAME=profit GIT_AUTHOR_EMAIL=profit@lakehouse GIT_COMMITTER_NAME=profit GIT_COMMITTER_EMAIL=profit@lakehouse git commit --allow-empty -m "${msg}"`); + const sha = await getHeadSha(); + const pushCmd = `PAT="dead60d1160a02f81d241197d5d18f4608794fb2"; git -c credential.helper='!f() { echo "username=profit"; echo "password='$PAT'"; }; f' push origin HEAD 2>&1`; + const pr = await sh(pushCmd); + if (pr.code !== 0) throw new Error(`push failed: ${pr.stderr || pr.stdout}`); + return sha; +} + +async function waitForVerdict(sha: string, deadlineMs: number): Promise { + const short = sha.slice(0, 12); + const path = `${VERDICTS_DIR}/${TARGET_PR}-${short}.json`; + const start = Date.now(); + while (Date.now() - start < deadlineMs) { + try { + const raw = await readFile(path, "utf8"); + return JSON.parse(raw); + } catch { /* not yet */ } + await new Promise(r => setTimeout(r, POLL_INTERVAL_MS)); + } + throw new Error(`no verdict file after ${deadlineMs}ms: ${path}`); +} + +async function captureAggState(): Promise<{ sig_count: number; max_count: number; max_confidence: number; top3: Array<{ sig: string; count: number; conf: number; summary: string }> }> { + const agg = await aggregate(AUDIT_LESSONS, { + keyFn: (r) => r?.signature, + scopeFn: (r) => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined), + }); + const list = Array.from(agg.values()).sort((a, b) => b.count - a.count); + return { + sig_count: list.length, + max_count: list[0]?.count ?? 0, + max_confidence: list.reduce((m, a) => Math.max(m, a.confidence), 0), + top3: list.slice(0, 3).map(a => ({ + sig: a.signature, + count: a.count, + conf: a.confidence, + summary: a.representative_summary.slice(0, 80), + })), + }; +} + +interface RunRecord { + run: number; + sha: string; + verdict_overall: string; + findings_total: number; + findings_block: number; + findings_warn: number; + findings_info: number; + audit_duration_ms: number; + claims_total: number; + claims_empirical: number; + kb_sig_count_after: number; + kb_max_count_after: number; + kb_max_confidence_after: number; +} + +async function main() { + console.log(`[nine] target PR: #${TARGET_PR}`); + console.log(`[nine] runs: ${RUNS}`); + console.log(`[nine] audit_lessons.jsonl: ${AUDIT_LESSONS}`); + console.log(""); + + const baseline = await captureAggState(); + console.log(`[nine] baseline: sig_count=${baseline.sig_count} max_count=${baseline.max_count} max_conf=${baseline.max_confidence.toFixed(2)}`); + console.log(""); + + const records: RunRecord[] = []; + for (let n = 1; n <= RUNS; n++) { + const t0 = Date.now(); + console.log(`─── run ${n}/${RUNS} ───`); + const sha = await pushEmptyCommit(n); + console.log(` pushed ${sha.slice(0, 12)}`); + const verdict = await waitForVerdict(sha, AUDIT_TIMEOUT_MS); + const after = await captureAggState(); + const rec: RunRecord = { + run: n, + sha: sha.slice(0, 12), + verdict_overall: String(verdict.overall), + findings_total: Number(verdict.metrics?.findings_total ?? 0), + findings_block: Number(verdict.metrics?.findings_block ?? 0), + findings_warn: Number(verdict.metrics?.findings_warn ?? 0), + findings_info: Number(verdict.metrics?.findings_info ?? 0), + audit_duration_ms: Number(verdict.metrics?.audit_duration_ms ?? 0), + claims_total: Number(verdict.metrics?.claims_total ?? 0), + claims_empirical: Number(verdict.metrics?.claims_empirical ?? 0), + kb_sig_count_after: after.sig_count, + kb_max_count_after: after.max_count, + kb_max_confidence_after: after.max_confidence, + }; + records.push(rec); + console.log(` verdict=${rec.verdict_overall} findings=${rec.findings_total} (b=${rec.findings_block} w=${rec.findings_warn})`); + console.log(` kb after: sig=${rec.kb_sig_count_after} max_count=${rec.kb_max_count_after} max_conf=${rec.kb_max_confidence_after.toFixed(2)}`); + console.log(` elapsed: ${((Date.now() - t0) / 1000).toFixed(1)}s`); + console.log(""); + } + + console.log("═══ FINAL ═══"); + console.log("run | verdict | find | block warn info | dur_s | kb_sig max_count max_conf"); + for (const r of records) { + console.log( + ` ${String(r.run).padStart(1)} | ${r.verdict_overall.padEnd(16)} | ${String(r.findings_total).padStart(4)} | ${String(r.findings_block).padStart(5)} ${String(r.findings_warn).padStart(5)} ${String(r.findings_info).padStart(5)} | ${(r.audit_duration_ms / 1000).toFixed(1).padStart(5)} | ${String(r.kb_sig_count_after).padStart(6)} ${String(r.kb_max_count_after).padStart(9)} ${r.kb_max_confidence_after.toFixed(2)}`, + ); + } + + console.log(""); + console.log("═══ COMPOUNDING PROPERTY ═══"); + const sigDelta = records[records.length - 1].kb_sig_count_after - baseline.sig_count; + const maxCount = records[records.length - 1].kb_max_count_after; + const maxConf = records[records.length - 1].kb_max_confidence_after; + console.log(` signatures added over ${RUNS} runs: ${sigDelta}`); + console.log(` max count after run ${RUNS}: ${maxCount} (same-PR recurrences per signature)`); + console.log(` max confidence after run ${RUNS}: ${maxConf.toFixed(2)} (expect LOW — same-PR should not inflate)`); + + const verdictSet = new Set(records.map(r => r.verdict_overall)); + if (verdictSet.size === 1) { + console.log(` verdict stable: all ${RUNS} runs returned '${[...verdictSet][0]}' ✓`); + } else { + console.log(` verdict oscillated across runs: ${[...verdictSet].join(" | ")} ✗`); + } + + if (maxConf < 0.3) { + console.log(` confidence policy holding: same-PR noise stays below escalation threshold ✓`); + } else { + console.log(` ⚠ confidence escalated above 0.3 on same-PR noise — kb_index policy needs tightening`); + } + + const jsonOut = `${REPO}/tests/real-world/runs/nine_consecutive_${Date.now().toString(36)}.json`; + await Bun.write(jsonOut, JSON.stringify({ target_pr: TARGET_PR, baseline, records }, null, 2)); + console.log(""); + console.log(` report: ${jsonOut}`); +} + +main().catch(e => { console.error("[nine] fatal:", e); process.exit(1); });