Audit pipeline PR #9: determinism + fact extraction + verifier gate + KB stats #9

Merged
profit merged 34 commits from test/enrich-prd-pipeline into main 2026-04-23 05:29:39 +00:00
8 changed files with 1227 additions and 88 deletions

View File

@ -56,7 +56,7 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise<
const [staticFindings, dynamicFindings, inferenceFindings, kbFindings] = await Promise.all([
runStaticCheck(diff),
opts.skip_dynamic ? Promise.resolve(stubFinding("dynamic", "skipped by options")) : runDynamicCheck(),
opts.skip_inference ? Promise.resolve(stubFinding("inference", "skipped by options")) : runInferenceCheck(claims, diff),
opts.skip_inference ? Promise.resolve(stubFinding("inference", "skipped by options")) : runInferenceCheck(claims, diff, { pr_number: pr.number, head_sha: pr.head_sha }),
runKbCheck(claims, pr.files.map(f => f.path)),
]);

View File

@ -14,19 +14,54 @@
import type { Claim, Finding } from "../types.ts";
import { Glob } from "bun";
import { readFile } from "node:fs/promises";
import { readFile, mkdir, appendFile } from "node:fs/promises";
import { extractFacts } from "../fact_extractor.ts";
const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100";
const MODEL = process.env.LH_AUDITOR_REVIEW_MODEL ?? "gpt-oss:120b";
// Tie-breaker for claims where the N=3 consensus produces a 1-1-1
// split (genuinely borderline). Different architecture from the
// primary reviewer (gpt-oss) so the tie-break isn't correlated with
// the original disagreement. qwen3-coder:480b is a newer coding
// specialist at 480B params, well-suited to PR-diff claim verification
// and distinct in training lineage from gpt-oss.
const TIEBREAKER_MODEL = process.env.LH_AUDITOR_TIEBREAKER_MODEL ?? "qwen3-coder:480b";
const N_CONSENSUS = Number(process.env.LH_AUDITOR_CONSENSUS_N ?? 3);
const AUDIT_DISCREPANCIES_JSONL = "/home/profit/lakehouse/data/_kb/audit_discrepancies.jsonl";
// 40KB comfortably fits gpt-oss:120b's context. PR #1 (~39KB) was
// previously truncated at 15KB causing the reviewer to miss later
// files (gitea.ts, policy.ts) and flag "no Gitea client present" as a
// block finding when the file was simply outside the truncation window.
//
// Above this threshold we curate via tree-split rather than truncate,
// following the scrum_master pattern: shard the diff, summarize each
// shard against the claim-verification task, merge into a compact
// scratchpad, then ask the cloud to verify claims against the
// scratchpad. This gives the cloud full-PR fidelity without bursting
// its context window (observed failure mode: empty response or
// unparseable output when prompt exceeds model's comfortable range).
const MAX_DIFF_CHARS = 40000;
// Tree-split kicks in above this. 30KB is below MAX_DIFF_CHARS so we
// curate BEFORE truncation would happen — never lose signal to a hard
// cut. Shard size is chosen so ~10 shards cover PR #8-size diffs in a
// reasonable round-trip budget.
const CURATION_THRESHOLD = 30000;
const DIFF_SHARD_SIZE = 4500;
const CALL_TIMEOUT_MS = 120_000;
const REPO_ROOT = "/home/profit/lakehouse";
export async function runInferenceCheck(claims: Claim[], diff: string): Promise<Finding[]> {
export interface InferenceContext {
pr_number: number;
head_sha: string;
}
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
export async function runInferenceCheck(
claims: Claim[],
diff: string,
ctx?: InferenceContext,
): Promise<Finding[]> {
if (claims.length === 0) {
return [{
check: "inference",
@ -51,9 +86,26 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise<
}];
}
const truncated = diff.length > MAX_DIFF_CHARS
? diff.slice(0, MAX_DIFF_CHARS) + `\n...[${diff.length - MAX_DIFF_CHARS} more chars truncated]`
: diff;
// Diff source for the cloud prompt — either the raw diff (small
// enough to fit), or a tree-split scratchpad (curation layer). We
// prefer curation to truncation: truncation silently drops files
// past the window; curation summarizes them so the cloud still sees
// what changed, just densified.
let diffForPrompt: string;
let curationNote = "";
if (diff.length > CURATION_THRESHOLD) {
const ts = await treeSplitDiff(diff, verifiable);
diffForPrompt = ts.scratchpad;
curationNote = ` (curated: ${diff.length} chars → ${ts.shards} shards → scratchpad ${ts.scratchpad.length} chars)`;
} else {
diffForPrompt = diff;
}
// Belt-and-suspenders truncation — even a tree-split scratchpad
// shouldn't exceed MAX_DIFF_CHARS in practice, but guard anyway so
// pathological inputs can't burst the prompt.
const truncated = diffForPrompt.length > MAX_DIFF_CHARS
? diffForPrompt.slice(0, MAX_DIFF_CHARS) + `\n...[${diffForPrompt.length - MAX_DIFF_CHARS} more chars truncated]`
: diffForPrompt;
// Build the reviewer prompt in the same shape as run_codereview's
// review stage (llm_team_ui.py:10950), adapted for claim verification:
@ -61,6 +113,30 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise<
// "Code: ..."
// "Review: bugs/security/perf/style/edge. Provide corrected code."
// We add: claim list upfront + ask for structured JSON verdict.
//
// When the diff was curated (tree-split scratchpad), we add an
// explicit anti-false-positive instruction: the scratchpad is a
// distillation, not the full source, so absence-from-scratchpad is
// NOT evidence of absence-from-diff. Mirrors the fix we made in
// scrum_master's review prompt for the same class of error.
const isCurated = curationNote.length > 0;
const curationGuard = isCurated
? [
"",
"CRITICAL: the 'Diff' below is a curated multi-shard scratchpad,",
"NOT the full raw diff. The scratchpad distills each shard down",
"to facts useful for claim verification and drops the rest.",
"DO NOT flag a function/field/feature as 'missing' or 'not",
"implemented' based solely on its absence from the scratchpad —",
"absence in a distillation is NOT evidence of absence in the",
"actual diff. Only judge a claim NOT BACKED when the scratchpad",
"DIRECTLY contradicts it (e.g. scratchpad shows the function was",
"added empty, or shows the claimed code path is a stub).",
"Skip the unflagged_gaps section entirely when operating on a",
"curated scratchpad — you can't reliably detect gaps from a",
"distillation, and false positives there are worse than misses.",
].join("\n")
: "";
const systemMsg = [
"You review pull-request diffs against the author's own ship-claims.",
"For each claim, decide: is it backed by actual code in the diff, or is",
@ -74,6 +150,7 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise<
" - the claim claims integration but the integration point is a stub",
" - the diff contains unimplemented!() / todo!() / TODO comments",
" - the claim says 'works end-to-end' but the diff has no end-to-end test",
curationGuard,
"",
"Respond with strict JSON only. No prose before or after. Shape:",
"{",
@ -100,94 +177,131 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise<
`Strict JSON only, matching the shape described. No prose outside JSON.`,
].join("\n");
let resp: Response;
try {
resp = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "ollama_cloud",
model: MODEL,
messages: [
{ role: "system", content: systemMsg },
{ role: "user", content: userMsg },
],
// Deterministic classification — temp=0 is greedy-sample, so
// identical input yields identical output on the same model
// version. This kills the signature creep we observed in the
// 9-run empirical test (sig_count 16→27 from cloud phrasing
// variance at temp=0.2).
//
// IMPORTANT: keep think=true. gpt-oss:120b is a reasoning
// model; setting think=false caused it to return empty content
// on large prompts (observed during Level 1 validation: 13421
// tokens used, empty content returned). The reasoning trace is
// variable prose, but at temp=0 the FINAL classification is
// still deterministic because greedy sampling converges to
// the same conclusion from the same starting state.
max_tokens: 3000,
temperature: 0,
think: true,
}),
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
});
} catch (e) {
// Cloud unreachable → soft-fail. Don't block a PR because the
// reviewer model is down. Static + dynamic + kb still run.
// N=3 consensus — run the primary reviewer in parallel, collect
// all three parsed responses, majority-vote per claim. Parallel
// (Promise.all) because each call is ~20-30s and they're independent;
// wall-clock stays ~same as single call, cost 3x tokens. Empirical
// justification: in 3-run determinism tests, 7/8 findings were
// stable but 1 flipped across runs — majority vote stabilizes the
// flipping class without losing the stable signal.
const primaryRuns = await Promise.all(
Array.from({ length: N_CONSENSUS }, () =>
runCloudInference(systemMsg, userMsg, MODEL)),
);
const parsedRuns = primaryRuns.filter(r => r.parsed !== null);
if (parsedRuns.length === 0) {
// All N calls failed. Surface the first-run diagnostic so the
// operator sees *why* (unreachable / non-200 / unparseable).
const first = primaryRuns[0];
return [{
check: "inference",
severity: "info",
summary: "cloud inference unreachable — skipped",
evidence: [`fetch failed: ${(e as Error).message.slice(0, 180)}`],
}];
}
if (!resp.ok) {
return [{
check: "inference",
severity: "info",
summary: `cloud inference returned ${resp.status} — skipped`,
evidence: [`body: ${(await resp.text()).slice(0, 200)}`],
}];
}
const body: any = await resp.json();
const content: string = body?.choices?.[0]?.message?.content ?? "";
const usage = body?.usage ?? {};
const parsed = extractJson(content);
if (!parsed) {
return [{
check: "inference",
severity: "info",
summary: "cloud returned unparseable output — skipped",
summary: `cloud inference all ${N_CONSENSUS} consensus runs failed — ${first.error ?? "unknown"}`,
evidence: [
`head: ${content.slice(0, 200)}`,
`tokens: ${usage.total_tokens ?? "?"}`,
`first-run diagnostic: ${first.diagnostic ?? "(none)"}`,
`successful runs: 0 / ${N_CONSENSUS}`,
],
}];
}
// Aggregate votes per claim_idx.
interface Votes { trues: number; falses: number; evidences: string[] }
const votesByClaim = new Map<number, Votes>();
const unflaggedByRun: any[][] = [];
let totalTokens = 0;
for (const run of parsedRuns) {
totalTokens += run.tokens;
unflaggedByRun.push(Array.isArray(run.parsed?.unflagged_gaps) ? run.parsed.unflagged_gaps : []);
for (const v of run.parsed?.claim_verdicts ?? []) {
const idx = Number(v?.claim_idx);
if (!Number.isFinite(idx)) continue;
const rec = votesByClaim.get(idx) ?? { trues: 0, falses: 0, evidences: [] };
if (v.backed === false) {
rec.falses++;
rec.evidences.push(String(v.evidence ?? ""));
} else if (v.backed === true) {
rec.trues++;
}
votesByClaim.set(idx, rec);
}
}
const findings: Finding[] = [];
// One summary info finding so the verdict layer knows the check ran.
// Summary finding so the verdict layer knows the check ran.
findings.push({
check: "inference",
severity: "info",
summary: `cloud review completed (model=${MODEL}, tokens=${usage.total_tokens ?? "?"})`,
summary: `cloud review completed (model=${MODEL}, consensus=${parsedRuns.length}/${N_CONSENSUS}, tokens=${totalTokens})${curationNote}`,
evidence: [
`claim_verdicts: ${parsed.claim_verdicts?.length ?? 0}, unflagged_gaps: ${parsed.unflagged_gaps?.length ?? 0}`,
`claims voted: ${votesByClaim.size}`,
`parsed runs: ${parsedRuns.length} / ${N_CONSENSUS}`,
],
});
for (const v of parsed.claim_verdicts ?? []) {
if (v?.backed === false) {
const idx = typeof v.claim_idx === "number" ? v.claim_idx : -1;
// Indices point at the verifiable[] list we sent the cloud,
// not the full claims[] list. Translate back.
const claim = verifiable[idx];
if (!claim) continue;
// Strong+unbacked = BLOCK. That's the whole point of the auditor.
// Per-claim majority vote; tie-break if no majority.
const discrepancies: Array<{
claim_idx: number;
claim_text: string;
votes: { trues: number; falses: number };
resolution: "majority_backed" | "majority_not_backed" | "tiebreaker_backed" | "tiebreaker_not_backed" | "unresolved";
tiebreaker_model?: string;
}> = [];
for (const [idx, votes] of votesByClaim) {
const claim = verifiable[idx];
if (!claim) continue;
const totalVotes = votes.trues + votes.falses;
let notBacked: boolean | null = null;
let resolution: typeof discrepancies[number]["resolution"] = "majority_backed";
let evidenceText = "";
let tbModel: string | undefined;
if (votes.falses > votes.trues) {
notBacked = true;
resolution = "majority_not_backed";
evidenceText = votes.evidences[0] ?? "(no reason given)";
} else if (votes.trues > votes.falses) {
notBacked = false;
resolution = "majority_backed";
} else {
// Tie. Run tie-breaker with a different-architecture model.
const tb = await runCloudInference(systemMsg, userMsg, TIEBREAKER_MODEL);
if (tb.parsed) {
const tv = (tb.parsed.claim_verdicts ?? []).find((v: any) => Number(v?.claim_idx) === idx);
if (tv?.backed === false) {
notBacked = true;
resolution = "tiebreaker_not_backed";
evidenceText = `(tie-breaker ${TIEBREAKER_MODEL}) ${String(tv.evidence ?? "")}`;
tbModel = TIEBREAKER_MODEL;
} else if (tv?.backed === true) {
notBacked = false;
resolution = "tiebreaker_backed";
tbModel = TIEBREAKER_MODEL;
} else {
resolution = "unresolved";
}
} else {
resolution = "unresolved";
}
}
// Log every case where the N runs disagreed — discrepancies are
// signal, not noise. Separate from audit_lessons.jsonl because
// they're about the *auditor's* quality, not the PR's quality.
const disagreed = totalVotes >= 2 && votes.trues > 0 && votes.falses > 0;
if (disagreed || resolution.startsWith("tiebreaker") || resolution === "unresolved") {
discrepancies.push({
claim_idx: idx,
claim_text: claim.text,
votes: { trues: votes.trues, falses: votes.falses },
resolution,
tiebreaker_model: tbModel,
});
}
if (notBacked === true) {
const sev: Finding["severity"] = claim.strength === "strong" ? "block"
: claim.strength === "moderate" ? "warn"
: "info";
@ -198,13 +312,45 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise<
summary: `cloud: claim not backed — "${claim.text.slice(0, 100)}"`,
evidence: [
`at ${claim.location}`,
`cloud reason: ${String(v.evidence ?? "no reason given").slice(0, 200)}`,
`consensus: ${votes.falses}/${totalVotes} not-backed (resolution: ${resolution})`,
`cloud reason: ${evidenceText.slice(0, 200)}`,
],
});
}
}
for (const g of parsed.unflagged_gaps ?? []) {
// Persist discrepancies so we can measure consensus drift over time.
if (discrepancies.length > 0 && ctx) {
persistDiscrepancies(ctx, discrepancies).catch(e =>
console.error(`[inference] discrepancy log failed: ${(e as Error).message}`));
}
// Use first run's parsed for downstream unflagged_gaps processing.
const parsed = parsedRuns[0].parsed;
// Route the curated scratchpad through llm_team's extract-facts
// pipeline when we have (a) a curated scratchpad (best signal about
// what the PR actually changed) and (b) PR context to scope facts.
// AWAITED (not fire-and-forget) so CLI callers like audit_one.ts
// don't exit before extraction lands; the systemd poller has plenty
// of headroom (90s cycle vs ~15s extraction). A failure inside
// extractAndPersistFacts is caught + logged but never throws.
if (isCurated && ctx && process.env.LH_AUDITOR_SKIP_EXTRACT !== "1") {
try {
await extractAndPersistFacts(diffForPrompt, ctx);
} catch (e) {
console.error(`[inference] fact extraction failed: ${(e as Error).message}`);
}
}
// Belt-and-suspenders: when operating on a curated scratchpad, drop
// the unflagged_gaps section entirely. The distillation can't
// reliably ground gap-detection, and false positives are worse than
// misses for this signal class. The systemMsg already asks the
// cloud to skip this section when curated — but the model may still
// emit it, so we filter here too.
const gapsToEmit = isCurated ? [] : (parsed.unflagged_gaps ?? []);
for (const g of gapsToEmit) {
const summary = String(g?.summary ?? "?");
const location = String(g?.location ?? "?");
// False-positive guard — when the cloud says "X not defined in this
@ -248,6 +394,191 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise<
return findings;
}
// Single cloud call — the consensus loop calls this N times in
// parallel. Returns the parsed JSON shape + token usage + any error
// diagnostic. NEVER throws; the consensus aggregator handles partial
// failures by dropping non-parsed runs from the vote.
interface CloudRunResult {
parsed: any | null;
tokens: number;
error?: string; // "unreachable" | "non_200" | "unparseable"
diagnostic?: string; // first 200 chars for debugging
model: string;
}
async function runCloudInference(systemMsg: string, userMsg: string, model: string): Promise<CloudRunResult> {
let resp: Response;
try {
resp = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "ollama_cloud",
model,
messages: [
{ role: "system", content: systemMsg },
{ role: "user", content: userMsg },
],
// temp=0 (greedy) + think=true. think=true is required for
// gpt-oss:120b — without it the model returns empty content
// on large prompts. Variance from the think trace is observed
// in practice, which is why we use N=3 consensus, not single-
// call determinism.
max_tokens: 3000,
temperature: 0,
think: true,
}),
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
});
} catch (e) {
return { parsed: null, tokens: 0, error: "unreachable", diagnostic: (e as Error).message.slice(0, 200), model };
}
if (!resp.ok) {
return { parsed: null, tokens: 0, error: "non_200", diagnostic: `${resp.status}: ${(await resp.text()).slice(0, 160)}`, model };
}
let body: any;
try { body = await resp.json(); }
catch (e) { return { parsed: null, tokens: 0, error: "unparseable", diagnostic: (e as Error).message, model }; }
const content: string = body?.choices?.[0]?.message?.content ?? "";
const tokens: number = body?.usage?.total_tokens ?? 0;
const parsed = extractJson(content);
if (!parsed) {
return { parsed: null, tokens, error: "unparseable", diagnostic: content.slice(0, 200), model };
}
return { parsed, tokens, model };
}
async function persistDiscrepancies(ctx: InferenceContext, discrepancies: any[]): Promise<void> {
await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true });
const rows = discrepancies.map(d => JSON.stringify({
pr_number: ctx.pr_number,
head_sha: ctx.head_sha,
logged_at: new Date().toISOString(),
...d,
}));
await appendFile(AUDIT_DISCREPANCIES_JSONL, rows.join("\n") + "\n");
}
// Extract structured knowledge from the curated scratchpad and append
// to data/_kb/audit_facts.jsonl — one row per extract run, keyed by
// PR number + head SHA for scope tracking. kb_query tails this next
// audit to surface recurring entities/relationships across PRs.
async function extractAndPersistFacts(scratchpad: string, ctx: InferenceContext): Promise<void> {
const ex = await extractFacts(scratchpad);
if (ex.error && ex.entities.length === 0 && ex.facts.length === 0) {
// Full failure — log but don't write an empty row.
console.error(`[inference] extractFacts skipped row: ${ex.error}`);
return;
}
const row = {
pr_number: ctx.pr_number,
head_sha: ctx.head_sha,
extracted_at: ex.extracted_at,
extractor: ex.extractor_model,
verifier: ex.verifier_model,
llm_team_run_id: ex.llm_team_run_id ?? null,
facts: ex.facts,
entities: ex.entities,
relationships: ex.relationships,
verification_preview: ex.verification.slice(0, 400),
verifier_verdicts: ex.verifier_verdicts,
facts_dropped_by_verifier: ex.facts_dropped_by_verifier ?? 0,
schema_version: 2,
source: "audit_inference",
};
await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true });
await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(row) + "\n");
}
// Curation via tree-split — ports the scrum_master pattern into the
// inference check. Shards the raw diff into DIFF_SHARD_SIZE chunks,
// summarizes each shard *against the claim-verification task* so the
// summary preserves exactly what the cloud needs to judge claims
// (function signatures, struct fields, deletions, new files), drops
// everything else. Merges into a compact scratchpad.
//
// Cost: N cloud calls for the shard summaries + 1 cloud call for the
// final verification = N+1 calls instead of 1. Mitigation: shards run
// serially (not parallel) to keep gateway load bounded; summary calls
// use max_tokens=400 so they're fast (~2s each on gpt-oss:120b).
//
// Determinism: each shard summary call uses temp=0 + think=true (same
// as the top-level inference call), so identical input yields
// identical scratchpad. The final verification call then sees a
// stable scratchpad, giving stable verdicts.
async function treeSplitDiff(
fullDiff: string,
claims: Claim[],
): Promise<{ scratchpad: string; shards: number }> {
const shards: Array<{ from: number; to: number; text: string }> = [];
for (let i = 0; i < fullDiff.length; i += DIFF_SHARD_SIZE) {
const end = Math.min(i + DIFF_SHARD_SIZE, fullDiff.length);
shards.push({ from: i, to: end, text: fullDiff.slice(i, end) });
}
// Curate the claim list into a short form the summary prompt can
// use to bias extraction toward relevant facts.
const claimDigest = claims.map((c, i) =>
`${i}. [${c.strength}] "${c.text.slice(0, 100)}"`
).join("\n");
let scratchpad = "";
for (const [si, shard] of shards.entries()) {
const prompt = [
`You are summarizing shard ${si + 1}/${shards.length} (chars ${shard.from}..${shard.to}) of a PR diff.`,
`The downstream task will verify these ship-claims against the full-PR summary. Extract ONLY facts that could confirm or refute these claims:`,
"",
claimDigest,
"",
"Extract: new function/method signatures, struct fields, deletions, new files, wiring (function X calls Y), absence-of-implementation markers, TODO comments on added lines.",
"Skip: comment-only edits, whitespace, import reordering, unrelated cosmetic changes.",
"",
"─────── shard diff ───────",
shard.text,
"─────── end shard ───────",
"",
"Output: up to 180 words of facts in bullet form. No prose preamble, no claim verdicts (that's for the downstream step).",
].join("\n");
const r = await callCloud(prompt, 400);
if (r.content) {
scratchpad += `\n--- shard ${si + 1} (chars ${shard.from}..${shard.to}) ---\n${r.content.trim()}\n`;
}
}
return { scratchpad: scratchpad.trim(), shards: shards.length };
}
// Minimal cloud caller used only by treeSplitDiff — same gateway +
// model as the top-level call, but think=false. Shards are small
// (≤DIFF_SHARD_SIZE ~4500 chars) and the task is pure fact
// extraction, not reasoning. think=true on the shards introduced
// variance in reasoning traces that compounded across 23 calls into
// a non-deterministic scratchpad (observed during curation
// validation: same-SHA runs produced 5/7/8 final findings).
// think=false on small prompts is stable — only breaks at the main
// call's 10K+ prompt size, which keeps think=true.
async function callCloud(prompt: string, maxTokens: number): Promise<{ content: string }> {
try {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "ollama_cloud",
model: MODEL,
messages: [{ role: "user", content: prompt }],
max_tokens: maxTokens,
temperature: 0,
think: false,
}),
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
});
if (!r.ok) return { content: "" };
const j: any = await r.json();
return { content: j?.choices?.[0]?.message?.content ?? "" };
} catch {
return { content: "" };
}
}
// Pull out plausible code-symbol names from a summary string.
// Matches:
// - identifier with backticks: `foo_bar`

