Closes the cycle J asked for: curated cloud output lands structured knowledge in the KB so future audits have architectural context, not just a log of per-finding signatures. Three pieces: 1. Inference curation (tree-split) — when diff > 30KB, shard at 4.5KB, summarize each shard via cloud (temp=0, think=false on small shards; think=true on main call). Merge into scratchpad. The cloud verification then runs against the scratchpad, not truncated raw. Eliminates the 40KB MAX_DIFF_CHARS truncation path for large PRs (PR #8 is 102KB — was losing 62KB). Anti-false-positive guard in the prompt: cloud is told scratchpad absence is NOT diff absence, so it doesn't flag curated-out symbols as missing. unflagged_gaps section is dropped entirely when curated (scratchpad can't ground them). 2. fact_extractor — TS client for llm_team_ui's extract-facts mode at localhost:5000/api/run. Sends curated scratchpad through qwen2.5 extractor + gemma2 verifier, parses SSE stream, returns structured {facts, entities, relationships, verification, llm_team_run_id}. Best-effort: if llm_team is down, extraction fails silently and the audit still completes. AWAITED so CLI tools (audit_one.ts) don't exit before extraction lands — the systemd poller has 90s headroom so the extra ~15s doesn't matter. 3. audit_facts.jsonl + checkAuditFacts() — one row per curated audit with the extraction result. kb_query tails the jsonl, explodes entity rows, aggregates by entity name with distinct-PR counting, surfaces entities recurring in 2+ PRs as info findings. Filters out short names (<3 chars, extractor truncation artifacts) and generic types (string/number/etc.) so signal isn't drowned. Verified end-to-end on PR #8: 102KB diff → 23 shards → 1KB scratchpad → qwen2.5 extracted 4 facts + 6 entities + 6 relationships (real code-level knowledge: AggregateOptions<T> type, aggregate<T> async function with real signature, typed relationships). llm_team_run_id cross-references to llm_team's own team_runs table. Also: audit.ts passes (pr_number, head_sha) as InferenceContext so extracted facts are scope-tagged for the KB index.
398 lines
16 KiB
TypeScript
398 lines
16 KiB
TypeScript
// Local-KB check — reads data/_kb/ + data/_observer/ + data/_bot/
|
||
// for prior evidence bearing on this PR's claims. Cheap, offline,
|
||
// no model calls. The point: if a claim like "Phase X shipped" has
|
||
// a historical record of failing on the same signature before, the
|
||
// auditor surfaces that pattern before the cloud check has to
|
||
// infer it.
|
||
//
|
||
// What this check reads (all file-backed, append-only or periodic):
|
||
// data/_kb/outcomes.jsonl — per-scenario outcomes (kb.ts)
|
||
// data/_kb/error_corrections.jsonl — fail→succeed deltas on same sig
|
||
// data/_kb/scrum_reviews.jsonl — scrum-master accepted reviews
|
||
// data/_observer/ops.jsonl — observer ring → disk stream
|
||
// data/_bot/cycles/*.json — bot cycle results
|
||
//
|
||
// Each JSONL line / per-cycle file is small; this check reads tails
|
||
// only (last N lines or last M files) to stay cheap on large corpora.
|
||
|
||
import { readFile, readdir, stat } from "node:fs/promises";
|
||
import { join } from "node:path";
|
||
import type { Claim, Finding } from "../types.ts";
|
||
import { aggregate, ratingSeverity, formatAgg } from "../kb_index.ts";
|
||
|
||
const KB_DIR = "/home/profit/lakehouse/data/_kb";
|
||
const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl";
|
||
const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles";
|
||
const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl";
|
||
const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl";
|
||
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
|
||
const TAIL_LINES = 500;
|
||
const MAX_BOT_CYCLE_FILES = 30;
|
||
|
||
export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promise<Finding[]> {
|
||
const findings: Finding[] = [];
|
||
|
||
// 1. Recent scenario outcomes: are strong-claim-style phrases showing
|
||
// up alongside failed events? That's "we claimed it worked" +
|
||
// "it didn't" in the KB.
|
||
const scenarioFindings = await checkScenarioOutcomes(claims);
|
||
findings.push(...scenarioFindings);
|
||
|
||
// 2. Error corrections: any of the claims text overlap a
|
||
// recently-observed fail→succeed pair? If yes, add context.
|
||
const correctionFindings = await checkErrorCorrections(claims);
|
||
findings.push(...correctionFindings);
|
||
|
||
// 3. Bot cycles: any prior bot cycle ended in tests_failed or
|
||
// apply_failed on a file this PR is also touching?
|
||
const botFindings = await checkBotCycles();
|
||
findings.push(...botFindings);
|
||
|
||
// 4. Observer: count recent error events. High volume = shared
|
||
// infra problem, worth flagging (context for other findings).
|
||
const obsFindings = await checkObserverStream();
|
||
findings.push(...obsFindings);
|
||
|
||
// 5. Scrum-master reviews — surface prior accepted reviews for any
|
||
// file in this PR's diff. Cohesion plan Phase C wire: the
|
||
// auditor gets to "borrow" the scrum-master's deeper per-file
|
||
// analysis instead of re-doing that work.
|
||
if (prFiles.length > 0) {
|
||
const scrumFindings = await checkScrumReviews(prFiles);
|
||
findings.push(...scrumFindings);
|
||
}
|
||
|
||
// 6b. Audit-facts (llm_team extract pipeline output) — surface
|
||
// entities that recur across multiple PRs. These are the
|
||
// "core system entities" accumulating in the knowledge base;
|
||
// showing them as info on future audits gives reviewers
|
||
// architectural context the raw diff doesn't convey.
|
||
const factFindings = await checkAuditFacts();
|
||
findings.push(...factFindings);
|
||
|
||
// 6. Audit-lessons feedback loop — summarize the top recurring
|
||
// patterns from prior audits' block/warn findings. If the same
|
||
// pattern signature has fired 3+ times across prior audits,
|
||
// emit it as a block-severity finding so reviewers know this
|
||
// is a known-recurring class, not a one-off. Does NOT couple
|
||
// to the current audit's static/inference findings (those run
|
||
// in parallel and we can't see them here) — the amplification
|
||
// is emergent: if the current audit's finding-summary matches
|
||
// a top recurrence, the reviewer sees both.
|
||
const auditLessonFindings = await checkAuditLessons();
|
||
findings.push(...auditLessonFindings);
|
||
|
||
return findings;
|
||
}
|
||
|
||
async function tailJsonl<T = any>(path: string, n: number): Promise<T[]> {
|
||
try {
|
||
const raw = await readFile(path, "utf8");
|
||
const lines = raw.split("\n").filter(l => l.length > 0);
|
||
const slice = lines.slice(-n);
|
||
const out: T[] = [];
|
||
for (const line of slice) {
|
||
try { out.push(JSON.parse(line)); } catch { /* skip malformed */ }
|
||
}
|
||
return out;
|
||
} catch {
|
||
return [];
|
||
}
|
||
}
|
||
|
||
async function checkScenarioOutcomes(_claims: Claim[]): Promise<Finding[]> {
|
||
const outcomes = await tailJsonl<any>(join(KB_DIR, "outcomes.jsonl"), TAIL_LINES);
|
||
if (outcomes.length === 0) return [];
|
||
const totalEvents = outcomes.reduce((s, o) => s + (o.total_events ?? 0), 0);
|
||
const okEvents = outcomes.reduce((s, o) => s + (o.ok_events ?? 0), 0);
|
||
const failRate = totalEvents > 0 ? 1 - okEvents / totalEvents : 0;
|
||
|
||
if (totalEvents === 0) {
|
||
return [{
|
||
check: "kb_query",
|
||
severity: "info",
|
||
summary: `KB: no scenario outcomes on file — learning loop is empty`,
|
||
evidence: [`data/_kb/outcomes.jsonl has ${outcomes.length} entries with 0 total events`],
|
||
}];
|
||
}
|
||
|
||
const recent = outcomes.slice(-10);
|
||
const recentFailSigs: string[] = recent
|
||
.filter(o => (o.ok_events ?? 0) < (o.total_events ?? 0))
|
||
.map(o => o.sig_hash)
|
||
.filter(s => typeof s === "string");
|
||
|
||
const findings: Finding[] = [{
|
||
check: "kb_query",
|
||
severity: failRate > 0.3 ? "warn" : "info",
|
||
summary: `KB: ${outcomes.length} recent scenario runs, ${okEvents}/${totalEvents} events ok (fail rate ${(failRate * 100).toFixed(1)}%)`,
|
||
evidence: [
|
||
`most recent: ${recent[recent.length - 1]?.run_id ?? "?"}`,
|
||
`recent failing sigs: ${recentFailSigs.length > 0 ? recentFailSigs.slice(-3).join(", ") : "none"}`,
|
||
],
|
||
}];
|
||
return findings;
|
||
}
|
||
|
||
async function checkErrorCorrections(_claims: Claim[]): Promise<Finding[]> {
|
||
const corrections = await tailJsonl<any>(join(KB_DIR, "error_corrections.jsonl"), TAIL_LINES);
|
||
if (corrections.length === 0) return [];
|
||
return [{
|
||
check: "kb_query",
|
||
severity: "info",
|
||
summary: `KB: ${corrections.length} error corrections on file (fail→succeed pairs)`,
|
||
evidence: [
|
||
corrections.length > 0
|
||
? `most recent: ${String(corrections[corrections.length - 1]?.sig_hash ?? "?").slice(0, 24)}`
|
||
: "none",
|
||
],
|
||
}];
|
||
}
|
||
|
||
async function checkBotCycles(): Promise<Finding[]> {
|
||
let entries: string[] = [];
|
||
try { entries = await readdir(BOT_CYCLES_DIR); }
|
||
catch { return []; }
|
||
|
||
const jsonFiles = entries.filter(e => e.endsWith(".json"));
|
||
if (jsonFiles.length === 0) return [];
|
||
|
||
// Sort by mtime desc, take most recent N
|
||
const withStat = await Promise.all(
|
||
jsonFiles.map(async name => {
|
||
try { return { name, mtime: (await stat(join(BOT_CYCLES_DIR, name))).mtimeMs }; }
|
||
catch { return { name, mtime: 0 }; }
|
||
}),
|
||
);
|
||
const recent = withStat.sort((a, b) => b.mtime - a.mtime).slice(0, MAX_BOT_CYCLE_FILES);
|
||
|
||
const outcomes: Record<string, number> = {};
|
||
for (const { name } of recent) {
|
||
try {
|
||
const r = JSON.parse(await readFile(join(BOT_CYCLES_DIR, name), "utf8"));
|
||
const o = String(r.outcome ?? "unknown");
|
||
outcomes[o] = (outcomes[o] ?? 0) + 1;
|
||
} catch { /* skip */ }
|
||
}
|
||
|
||
const summary = Object.entries(outcomes)
|
||
.sort((a, b) => b[1] - a[1])
|
||
.map(([k, v]) => `${k}=${v}`)
|
||
.join(", ");
|
||
|
||
const failCount = (outcomes["tests_failed"] ?? 0) + (outcomes["apply_failed"] ?? 0) + (outcomes["model_failed"] ?? 0);
|
||
return [{
|
||
check: "kb_query",
|
||
severity: failCount > recent.length / 2 ? "warn" : "info",
|
||
summary: `KB: bot recorded ${recent.length} recent cycles — ${summary || "no outcomes parsed"}`,
|
||
evidence: [
|
||
`dir: ${BOT_CYCLES_DIR}`,
|
||
`fail-class total: ${failCount} / ${recent.length}`,
|
||
],
|
||
}];
|
||
}
|
||
|
||
async function checkObserverStream(): Promise<Finding[]> {
|
||
const ops = await tailJsonl<any>(OBSERVER_OPS, TAIL_LINES);
|
||
if (ops.length === 0) return [];
|
||
const failures = ops.filter(o => o.ok === false).length;
|
||
return [{
|
||
check: "kb_query",
|
||
severity: "info",
|
||
summary: `KB: observer stream ${ops.length} recent ops, ${failures} failures`,
|
||
evidence: [
|
||
`source: ${OBSERVER_OPS}`,
|
||
`by source: ${observerBySource(ops)}`,
|
||
],
|
||
}];
|
||
}
|
||
|
||
function observerBySource(ops: any[]): string {
|
||
const c: Record<string, number> = {};
|
||
for (const o of ops) {
|
||
const s = String(o.source ?? "unknown");
|
||
c[s] = (c[s] ?? 0) + 1;
|
||
}
|
||
return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty";
|
||
}
|
||
|
||
// Audit-facts — reads data/_kb/audit_facts.jsonl (populated by every
|
||
// curated inference run via llm_team's extract pipeline). Each row
|
||
// has arrays: facts, entities, relationships. We explode entities and
|
||
// aggregate them across PRs using kb_index. An entity seen in 3+ PRs
|
||
// is a "core system entity" — we surface the top N as info context.
|
||
//
|
||
// Filters out short names (<3 chars, likely qwen2.5 truncation
|
||
// artifacts) and generic types ("string", "number") that would
|
||
// otherwise dominate the ranking.
|
||
const ENTITY_NAME_MIN_LEN = 3;
|
||
const GENERIC_ENTITY_NAMES = new Set([
|
||
"string", "number", "boolean", "any", "void", "unknown", "never",
|
||
"object", "array", "function", "const", "let", "var", "true", "false",
|
||
"null", "undefined", "promise", "map", "set", "record",
|
||
]);
|
||
|
||
async function checkAuditFacts(): Promise<Finding[]> {
|
||
// Read raw rows — each row has multiple entities, so we can't just
|
||
// use aggregate() directly (it's one-signature-per-row). Explode
|
||
// entities into (row, entity) pairs, then aggregate by entity name.
|
||
let raw: string;
|
||
try { raw = await (await import("node:fs/promises")).readFile(AUDIT_FACTS_JSONL, "utf8"); }
|
||
catch { return []; }
|
||
const lines = raw.split("\n").filter(l => l.length > 0);
|
||
if (lines.length === 0) return [];
|
||
|
||
interface EntityRow { entity_key: string; pr_number: number; type: string; name: string; description: string }
|
||
const entityRows: EntityRow[] = [];
|
||
for (const line of lines.slice(-TAIL_LINES * 2)) {
|
||
let row: any;
|
||
try { row = JSON.parse(line); } catch { continue; }
|
||
const prNum = Number(row?.pr_number);
|
||
if (!Number.isFinite(prNum)) continue;
|
||
for (const e of Array.isArray(row?.entities) ? row.entities : []) {
|
||
const name = String(e?.name ?? "").trim();
|
||
if (name.length < ENTITY_NAME_MIN_LEN) continue;
|
||
if (GENERIC_ENTITY_NAMES.has(name.toLowerCase())) continue;
|
||
entityRows.push({
|
||
entity_key: name.toLowerCase(),
|
||
pr_number: prNum,
|
||
type: String(e?.type ?? "?"),
|
||
name,
|
||
description: String(e?.description ?? "").slice(0, 160),
|
||
});
|
||
}
|
||
}
|
||
if (entityRows.length === 0) return [];
|
||
|
||
// Aggregate manually — one key per entity name, distinct_scopes by PR.
|
||
type Agg = { count: number; scopes: Set<number>; types: Set<string>; last_name: string; last_desc: string };
|
||
const byEntity = new Map<string, Agg>();
|
||
for (const r of entityRows) {
|
||
const a = byEntity.get(r.entity_key) ?? {
|
||
count: 0, scopes: new Set<number>(), types: new Set<string>(), last_name: "", last_desc: "",
|
||
};
|
||
a.count += 1;
|
||
a.scopes.add(r.pr_number);
|
||
a.types.add(r.type);
|
||
a.last_name = r.name;
|
||
a.last_desc = r.description;
|
||
byEntity.set(r.entity_key, a);
|
||
}
|
||
|
||
// Rank: require 2+ distinct PRs (same-PR entity-repeats don't count
|
||
// as "cross-cutting"). Take the top 5 to avoid flooding the verdict.
|
||
const ranked = Array.from(byEntity.entries())
|
||
.filter(([_, a]) => a.scopes.size >= 2)
|
||
.sort((a, b) => b[1].scopes.size - a[1].scopes.size || b[1].count - a[1].count)
|
||
.slice(0, 5);
|
||
|
||
if (ranked.length === 0) {
|
||
// Useful to know the KB is being populated — emit a single
|
||
// summary so operators see fact extraction is alive.
|
||
return [{
|
||
check: "kb_query",
|
||
severity: "info",
|
||
summary: `audit_facts KB has ${entityRows.length} entity-observations across ${new Set(entityRows.map(r => r.pr_number)).size} PRs (no cross-PR recurrences yet)`,
|
||
evidence: [`source: ${AUDIT_FACTS_JSONL}`],
|
||
}];
|
||
}
|
||
|
||
return ranked.map(([_, a]) => ({
|
||
check: "kb_query" as const,
|
||
severity: "info" as const,
|
||
summary: `core entity \`${a.last_name}\` recurs in ${a.scopes.size} PRs (types: ${Array.from(a.types).join(",")})`,
|
||
evidence: [
|
||
`count=${a.count} distinct_PRs=${a.scopes.size}`,
|
||
`description: ${a.last_desc.slice(0, 200)}`,
|
||
`PRs: ${Array.from(a.scopes).sort((x, y) => x - y).join(",")}`,
|
||
],
|
||
}));
|
||
}
|
||
|
||
// Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by
|
||
// every audit's appendAuditLessons). Uses the shared kb_index
|
||
// aggregator: groups by `signature`, distinct-scopes keyed by PR
|
||
// number, severity from ratingSeverity(agg) which applies the
|
||
// confidence × count rating (see kb_index.ts). This is the same
|
||
// aggregation any other KB reader uses — shared discipline, not
|
||
// per-check custom logic.
|
||
async function checkAuditLessons(): Promise<Finding[]> {
|
||
const bySig = await aggregate<any>(AUDIT_LESSONS_JSONL, {
|
||
keyFn: (r) => r?.signature,
|
||
scopeFn: (r) => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined),
|
||
checkFn: (r) => r?.check,
|
||
tailLimit: TAIL_LINES * 4,
|
||
});
|
||
if (bySig.size === 0) return [];
|
||
|
||
const findings: Finding[] = [];
|
||
for (const [sig, agg] of bySig) {
|
||
// Silent on first-ever occurrence — not yet signal.
|
||
if (agg.count < 2) continue;
|
||
const sev = ratingSeverity(agg);
|
||
findings.push({
|
||
check: "kb_query",
|
||
severity: sev,
|
||
summary: `recurring audit pattern (${agg.distinct_scopes} distinct PRs, ${agg.count} flaggings, conf=${agg.confidence.toFixed(2)}): ${agg.representative_summary.slice(0, 160)}`,
|
||
evidence: [
|
||
`signature=${sig}`,
|
||
`checks: ${agg.checks.join(",")}`,
|
||
`scopes: ${agg.scopes.slice(-6).join(",")}`,
|
||
formatAgg(agg),
|
||
],
|
||
});
|
||
}
|
||
return findings;
|
||
}
|
||
|
||
// Scrum-master reviews — the scrum pipeline writes one row per
|
||
// accepted per-file review. We match reviews whose `file` matches
|
||
// any path in the PR's diff, then surface the *preview* + which
|
||
// model the escalation ladder had to reach. If the scrum-master
|
||
// needed the 123B specialist or larger to resolve a file, that's
|
||
// a meaningful signal about the code's complexity — and it's
|
||
// surfaced to the PR without the auditor having to re-run the
|
||
// escalation ladder itself.
|
||
async function checkScrumReviews(prFiles: string[]): Promise<Finding[]> {
|
||
const rows = await tailJsonl<any>(SCRUM_REVIEWS_JSONL, TAIL_LINES);
|
||
if (rows.length === 0) return [];
|
||
|
||
// Match by exact file OR filename suffix — PR files arrive as
|
||
// `auditor/audit.ts`-style relative paths; scrum stores the same.
|
||
const norm = (p: string) => p.replace(/^\/+/, "").replace(/^home\/profit\/lakehouse\//, "");
|
||
const prSet = new Set(prFiles.map(norm));
|
||
|
||
// Keep only the most recent review per file (last-wins).
|
||
const latestByFile = new Map<string, any>();
|
||
for (const r of rows) {
|
||
const f = norm(String(r.file ?? ""));
|
||
if (!f) continue;
|
||
if (!prSet.has(f)) continue;
|
||
latestByFile.set(f, r);
|
||
}
|
||
if (latestByFile.size === 0) return [];
|
||
|
||
const findings: Finding[] = [];
|
||
for (const [file, r] of latestByFile) {
|
||
const model = String(r.accepted_model ?? "?");
|
||
const attempt = r.accepted_on_attempt ?? "?";
|
||
const treeSplit = !!r.tree_split_fired;
|
||
// Heuristic: if the scrum-master had to escalate past attempt 3,
|
||
// or had to tree-split, that's context the PR reviewer should see.
|
||
// Severity: info for low-escalation, warn if escalated far up
|
||
// the ladder (cloud specialist required).
|
||
const heavyEscalation = Number(attempt) >= 4;
|
||
const sev: "warn" | "info" = heavyEscalation ? "warn" : "info";
|
||
findings.push({
|
||
check: "kb_query",
|
||
severity: sev,
|
||
summary: `scrum-master review for \`${file}\` — accepted on attempt ${attempt} by \`${model}\`${treeSplit ? " (tree-split)" : ""}`,
|
||
evidence: [
|
||
`reviewed_at: ${r.reviewed_at ?? "?"}`,
|
||
`preview: ${String(r.suggestions_preview ?? "").slice(0, 300).replace(/\n/g, " ")}`,
|
||
],
|
||
});
|
||
}
|
||
return findings;
|
||
}
|