lakehouse/auditor/checks/kb_query.ts
profit 0306dd88c1
Some checks failed
lakehouse/auditor 2 blocking issues: unimplemented!() macro call in tests/real-world/hard_task_escalation.ts
auditor: close the verdict→playbook loop + fix rubric-string false positive
Two changes that fell out of running the auto-loop for real on PR #8:

1. The systemd auditor blocked PR #8 on 'unimplemented!()' / 'todo!()'
   in tests/real-world/hard_task_escalation.ts — but those strings are
   the rubric itself, not macro calls. Added isInsideQuotedString()
   detection in static.ts: BLOCK_PATTERNS now skip matches that fall
   inside double-quoted / single-quoted / backtick string literals on
   the added line. WARN/INFO patterns still run — a TODO comment in
   a string is still a valid signal.

2. Verdicts were being persisted to disk but never fed back as
   learning signal. Added appendAuditLessons() — every block/warn
   finding writes a JSONL row to data/_kb/audit_lessons.jsonl with a
   path-agnostic signature (strips file paths, line numbers, commit
   hashes) so the SAME class of finding on DIFFERENT files dedups to
   one signature.

   kb_query now tails audit_lessons.jsonl and emits recurrence
   findings: 2 distinct PRs hit a signature = info, 3-4 = warn, 5+ =
   block. Severity ramps on distinct-PR count, not total rows, so a
   single unfixed PR being re-audited doesn't inflate its own
   recurrence score.

Fires on post-verdict fire-and-forget (can't break the audit if
disk write fails). The learning loop is now closed: each audit
contributes to the KB that guides the next audit.

Tested: unit tests for normalizedSignature confirmed path-agnostic
dedup; static.ts regression tests confirmed rubric strings no longer
trip BLOCK while real unquoted unimplemented!() still does.
2026-04-22 21:31:35 -05:00

317 lines
13 KiB
TypeScript