View File

@ -25,6 +25,7 @@ const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl";
const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles";
const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl";
const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl";
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
const TAIL_LINES = 500;
const MAX_BOT_CYCLE_FILES = 30;
@ -61,6 +62,14 @@ export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promi
findings.push(...scrumFindings);
}
// 6b. Audit-facts (llm_team extract pipeline output) — surface
// entities that recur across multiple PRs. These are the
// "core system entities" accumulating in the knowledge base;
// showing them as info on future audits gives reviewers
// architectural context the raw diff doesn't convey.
const factFindings = await checkAuditFacts();
findings.push(...factFindings);
// 6. Audit-lessons feedback loop — summarize the top recurring
// patterns from prior audits' block/warn findings. If the same
// pattern signature has fired 3+ times across prior audits,
@ -207,6 +216,99 @@ function observerBySource(ops: any[]): string {
return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty";
}
// Audit-facts — reads data/_kb/audit_facts.jsonl (populated by every
// curated inference run via llm_team's extract pipeline). Each row
// has arrays: facts, entities, relationships. We explode entities and
// aggregate them across PRs using kb_index. An entity seen in 3+ PRs
// is a "core system entity" — we surface the top N as info context.
//
// Filters out short names (<3 chars, likely qwen2.5 truncation
// artifacts) and generic types ("string", "number") that would
// otherwise dominate the ranking.
const ENTITY_NAME_MIN_LEN = 3;
const GENERIC_ENTITY_NAMES = new Set([
"string", "number", "boolean", "any", "void", "unknown", "never",
"object", "array", "function", "const", "let", "var", "true", "false",
"null", "undefined", "promise", "map", "set", "record",
]);
async function checkAuditFacts(): Promise<Finding[]> {
// Read raw rows — each row has multiple entities, so we can't just
// use aggregate() directly (it's one-signature-per-row). Explode
// entities into (row, entity) pairs, then aggregate by entity name.
let raw: string;
try { raw = await (await import("node:fs/promises")).readFile(AUDIT_FACTS_JSONL, "utf8"); }
catch { return []; }
const lines = raw.split("\n").filter(l => l.length > 0);
if (lines.length === 0) return [];
interface EntityRow { entity_key: string; pr_number: number; type: string; name: string; description: string }
const entityRows: EntityRow[] = [];
for (const line of lines.slice(-TAIL_LINES * 2)) {
let row: any;
try { row = JSON.parse(line); } catch { continue; }
const prNum = Number(row?.pr_number);
if (!Number.isFinite(prNum)) continue;
for (const e of Array.isArray(row?.entities) ? row.entities : []) {
const name = String(e?.name ?? "").trim();
if (name.length < ENTITY_NAME_MIN_LEN) continue;
if (GENERIC_ENTITY_NAMES.has(name.toLowerCase())) continue;
entityRows.push({
entity_key: name.toLowerCase(),
pr_number: prNum,
type: String(e?.type ?? "?"),
name,
description: String(e?.description ?? "").slice(0, 160),
});
}
}
if (entityRows.length === 0) return [];
// Aggregate manually — one key per entity name, distinct_scopes by PR.
type Agg = { count: number; scopes: Set<number>; types: Set<string>; last_name: string; last_desc: string };
const byEntity = new Map<string, Agg>();
for (const r of entityRows) {
const a = byEntity.get(r.entity_key) ?? {
count: 0, scopes: new Set<number>(), types: new Set<string>(), last_name: "", last_desc: "",
};
a.count += 1;
a.scopes.add(r.pr_number);
a.types.add(r.type);
a.last_name = r.name;
a.last_desc = r.description;
byEntity.set(r.entity_key, a);
}
// Rank: require 2+ distinct PRs (same-PR entity-repeats don't count
// as "cross-cutting"). Take the top 5 to avoid flooding the verdict.
const ranked = Array.from(byEntity.entries())
.filter(([_, a]) => a.scopes.size >= 2)
.sort((a, b) => b[1].scopes.size - a[1].scopes.size || b[1].count - a[1].count)
.slice(0, 5);
if (ranked.length === 0) {
// Useful to know the KB is being populated — emit a single
// summary so operators see fact extraction is alive.
return [{
check: "kb_query",
severity: "info",
summary: `audit_facts KB has ${entityRows.length} entity-observations across ${new Set(entityRows.map(r => r.pr_number)).size} PRs (no cross-PR recurrences yet)`,
evidence: [`source: ${AUDIT_FACTS_JSONL}`],
}];
}
return ranked.map(([_, a]) => ({
check: "kb_query" as const,
severity: "info" as const,
summary: `core entity \`${a.last_name}\` recurs in ${a.scopes.size} PRs (types: ${Array.from(a.types).join(",")})`,
evidence: [
`count=${a.count} distinct_PRs=${a.scopes.size}`,
`description: ${a.last_desc.slice(0, 200)}`,
`PRs: ${Array.from(a.scopes).sort((x, y) => x - y).join(",")}`,
],
}));
}
// Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by
// every audit's appendAuditLessons). Uses the shared kb_index
// aggregator: groups by `signature`, distinct-scopes keyed by PR

