lakehouse/auditor/checks/kb_query.ts
profit 9d12a814e3
Some checks failed
lakehouse/auditor 1 blocking issue: cloud: claim not backed — "the proven escalation ladder with learning context, collects"
auditor: kb_index aggregator + nine-consecutive empirical test
Phase 1 — definition-layer over append-only JSONL scratchpads.

auditor/kb_index.ts is the single shared aggregator:

  aggregate<T>(jsonlPath, { keyFn, scopeFn, checkFn, tailLimit })
      → Map<signature, {count, distinct_scopes, confidence,
                        first_seen, last_seen, representative_summary, ...}>

  ratingSeverity(agg) — confidence × count severity policy shared
    across all KB readers. Kills the "same unfixed PR inflates its
    own recurrence score" failure mode by design: confidence =
    distinct_scopes/count, so same-scope noise stays below the 0.3
    escalation threshold no matter how many times it repeats.

checkAuditLessons now routes through aggregate + ratingSeverity.
Net effect: the recurrence detector's bespoke Map/Set bookkeeping is
gone; same behavior, shared discipline, reusable by scrum/observer.

Also: symbolsExistInRepo now skips files >500KB so the audit can't
get stuck slurping a fixture.

Phase 2 — nine-consecutive audit runner.

tests/real-world/nine_consecutive_audits.ts pushes 9 empty commits,
waits for each verdict, captures the audit_lessons aggregate state
after each run, reports:

  - sig_count trajectory (should stabilize, not grow linearly)
  - max_count trajectory (same-signature repeat rate)
  - max_confidence trajectory (must stay LOW on same-PR noise)
  - verdict_stable across runs (must NOT oscillate)

This is the empirical proof that the KB compounds favorably:
noise doesn't escalate itself, and signal stays distinguishable.

Unit-tested both failure modes: same-PR × 9 repeats = conf=0.11
(info); cross-PR × 5 distinct = conf=1.00 (block). The rating
function correctly discriminates.
2026-04-22 21:49:46 -05:00