// Local-KB check — reads data/_kb/ + data/_observer/ + data/_bot/
// for prior evidence bearing on this PR's claims. Cheap, offline,
// no model calls. The point: if a claim like "Phase X shipped" has
// a historical record of failing on the same signature before, the
// auditor surfaces that pattern before the cloud check has to
// infer it.
//
// What this check reads (all file-backed, append-only or periodic):
// data/_kb/outcomes.jsonl — per-scenario outcomes (kb.ts)
// data/_kb/error_corrections.jsonl — fail→succeed deltas on same sig
// data/_kb/scrum_reviews.jsonl — scrum-master accepted reviews
// data/_observer/ops.jsonl — observer ring → disk stream
// data/_bot/cycles/*.json — bot cycle results
//
// Each JSONL line / per-cycle file is small; this check reads tails
// only (last N lines or last M files) to stay cheap on large corpora.
import { readFile, readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import type { Claim, Finding } from "../types.ts";
const KB_DIR = "/home/profit/lakehouse/data/_kb";
const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl";
const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles";
const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl";
const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl";
const TAIL_LINES = 500;
const MAX_BOT_CYCLE_FILES = 30;
// Recurrence threshold — at this count a warn becomes a block.
// The rationale: three independent audits all flagging the SAME
// pattern signature is strong evidence the pattern is a real
// problem, not noise. One occurrence = info, two = warn, three+ = block.
const RECURRENCE_BLOCK_THRESHOLD = 3;
export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promise<Finding[]> {
const findings: Finding[] = [];
// 1. Recent scenario outcomes: are strong-claim-style phrases showing
// up alongside failed events? That's "we claimed it worked" +
// "it didn't" in the KB.
const scenarioFindings = await checkScenarioOutcomes(claims);
findings.push(...scenarioFindings);
// 2. Error corrections: any of the claims text overlap a
// recently-observed fail→succeed pair? If yes, add context.
const correctionFindings = await checkErrorCorrections(claims);
findings.push(...correctionFindings);
// 3. Bot cycles: any prior bot cycle ended in tests_failed or
// apply_failed on a file this PR is also touching?
const botFindings = await checkBotCycles();
findings.push(...botFindings);
// 4. Observer: count recent error events. High volume = shared
// infra problem, worth flagging (context for other findings).
const obsFindings = await checkObserverStream();
findings.push(...obsFindings);
// 5. Scrum-master reviews — surface prior accepted reviews for any
// file in this PR's diff. Cohesion plan Phase C wire: the
// auditor gets to "borrow" the scrum-master's deeper per-file
// analysis instead of re-doing that work.
if (prFiles.length > 0) {
const scrumFindings = await checkScrumReviews(prFiles);
findings.push(...scrumFindings);
}
// 6. Audit-lessons feedback loop — summarize the top recurring
// patterns from prior audits' block/warn findings. If the same
// pattern signature has fired 3+ times across prior audits,
// emit it as a block-severity finding so reviewers know this
// is a known-recurring class, not a one-off. Does NOT couple
// to the current audit's static/inference findings (those run
// in parallel and we can't see them here) — the amplification
// is emergent: if the current audit's finding-summary matches
// a top recurrence, the reviewer sees both.
const auditLessonFindings = await checkAuditLessons();
findings.push(...auditLessonFindings);
return findings;
}
async function tailJsonl<T = any>(path: string, n: number): Promise<T[]> {
try {
const raw = await readFile(path, "utf8");
const lines = raw.split("\n").filter(l => l.length > 0);
const slice = lines.slice(-n);
const out: T[] = [];
for (const line of slice) {
try { out.push(JSON.parse(line)); } catch { /* skip malformed */ }
}
return out;
} catch {
return [];
}
}
async function checkScenarioOutcomes(_claims: Claim[]): Promise<Finding[]> {
const outcomes = await tailJsonl<any>(join(KB_DIR, "outcomes.jsonl"), TAIL_LINES);
if (outcomes.length === 0) return [];
const totalEvents = outcomes.reduce((s, o) => s + (o.total_events ?? 0), 0);
const okEvents = outcomes.reduce((s, o) => s + (o.ok_events ?? 0), 0);
const failRate = totalEvents > 0 ? 1 - okEvents / totalEvents : 0;
if (totalEvents === 0) {
return [{
check: "kb_query",
severity: "info",
summary: `KB: no scenario outcomes on file — learning loop is empty`,
evidence: [`data/_kb/outcomes.jsonl has ${outcomes.length} entries with 0 total events`],
}];
}
const recent = outcomes.slice(-10);
const recentFailSigs: string[] = recent
.filter(o => (o.ok_events ?? 0) < (o.total_events ?? 0))
.map(o => o.sig_hash)
.filter(s => typeof s === "string");
const findings: Finding[] = [{
check: "kb_query",
severity: failRate > 0.3 ? "warn" : "info",
summary: `KB: ${outcomes.length} recent scenario runs, ${okEvents}/${totalEvents} events ok (fail rate ${(failRate * 100).toFixed(1)}%)`,
evidence: [
`most recent: ${recent[recent.length - 1]?.run_id ?? "?"}`,
`recent failing sigs: ${recentFailSigs.length > 0 ? recentFailSigs.slice(-3).join(", ") : "none"}`,
],
}];
return findings;
}
async function checkErrorCorrections(_claims: Claim[]): Promise<Finding[]> {
const corrections = await tailJsonl<any>(join(KB_DIR, "error_corrections.jsonl"), TAIL_LINES);
if (corrections.length === 0) return [];
return [{
check: "kb_query",
severity: "info",
summary: `KB: ${corrections.length} error corrections on file (fail→succeed pairs)`,
evidence: [
corrections.length > 0
? `most recent: ${String(corrections[corrections.length - 1]?.sig_hash ?? "?").slice(0, 24)}`
: "none",
],
}];
}
async function checkBotCycles(): Promise<Finding[]> {
let entries: string[] = [];
try { entries = await readdir(BOT_CYCLES_DIR); }
catch { return []; }
const jsonFiles = entries.filter(e => e.endsWith(".json"));
if (jsonFiles.length === 0) return [];
// Sort by mtime desc, take most recent N
const withStat = await Promise.all(
jsonFiles.map(async name => {
try { return { name, mtime: (await stat(join(BOT_CYCLES_DIR, name))).mtimeMs }; }
catch { return { name, mtime: 0 }; }
}),
);
const recent = withStat.sort((a, b) => b.mtime - a.mtime).slice(0, MAX_BOT_CYCLE_FILES);
const outcomes: Record<string, number> = {};
for (const { name } of recent) {
try {
const r = JSON.parse(await readFile(join(BOT_CYCLES_DIR, name), "utf8"));
const o = String(r.outcome ?? "unknown");
outcomes[o] = (outcomes[o] ?? 0) + 1;
} catch { /* skip */ }
}
const summary = Object.entries(outcomes)
.sort((a, b) => b[1] - a[1])
.map(([k, v]) => `${k}=${v}`)
.join(", ");
const failCount = (outcomes["tests_failed"] ?? 0) + (outcomes["apply_failed"] ?? 0) + (outcomes["model_failed"] ?? 0);
return [{
check: "kb_query",
severity: failCount > recent.length / 2 ? "warn" : "info",
summary: `KB: bot recorded ${recent.length} recent cycles — ${summary || "no outcomes parsed"}`,
evidence: [
`dir: ${BOT_CYCLES_DIR}`,
`fail-class total: ${failCount} / ${recent.length}`,
],
}];
}
async function checkObserverStream(): Promise<Finding[]> {
const ops = await tailJsonl<any>(OBSERVER_OPS, TAIL_LINES);
if (ops.length === 0) return [];
const failures = ops.filter(o => o.ok === false).length;
return [{
check: "kb_query",
severity: "info",
summary: `KB: observer stream ${ops.length} recent ops, ${failures} failures`,
evidence: [
`source: ${OBSERVER_OPS}`,
`by source: ${observerBySource(ops)}`,
],
}];
}
function observerBySource(ops: any[]): string {
const c: Record<string, number> = {};
for (const o of ops) {
const s = String(o.source ?? "unknown");
c[s] = (c[s] ?? 0) + 1;
}
return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty";
}
// Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by
// every audit's appendAuditLessons). Groups rows by `signature` (the
// check-normalized dedup key) and emits a finding per signature that
// has 2+ occurrences. Severity ramps with count: 2 = info, 3-4 = warn,
// 5+ = block. This is how the auditor accumulates institutional
// memory: without this check, a recurring flaw (placeholder code
// class X, unbacked claim pattern Y) looks new every audit.
async function checkAuditLessons(): Promise<Finding[]> {
const rows = await tailJsonl<any>(AUDIT_LESSONS_JSONL, TAIL_LINES * 4);
if (rows.length === 0) return [];
type Agg = { count: number; last_summary: string; last_pr: number; last_sha: string; checks: Set<string>; prs: Set<number> };
const bySig = new Map<string, Agg>();
for (const r of rows) {
const sig = String(r.signature ?? "");
if (!sig) continue;
const a = bySig.get(sig) ?? {
count: 0, last_summary: "", last_pr: 0, last_sha: "",
checks: new Set<string>(), prs: new Set<number>(),
};
a.count += 1;
a.last_summary = String(r.summary ?? a.last_summary);
a.last_pr = Number(r.pr_number ?? a.last_pr);
a.last_sha = String(r.head_sha ?? a.last_sha);
if (r.check) a.checks.add(String(r.check));
if (r.pr_number) a.prs.add(Number(r.pr_number));
bySig.set(sig, a);
}
const findings: Finding[] = [];
// Emit only signatures with 2+ prior PRs (not just 2+ rows — a
// single unresolved PR being re-audited on every push would
// otherwise self-inflate). Distinct-PRs count is the real signal.
for (const [sig, a] of bySig) {
if (a.prs.size < 2) continue;
const sev: "block" | "warn" | "info" =
a.prs.size >= RECURRENCE_BLOCK_THRESHOLD + 2 ? "block" :
a.prs.size >= RECURRENCE_BLOCK_THRESHOLD ? "warn" : "info";
findings.push({
check: "kb_query",
severity: sev,
summary: `recurring audit pattern (${a.prs.size} distinct PRs, ${a.count} total flaggings): ${a.last_summary.slice(0, 180)}`,
evidence: [
`signature=${sig}`,
`checks: ${Array.from(a.checks).join(",")}`,
`PRs: ${Array.from(a.prs).sort((x,y)=>x-y).join(",")}`,
`most recent: PR #${a.last_pr} @ ${a.last_sha.slice(0, 12)}`,
],
});
}
return findings;
}
// Scrum-master reviews — the scrum pipeline writes one row per
// accepted per-file review. We match reviews whose `file` matches
// any path in the PR's diff, then surface the *preview* + which
// model the escalation ladder had to reach. If the scrum-master
// needed the 123B specialist or larger to resolve a file, that's
// a meaningful signal about the code's complexity — and it's
// surfaced to the PR without the auditor having to re-run the
// escalation ladder itself.
async function checkScrumReviews(prFiles: string[]): Promise<Finding[]> {
const rows = await tailJsonl<any>(SCRUM_REVIEWS_JSONL, TAIL_LINES);
if (rows.length === 0) return [];
// Match by exact file OR filename suffix — PR files arrive as
// `auditor/audit.ts`-style relative paths; scrum stores the same.
const norm = (p: string) => p.replace(/^\/+/, "").replace(/^home\/profit\/lakehouse\//, "");
const prSet = new Set(prFiles.map(norm));
// Keep only the most recent review per file (last-wins).
const latestByFile = new Map<string, any>();
for (const r of rows) {
const f = norm(String(r.file ?? ""));
if (!f) continue;
if (!prSet.has(f)) continue;
latestByFile.set(f, r);
}
if (latestByFile.size === 0) return [];
const findings: Finding[] = [];
for (const [file, r] of latestByFile) {
const model = String(r.accepted_model ?? "?");
const attempt = r.accepted_on_attempt ?? "?";
const treeSplit = !!r.tree_split_fired;
// Heuristic: if the scrum-master had to escalate past attempt 3,
// or had to tree-split, that's context the PR reviewer should see.
// Severity: info for low-escalation, warn if escalated far up
// the ladder (cloud specialist required).
const heavyEscalation = Number(attempt) >= 4;
const sev: "warn" | "info" = heavyEscalation ? "warn" : "info";
findings.push({
check: "kb_query",
severity: sev,
summary: `scrum-master review for \`${file}\` — accepted on attempt ${attempt} by \`${model}\`${treeSplit ? " (tree-split)" : ""}`,
evidence: [
`reviewed_at: ${r.reviewed_at ?? "?"}`,
`preview: ${String(r.suggestions_preview ?? "").slice(0, 300).replace(/\n/g, " ")}`,
],
});
}
return findings;
}