View File

@ -51,11 +51,20 @@ const WEAK_PATTERNS: RegExp[] = [
// Empirical claims: runtime measurements / observed outcomes that can't
// be verified from a diff (only from the actual run that produced
// them). Example: "6/6 iterations complete, 58 cloud calls, 306s
// end-to-end" — true, but only the test's own summary.json can
// confirm it. Classifying as empirical lets the inference check skip
// them). Classifying as empirical lets the inference check skip
// diff-verification and saves the ladder for falsifiable claims.
//
// Two classes share this bucket because they share the skip discipline:
//
// 1. Runtime metrics — "58 cloud calls", "306s end-to-end"
// 2. History/proof refs — "verified on PR #8", "was flipping across runs"
//
// Both are assertions about state outside the current diff. The cloud
// would flag them as "not backed" — but that's a false positive: the
// proof lives in the referenced run, prior commit, or test output, not
// in the added lines the cloud is reading.
const EMPIRICAL_PATTERNS: RegExp[] = [
// ─── Runtime metrics ───
// Iteration / attempt counts: "6/6 iterations", "attempt 5", "accepted on attempt 3"
/\b\d+\s*\/\s*\d+\s+(iterations?|attempts?|cycles?|runs?|shards?)\b/i,
/\b(accepted|resolved|converged)\s+on\s+attempt\s+\d+\b/i,
@ -66,6 +75,30 @@ const EMPIRICAL_PATTERNS: RegExp[] = [
// "escalated through N tiers", "N distinct models"
/\bescalated\s+through\s+\d+\b/i,
/\b\d+\s+distinct\s+(model|tier)s?\b/i,
// ─── History / proof references ───
// "verified on PR #8", "verified end-to-end on PR 8", "tested against PR #4"
// Require PR#N / commit-hash / "prior <word>" to avoid matching
// "verified ... in production" (PR without \b-ish anchor previously
// consumed "pr" of "production").
/\bverified\s+(?:end[- ]to[- ]end\s+)?(?:on|against|in)\s+(?:PR\s*#?\d+|commit\s+[0-9a-f]{6,}|prior\s+\w+|the\s+\w+\s+audit)\b/i,
/\btested\s+(?:against|in|on)\s+(?:PR\s*#?\d+|commit\s+[0-9a-f]{6,}|prior\s+\w+)\b/i,
// Direct PR/commit references: "PR #8", "on PR 9", "from commit abc123"
/\b(?:on|from|in|via|per)\s+PR\s*#?\d+\b/i,
/\b(?:from|in|per|against)\s+commit\s+[0-9a-f]{6,}/i,
// Observational descriptions of prior behavior: "was flipping", "was X before", "previously observed"
/\b(?:was|were)\s+(?:flipping|drifting|inconsistent|non[- ]deterministic|creeping)\b/i,
/\bpreviously\s+(?:observed|flagged|reported|seen|landed)\b/i,
/\bused\s+to\s+(?:flip|fail|flag|reject|block)\b/i,
/\bobserved\s+(?:in|during|on|across)\s+(?:PR|prior|\d+\s+(?:runs?|audits?))/i,
// "flipping/drifting across N runs" — historical variance description
/\b(?:flipping|drifting|varying|oscillating)\s+across\s+(?:\d+\s+)?(?:runs?|audits?|iterations?)\b/i,
// "the proven X" referring to prior work (proven is a STRONG pattern
// but in context "the proven FOO" is usually a historical reference,
// not a fresh claim). We catch it here so the empirical skip wins.
/\bthe\s+proven\s+(?:escalation\s+ladder|pipeline|flow|loop|tier|path)/i,
// "from the 9-run test", "across the 5-run validation"
/\b(?:from|across|in|during)\s+the\s+\d+[- ]run\s+(?:test|validation|probe|experiment)/i,
];
export interface ParsedClaims {
@ -101,7 +134,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out
// classify it as empirical so the inference check doesn't ask
// the cloud to prove "58 cloud calls" from the diff. Order:
// empirical → strong → moderate → weak.
const empirical = firstMatch(line, EMPIRICAL_PATTERNS);
const empirical = firstUnquotedMatch(line, EMPIRICAL_PATTERNS);
if (empirical) {
out.push({
text: line.trim().slice(0, 200),
@ -111,7 +144,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out
});
continue;
}
const strong = firstMatch(line, STRONG_PATTERNS);
const strong = firstUnquotedMatch(line, STRONG_PATTERNS);
if (strong) {
out.push({
text: line.trim().slice(0, 200),
@ -121,7 +154,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out
});
continue;
}
const moderate = firstMatch(line, MODERATE_PATTERNS);
const moderate = firstUnquotedMatch(line, MODERATE_PATTERNS);
if (moderate) {
out.push({
text: line.trim().slice(0, 200),
@ -131,7 +164,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out
});
continue;
}
const weak = firstMatch(line, WEAK_PATTERNS);
const weak = firstUnquotedMatch(line, WEAK_PATTERNS);
if (weak) {
out.push({
text: line.trim().slice(0, 200),
@ -143,9 +176,35 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out
}
}
function firstMatch(text: string, patterns: RegExp[]): RegExp | null {
// Match a pattern only when its match position is NOT inside a quoted
// string on the line. Mirrors the same guard in auditor/checks/static.ts
// — the two files have the same false-positive class: PR authors
// quote pattern examples in commit message bodies (e.g. `"Phase 45
// shipped"` as a test example) and without this guard those quoted
// references get flagged as fresh ship-claims. Only skips when the
// match itself falls inside quotes; real (unquoted) uses of the same
// vocabulary still classify correctly.
function firstUnquotedMatch(text: string, patterns: RegExp[]): RegExp | null {
for (const p of patterns) {
if (p.test(text)) return p;
const m = text.match(p);
if (!m || typeof m.index !== "number") continue;
if (isInsideQuotedString(text, m.index)) continue;
return p;
}
return null;
}
// Walks left→right toggling in-quote state on each unescaped quote.
// Good enough for single-line claims; multi-line strings aren't parsed.
function isInsideQuotedString(line: string, pos: number): boolean {
let inDouble = false, inSingle = false, inBacktick = false;
for (let i = 0; i < pos; i++) {
const c = line[i];
const esc = i > 0 && line[i - 1] === "\\";
if (esc) continue;
if (c === '"' && !inSingle && !inBacktick) inDouble = !inDouble;
else if (c === "'" && !inDouble && !inBacktick) inSingle = !inSingle;
else if (c === "`" && !inDouble && !inSingle) inBacktick = !inBacktick;
}
return inDouble || inSingle || inBacktick;
}

271
auditor/fact_extractor.ts Normal file
View File

@ -0,0 +1,271 @@
// fact_extractor — routes curated TEXT through llm_team_ui's
// "knowledge extract facts" mode (mode=extract at /api/run).
//
// What it gives us: structured {facts, entities, relationships} from
// whatever curated blob we send. Auditor sends the tree-split
// inference scratchpad (the best distillation of what a PR changed).
// Scrum_master will later send its accepted review bodies.
//
// Why route through llm_team and not just extract directly from our
// own checks: llm_team's extract uses a local EXTRACTOR model
// (qwen2.5) + a separate VERIFIER (gemma2). This cross-check is the
// discipline J wants for knowledge going into the playbook — facts
// go in only after a second model has rated them CORRECT /
// UNVERIFIABLE. Fast (local models, ~10-20s), free, and matches the
// codereview pattern J already trusts.
//
// SSE parsing: llm_team streams SSE events. We're only interested in
// the final "response" event with role="final" + the extraction
// response (role="extraction N"). Parse the JSON from the extractor's
// response text.
const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000";
const EXTRACTOR = process.env.LH_FACT_EXTRACTOR ?? "qwen2.5:latest";
const VERIFIER = process.env.LH_FACT_VERIFIER ?? "gemma2:latest";
const EXTRACT_TIMEOUT_MS = 120_000;
const PROJECT_CONTEXT_FILE = process.env.LH_AUDITOR_CONTEXT_FILE
?? "/home/profit/lakehouse/docs/AUDITOR_CONTEXT.md";
let cachedContext: string | null = null;
async function loadProjectContext(): Promise<string> {
if (cachedContext !== null) return cachedContext;
try {
const { readFile } = await import("node:fs/promises");
const raw = await readFile(PROJECT_CONTEXT_FILE, "utf8");
// Cap at 4KB — anything past that is more noise than signal for
// the extractor/verifier's attention budget.
cachedContext = raw.slice(0, 4000);
} catch {
cachedContext = ""; // context file missing → extractor runs without preamble
}
return cachedContext;
}
export interface Entity {
name: string;
type: string;
description?: string;
}
export interface Relationship {
from: string;
to: string;
type: string;
}
export interface ExtractedFacts {
facts: string[];
entities: Entity[];
relationships: Relationship[];
verification: string;
extractor_model: string;
verifier_model: string;
source_preview: string;
// Populated when the extract run completed server-side (llm_team
// persists to its own team_runs; this is for our own cross-ref).
llm_team_run_id?: number;
extracted_at: string;
// Per-fact verdicts from the verifier pass (CORRECT/INCORRECT/
// UNVERIFIABLE/UNCHECKED). Aligned 1:1 with the *raw* fact list
// pre-drop so operators can see which verdicts mapped to dropped
// facts if needed.
verifier_verdicts?: Array<"CORRECT" | "INCORRECT" | "UNVERIFIABLE" | "UNCHECKED">;
facts_dropped_by_verifier?: number;
error?: string;
}
/**
* Run the llm_team extract pipeline on `source` text. Returns
* structured {facts, entities, relationships}.
*
* Returns an object with `error` set if the pipeline failed never
* throws, because fact extraction is best-effort enrichment (the
* primary audit must not break if llm_team is down).
*/
export async function extractFacts(source: string): Promise<ExtractedFacts> {
const base: ExtractedFacts = {
facts: [],
entities: [],
relationships: [],
verification: "",
extractor_model: EXTRACTOR,
verifier_model: VERIFIER,
source_preview: source.slice(0, 240),
extracted_at: new Date().toISOString(),
};
// Prepend project context to the source so the extractor + verifier
// know what codebase/framework these facts belong to. Without this,
// the verifier marks most domain-specific facts as UNVERIFIABLE ("I
// don't know what Lakehouse is"). With it, the verifier can CORRECT-
// stamp facts that align with the stated architecture.
const context = await loadProjectContext();
const prompt = context.length > 0
? `=== PROJECT CONTEXT (for grounding facts; do NOT extract facts from this section) ===\n${context}\n\n=== CONTENT TO EXTRACT FACTS FROM ===\n${source}`
: source;
let resp: Response;
try {
resp = await fetch(`${LLM_TEAM}/api/run`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
mode: "extract",
prompt,
extractor: EXTRACTOR,
verifier: VERIFIER,
source: "prompt",
skip_cache: true, // cache by prompt would dedup identical
// scratchpads, but we want fresh extraction
// for per-audit facts; cheap since local.
}),
signal: AbortSignal.timeout(EXTRACT_TIMEOUT_MS),
});
} catch (e) {
return { ...base, error: `fetch failed: ${(e as Error).message}` };
}
if (!resp.ok) {
const body = await resp.text().catch(() => "");
return { ...base, error: `llm_team /api/run ${resp.status}: ${body.slice(0, 200)}` };
}
// Stream SSE lines; collect the one extraction response + the run_saved event
// so we can capture the team-runs ID for cross-ref.
const decoder = new TextDecoder();
const reader = resp.body?.getReader();
if (!reader) return { ...base, error: "no response body" };
let buffer = "";
let extractionText = "";
let verifierText = "";
let runId: number | undefined = undefined;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let nl: number;
while ((nl = buffer.indexOf("\n\n")) >= 0) {
const chunk = buffer.slice(0, nl);
buffer = buffer.slice(nl + 2);
const dataLine = chunk.split("\n").find(l => l.startsWith("data: "));
if (!dataLine) continue;
try {
const ev = JSON.parse(dataLine.slice(6));
if (ev.type === "response") {
const role = String(ev.role ?? "");
if (role.startsWith("extraction")) extractionText = String(ev.text ?? "");
else if (role === "verifier") verifierText = String(ev.text ?? "");
} else if (ev.type === "run_saved") {
const id = Number(ev.run_id);
if (Number.isFinite(id)) runId = id;
}
} catch { /* skip malformed SSE */ }
}
}
} catch (e) {
return { ...base, error: `SSE read failed: ${(e as Error).message}` };
}
// Pull the JSON object out of extractionText (may be wrapped in ```json fences).
const parsed = extractFirstJsonObject(extractionText);
if (!parsed) {
return { ...base, error: "extractor returned no parseable JSON", verification: verifierText };
}
const rawFacts: string[] = Array.isArray(parsed.facts)
? parsed.facts.slice(0, 50).map(String)
: [];
// Parse the verifier's free-form prose into per-fact verdicts, then
// drop any fact the verifier explicitly marked INCORRECT. Leave
// UNVERIFIABLE in place: many of our extractions are domain-specific
// (Lakehouse internals) and the verifier has no prior-knowledge
// anchor, so UNVERIFIABLE is the expected verdict for new signal,
// not a quality fail. This is verifier-gated persistence: drop only
// what's affirmatively wrong, not what's novel.
const verdicts = parseVerifierVerdicts(verifierText, rawFacts.length);
const incorrectIdx = new Set<number>();
verdicts.forEach((v, i) => { if (v === "INCORRECT") incorrectIdx.add(i); });
const kept = rawFacts.filter((_, i) => !incorrectIdx.has(i));
return {
...base,
facts: kept,
entities: Array.isArray(parsed.entities)
? parsed.entities.slice(0, 30).map((e: any) => ({
name: String(e?.name ?? ""),
type: String(e?.type ?? ""),
description: typeof e?.description === "string" ? e.description.slice(0, 240) : undefined,
})).filter(e => e.name.length > 0)
: [],
relationships: Array.isArray(parsed.relationships)
? parsed.relationships.slice(0, 30).map((r: any) => ({
from: String(r?.from ?? ""),
to: String(r?.to ?? ""),
type: String(r?.type ?? ""),
})).filter(r => r.from.length > 0 && r.to.length > 0)
: [],
verification: verifierText.slice(0, 1500),
facts_dropped_by_verifier: incorrectIdx.size,
verifier_verdicts: verdicts,
llm_team_run_id: runId,
};
}
// Parse verifier's free-form output into a per-fact verdict array.
// Gemma2 uses several formats depending on prompt mood:
// Format A: **1.** claim... * **Verdict:** CORRECT
// Format B: **1.** claim... * **CORRECT** (no "Verdict:" label)
// Format C: 1. claim... CORRECT
// Strategy: split on fact numbers, then find the first
// CORRECT|INCORRECT|UNVERIFIABLE token in each section. Handles all
// three formats without regex gymnastics.
function parseVerifierVerdicts(
verifierText: string,
numFacts: number,
): Array<"CORRECT" | "INCORRECT" | "UNVERIFIABLE" | "UNCHECKED"> {
const out: Array<"CORRECT" | "INCORRECT" | "UNVERIFIABLE" | "UNCHECKED"> =
Array(numFacts).fill("UNCHECKED");
if (!verifierText) return out;
// Find each fact section start — "**N.**" or "N." at line start —
// and slice out the content up to the NEXT fact number. Each section
// gets scanned for the first CORRECT/INCORRECT/UNVERIFIABLE token.
const starts: Array<{ idx: number; pos: number }> = [];
const header = /(?:^|\n)\s*(?:\*\*)?(\d+)[.)]/g;
for (const m of verifierText.matchAll(header)) {
const factNum = Number(m[1]);
if (!Number.isFinite(factNum)) continue;
starts.push({ idx: factNum - 1, pos: m.index! });
}
for (let i = 0; i < starts.length; i++) {
const s = starts[i];
const end = i + 1 < starts.length ? starts[i + 1].pos : verifierText.length;
if (s.idx < 0 || s.idx >= numFacts) continue;
const section = verifierText.slice(s.pos, end);
const v = section.match(/\b(CORRECT|INCORRECT|UNVERIFIABLE)\b/i);
if (v) out[s.idx] = v[1].toUpperCase() as "CORRECT" | "INCORRECT" | "UNVERIFIABLE";
}
return out;
}
// Lift the first balanced JSON object out of (possibly fenced) text.
// Same discipline as inference.ts::extractJson.
function extractFirstJsonObject(text: string): any | null {
const cleaned = text.replace(/^```(?:json)?\s*/im, "").replace(/```\s*$/im, "");
let depth = 0, start = -1;
for (let i = 0; i < cleaned.length; i++) {
const c = cleaned[i];
if (c === "{") { if (depth === 0) start = i; depth++; }
else if (c === "}") {
depth--;
if (depth === 0 && start >= 0) {
try { return JSON.parse(cleaned.slice(start, i + 1)); } catch { start = -1; }
}
}
}
return null;
}

269
auditor/kb_stats.ts Normal file
View File

@ -0,0 +1,269 @@
// kb_stats — on-demand dashboard numbers from the KB scratchpad
// files. Reads data/_auditor/verdicts/*, data/_kb/audit_lessons.jsonl,
// data/_kb/audit_facts.jsonl, data/_kb/audit_discrepancies.jsonl,
// data/_kb/scrum_reviews.jsonl and prints:
//
// - verdict flip-flop rate (same SHA re-audited, verdict changed?)
// - consensus discrepancy rate (N runs disagreed on a claim)
// - confidence distribution from kb_index aggregator
// - top N recurring entities from audit_facts
// - fact growth over time
// - scrum vs inference KB split
//
// Run: bun run auditor/kb_stats.ts
// bun run auditor/kb_stats.ts --top 15 # show top 15 entities
// bun run auditor/kb_stats.ts --json # machine-readable
//
// This is the "dashboard" without running Grafana. If someone really
// wants a dashboard, wire this output into a static HTML page + cron.
import { readFile, readdir } from "node:fs/promises";
import { join } from "node:path";
import { aggregate } from "./kb_index.ts";
const REPO = "/home/profit/lakehouse";
const VERDICTS_DIR = `${REPO}/data/_auditor/verdicts`;
const AUDIT_LESSONS = `${REPO}/data/_kb/audit_lessons.jsonl`;
const AUDIT_FACTS = `${REPO}/data/_kb/audit_facts.jsonl`;
const AUDIT_DISCREPANCIES = `${REPO}/data/_kb/audit_discrepancies.jsonl`;
const SCRUM_REVIEWS = `${REPO}/data/_kb/scrum_reviews.jsonl`;
interface Args {
top: number;
json: boolean;
}
function parseArgs(argv: string[]): Args {
const a: Args = { top: 10, json: false };
for (let i = 2; i < argv.length; i++) {
if (argv[i] === "--top") a.top = Number(argv[++i] ?? 10);
else if (argv[i] === "--json") a.json = true;
}
return a;
}
async function readJsonl<T = any>(path: string): Promise<T[]> {
try {
const raw = await readFile(path, "utf8");
return raw.split("\n").filter(l => l.length > 0).map(l => {
try { return JSON.parse(l) as T; } catch { return null as any; }
}).filter(r => r !== null);
} catch { return []; }
}
async function loadVerdicts(): Promise<Array<{ pr: number; sha: string; overall: string; findings_total: number; findings_block: number; findings_warn: number }>> {
let files: string[] = [];
try { files = await readdir(VERDICTS_DIR); } catch { return []; }
const out = [];
for (const f of files) {
if (!f.endsWith(".json")) continue;
const m = f.match(/^(\d+)-([0-9a-f]+)\.json$/);
if (!m) continue;
try {
const v = JSON.parse(await readFile(join(VERDICTS_DIR, f), "utf8"));
out.push({
pr: Number(m[1]),
sha: m[2],
overall: String(v.overall),
findings_total: Number(v.metrics?.findings_total ?? 0),
findings_block: Number(v.metrics?.findings_block ?? 0),
findings_warn: Number(v.metrics?.findings_warn ?? 0),
});
} catch { /* skip corrupt */ }
}
return out;
}
interface Stats {
audit_count: number;
verdict_distribution: Record<string, number>;
// Same PR with multiple SHAs — if verdicts differ, that's drift across
// the PR's commit history. Not a flip-flop in the classical sense,
// but worth surfacing (e.g. "PR #8 was block block req req block").
per_pr_verdict_sequences: Record<number, string[]>;
// For each PR with ≥ 2 audits, how many distinct verdicts did it
// produce? 1 = stable; 2+ = some flipping.
verdict_instability: { pr_count: number; pr_with_multiple_verdicts: number; pr_with_3plus: number };
consensus: { discrepancy_count: number; tiebreaker_used: number; unresolved: number };
kb: {
audit_lessons_rows: number;
audit_facts_rows: number;
scrum_reviews_rows: number;
distinct_finding_signatures: number;
distinct_entities_across_prs: number;
entities_in_2plus_prs: number;
entities_in_5plus_prs: number;
};
fact_quality: {
verifier_verdict_distribution: Record<string, number>;
facts_dropped_by_verifier_total: number;
extraction_success_rate: number;
};
top_entities: Array<{ name: string; distinct_prs: number; count: number; types: string[] }>;
kb_by_source: Record<string, number>;
}
async function collect(args: Args): Promise<Stats> {
const verdicts = await loadVerdicts();
const lessons = await readJsonl<any>(AUDIT_LESSONS);
const facts = await readJsonl<any>(AUDIT_FACTS);
const disc = await readJsonl<any>(AUDIT_DISCREPANCIES);
const reviews = await readJsonl<any>(SCRUM_REVIEWS);
// Verdict stability
const byPr: Record<number, string[]> = {};
const verdictDist: Record<string, number> = {};
for (const v of verdicts) {
(byPr[v.pr] ??= []).push(v.overall);
verdictDist[v.overall] = (verdictDist[v.overall] ?? 0) + 1;
}
let multi = 0, tri = 0;
for (const [_, seq] of Object.entries(byPr)) {
const distinct = new Set(seq);
if (distinct.size >= 2) multi++;
if (distinct.size >= 3) tri++;
}
// Consensus drift
const consensus = {
discrepancy_count: disc.length,
tiebreaker_used: disc.filter(d => String(d.resolution).startsWith("tiebreaker")).length,
unresolved: disc.filter(d => d.resolution === "unresolved").length,
};
// Lesson signatures
const lessonAgg = await aggregate<any>(AUDIT_LESSONS, {
keyFn: r => r?.signature,
scopeFn: r => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined),
});
// Entity aggregation across audit_facts rows
interface EntAgg { distinct_prs: Set<number>; count: number; types: Set<string>; name: string; sources: Set<string> }
const entAgg = new Map<string, EntAgg>();
const sourceCount: Record<string, number> = {};
let totalVerdictDist: Record<string, number> = { CORRECT: 0, INCORRECT: 0, UNVERIFIABLE: 0, UNCHECKED: 0 };
let factsDroppedTotal = 0;
let extractionsWithFacts = 0;
for (const row of facts) {
const src = String(row.source ?? "unknown");
sourceCount[src] = (sourceCount[src] ?? 0) + 1;
const pr = Number(row.pr_number);
if (Array.isArray(row.verifier_verdicts)) {
for (const v of row.verifier_verdicts) {
totalVerdictDist[v] = (totalVerdictDist[v] ?? 0) + 1;
}
}
factsDroppedTotal += Number(row.facts_dropped_by_verifier ?? 0);
if ((Array.isArray(row.facts) && row.facts.length > 0) || (Array.isArray(row.entities) && row.entities.length > 0)) {
extractionsWithFacts++;
}
for (const e of Array.isArray(row.entities) ? row.entities : []) {
const name = String(e?.name ?? "").trim();
if (name.length < 3) continue;
const key = name.toLowerCase();
const agg = entAgg.get(key) ?? { distinct_prs: new Set(), count: 0, types: new Set(), name, sources: new Set() };
agg.count++;
if (Number.isFinite(pr) && pr > 0) agg.distinct_prs.add(pr);
if (e?.type) agg.types.add(String(e.type));
agg.sources.add(src);
entAgg.set(key, agg);
}
}
const entitiesIn2Plus = Array.from(entAgg.values()).filter(a => a.distinct_prs.size >= 2).length;
const entitiesIn5Plus = Array.from(entAgg.values()).filter(a => a.distinct_prs.size >= 5).length;
const topEntities = Array.from(entAgg.values())
.sort((a, b) => b.distinct_prs.size - a.distinct_prs.size || b.count - a.count)
.slice(0, args.top)
.map(a => ({
name: a.name,
distinct_prs: a.distinct_prs.size,
count: a.count,
types: Array.from(a.types),
}));
const stats: Stats = {
audit_count: verdicts.length,
verdict_distribution: verdictDist,
per_pr_verdict_sequences: byPr,
verdict_instability: {
pr_count: Object.keys(byPr).length,
pr_with_multiple_verdicts: multi,
pr_with_3plus: tri,
},
consensus,
kb: {
audit_lessons_rows: lessons.length,
audit_facts_rows: facts.length,
scrum_reviews_rows: reviews.length,
distinct_finding_signatures: lessonAgg.size,
distinct_entities_across_prs: entAgg.size,
entities_in_2plus_prs: entitiesIn2Plus,
entities_in_5plus_prs: entitiesIn5Plus,
},
fact_quality: {
verifier_verdict_distribution: totalVerdictDist,
facts_dropped_by_verifier_total: factsDroppedTotal,
extraction_success_rate: facts.length > 0 ? extractionsWithFacts / facts.length : 0,
},
top_entities: topEntities,
kb_by_source: sourceCount,
};
return stats;
}
function renderHuman(s: Stats): string {
const lines: string[] = [];
lines.push("═══ KB STATS ═══");
lines.push("");
lines.push(`Audits: ${s.audit_count} total across ${s.verdict_instability.pr_count} distinct PRs`);
lines.push(`Verdicts: ${Object.entries(s.verdict_distribution).map(([k, v]) => `${k}=${v}`).join(" ")}`);
const multiplePct = s.verdict_instability.pr_count > 0
? Math.round(100 * s.verdict_instability.pr_with_multiple_verdicts / s.verdict_instability.pr_count)
: 0;
lines.push(`Verdict instability: ${s.verdict_instability.pr_with_multiple_verdicts}/${s.verdict_instability.pr_count} PRs had 2+ distinct verdicts (${multiplePct}%) — 3+ distinct: ${s.verdict_instability.pr_with_3plus}`);
lines.push("");
lines.push("─── Consensus ───");
lines.push(` discrepancies logged: ${s.consensus.discrepancy_count}`);
lines.push(` tiebreaker used: ${s.consensus.tiebreaker_used}`);
lines.push(` unresolved: ${s.consensus.unresolved}`);
const dRate = s.audit_count > 0 ? (100 * s.consensus.discrepancy_count / s.audit_count).toFixed(1) : "0";
lines.push(` discrepancy rate: ${dRate}% of audits`);
lines.push("");
lines.push("─── KB size ───");
lines.push(` audit_lessons.jsonl: ${s.kb.audit_lessons_rows} rows, ${s.kb.distinct_finding_signatures} distinct signatures`);
lines.push(` audit_facts.jsonl: ${s.kb.audit_facts_rows} rows, ${s.kb.distinct_entities_across_prs} distinct entities`);
lines.push(` scrum_reviews.jsonl: ${s.kb.scrum_reviews_rows} rows`);
lines.push(` entities in 2+ PRs: ${s.kb.entities_in_2plus_prs}`);
lines.push(` entities in 5+ PRs: ${s.kb.entities_in_5plus_prs} ← strong cross-cutting signal`);
lines.push("");
lines.push("─── Fact quality ───");
const v = s.fact_quality.verifier_verdict_distribution;
lines.push(` verifier verdicts: CORRECT=${v.CORRECT ?? 0} UNVERIFIABLE=${v.UNVERIFIABLE ?? 0} UNCHECKED=${v.UNCHECKED ?? 0} INCORRECT=${v.INCORRECT ?? 0}`);
lines.push(` facts dropped by verifier: ${s.fact_quality.facts_dropped_by_verifier_total}`);
lines.push(` extraction success rate: ${(s.fact_quality.extraction_success_rate * 100).toFixed(1)}%`);
lines.push("");
lines.push("─── KB sources ───");
for (const [src, n] of Object.entries(s.kb_by_source)) {
lines.push(` ${src}: ${n}`);
}
lines.push("");
lines.push(`─── Top ${s.top_entities.length} recurring entities ───`);
for (const e of s.top_entities) {
lines.push(` [${e.distinct_prs} PRs × ${e.count} obs] ${e.name} (${e.types.join(",")})`);
}
return lines.join("\n");
}
async function main() {
const args = parseArgs(process.argv);
const stats = await collect(args);
if (args.json) {
console.log(JSON.stringify(stats, (_, v) => v instanceof Set ? Array.from(v) : v, 2));
} else {
console.log(renderHuman(stats));
}
}
main().catch(e => { console.error("[kb_stats] fatal:", e); process.exit(1); });