296 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Local-KB check — reads data/_kb/ + data/_observer/ + data/_bot/
// for prior evidence bearing on this PR's claims. Cheap, offline,
// no model calls. The point: if a claim like "Phase X shipped" has
// a historical record of failing on the same signature before, the
// auditor surfaces that pattern before the cloud check has to
// infer it.
//
// What this check reads (all file-backed, append-only or periodic):
// data/_kb/outcomes.jsonl — per-scenario outcomes (kb.ts)
// data/_kb/error_corrections.jsonl — fail→succeed deltas on same sig
// data/_kb/scrum_reviews.jsonl — scrum-master accepted reviews
// data/_observer/ops.jsonl — observer ring → disk stream
// data/_bot/cycles/*.json — bot cycle results
//
// Each JSONL line / per-cycle file is small; this check reads tails
// only (last N lines or last M files) to stay cheap on large corpora.
import { readFile, readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import type { Claim, Finding } from "../types.ts";
import { aggregate, ratingSeverity, formatAgg } from "../kb_index.ts";
const KB_DIR = "/home/profit/lakehouse/data/_kb";
const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl";
const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles";
const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl";
const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl";
const TAIL_LINES = 500;
const MAX_BOT_CYCLE_FILES = 30;
export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promise<Finding[]> {
const findings: Finding[] = [];
// 1. Recent scenario outcomes: are strong-claim-style phrases showing
// up alongside failed events? That's "we claimed it worked" +
// "it didn't" in the KB.
const scenarioFindings = await checkScenarioOutcomes(claims);
findings.push(...scenarioFindings);
// 2. Error corrections: any of the claims text overlap a
// recently-observed fail→succeed pair? If yes, add context.
const correctionFindings = await checkErrorCorrections(claims);
findings.push(...correctionFindings);
// 3. Bot cycles: any prior bot cycle ended in tests_failed or
// apply_failed on a file this PR is also touching?
const botFindings = await checkBotCycles();
findings.push(...botFindings);
// 4. Observer: count recent error events. High volume = shared
// infra problem, worth flagging (context for other findings).
const obsFindings = await checkObserverStream();
findings.push(...obsFindings);
// 5. Scrum-master reviews — surface prior accepted reviews for any
// file in this PR's diff. Cohesion plan Phase C wire: the
// auditor gets to "borrow" the scrum-master's deeper per-file
// analysis instead of re-doing that work.
if (prFiles.length > 0) {
const scrumFindings = await checkScrumReviews(prFiles);
findings.push(...scrumFindings);
}
// 6. Audit-lessons feedback loop — summarize the top recurring
// patterns from prior audits' block/warn findings. If the same
// pattern signature has fired 3+ times across prior audits,
// emit it as a block-severity finding so reviewers know this
// is a known-recurring class, not a one-off. Does NOT couple
// to the current audit's static/inference findings (those run
// in parallel and we can't see them here) — the amplification
// is emergent: if the current audit's finding-summary matches
// a top recurrence, the reviewer sees both.
const auditLessonFindings = await checkAuditLessons();
findings.push(...auditLessonFindings);
return findings;
}
async function tailJsonl<T = any>(path: string, n: number): Promise<T[]> {
try {
const raw = await readFile(path, "utf8");
const lines = raw.split("\n").filter(l => l.length > 0);
const slice = lines.slice(-n);
const out: T[] = [];
for (const line of slice) {
try { out.push(JSON.parse(line)); } catch { /* skip malformed */ }
}
return out;
} catch {
return [];
}
}
async function checkScenarioOutcomes(_claims: Claim[]): Promise<Finding[]> {
const outcomes = await tailJsonl<any>(join(KB_DIR, "outcomes.jsonl"), TAIL_LINES);
if (outcomes.length === 0) return [];
const totalEvents = outcomes.reduce((s, o) => s + (o.total_events ?? 0), 0);
const okEvents = outcomes.reduce((s, o) => s + (o.ok_events ?? 0), 0);
const failRate = totalEvents > 0 ? 1 - okEvents / totalEvents : 0;
if (totalEvents === 0) {
return [{
check: "kb_query",
severity: "info",
summary: `KB: no scenario outcomes on file — learning loop is empty`,
evidence: [`data/_kb/outcomes.jsonl has ${outcomes.length} entries with 0 total events`],
}];
}
const recent = outcomes.slice(-10);
const recentFailSigs: string[] = recent
.filter(o => (o.ok_events ?? 0) < (o.total_events ?? 0))
.map(o => o.sig_hash)
.filter(s => typeof s === "string");
const findings: Finding[] = [{
check: "kb_query",
severity: failRate > 0.3 ? "warn" : "info",
summary: `KB: ${outcomes.length} recent scenario runs, ${okEvents}/${totalEvents} events ok (fail rate ${(failRate * 100).toFixed(1)}%)`,
evidence: [
`most recent: ${recent[recent.length - 1]?.run_id ?? "?"}`,
`recent failing sigs: ${recentFailSigs.length > 0 ? recentFailSigs.slice(-3).join(", ") : "none"}`,
],
}];
return findings;
}
async function checkErrorCorrections(_claims: Claim[]): Promise<Finding[]> {
const corrections = await tailJsonl<any>(join(KB_DIR, "error_corrections.jsonl"), TAIL_LINES);
if (corrections.length === 0) return [];
return [{
check: "kb_query",
severity: "info",
summary: `KB: ${corrections.length} error corrections on file (fail→succeed pairs)`,
evidence: [
corrections.length > 0
? `most recent: ${String(corrections[corrections.length - 1]?.sig_hash ?? "?").slice(0, 24)}`
: "none",
],
}];
}
async function checkBotCycles(): Promise<Finding[]> {
let entries: string[] = [];
try { entries = await readdir(BOT_CYCLES_DIR); }
catch { return []; }
const jsonFiles = entries.filter(e => e.endsWith(".json"));
if (jsonFiles.length === 0) return [];
// Sort by mtime desc, take most recent N
const withStat = await Promise.all(
jsonFiles.map(async name => {
try { return { name, mtime: (await stat(join(BOT_CYCLES_DIR, name))).mtimeMs }; }
catch { return { name, mtime: 0 }; }
}),
);
const recent = withStat.sort((a, b) => b.mtime - a.mtime).slice(0, MAX_BOT_CYCLE_FILES);
const outcomes: Record<string, number> = {};
for (const { name } of recent) {
try {
const r = JSON.parse(await readFile(join(BOT_CYCLES_DIR, name), "utf8"));
const o = String(r.outcome ?? "unknown");
outcomes[o] = (outcomes[o] ?? 0) + 1;
} catch { /* skip */ }
}
const summary = Object.entries(outcomes)
.sort((a, b) => b[1] - a[1])
.map(([k, v]) => `${k}=${v}`)
.join(", ");
const failCount = (outcomes["tests_failed"] ?? 0) + (outcomes["apply_failed"] ?? 0) + (outcomes["model_failed"] ?? 0);
return [{
check: "kb_query",
severity: failCount > recent.length / 2 ? "warn" : "info",
summary: `KB: bot recorded ${recent.length} recent cycles — ${summary || "no outcomes parsed"}`,
evidence: [
`dir: ${BOT_CYCLES_DIR}`,
`fail-class total: ${failCount} / ${recent.length}`,
],
}];
}
async function checkObserverStream(): Promise<Finding[]> {
const ops = await tailJsonl<any>(OBSERVER_OPS, TAIL_LINES);
if (ops.length === 0) return [];
const failures = ops.filter(o => o.ok === false).length;
return [{
check: "kb_query",
severity: "info",
summary: `KB: observer stream ${ops.length} recent ops, ${failures} failures`,
evidence: [
`source: ${OBSERVER_OPS}`,
`by source: ${observerBySource(ops)}`,
],
}];
}
function observerBySource(ops: any[]): string {
const c: Record<string, number> = {};
for (const o of ops) {
const s = String(o.source ?? "unknown");
c[s] = (c[s] ?? 0) + 1;
}
return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty";
}
// Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by
// every audit's appendAuditLessons). Uses the shared kb_index
// aggregator: groups by `signature`, distinct-scopes keyed by PR
// number, severity from ratingSeverity(agg) which applies the
// confidence × count rating (see kb_index.ts). This is the same
// aggregation any other KB reader uses — shared discipline, not
// per-check custom logic.
async function checkAuditLessons(): Promise<Finding[]> {
const bySig = await aggregate<any>(AUDIT_LESSONS_JSONL, {
keyFn: (r) => r?.signature,
scopeFn: (r) => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined),
checkFn: (r) => r?.check,
tailLimit: TAIL_LINES * 4,
});
if (bySig.size === 0) return [];
const findings: Finding[] = [];
for (const [sig, agg] of bySig) {
// Silent on first-ever occurrence — not yet signal.
if (agg.count < 2) continue;
const sev = ratingSeverity(agg);
findings.push({
check: "kb_query",
severity: sev,
summary: `recurring audit pattern (${agg.distinct_scopes} distinct PRs, ${agg.count} flaggings, conf=${agg.confidence.toFixed(2)}): ${agg.representative_summary.slice(0, 160)}`,
evidence: [
`signature=${sig}`,
`checks: ${agg.checks.join(",")}`,
`scopes: ${agg.scopes.slice(-6).join(",")}`,
formatAgg(agg),
],
});
}
return findings;
}
// Scrum-master reviews — the scrum pipeline writes one row per
// accepted per-file review. We match reviews whose `file` matches
// any path in the PR's diff, then surface the *preview* + which
// model the escalation ladder had to reach. If the scrum-master
// needed the 123B specialist or larger to resolve a file, that's
// a meaningful signal about the code's complexity — and it's
// surfaced to the PR without the auditor having to re-run the
// escalation ladder itself.
async function checkScrumReviews(prFiles: string[]): Promise<Finding[]> {
const rows = await tailJsonl<any>(SCRUM_REVIEWS_JSONL, TAIL_LINES);
if (rows.length === 0) return [];
// Match by exact file OR filename suffix — PR files arrive as
// `auditor/audit.ts`-style relative paths; scrum stores the same.
const norm = (p: string) => p.replace(/^\/+/, "").replace(/^home\/profit\/lakehouse\//, "");
const prSet = new Set(prFiles.map(norm));
// Keep only the most recent review per file (last-wins).
const latestByFile = new Map<string, any>();
for (const r of rows) {
const f = norm(String(r.file ?? ""));
if (!f) continue;
if (!prSet.has(f)) continue;
latestByFile.set(f, r);
}
if (latestByFile.size === 0) return [];
const findings: Finding[] = [];
for (const [file, r] of latestByFile) {
const model = String(r.accepted_model ?? "?");
const attempt = r.accepted_on_attempt ?? "?";
const treeSplit = !!r.tree_split_fired;
// Heuristic: if the scrum-master had to escalate past attempt 3,
// or had to tree-split, that's context the PR reviewer should see.
// Severity: info for low-escalation, warn if escalated far up
// the ladder (cloud specialist required).
const heavyEscalation = Number(attempt) >= 4;
const sev: "warn" | "info" = heavyEscalation ? "warn" : "info";
findings.push({
check: "kb_query",
severity: sev,
summary: `scrum-master review for \`${file}\` — accepted on attempt ${attempt} by \`${model}\`${treeSplit ? " (tree-split)" : ""}`,
evidence: [
`reviewed_at: ${r.reviewed_at ?? "?"}`,
`preview: ${String(r.suggestions_preview ?? "").slice(0, 300).replace(/\n/g, " ")}`,
],
});
}
return findings;
}