profit 7c1745611a Audit pipeline PR #9: determinism + fact extraction + verifier gate + KB stats + context injection (PR #9)
Bundles PR #9's work for the audit pipeline:

- N=3 consensus on cloud inference (gpt-oss:120b parallel) with qwen3-coder:480b tie-breaker
- audit_discrepancies.jsonl logs N-run disagreements
- scrum_master reviews route through llm_team fact extraction; source="scrum_review"
- Verifier-gated persistence: drops INCORRECT, keeps UNVERIFIABLE/UNCHECKED; schema_version:2
- scrum_master_reviewed flag on accepted reviews
- auditor/kb_stats.ts: on-demand observability script
- claim_parser history/proof pattern class (verified-on-PR, was-flipping, the-proven-X)
- claim_parser quoted-string guard (mirrors static.ts fix)
- fact_extractor project context injection via docs/AUDITOR_CONTEXT.md
- Fixed verifier-verdict parser to handle multiple gemma2 output formats

Empirical: 3-run determinism test on unchanged PR #9 SHA showed 7/7 warn findings stable; block count oscillation eliminated; llm_team quality scores 8-9 on context-injected extract runs.

See PR #9 for full run-by-run commit history.
2026-04-23 05:29:38 +00:00

398 lines
16 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Local-KB check — reads data/_kb/ + data/_observer/ + data/_bot/
// for prior evidence bearing on this PR's claims. Cheap, offline,
// no model calls. The point: if a claim like "Phase X shipped" has
// a historical record of failing on the same signature before, the
// auditor surfaces that pattern before the cloud check has to
// infer it.
//
// What this check reads (all file-backed, append-only or periodic):
// data/_kb/outcomes.jsonl — per-scenario outcomes (kb.ts)
// data/_kb/error_corrections.jsonl — fail→succeed deltas on same sig
// data/_kb/scrum_reviews.jsonl — scrum-master accepted reviews
// data/_observer/ops.jsonl — observer ring → disk stream
// data/_bot/cycles/*.json — bot cycle results
//
// Each JSONL line / per-cycle file is small; this check reads tails
// only (last N lines or last M files) to stay cheap on large corpora.
import { readFile, readdir, stat } from "node:fs/promises";
import { join } from "node:path";
import type { Claim, Finding } from "../types.ts";
import { aggregate, ratingSeverity, formatAgg } from "../kb_index.ts";
const KB_DIR = "/home/profit/lakehouse/data/_kb";
const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl";
const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles";
const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl";
const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl";
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
const TAIL_LINES = 500;
const MAX_BOT_CYCLE_FILES = 30;
export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promise<Finding[]> {
const findings: Finding[] = [];
// 1. Recent scenario outcomes: are strong-claim-style phrases showing
// up alongside failed events? That's "we claimed it worked" +
// "it didn't" in the KB.
const scenarioFindings = await checkScenarioOutcomes(claims);
findings.push(...scenarioFindings);
// 2. Error corrections: any of the claims text overlap a
// recently-observed fail→succeed pair? If yes, add context.
const correctionFindings = await checkErrorCorrections(claims);
findings.push(...correctionFindings);
// 3. Bot cycles: any prior bot cycle ended in tests_failed or
// apply_failed on a file this PR is also touching?
const botFindings = await checkBotCycles();
findings.push(...botFindings);
// 4. Observer: count recent error events. High volume = shared
// infra problem, worth flagging (context for other findings).
const obsFindings = await checkObserverStream();
findings.push(...obsFindings);
// 5. Scrum-master reviews — surface prior accepted reviews for any
// file in this PR's diff. Cohesion plan Phase C wire: the
// auditor gets to "borrow" the scrum-master's deeper per-file
// analysis instead of re-doing that work.
if (prFiles.length > 0) {
const scrumFindings = await checkScrumReviews(prFiles);
findings.push(...scrumFindings);
}
// 6b. Audit-facts (llm_team extract pipeline output) — surface
// entities that recur across multiple PRs. These are the
// "core system entities" accumulating in the knowledge base;
// showing them as info on future audits gives reviewers
// architectural context the raw diff doesn't convey.
const factFindings = await checkAuditFacts();
findings.push(...factFindings);
// 6. Audit-lessons feedback loop — summarize the top recurring
// patterns from prior audits' block/warn findings. If the same
// pattern signature has fired 3+ times across prior audits,
// emit it as a block-severity finding so reviewers know this
// is a known-recurring class, not a one-off. Does NOT couple
// to the current audit's static/inference findings (those run
// in parallel and we can't see them here) — the amplification
// is emergent: if the current audit's finding-summary matches
// a top recurrence, the reviewer sees both.
const auditLessonFindings = await checkAuditLessons();
findings.push(...auditLessonFindings);
return findings;
}
async function tailJsonl<T = any>(path: string, n: number): Promise<T[]> {
try {
const raw = await readFile(path, "utf8");
const lines = raw.split("\n").filter(l => l.length > 0);
const slice = lines.slice(-n);
const out: T[] = [];
for (const line of slice) {
try { out.push(JSON.parse(line)); } catch { /* skip malformed */ }
}
return out;
} catch {
return [];
}
}
async function checkScenarioOutcomes(_claims: Claim[]): Promise<Finding[]> {
const outcomes = await tailJsonl<any>(join(KB_DIR, "outcomes.jsonl"), TAIL_LINES);
if (outcomes.length === 0) return [];
const totalEvents = outcomes.reduce((s, o) => s + (o.total_events ?? 0), 0);
const okEvents = outcomes.reduce((s, o) => s + (o.ok_events ?? 0), 0);
const failRate = totalEvents > 0 ? 1 - okEvents / totalEvents : 0;
if (totalEvents === 0) {
return [{
check: "kb_query",
severity: "info",
summary: `KB: no scenario outcomes on file — learning loop is empty`,
evidence: [`data/_kb/outcomes.jsonl has ${outcomes.length} entries with 0 total events`],
}];
}
const recent = outcomes.slice(-10);
const recentFailSigs: string[] = recent
.filter(o => (o.ok_events ?? 0) < (o.total_events ?? 0))
.map(o => o.sig_hash)
.filter(s => typeof s === "string");
const findings: Finding[] = [{
check: "kb_query",
severity: failRate > 0.3 ? "warn" : "info",
summary: `KB: ${outcomes.length} recent scenario runs, ${okEvents}/${totalEvents} events ok (fail rate ${(failRate * 100).toFixed(1)}%)`,
evidence: [
`most recent: ${recent[recent.length - 1]?.run_id ?? "?"}`,
`recent failing sigs: ${recentFailSigs.length > 0 ? recentFailSigs.slice(-3).join(", ") : "none"}`,
],
}];
return findings;
}
async function checkErrorCorrections(_claims: Claim[]): Promise<Finding[]> {
const corrections = await tailJsonl<any>(join(KB_DIR, "error_corrections.jsonl"), TAIL_LINES);
if (corrections.length === 0) return [];
return [{
check: "kb_query",
severity: "info",
summary: `KB: ${corrections.length} error corrections on file (fail→succeed pairs)`,
evidence: [
corrections.length > 0
? `most recent: ${String(corrections[corrections.length - 1]?.sig_hash ?? "?").slice(0, 24)}`
: "none",
],
}];
}
async function checkBotCycles(): Promise<Finding[]> {
let entries: string[] = [];
try { entries = await readdir(BOT_CYCLES_DIR); }
catch { return []; }
const jsonFiles = entries.filter(e => e.endsWith(".json"));
if (jsonFiles.length === 0) return [];
// Sort by mtime desc, take most recent N
const withStat = await Promise.all(
jsonFiles.map(async name => {
try { return { name, mtime: (await stat(join(BOT_CYCLES_DIR, name))).mtimeMs }; }
catch { return { name, mtime: 0 }; }
}),
);
const recent = withStat.sort((a, b) => b.mtime - a.mtime).slice(0, MAX_BOT_CYCLE_FILES);
const outcomes: Record<string, number> = {};
for (const { name } of recent) {
try {
const r = JSON.parse(await readFile(join(BOT_CYCLES_DIR, name), "utf8"));
const o = String(r.outcome ?? "unknown");
outcomes[o] = (outcomes[o] ?? 0) + 1;
} catch { /* skip */ }
}
const summary = Object.entries(outcomes)
.sort((a, b) => b[1] - a[1])
.map(([k, v]) => `${k}=${v}`)
.join(", ");
const failCount = (outcomes["tests_failed"] ?? 0) + (outcomes["apply_failed"] ?? 0) + (outcomes["model_failed"] ?? 0);
return [{
check: "kb_query",
severity: failCount > recent.length / 2 ? "warn" : "info",
summary: `KB: bot recorded ${recent.length} recent cycles — ${summary || "no outcomes parsed"}`,
evidence: [
`dir: ${BOT_CYCLES_DIR}`,
`fail-class total: ${failCount} / ${recent.length}`,
],
}];
}
async function checkObserverStream(): Promise<Finding[]> {
const ops = await tailJsonl<any>(OBSERVER_OPS, TAIL_LINES);
if (ops.length === 0) return [];
const failures = ops.filter(o => o.ok === false).length;
return [{
check: "kb_query",
severity: "info",
summary: `KB: observer stream ${ops.length} recent ops, ${failures} failures`,
evidence: [
`source: ${OBSERVER_OPS}`,
`by source: ${observerBySource(ops)}`,
],
}];
}
function observerBySource(ops: any[]): string {
const c: Record<string, number> = {};
for (const o of ops) {
const s = String(o.source ?? "unknown");
c[s] = (c[s] ?? 0) + 1;
}
return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty";
}
// Audit-facts — reads data/_kb/audit_facts.jsonl (populated by every
// curated inference run via llm_team's extract pipeline). Each row
// has arrays: facts, entities, relationships. We explode entities and
// aggregate them across PRs using kb_index. An entity seen in 3+ PRs
// is a "core system entity" — we surface the top N as info context.
//
// Filters out short names (<3 chars, likely qwen2.5 truncation
// artifacts) and generic types ("string", "number") that would
// otherwise dominate the ranking.
const ENTITY_NAME_MIN_LEN = 3;
const GENERIC_ENTITY_NAMES = new Set([
"string", "number", "boolean", "any", "void", "unknown", "never",
"object", "array", "function", "const", "let", "var", "true", "false",
"null", "undefined", "promise", "map", "set", "record",
]);
async function checkAuditFacts(): Promise<Finding[]> {
// Read raw rows — each row has multiple entities, so we can't just
// use aggregate() directly (it's one-signature-per-row). Explode
// entities into (row, entity) pairs, then aggregate by entity name.
let raw: string;
try { raw = await (await import("node:fs/promises")).readFile(AUDIT_FACTS_JSONL, "utf8"); }
catch { return []; }
const lines = raw.split("\n").filter(l => l.length > 0);
if (lines.length === 0) return [];
interface EntityRow { entity_key: string; pr_number: number; type: string; name: string; description: string }
const entityRows: EntityRow[] = [];
for (const line of lines.slice(-TAIL_LINES * 2)) {
let row: any;
try { row = JSON.parse(line); } catch { continue; }
const prNum = Number(row?.pr_number);
if (!Number.isFinite(prNum)) continue;
for (const e of Array.isArray(row?.entities) ? row.entities : []) {
const name = String(e?.name ?? "").trim();
if (name.length < ENTITY_NAME_MIN_LEN) continue;
if (GENERIC_ENTITY_NAMES.has(name.toLowerCase())) continue;
entityRows.push({
entity_key: name.toLowerCase(),
pr_number: prNum,
type: String(e?.type ?? "?"),
name,
description: String(e?.description ?? "").slice(0, 160),
});
}
}
if (entityRows.length === 0) return [];
// Aggregate manually — one key per entity name, distinct_scopes by PR.
type Agg = { count: number; scopes: Set<number>; types: Set<string>; last_name: string; last_desc: string };
const byEntity = new Map<string, Agg>();
for (const r of entityRows) {
const a = byEntity.get(r.entity_key) ?? {
count: 0, scopes: new Set<number>(), types: new Set<string>(), last_name: "", last_desc: "",
};
a.count += 1;
a.scopes.add(r.pr_number);
a.types.add(r.type);
a.last_name = r.name;
a.last_desc = r.description;
byEntity.set(r.entity_key, a);
}
// Rank: require 2+ distinct PRs (same-PR entity-repeats don't count
// as "cross-cutting"). Take the top 5 to avoid flooding the verdict.
const ranked = Array.from(byEntity.entries())
.filter(([_, a]) => a.scopes.size >= 2)
.sort((a, b) => b[1].scopes.size - a[1].scopes.size || b[1].count - a[1].count)
.slice(0, 5);
if (ranked.length === 0) {
// Useful to know the KB is being populated — emit a single
// summary so operators see fact extraction is alive.
return [{
check: "kb_query",
severity: "info",
summary: `audit_facts KB has ${entityRows.length} entity-observations across ${new Set(entityRows.map(r => r.pr_number)).size} PRs (no cross-PR recurrences yet)`,
evidence: [`source: ${AUDIT_FACTS_JSONL}`],
}];
}
return ranked.map(([_, a]) => ({
check: "kb_query" as const,
severity: "info" as const,
summary: `core entity \`${a.last_name}\` recurs in ${a.scopes.size} PRs (types: ${Array.from(a.types).join(",")})`,
evidence: [
`count=${a.count} distinct_PRs=${a.scopes.size}`,
`description: ${a.last_desc.slice(0, 200)}`,
`PRs: ${Array.from(a.scopes).sort((x, y) => x - y).join(",")}`,
],
}));
}
// Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by
// every audit's appendAuditLessons). Uses the shared kb_index
// aggregator: groups by `signature`, distinct-scopes keyed by PR
// number, severity from ratingSeverity(agg) which applies the
// confidence × count rating (see kb_index.ts). This is the same
// aggregation any other KB reader uses — shared discipline, not
// per-check custom logic.
async function checkAuditLessons(): Promise<Finding[]> {
const bySig = await aggregate<any>(AUDIT_LESSONS_JSONL, {
keyFn: (r) => r?.signature,
scopeFn: (r) => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined),
checkFn: (r) => r?.check,
tailLimit: TAIL_LINES * 4,
});
if (bySig.size === 0) return [];
const findings: Finding[] = [];
for (const [sig, agg] of bySig) {
// Silent on first-ever occurrence — not yet signal.
if (agg.count < 2) continue;
const sev = ratingSeverity(agg);
findings.push({
check: "kb_query",
severity: sev,
summary: `recurring audit pattern (${agg.distinct_scopes} distinct PRs, ${agg.count} flaggings, conf=${agg.confidence.toFixed(2)}): ${agg.representative_summary.slice(0, 160)}`,
evidence: [
`signature=${sig}`,
`checks: ${agg.checks.join(",")}`,
`scopes: ${agg.scopes.slice(-6).join(",")}`,
formatAgg(agg),
],
});
}
return findings;
}
// Scrum-master reviews — the scrum pipeline writes one row per
// accepted per-file review. We match reviews whose `file` matches
// any path in the PR's diff, then surface the *preview* + which
// model the escalation ladder had to reach. If the scrum-master
// needed the 123B specialist or larger to resolve a file, that's
// a meaningful signal about the code's complexity — and it's
// surfaced to the PR without the auditor having to re-run the
// escalation ladder itself.
async function checkScrumReviews(prFiles: string[]): Promise<Finding[]> {
const rows = await tailJsonl<any>(SCRUM_REVIEWS_JSONL, TAIL_LINES);
if (rows.length === 0) return [];
// Match by exact file OR filename suffix — PR files arrive as
// `auditor/audit.ts`-style relative paths; scrum stores the same.
const norm = (p: string) => p.replace(/^\/+/, "").replace(/^home\/profit\/lakehouse\//, "");
const prSet = new Set(prFiles.map(norm));
// Keep only the most recent review per file (last-wins).
const latestByFile = new Map<string, any>();
for (const r of rows) {
const f = norm(String(r.file ?? ""));
if (!f) continue;
if (!prSet.has(f)) continue;
latestByFile.set(f, r);
}
if (latestByFile.size === 0) return [];
const findings: Finding[] = [];
for (const [file, r] of latestByFile) {
const model = String(r.accepted_model ?? "?");
const attempt = r.accepted_on_attempt ?? "?";
const treeSplit = !!r.tree_split_fired;
// Heuristic: if the scrum-master had to escalate past attempt 3,
// or had to tree-split, that's context the PR reviewer should see.
// Severity: info for low-escalation, warn if escalated far up
// the ladder (cloud specialist required).
const heavyEscalation = Number(attempt) >= 4;
const sev: "warn" | "info" = heavyEscalation ? "warn" : "info";
findings.push({
check: "kb_query",
severity: sev,
summary: `scrum-master review for \`${file}\` — accepted on attempt ${attempt} by \`${model}\`${treeSplit ? " (tree-split)" : ""}`,
evidence: [
`reviewed_at: ${r.reviewed_at ?? "?"}`,
`preview: ${String(r.suggestions_preview ?? "").slice(0, 300).replace(/\n/g, " ")}`,
],
});
}
return findings;
}