69
docs/AUDITOR_CONTEXT.md Normal file
View File

@ -0,0 +1,69 @@
# Auditor Context — project preamble for fact extraction
This file is read by `auditor/fact_extractor.ts` and prepended to the
extract-facts prompt sent to llm_team. The goal: give the extractor +
verifier enough grounding to ground domain-specific facts instead of
marking them UNVERIFIABLE by default.
Keep this short (< 400 words). Verifier only reads the first ~4KB of
the prompt alongside the facts. Longer = noise, not signal.
Update when: a new Phase lands, a crate is added/removed, the project's
primary domain shifts (e.g. staffing → DevOps).
---
## What Lakehouse is
Lakehouse is a Rust-first data platform over S3-compatible object
storage. Primary use: a staffing company ingesting legacy CRM data for
AI-powered worker matching, contract fulfillment, and playbook-driven
coordination.
Architecture: 13 Rust crates + a Python sidecar (Ollama) + TypeScript
sub-agents (auditor, scrum_master, bot). Runs on a single server
(Nvidia A4000, 128GB RAM). All services on localhost: gateway :3100,
sidecar :3200, UI :3300, MCP :3700, observer :3800, MinIO :9000.
## Key crates (each maps to a responsibility)
- **shared** — types, Arrow helpers, PII utilities, SecretsProvider
- **proto** — gRPC definitions
- **storaged** — S3/MinIO I/O, AppendLog, ErrorJournal
- **catalogd** — metadata authority (manifests, views, tombstones)
- **queryd** — DataFusion SQL, MemTable cache, compaction
- **ingestd** — CSV/JSON/PDF/Postgres/MySQL ingest
- **vectord** — embeddings, HNSW index, **playbook_memory meta-index** (Phase 19+)
- **vectord-lance** — Lance 4.0 firewall crate (separate Arrow version)
- **journald** — append-only mutation event log
- **aibridge** — Rust↔Python sidecar bridge, context budget + continuation
- **gateway** — Axum HTTP :3100 + gRPC :3101 (Phase 38+ adds /v1/chat)
- **ui** — Dioxus WASM (stale, pre-Phase-9)
- **lance-bench** — standalone benchmark
## Current architectural direction (Phase 38-44)
Universal AI Control Plane: a `/v1/chat` OpenAI-compatible API that
routes all LLM traffic through one layer for token accounting + provider
fallback. Truth Layer + Validation Pipeline enforce staffing-domain
invariants (worker eligibility, PII, contract rules). The Auditor
(Phase A of cohesion plan) hard-blocks PR merges on placeholder code.
## Auditor sub-agent role
`auditor/` (TypeScript, Bun runtime) polls Gitea every 90s for open PRs.
For each fresh head SHA it runs 4 checks in parallel: static (grep-style
placeholder detection), dynamic (runs the hybrid fixture), inference
(gpt-oss:120b cloud review with N=3 consensus + qwen3-coder:480b
tie-breaker), and kb_query (reads `data/_kb/*.jsonl` for prior evidence).
Verdicts post to Gitea as commit status + review comment. Findings
append to `data/_kb/audit_lessons.jsonl` (path-agnostic signatures for
dedup). Curated scratchpads from tree-split get routed through this
extract-facts pipeline to populate `audit_facts.jsonl` — which is what
you (the extractor) are currently producing.
## Things that are NOT the auditor
- The LLM Team UI at `/root/llm_team_ui.py` (devop.live:5000) — a separate product for human-facing multi-model experimentation
- The scrum_master pipeline at `tests/real-world/scrum_master_pipeline.ts` — reviews files, not claims
- The bot at `bot/` — will apply fixes, doesn't audit

View File

@ -343,12 +343,50 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of
attempts_made: history.length,
tree_split_fired: treeSplitFired,
suggestions_preview: accepted.slice(0, 2000),
schema_version: 2,
scrum_master_reviewed: true,
};
try {
await appendFile(SCRUM_REVIEWS_JSONL, JSON.stringify(row) + "\n");
} catch (e) {
console.error(`[scrum] failed to append scrum_reviews.jsonl: ${(e as Error).message}`);
}
// Route the accepted review through llm_team's fact extractor so
// its entities + relationships land in audit_facts.jsonl alongside
// inference-side extractions. Same index, two sources. Tagged
// source:"scrum_review" + scrum_master_reviewed:true so downstream
// queries can filter by provenance. Reviews shorter than 120
// chars are skipped — they're usually one-liners ("LGTM") with
// no extractable knowledge.
if (accepted.length >= 120 && process.env.LH_SCRUM_SKIP_EXTRACT !== "1") {
try {
const { extractFacts } = await import("../../auditor/fact_extractor.ts");
const ex = await extractFacts(accepted);
if (!ex.error || ex.entities.length + ex.facts.length > 0) {
const factRow = {
pr_number: 0, // scrum runs outside a PR scope
file: rel,
head_sha: "", // no SHA scope; scope is the file+timestamp
extracted_at: ex.extracted_at,
extractor: ex.extractor_model,
verifier: ex.verifier_model,
llm_team_run_id: ex.llm_team_run_id ?? null,
facts: ex.facts,
entities: ex.entities,
relationships: ex.relationships,
verification_preview: ex.verification.slice(0, 400),
schema_version: 2,
source: "scrum_review",
scrum_master_reviewed: true,
};
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(factRow) + "\n");
}
} catch (e) {
console.error(`[scrum] fact extraction failed for ${rel}: ${(e as Error).message}`);
}
}
}
return review;