From 7c1745611a5bcb1289c4dfc738e35c2986570e1b Mon Sep 17 00:00:00 2001 From: profit Date: Thu, 23 Apr 2026 05:29:38 +0000 Subject: [PATCH] Audit pipeline PR #9: determinism + fact extraction + verifier gate + KB stats + context injection (PR #9) Bundles PR #9's work for the audit pipeline: - N=3 consensus on cloud inference (gpt-oss:120b parallel) with qwen3-coder:480b tie-breaker - audit_discrepancies.jsonl logs N-run disagreements - scrum_master reviews route through llm_team fact extraction; source="scrum_review" - Verifier-gated persistence: drops INCORRECT, keeps UNVERIFIABLE/UNCHECKED; schema_version:2 - scrum_master_reviewed flag on accepted reviews - auditor/kb_stats.ts: on-demand observability script - claim_parser history/proof pattern class (verified-on-PR, was-flipping, the-proven-X) - claim_parser quoted-string guard (mirrors static.ts fix) - fact_extractor project context injection via docs/AUDITOR_CONTEXT.md - Fixed verifier-verdict parser to handle multiple gemma2 output formats Empirical: 3-run determinism test on unchanged PR #9 SHA showed 7/7 warn findings stable; block count oscillation eliminated; llm_team quality scores 8-9 on context-injected extract runs. See PR #9 for full run-by-run commit history. --- auditor/audit.ts | 2 +- auditor/checks/inference.ts | 487 ++++++++++++++++++---- auditor/checks/kb_query.ts | 102 +++++ auditor/claim_parser.ts | 77 +++- auditor/fact_extractor.ts | 271 ++++++++++++ auditor/kb_stats.ts | 269 ++++++++++++ docs/AUDITOR_CONTEXT.md | 69 +++ tests/real-world/scrum_master_pipeline.ts | 38 ++ 8 files changed, 1227 insertions(+), 88 deletions(-) create mode 100644 auditor/fact_extractor.ts create mode 100644 auditor/kb_stats.ts create mode 100644 docs/AUDITOR_CONTEXT.md diff --git a/auditor/audit.ts b/auditor/audit.ts index 18bf732..91d23fc 100644 --- a/auditor/audit.ts +++ b/auditor/audit.ts @@ -56,7 +56,7 @@ export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise< const [staticFindings, dynamicFindings, inferenceFindings, kbFindings] = await Promise.all([ runStaticCheck(diff), opts.skip_dynamic ? Promise.resolve(stubFinding("dynamic", "skipped by options")) : runDynamicCheck(), - opts.skip_inference ? Promise.resolve(stubFinding("inference", "skipped by options")) : runInferenceCheck(claims, diff), + opts.skip_inference ? Promise.resolve(stubFinding("inference", "skipped by options")) : runInferenceCheck(claims, diff, { pr_number: pr.number, head_sha: pr.head_sha }), runKbCheck(claims, pr.files.map(f => f.path)), ]); diff --git a/auditor/checks/inference.ts b/auditor/checks/inference.ts index c6c3dbf..4a83745 100644 --- a/auditor/checks/inference.ts +++ b/auditor/checks/inference.ts @@ -14,19 +14,54 @@ import type { Claim, Finding } from "../types.ts"; import { Glob } from "bun"; -import { readFile } from "node:fs/promises"; +import { readFile, mkdir, appendFile } from "node:fs/promises"; +import { extractFacts } from "../fact_extractor.ts"; const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100"; const MODEL = process.env.LH_AUDITOR_REVIEW_MODEL ?? "gpt-oss:120b"; +// Tie-breaker for claims where the N=3 consensus produces a 1-1-1 +// split (genuinely borderline). Different architecture from the +// primary reviewer (gpt-oss) so the tie-break isn't correlated with +// the original disagreement. qwen3-coder:480b is a newer coding +// specialist at 480B params, well-suited to PR-diff claim verification +// and distinct in training lineage from gpt-oss. +const TIEBREAKER_MODEL = process.env.LH_AUDITOR_TIEBREAKER_MODEL ?? "qwen3-coder:480b"; +const N_CONSENSUS = Number(process.env.LH_AUDITOR_CONSENSUS_N ?? 3); +const AUDIT_DISCREPANCIES_JSONL = "/home/profit/lakehouse/data/_kb/audit_discrepancies.jsonl"; // 40KB comfortably fits gpt-oss:120b's context. PR #1 (~39KB) was // previously truncated at 15KB causing the reviewer to miss later // files (gitea.ts, policy.ts) and flag "no Gitea client present" as a // block finding when the file was simply outside the truncation window. +// +// Above this threshold we curate via tree-split rather than truncate, +// following the scrum_master pattern: shard the diff, summarize each +// shard against the claim-verification task, merge into a compact +// scratchpad, then ask the cloud to verify claims against the +// scratchpad. This gives the cloud full-PR fidelity without bursting +// its context window (observed failure mode: empty response or +// unparseable output when prompt exceeds model's comfortable range). const MAX_DIFF_CHARS = 40000; +// Tree-split kicks in above this. 30KB is below MAX_DIFF_CHARS so we +// curate BEFORE truncation would happen — never lose signal to a hard +// cut. Shard size is chosen so ~10 shards cover PR #8-size diffs in a +// reasonable round-trip budget. +const CURATION_THRESHOLD = 30000; +const DIFF_SHARD_SIZE = 4500; const CALL_TIMEOUT_MS = 120_000; const REPO_ROOT = "/home/profit/lakehouse"; -export async function runInferenceCheck(claims: Claim[], diff: string): Promise { +export interface InferenceContext { + pr_number: number; + head_sha: string; +} + +const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl"; + +export async function runInferenceCheck( + claims: Claim[], + diff: string, + ctx?: InferenceContext, +): Promise { if (claims.length === 0) { return [{ check: "inference", @@ -51,9 +86,26 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise< }]; } - const truncated = diff.length > MAX_DIFF_CHARS - ? diff.slice(0, MAX_DIFF_CHARS) + `\n...[${diff.length - MAX_DIFF_CHARS} more chars truncated]` - : diff; + // Diff source for the cloud prompt — either the raw diff (small + // enough to fit), or a tree-split scratchpad (curation layer). We + // prefer curation to truncation: truncation silently drops files + // past the window; curation summarizes them so the cloud still sees + // what changed, just densified. + let diffForPrompt: string; + let curationNote = ""; + if (diff.length > CURATION_THRESHOLD) { + const ts = await treeSplitDiff(diff, verifiable); + diffForPrompt = ts.scratchpad; + curationNote = ` (curated: ${diff.length} chars → ${ts.shards} shards → scratchpad ${ts.scratchpad.length} chars)`; + } else { + diffForPrompt = diff; + } + // Belt-and-suspenders truncation — even a tree-split scratchpad + // shouldn't exceed MAX_DIFF_CHARS in practice, but guard anyway so + // pathological inputs can't burst the prompt. + const truncated = diffForPrompt.length > MAX_DIFF_CHARS + ? diffForPrompt.slice(0, MAX_DIFF_CHARS) + `\n...[${diffForPrompt.length - MAX_DIFF_CHARS} more chars truncated]` + : diffForPrompt; // Build the reviewer prompt in the same shape as run_codereview's // review stage (llm_team_ui.py:10950), adapted for claim verification: @@ -61,6 +113,30 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise< // "Code: ..." // "Review: bugs/security/perf/style/edge. Provide corrected code." // We add: claim list upfront + ask for structured JSON verdict. + // + // When the diff was curated (tree-split scratchpad), we add an + // explicit anti-false-positive instruction: the scratchpad is a + // distillation, not the full source, so absence-from-scratchpad is + // NOT evidence of absence-from-diff. Mirrors the fix we made in + // scrum_master's review prompt for the same class of error. + const isCurated = curationNote.length > 0; + const curationGuard = isCurated + ? [ + "", + "CRITICAL: the 'Diff' below is a curated multi-shard scratchpad,", + "NOT the full raw diff. The scratchpad distills each shard down", + "to facts useful for claim verification and drops the rest.", + "DO NOT flag a function/field/feature as 'missing' or 'not", + "implemented' based solely on its absence from the scratchpad —", + "absence in a distillation is NOT evidence of absence in the", + "actual diff. Only judge a claim NOT BACKED when the scratchpad", + "DIRECTLY contradicts it (e.g. scratchpad shows the function was", + "added empty, or shows the claimed code path is a stub).", + "Skip the unflagged_gaps section entirely when operating on a", + "curated scratchpad — you can't reliably detect gaps from a", + "distillation, and false positives there are worse than misses.", + ].join("\n") + : ""; const systemMsg = [ "You review pull-request diffs against the author's own ship-claims.", "For each claim, decide: is it backed by actual code in the diff, or is", @@ -74,6 +150,7 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise< " - the claim claims integration but the integration point is a stub", " - the diff contains unimplemented!() / todo!() / TODO comments", " - the claim says 'works end-to-end' but the diff has no end-to-end test", + curationGuard, "", "Respond with strict JSON only. No prose before or after. Shape:", "{", @@ -100,94 +177,131 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise< `Strict JSON only, matching the shape described. No prose outside JSON.`, ].join("\n"); - let resp: Response; - try { - resp = await fetch(`${GATEWAY}/v1/chat`, { - method: "POST", - headers: { "content-type": "application/json" }, - body: JSON.stringify({ - provider: "ollama_cloud", - model: MODEL, - messages: [ - { role: "system", content: systemMsg }, - { role: "user", content: userMsg }, - ], - // Deterministic classification — temp=0 is greedy-sample, so - // identical input yields identical output on the same model - // version. This kills the signature creep we observed in the - // 9-run empirical test (sig_count 16→27 from cloud phrasing - // variance at temp=0.2). - // - // IMPORTANT: keep think=true. gpt-oss:120b is a reasoning - // model; setting think=false caused it to return empty content - // on large prompts (observed during Level 1 validation: 13421 - // tokens used, empty content returned). The reasoning trace is - // variable prose, but at temp=0 the FINAL classification is - // still deterministic because greedy sampling converges to - // the same conclusion from the same starting state. - max_tokens: 3000, - temperature: 0, - think: true, - }), - signal: AbortSignal.timeout(CALL_TIMEOUT_MS), - }); - } catch (e) { - // Cloud unreachable → soft-fail. Don't block a PR because the - // reviewer model is down. Static + dynamic + kb still run. + // N=3 consensus — run the primary reviewer in parallel, collect + // all three parsed responses, majority-vote per claim. Parallel + // (Promise.all) because each call is ~20-30s and they're independent; + // wall-clock stays ~same as single call, cost 3x tokens. Empirical + // justification: in 3-run determinism tests, 7/8 findings were + // stable but 1 flipped across runs — majority vote stabilizes the + // flipping class without losing the stable signal. + const primaryRuns = await Promise.all( + Array.from({ length: N_CONSENSUS }, () => + runCloudInference(systemMsg, userMsg, MODEL)), + ); + + const parsedRuns = primaryRuns.filter(r => r.parsed !== null); + if (parsedRuns.length === 0) { + // All N calls failed. Surface the first-run diagnostic so the + // operator sees *why* (unreachable / non-200 / unparseable). + const first = primaryRuns[0]; return [{ check: "inference", severity: "info", - summary: "cloud inference unreachable — skipped", - evidence: [`fetch failed: ${(e as Error).message.slice(0, 180)}`], - }]; - } - - if (!resp.ok) { - return [{ - check: "inference", - severity: "info", - summary: `cloud inference returned ${resp.status} — skipped`, - evidence: [`body: ${(await resp.text()).slice(0, 200)}`], - }]; - } - - const body: any = await resp.json(); - const content: string = body?.choices?.[0]?.message?.content ?? ""; - const usage = body?.usage ?? {}; - - const parsed = extractJson(content); - if (!parsed) { - return [{ - check: "inference", - severity: "info", - summary: "cloud returned unparseable output — skipped", + summary: `cloud inference all ${N_CONSENSUS} consensus runs failed — ${first.error ?? "unknown"}`, evidence: [ - `head: ${content.slice(0, 200)}`, - `tokens: ${usage.total_tokens ?? "?"}`, + `first-run diagnostic: ${first.diagnostic ?? "(none)"}`, + `successful runs: 0 / ${N_CONSENSUS}`, ], }]; } + // Aggregate votes per claim_idx. + interface Votes { trues: number; falses: number; evidences: string[] } + const votesByClaim = new Map(); + const unflaggedByRun: any[][] = []; + let totalTokens = 0; + for (const run of parsedRuns) { + totalTokens += run.tokens; + unflaggedByRun.push(Array.isArray(run.parsed?.unflagged_gaps) ? run.parsed.unflagged_gaps : []); + for (const v of run.parsed?.claim_verdicts ?? []) { + const idx = Number(v?.claim_idx); + if (!Number.isFinite(idx)) continue; + const rec = votesByClaim.get(idx) ?? { trues: 0, falses: 0, evidences: [] }; + if (v.backed === false) { + rec.falses++; + rec.evidences.push(String(v.evidence ?? "")); + } else if (v.backed === true) { + rec.trues++; + } + votesByClaim.set(idx, rec); + } + } + const findings: Finding[] = []; - // One summary info finding so the verdict layer knows the check ran. + // Summary finding so the verdict layer knows the check ran. findings.push({ check: "inference", severity: "info", - summary: `cloud review completed (model=${MODEL}, tokens=${usage.total_tokens ?? "?"})`, + summary: `cloud review completed (model=${MODEL}, consensus=${parsedRuns.length}/${N_CONSENSUS}, tokens=${totalTokens})${curationNote}`, evidence: [ - `claim_verdicts: ${parsed.claim_verdicts?.length ?? 0}, unflagged_gaps: ${parsed.unflagged_gaps?.length ?? 0}`, + `claims voted: ${votesByClaim.size}`, + `parsed runs: ${parsedRuns.length} / ${N_CONSENSUS}`, ], }); - for (const v of parsed.claim_verdicts ?? []) { - if (v?.backed === false) { - const idx = typeof v.claim_idx === "number" ? v.claim_idx : -1; - // Indices point at the verifiable[] list we sent the cloud, - // not the full claims[] list. Translate back. - const claim = verifiable[idx]; - if (!claim) continue; - // Strong+unbacked = BLOCK. That's the whole point of the auditor. + // Per-claim majority vote; tie-break if no majority. + const discrepancies: Array<{ + claim_idx: number; + claim_text: string; + votes: { trues: number; falses: number }; + resolution: "majority_backed" | "majority_not_backed" | "tiebreaker_backed" | "tiebreaker_not_backed" | "unresolved"; + tiebreaker_model?: string; + }> = []; + + for (const [idx, votes] of votesByClaim) { + const claim = verifiable[idx]; + if (!claim) continue; + const totalVotes = votes.trues + votes.falses; + let notBacked: boolean | null = null; + let resolution: typeof discrepancies[number]["resolution"] = "majority_backed"; + let evidenceText = ""; + let tbModel: string | undefined; + + if (votes.falses > votes.trues) { + notBacked = true; + resolution = "majority_not_backed"; + evidenceText = votes.evidences[0] ?? "(no reason given)"; + } else if (votes.trues > votes.falses) { + notBacked = false; + resolution = "majority_backed"; + } else { + // Tie. Run tie-breaker with a different-architecture model. + const tb = await runCloudInference(systemMsg, userMsg, TIEBREAKER_MODEL); + if (tb.parsed) { + const tv = (tb.parsed.claim_verdicts ?? []).find((v: any) => Number(v?.claim_idx) === idx); + if (tv?.backed === false) { + notBacked = true; + resolution = "tiebreaker_not_backed"; + evidenceText = `(tie-breaker ${TIEBREAKER_MODEL}) ${String(tv.evidence ?? "")}`; + tbModel = TIEBREAKER_MODEL; + } else if (tv?.backed === true) { + notBacked = false; + resolution = "tiebreaker_backed"; + tbModel = TIEBREAKER_MODEL; + } else { + resolution = "unresolved"; + } + } else { + resolution = "unresolved"; + } + } + + // Log every case where the N runs disagreed — discrepancies are + // signal, not noise. Separate from audit_lessons.jsonl because + // they're about the *auditor's* quality, not the PR's quality. + const disagreed = totalVotes >= 2 && votes.trues > 0 && votes.falses > 0; + if (disagreed || resolution.startsWith("tiebreaker") || resolution === "unresolved") { + discrepancies.push({ + claim_idx: idx, + claim_text: claim.text, + votes: { trues: votes.trues, falses: votes.falses }, + resolution, + tiebreaker_model: tbModel, + }); + } + + if (notBacked === true) { const sev: Finding["severity"] = claim.strength === "strong" ? "block" : claim.strength === "moderate" ? "warn" : "info"; @@ -198,13 +312,45 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise< summary: `cloud: claim not backed — "${claim.text.slice(0, 100)}"`, evidence: [ `at ${claim.location}`, - `cloud reason: ${String(v.evidence ?? "no reason given").slice(0, 200)}`, + `consensus: ${votes.falses}/${totalVotes} not-backed (resolution: ${resolution})`, + `cloud reason: ${evidenceText.slice(0, 200)}`, ], }); } } - for (const g of parsed.unflagged_gaps ?? []) { + // Persist discrepancies so we can measure consensus drift over time. + if (discrepancies.length > 0 && ctx) { + persistDiscrepancies(ctx, discrepancies).catch(e => + console.error(`[inference] discrepancy log failed: ${(e as Error).message}`)); + } + + // Use first run's parsed for downstream unflagged_gaps processing. + const parsed = parsedRuns[0].parsed; + + // Route the curated scratchpad through llm_team's extract-facts + // pipeline when we have (a) a curated scratchpad (best signal about + // what the PR actually changed) and (b) PR context to scope facts. + // AWAITED (not fire-and-forget) so CLI callers like audit_one.ts + // don't exit before extraction lands; the systemd poller has plenty + // of headroom (90s cycle vs ~15s extraction). A failure inside + // extractAndPersistFacts is caught + logged but never throws. + if (isCurated && ctx && process.env.LH_AUDITOR_SKIP_EXTRACT !== "1") { + try { + await extractAndPersistFacts(diffForPrompt, ctx); + } catch (e) { + console.error(`[inference] fact extraction failed: ${(e as Error).message}`); + } + } + + // Belt-and-suspenders: when operating on a curated scratchpad, drop + // the unflagged_gaps section entirely. The distillation can't + // reliably ground gap-detection, and false positives are worse than + // misses for this signal class. The systemMsg already asks the + // cloud to skip this section when curated — but the model may still + // emit it, so we filter here too. + const gapsToEmit = isCurated ? [] : (parsed.unflagged_gaps ?? []); + for (const g of gapsToEmit) { const summary = String(g?.summary ?? "?"); const location = String(g?.location ?? "?"); // False-positive guard — when the cloud says "X not defined in this @@ -248,6 +394,191 @@ export async function runInferenceCheck(claims: Claim[], diff: string): Promise< return findings; } +// Single cloud call — the consensus loop calls this N times in +// parallel. Returns the parsed JSON shape + token usage + any error +// diagnostic. NEVER throws; the consensus aggregator handles partial +// failures by dropping non-parsed runs from the vote. +interface CloudRunResult { + parsed: any | null; + tokens: number; + error?: string; // "unreachable" | "non_200" | "unparseable" + diagnostic?: string; // first 200 chars for debugging + model: string; +} + +async function runCloudInference(systemMsg: string, userMsg: string, model: string): Promise { + let resp: Response; + try { + resp = await fetch(`${GATEWAY}/v1/chat`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + provider: "ollama_cloud", + model, + messages: [ + { role: "system", content: systemMsg }, + { role: "user", content: userMsg }, + ], + // temp=0 (greedy) + think=true. think=true is required for + // gpt-oss:120b — without it the model returns empty content + // on large prompts. Variance from the think trace is observed + // in practice, which is why we use N=3 consensus, not single- + // call determinism. + max_tokens: 3000, + temperature: 0, + think: true, + }), + signal: AbortSignal.timeout(CALL_TIMEOUT_MS), + }); + } catch (e) { + return { parsed: null, tokens: 0, error: "unreachable", diagnostic: (e as Error).message.slice(0, 200), model }; + } + if (!resp.ok) { + return { parsed: null, tokens: 0, error: "non_200", diagnostic: `${resp.status}: ${(await resp.text()).slice(0, 160)}`, model }; + } + let body: any; + try { body = await resp.json(); } + catch (e) { return { parsed: null, tokens: 0, error: "unparseable", diagnostic: (e as Error).message, model }; } + const content: string = body?.choices?.[0]?.message?.content ?? ""; + const tokens: number = body?.usage?.total_tokens ?? 0; + const parsed = extractJson(content); + if (!parsed) { + return { parsed: null, tokens, error: "unparseable", diagnostic: content.slice(0, 200), model }; + } + return { parsed, tokens, model }; +} + +async function persistDiscrepancies(ctx: InferenceContext, discrepancies: any[]): Promise { + await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true }); + const rows = discrepancies.map(d => JSON.stringify({ + pr_number: ctx.pr_number, + head_sha: ctx.head_sha, + logged_at: new Date().toISOString(), + ...d, + })); + await appendFile(AUDIT_DISCREPANCIES_JSONL, rows.join("\n") + "\n"); +} + +// Extract structured knowledge from the curated scratchpad and append +// to data/_kb/audit_facts.jsonl — one row per extract run, keyed by +// PR number + head SHA for scope tracking. kb_query tails this next +// audit to surface recurring entities/relationships across PRs. +async function extractAndPersistFacts(scratchpad: string, ctx: InferenceContext): Promise { + const ex = await extractFacts(scratchpad); + if (ex.error && ex.entities.length === 0 && ex.facts.length === 0) { + // Full failure — log but don't write an empty row. + console.error(`[inference] extractFacts skipped row: ${ex.error}`); + return; + } + const row = { + pr_number: ctx.pr_number, + head_sha: ctx.head_sha, + extracted_at: ex.extracted_at, + extractor: ex.extractor_model, + verifier: ex.verifier_model, + llm_team_run_id: ex.llm_team_run_id ?? null, + facts: ex.facts, + entities: ex.entities, + relationships: ex.relationships, + verification_preview: ex.verification.slice(0, 400), + verifier_verdicts: ex.verifier_verdicts, + facts_dropped_by_verifier: ex.facts_dropped_by_verifier ?? 0, + schema_version: 2, + source: "audit_inference", + }; + await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true }); + await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(row) + "\n"); +} + +// Curation via tree-split — ports the scrum_master pattern into the +// inference check. Shards the raw diff into DIFF_SHARD_SIZE chunks, +// summarizes each shard *against the claim-verification task* so the +// summary preserves exactly what the cloud needs to judge claims +// (function signatures, struct fields, deletions, new files), drops +// everything else. Merges into a compact scratchpad. +// +// Cost: N cloud calls for the shard summaries + 1 cloud call for the +// final verification = N+1 calls instead of 1. Mitigation: shards run +// serially (not parallel) to keep gateway load bounded; summary calls +// use max_tokens=400 so they're fast (~2s each on gpt-oss:120b). +// +// Determinism: each shard summary call uses temp=0 + think=true (same +// as the top-level inference call), so identical input yields +// identical scratchpad. The final verification call then sees a +// stable scratchpad, giving stable verdicts. +async function treeSplitDiff( + fullDiff: string, + claims: Claim[], +): Promise<{ scratchpad: string; shards: number }> { + const shards: Array<{ from: number; to: number; text: string }> = []; + for (let i = 0; i < fullDiff.length; i += DIFF_SHARD_SIZE) { + const end = Math.min(i + DIFF_SHARD_SIZE, fullDiff.length); + shards.push({ from: i, to: end, text: fullDiff.slice(i, end) }); + } + // Curate the claim list into a short form the summary prompt can + // use to bias extraction toward relevant facts. + const claimDigest = claims.map((c, i) => + `${i}. [${c.strength}] "${c.text.slice(0, 100)}"` + ).join("\n"); + + let scratchpad = ""; + for (const [si, shard] of shards.entries()) { + const prompt = [ + `You are summarizing shard ${si + 1}/${shards.length} (chars ${shard.from}..${shard.to}) of a PR diff.`, + `The downstream task will verify these ship-claims against the full-PR summary. Extract ONLY facts that could confirm or refute these claims:`, + "", + claimDigest, + "", + "Extract: new function/method signatures, struct fields, deletions, new files, wiring (function X calls Y), absence-of-implementation markers, TODO comments on added lines.", + "Skip: comment-only edits, whitespace, import reordering, unrelated cosmetic changes.", + "", + "─────── shard diff ───────", + shard.text, + "─────── end shard ───────", + "", + "Output: up to 180 words of facts in bullet form. No prose preamble, no claim verdicts (that's for the downstream step).", + ].join("\n"); + + const r = await callCloud(prompt, 400); + if (r.content) { + scratchpad += `\n--- shard ${si + 1} (chars ${shard.from}..${shard.to}) ---\n${r.content.trim()}\n`; + } + } + return { scratchpad: scratchpad.trim(), shards: shards.length }; +} + +// Minimal cloud caller used only by treeSplitDiff — same gateway + +// model as the top-level call, but think=false. Shards are small +// (≤DIFF_SHARD_SIZE ~4500 chars) and the task is pure fact +// extraction, not reasoning. think=true on the shards introduced +// variance in reasoning traces that compounded across 23 calls into +// a non-deterministic scratchpad (observed during curation +// validation: same-SHA runs produced 5/7/8 final findings). +// think=false on small prompts is stable — only breaks at the main +// call's 10K+ prompt size, which keeps think=true. +async function callCloud(prompt: string, maxTokens: number): Promise<{ content: string }> { + try { + const r = await fetch(`${GATEWAY}/v1/chat`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + provider: "ollama_cloud", + model: MODEL, + messages: [{ role: "user", content: prompt }], + max_tokens: maxTokens, + temperature: 0, + think: false, + }), + signal: AbortSignal.timeout(CALL_TIMEOUT_MS), + }); + if (!r.ok) return { content: "" }; + const j: any = await r.json(); + return { content: j?.choices?.[0]?.message?.content ?? "" }; + } catch { + return { content: "" }; + } +} + // Pull out plausible code-symbol names from a summary string. // Matches: // - identifier with backticks: `foo_bar` diff --git a/auditor/checks/kb_query.ts b/auditor/checks/kb_query.ts index 8daa410..475a7a8 100644 --- a/auditor/checks/kb_query.ts +++ b/auditor/checks/kb_query.ts @@ -25,6 +25,7 @@ const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl"; const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles"; const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl"; const AUDIT_LESSONS_JSONL = "/home/profit/lakehouse/data/_kb/audit_lessons.jsonl"; +const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl"; const TAIL_LINES = 500; const MAX_BOT_CYCLE_FILES = 30; @@ -61,6 +62,14 @@ export async function runKbCheck(claims: Claim[], prFiles: string[] = []): Promi findings.push(...scrumFindings); } + // 6b. Audit-facts (llm_team extract pipeline output) — surface + // entities that recur across multiple PRs. These are the + // "core system entities" accumulating in the knowledge base; + // showing them as info on future audits gives reviewers + // architectural context the raw diff doesn't convey. + const factFindings = await checkAuditFacts(); + findings.push(...factFindings); + // 6. Audit-lessons feedback loop — summarize the top recurring // patterns from prior audits' block/warn findings. If the same // pattern signature has fired 3+ times across prior audits, @@ -207,6 +216,99 @@ function observerBySource(ops: any[]): string { return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty"; } +// Audit-facts — reads data/_kb/audit_facts.jsonl (populated by every +// curated inference run via llm_team's extract pipeline). Each row +// has arrays: facts, entities, relationships. We explode entities and +// aggregate them across PRs using kb_index. An entity seen in 3+ PRs +// is a "core system entity" — we surface the top N as info context. +// +// Filters out short names (<3 chars, likely qwen2.5 truncation +// artifacts) and generic types ("string", "number") that would +// otherwise dominate the ranking. +const ENTITY_NAME_MIN_LEN = 3; +const GENERIC_ENTITY_NAMES = new Set([ + "string", "number", "boolean", "any", "void", "unknown", "never", + "object", "array", "function", "const", "let", "var", "true", "false", + "null", "undefined", "promise", "map", "set", "record", +]); + +async function checkAuditFacts(): Promise { + // Read raw rows — each row has multiple entities, so we can't just + // use aggregate() directly (it's one-signature-per-row). Explode + // entities into (row, entity) pairs, then aggregate by entity name. + let raw: string; + try { raw = await (await import("node:fs/promises")).readFile(AUDIT_FACTS_JSONL, "utf8"); } + catch { return []; } + const lines = raw.split("\n").filter(l => l.length > 0); + if (lines.length === 0) return []; + + interface EntityRow { entity_key: string; pr_number: number; type: string; name: string; description: string } + const entityRows: EntityRow[] = []; + for (const line of lines.slice(-TAIL_LINES * 2)) { + let row: any; + try { row = JSON.parse(line); } catch { continue; } + const prNum = Number(row?.pr_number); + if (!Number.isFinite(prNum)) continue; + for (const e of Array.isArray(row?.entities) ? row.entities : []) { + const name = String(e?.name ?? "").trim(); + if (name.length < ENTITY_NAME_MIN_LEN) continue; + if (GENERIC_ENTITY_NAMES.has(name.toLowerCase())) continue; + entityRows.push({ + entity_key: name.toLowerCase(), + pr_number: prNum, + type: String(e?.type ?? "?"), + name, + description: String(e?.description ?? "").slice(0, 160), + }); + } + } + if (entityRows.length === 0) return []; + + // Aggregate manually — one key per entity name, distinct_scopes by PR. + type Agg = { count: number; scopes: Set; types: Set; last_name: string; last_desc: string }; + const byEntity = new Map(); + for (const r of entityRows) { + const a = byEntity.get(r.entity_key) ?? { + count: 0, scopes: new Set(), types: new Set(), last_name: "", last_desc: "", + }; + a.count += 1; + a.scopes.add(r.pr_number); + a.types.add(r.type); + a.last_name = r.name; + a.last_desc = r.description; + byEntity.set(r.entity_key, a); + } + + // Rank: require 2+ distinct PRs (same-PR entity-repeats don't count + // as "cross-cutting"). Take the top 5 to avoid flooding the verdict. + const ranked = Array.from(byEntity.entries()) + .filter(([_, a]) => a.scopes.size >= 2) + .sort((a, b) => b[1].scopes.size - a[1].scopes.size || b[1].count - a[1].count) + .slice(0, 5); + + if (ranked.length === 0) { + // Useful to know the KB is being populated — emit a single + // summary so operators see fact extraction is alive. + return [{ + check: "kb_query", + severity: "info", + summary: `audit_facts KB has ${entityRows.length} entity-observations across ${new Set(entityRows.map(r => r.pr_number)).size} PRs (no cross-PR recurrences yet)`, + evidence: [`source: ${AUDIT_FACTS_JSONL}`], + }]; + } + + return ranked.map(([_, a]) => ({ + check: "kb_query" as const, + severity: "info" as const, + summary: `core entity \`${a.last_name}\` recurs in ${a.scopes.size} PRs (types: ${Array.from(a.types).join(",")})`, + evidence: [ + `count=${a.count} distinct_PRs=${a.scopes.size}`, + `description: ${a.last_desc.slice(0, 200)}`, + `PRs: ${Array.from(a.scopes).sort((x, y) => x - y).join(",")}`, + ], + })); +} + // Audit-lessons — reads data/_kb/audit_lessons.jsonl (populated by // every audit's appendAuditLessons). Uses the shared kb_index // aggregator: groups by `signature`, distinct-scopes keyed by PR diff --git a/auditor/claim_parser.ts b/auditor/claim_parser.ts index 0b00663..b17efbd 100644 --- a/auditor/claim_parser.ts +++ b/auditor/claim_parser.ts @@ -51,11 +51,20 @@ const WEAK_PATTERNS: RegExp[] = [ // Empirical claims: runtime measurements / observed outcomes that can't // be verified from a diff (only from the actual run that produced -// them). Example: "6/6 iterations complete, 58 cloud calls, 306s -// end-to-end" — true, but only the test's own summary.json can -// confirm it. Classifying as empirical lets the inference check skip +// them). Classifying as empirical lets the inference check skip // diff-verification and saves the ladder for falsifiable claims. +// +// Two classes share this bucket because they share the skip discipline: +// +// 1. Runtime metrics — "58 cloud calls", "306s end-to-end" +// 2. History/proof refs — "verified on PR #8", "was flipping across runs" +// +// Both are assertions about state outside the current diff. The cloud +// would flag them as "not backed" — but that's a false positive: the +// proof lives in the referenced run, prior commit, or test output, not +// in the added lines the cloud is reading. const EMPIRICAL_PATTERNS: RegExp[] = [ + // ─── Runtime metrics ─── // Iteration / attempt counts: "6/6 iterations", "attempt 5", "accepted on attempt 3" /\b\d+\s*\/\s*\d+\s+(iterations?|attempts?|cycles?|runs?|shards?)\b/i, /\b(accepted|resolved|converged)\s+on\s+attempt\s+\d+\b/i, @@ -66,6 +75,30 @@ const EMPIRICAL_PATTERNS: RegExp[] = [ // "escalated through N tiers", "N distinct models" /\bescalated\s+through\s+\d+\b/i, /\b\d+\s+distinct\s+(model|tier)s?\b/i, + + // ─── History / proof references ─── + // "verified on PR #8", "verified end-to-end on PR 8", "tested against PR #4" + // Require PR#N / commit-hash / "prior " to avoid matching + // "verified ... in production" (PR without \b-ish anchor previously + // consumed "pr" of "production"). + /\bverified\s+(?:end[- ]to[- ]end\s+)?(?:on|against|in)\s+(?:PR\s*#?\d+|commit\s+[0-9a-f]{6,}|prior\s+\w+|the\s+\w+\s+audit)\b/i, + /\btested\s+(?:against|in|on)\s+(?:PR\s*#?\d+|commit\s+[0-9a-f]{6,}|prior\s+\w+)\b/i, + // Direct PR/commit references: "PR #8", "on PR 9", "from commit abc123" + /\b(?:on|from|in|via|per)\s+PR\s*#?\d+\b/i, + /\b(?:from|in|per|against)\s+commit\s+[0-9a-f]{6,}/i, + // Observational descriptions of prior behavior: "was flipping", "was X before", "previously observed" + /\b(?:was|were)\s+(?:flipping|drifting|inconsistent|non[- ]deterministic|creeping)\b/i, + /\bpreviously\s+(?:observed|flagged|reported|seen|landed)\b/i, + /\bused\s+to\s+(?:flip|fail|flag|reject|block)\b/i, + /\bobserved\s+(?:in|during|on|across)\s+(?:PR|prior|\d+\s+(?:runs?|audits?))/i, + // "flipping/drifting across N runs" — historical variance description + /\b(?:flipping|drifting|varying|oscillating)\s+across\s+(?:\d+\s+)?(?:runs?|audits?|iterations?)\b/i, + // "the proven X" referring to prior work (proven is a STRONG pattern + // but in context "the proven FOO" is usually a historical reference, + // not a fresh claim). We catch it here so the empirical skip wins. + /\bthe\s+proven\s+(?:escalation\s+ladder|pipeline|flow|loop|tier|path)/i, + // "from the 9-run test", "across the 5-run validation" + /\b(?:from|across|in|during)\s+the\s+\d+[- ]run\s+(?:test|validation|probe|experiment)/i, ]; export interface ParsedClaims { @@ -101,7 +134,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out // classify it as empirical so the inference check doesn't ask // the cloud to prove "58 cloud calls" from the diff. Order: // empirical → strong → moderate → weak. - const empirical = firstMatch(line, EMPIRICAL_PATTERNS); + const empirical = firstUnquotedMatch(line, EMPIRICAL_PATTERNS); if (empirical) { out.push({ text: line.trim().slice(0, 200), @@ -111,7 +144,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out }); continue; } - const strong = firstMatch(line, STRONG_PATTERNS); + const strong = firstUnquotedMatch(line, STRONG_PATTERNS); if (strong) { out.push({ text: line.trim().slice(0, 200), @@ -121,7 +154,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out }); continue; } - const moderate = firstMatch(line, MODERATE_PATTERNS); + const moderate = firstUnquotedMatch(line, MODERATE_PATTERNS); if (moderate) { out.push({ text: line.trim().slice(0, 200), @@ -131,7 +164,7 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out }); continue; } - const weak = firstMatch(line, WEAK_PATTERNS); + const weak = firstUnquotedMatch(line, WEAK_PATTERNS); if (weak) { out.push({ text: line.trim().slice(0, 200), @@ -143,9 +176,35 @@ function scanText(text: string, location_prefix: string, commit_sha: string, out } } -function firstMatch(text: string, patterns: RegExp[]): RegExp | null { +// Match a pattern only when its match position is NOT inside a quoted +// string on the line. Mirrors the same guard in auditor/checks/static.ts +// — the two files have the same false-positive class: PR authors +// quote pattern examples in commit message bodies (e.g. `"Phase 45 +// shipped"` as a test example) and without this guard those quoted +// references get flagged as fresh ship-claims. Only skips when the +// match itself falls inside quotes; real (unquoted) uses of the same +// vocabulary still classify correctly. +function firstUnquotedMatch(text: string, patterns: RegExp[]): RegExp | null { for (const p of patterns) { - if (p.test(text)) return p; + const m = text.match(p); + if (!m || typeof m.index !== "number") continue; + if (isInsideQuotedString(text, m.index)) continue; + return p; } return null; } + +// Walks left→right toggling in-quote state on each unescaped quote. +// Good enough for single-line claims; multi-line strings aren't parsed. +function isInsideQuotedString(line: string, pos: number): boolean { + let inDouble = false, inSingle = false, inBacktick = false; + for (let i = 0; i < pos; i++) { + const c = line[i]; + const esc = i > 0 && line[i - 1] === "\\"; + if (esc) continue; + if (c === '"' && !inSingle && !inBacktick) inDouble = !inDouble; + else if (c === "'" && !inDouble && !inBacktick) inSingle = !inSingle; + else if (c === "`" && !inDouble && !inSingle) inBacktick = !inBacktick; + } + return inDouble || inSingle || inBacktick; +} diff --git a/auditor/fact_extractor.ts b/auditor/fact_extractor.ts new file mode 100644 index 0000000..1352a4c --- /dev/null +++ b/auditor/fact_extractor.ts @@ -0,0 +1,271 @@ +// fact_extractor — routes curated TEXT through llm_team_ui's +// "knowledge extract facts" mode (mode=extract at /api/run). +// +// What it gives us: structured {facts, entities, relationships} from +// whatever curated blob we send. Auditor sends the tree-split +// inference scratchpad (the best distillation of what a PR changed). +// Scrum_master will later send its accepted review bodies. +// +// Why route through llm_team and not just extract directly from our +// own checks: llm_team's extract uses a local EXTRACTOR model +// (qwen2.5) + a separate VERIFIER (gemma2). This cross-check is the +// discipline J wants for knowledge going into the playbook — facts +// go in only after a second model has rated them CORRECT / +// UNVERIFIABLE. Fast (local models, ~10-20s), free, and matches the +// codereview pattern J already trusts. +// +// SSE parsing: llm_team streams SSE events. We're only interested in +// the final "response" event with role="final" + the extraction +// response (role="extraction N"). Parse the JSON from the extractor's +// response text. + +const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000"; +const EXTRACTOR = process.env.LH_FACT_EXTRACTOR ?? "qwen2.5:latest"; +const VERIFIER = process.env.LH_FACT_VERIFIER ?? "gemma2:latest"; +const EXTRACT_TIMEOUT_MS = 120_000; +const PROJECT_CONTEXT_FILE = process.env.LH_AUDITOR_CONTEXT_FILE + ?? "/home/profit/lakehouse/docs/AUDITOR_CONTEXT.md"; + +let cachedContext: string | null = null; +async function loadProjectContext(): Promise { + if (cachedContext !== null) return cachedContext; + try { + const { readFile } = await import("node:fs/promises"); + const raw = await readFile(PROJECT_CONTEXT_FILE, "utf8"); + // Cap at 4KB — anything past that is more noise than signal for + // the extractor/verifier's attention budget. + cachedContext = raw.slice(0, 4000); + } catch { + cachedContext = ""; // context file missing → extractor runs without preamble + } + return cachedContext; +} + +export interface Entity { + name: string; + type: string; + description?: string; +} + +export interface Relationship { + from: string; + to: string; + type: string; +} + +export interface ExtractedFacts { + facts: string[]; + entities: Entity[]; + relationships: Relationship[]; + verification: string; + extractor_model: string; + verifier_model: string; + source_preview: string; + // Populated when the extract run completed server-side (llm_team + // persists to its own team_runs; this is for our own cross-ref). + llm_team_run_id?: number; + extracted_at: string; + // Per-fact verdicts from the verifier pass (CORRECT/INCORRECT/ + // UNVERIFIABLE/UNCHECKED). Aligned 1:1 with the *raw* fact list + // pre-drop so operators can see which verdicts mapped to dropped + // facts if needed. + verifier_verdicts?: Array<"CORRECT" | "INCORRECT" | "UNVERIFIABLE" | "UNCHECKED">; + facts_dropped_by_verifier?: number; + error?: string; +} + +/** + * Run the llm_team extract pipeline on `source` text. Returns + * structured {facts, entities, relationships}. + * + * Returns an object with `error` set if the pipeline failed — never + * throws, because fact extraction is best-effort enrichment (the + * primary audit must not break if llm_team is down). + */ +export async function extractFacts(source: string): Promise { + const base: ExtractedFacts = { + facts: [], + entities: [], + relationships: [], + verification: "", + extractor_model: EXTRACTOR, + verifier_model: VERIFIER, + source_preview: source.slice(0, 240), + extracted_at: new Date().toISOString(), + }; + + // Prepend project context to the source so the extractor + verifier + // know what codebase/framework these facts belong to. Without this, + // the verifier marks most domain-specific facts as UNVERIFIABLE ("I + // don't know what Lakehouse is"). With it, the verifier can CORRECT- + // stamp facts that align with the stated architecture. + const context = await loadProjectContext(); + const prompt = context.length > 0 + ? `=== PROJECT CONTEXT (for grounding facts; do NOT extract facts from this section) ===\n${context}\n\n=== CONTENT TO EXTRACT FACTS FROM ===\n${source}` + : source; + + let resp: Response; + try { + resp = await fetch(`${LLM_TEAM}/api/run`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + mode: "extract", + prompt, + extractor: EXTRACTOR, + verifier: VERIFIER, + source: "prompt", + skip_cache: true, // cache by prompt would dedup identical + // scratchpads, but we want fresh extraction + // for per-audit facts; cheap since local. + }), + signal: AbortSignal.timeout(EXTRACT_TIMEOUT_MS), + }); + } catch (e) { + return { ...base, error: `fetch failed: ${(e as Error).message}` }; + } + + if (!resp.ok) { + const body = await resp.text().catch(() => ""); + return { ...base, error: `llm_team /api/run ${resp.status}: ${body.slice(0, 200)}` }; + } + + // Stream SSE lines; collect the one extraction response + the run_saved event + // so we can capture the team-runs ID for cross-ref. + const decoder = new TextDecoder(); + const reader = resp.body?.getReader(); + if (!reader) return { ...base, error: "no response body" }; + + let buffer = ""; + let extractionText = ""; + let verifierText = ""; + let runId: number | undefined = undefined; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + let nl: number; + while ((nl = buffer.indexOf("\n\n")) >= 0) { + const chunk = buffer.slice(0, nl); + buffer = buffer.slice(nl + 2); + const dataLine = chunk.split("\n").find(l => l.startsWith("data: ")); + if (!dataLine) continue; + try { + const ev = JSON.parse(dataLine.slice(6)); + if (ev.type === "response") { + const role = String(ev.role ?? ""); + if (role.startsWith("extraction")) extractionText = String(ev.text ?? ""); + else if (role === "verifier") verifierText = String(ev.text ?? ""); + } else if (ev.type === "run_saved") { + const id = Number(ev.run_id); + if (Number.isFinite(id)) runId = id; + } + } catch { /* skip malformed SSE */ } + } + } + } catch (e) { + return { ...base, error: `SSE read failed: ${(e as Error).message}` }; + } + + // Pull the JSON object out of extractionText (may be wrapped in ```json fences). + const parsed = extractFirstJsonObject(extractionText); + if (!parsed) { + return { ...base, error: "extractor returned no parseable JSON", verification: verifierText }; + } + + const rawFacts: string[] = Array.isArray(parsed.facts) + ? parsed.facts.slice(0, 50).map(String) + : []; + + // Parse the verifier's free-form prose into per-fact verdicts, then + // drop any fact the verifier explicitly marked INCORRECT. Leave + // UNVERIFIABLE in place: many of our extractions are domain-specific + // (Lakehouse internals) and the verifier has no prior-knowledge + // anchor, so UNVERIFIABLE is the expected verdict for new signal, + // not a quality fail. This is verifier-gated persistence: drop only + // what's affirmatively wrong, not what's novel. + const verdicts = parseVerifierVerdicts(verifierText, rawFacts.length); + const incorrectIdx = new Set(); + verdicts.forEach((v, i) => { if (v === "INCORRECT") incorrectIdx.add(i); }); + const kept = rawFacts.filter((_, i) => !incorrectIdx.has(i)); + + return { + ...base, + facts: kept, + entities: Array.isArray(parsed.entities) + ? parsed.entities.slice(0, 30).map((e: any) => ({ + name: String(e?.name ?? ""), + type: String(e?.type ?? ""), + description: typeof e?.description === "string" ? e.description.slice(0, 240) : undefined, + })).filter(e => e.name.length > 0) + : [], + relationships: Array.isArray(parsed.relationships) + ? parsed.relationships.slice(0, 30).map((r: any) => ({ + from: String(r?.from ?? ""), + to: String(r?.to ?? ""), + type: String(r?.type ?? ""), + })).filter(r => r.from.length > 0 && r.to.length > 0) + : [], + verification: verifierText.slice(0, 1500), + facts_dropped_by_verifier: incorrectIdx.size, + verifier_verdicts: verdicts, + llm_team_run_id: runId, + }; +} + +// Parse verifier's free-form output into a per-fact verdict array. +// Gemma2 uses several formats depending on prompt mood: +// Format A: **1.** claim... * **Verdict:** CORRECT +// Format B: **1.** claim... * **CORRECT** (no "Verdict:" label) +// Format C: 1. claim... CORRECT +// Strategy: split on fact numbers, then find the first +// CORRECT|INCORRECT|UNVERIFIABLE token in each section. Handles all +// three formats without regex gymnastics. +function parseVerifierVerdicts( + verifierText: string, + numFacts: number, +): Array<"CORRECT" | "INCORRECT" | "UNVERIFIABLE" | "UNCHECKED"> { + const out: Array<"CORRECT" | "INCORRECT" | "UNVERIFIABLE" | "UNCHECKED"> = + Array(numFacts).fill("UNCHECKED"); + if (!verifierText) return out; + + // Find each fact section start — "**N.**" or "N." at line start — + // and slice out the content up to the NEXT fact number. Each section + // gets scanned for the first CORRECT/INCORRECT/UNVERIFIABLE token. + const starts: Array<{ idx: number; pos: number }> = []; + const header = /(?:^|\n)\s*(?:\*\*)?(\d+)[.)]/g; + for (const m of verifierText.matchAll(header)) { + const factNum = Number(m[1]); + if (!Number.isFinite(factNum)) continue; + starts.push({ idx: factNum - 1, pos: m.index! }); + } + for (let i = 0; i < starts.length; i++) { + const s = starts[i]; + const end = i + 1 < starts.length ? starts[i + 1].pos : verifierText.length; + if (s.idx < 0 || s.idx >= numFacts) continue; + const section = verifierText.slice(s.pos, end); + const v = section.match(/\b(CORRECT|INCORRECT|UNVERIFIABLE)\b/i); + if (v) out[s.idx] = v[1].toUpperCase() as "CORRECT" | "INCORRECT" | "UNVERIFIABLE"; + } + return out; +} + +// Lift the first balanced JSON object out of (possibly fenced) text. +// Same discipline as inference.ts::extractJson. +function extractFirstJsonObject(text: string): any | null { + const cleaned = text.replace(/^```(?:json)?\s*/im, "").replace(/```\s*$/im, ""); + let depth = 0, start = -1; + for (let i = 0; i < cleaned.length; i++) { + const c = cleaned[i]; + if (c === "{") { if (depth === 0) start = i; depth++; } + else if (c === "}") { + depth--; + if (depth === 0 && start >= 0) { + try { return JSON.parse(cleaned.slice(start, i + 1)); } catch { start = -1; } + } + } + } + return null; +} diff --git a/auditor/kb_stats.ts b/auditor/kb_stats.ts new file mode 100644 index 0000000..9608656 --- /dev/null +++ b/auditor/kb_stats.ts @@ -0,0 +1,269 @@ +// kb_stats — on-demand dashboard numbers from the KB scratchpad +// files. Reads data/_auditor/verdicts/*, data/_kb/audit_lessons.jsonl, +// data/_kb/audit_facts.jsonl, data/_kb/audit_discrepancies.jsonl, +// data/_kb/scrum_reviews.jsonl and prints: +// +// - verdict flip-flop rate (same SHA re-audited, verdict changed?) +// - consensus discrepancy rate (N runs disagreed on a claim) +// - confidence distribution from kb_index aggregator +// - top N recurring entities from audit_facts +// - fact growth over time +// - scrum vs inference KB split +// +// Run: bun run auditor/kb_stats.ts +// bun run auditor/kb_stats.ts --top 15 # show top 15 entities +// bun run auditor/kb_stats.ts --json # machine-readable +// +// This is the "dashboard" without running Grafana. If someone really +// wants a dashboard, wire this output into a static HTML page + cron. + +import { readFile, readdir } from "node:fs/promises"; +import { join } from "node:path"; +import { aggregate } from "./kb_index.ts"; + +const REPO = "/home/profit/lakehouse"; +const VERDICTS_DIR = `${REPO}/data/_auditor/verdicts`; +const AUDIT_LESSONS = `${REPO}/data/_kb/audit_lessons.jsonl`; +const AUDIT_FACTS = `${REPO}/data/_kb/audit_facts.jsonl`; +const AUDIT_DISCREPANCIES = `${REPO}/data/_kb/audit_discrepancies.jsonl`; +const SCRUM_REVIEWS = `${REPO}/data/_kb/scrum_reviews.jsonl`; + +interface Args { + top: number; + json: boolean; +} + +function parseArgs(argv: string[]): Args { + const a: Args = { top: 10, json: false }; + for (let i = 2; i < argv.length; i++) { + if (argv[i] === "--top") a.top = Number(argv[++i] ?? 10); + else if (argv[i] === "--json") a.json = true; + } + return a; +} + +async function readJsonl(path: string): Promise { + try { + const raw = await readFile(path, "utf8"); + return raw.split("\n").filter(l => l.length > 0).map(l => { + try { return JSON.parse(l) as T; } catch { return null as any; } + }).filter(r => r !== null); + } catch { return []; } +} + +async function loadVerdicts(): Promise> { + let files: string[] = []; + try { files = await readdir(VERDICTS_DIR); } catch { return []; } + const out = []; + for (const f of files) { + if (!f.endsWith(".json")) continue; + const m = f.match(/^(\d+)-([0-9a-f]+)\.json$/); + if (!m) continue; + try { + const v = JSON.parse(await readFile(join(VERDICTS_DIR, f), "utf8")); + out.push({ + pr: Number(m[1]), + sha: m[2], + overall: String(v.overall), + findings_total: Number(v.metrics?.findings_total ?? 0), + findings_block: Number(v.metrics?.findings_block ?? 0), + findings_warn: Number(v.metrics?.findings_warn ?? 0), + }); + } catch { /* skip corrupt */ } + } + return out; +} + +interface Stats { + audit_count: number; + verdict_distribution: Record; + // Same PR with multiple SHAs — if verdicts differ, that's drift across + // the PR's commit history. Not a flip-flop in the classical sense, + // but worth surfacing (e.g. "PR #8 was block block req req block"). + per_pr_verdict_sequences: Record; + // For each PR with ≥ 2 audits, how many distinct verdicts did it + // produce? 1 = stable; 2+ = some flipping. + verdict_instability: { pr_count: number; pr_with_multiple_verdicts: number; pr_with_3plus: number }; + consensus: { discrepancy_count: number; tiebreaker_used: number; unresolved: number }; + kb: { + audit_lessons_rows: number; + audit_facts_rows: number; + scrum_reviews_rows: number; + distinct_finding_signatures: number; + distinct_entities_across_prs: number; + entities_in_2plus_prs: number; + entities_in_5plus_prs: number; + }; + fact_quality: { + verifier_verdict_distribution: Record; + facts_dropped_by_verifier_total: number; + extraction_success_rate: number; + }; + top_entities: Array<{ name: string; distinct_prs: number; count: number; types: string[] }>; + kb_by_source: Record; +} + +async function collect(args: Args): Promise { + const verdicts = await loadVerdicts(); + const lessons = await readJsonl(AUDIT_LESSONS); + const facts = await readJsonl(AUDIT_FACTS); + const disc = await readJsonl(AUDIT_DISCREPANCIES); + const reviews = await readJsonl(SCRUM_REVIEWS); + + // Verdict stability + const byPr: Record = {}; + const verdictDist: Record = {}; + for (const v of verdicts) { + (byPr[v.pr] ??= []).push(v.overall); + verdictDist[v.overall] = (verdictDist[v.overall] ?? 0) + 1; + } + let multi = 0, tri = 0; + for (const [_, seq] of Object.entries(byPr)) { + const distinct = new Set(seq); + if (distinct.size >= 2) multi++; + if (distinct.size >= 3) tri++; + } + + // Consensus drift + const consensus = { + discrepancy_count: disc.length, + tiebreaker_used: disc.filter(d => String(d.resolution).startsWith("tiebreaker")).length, + unresolved: disc.filter(d => d.resolution === "unresolved").length, + }; + + // Lesson signatures + const lessonAgg = await aggregate(AUDIT_LESSONS, { + keyFn: r => r?.signature, + scopeFn: r => (r?.pr_number !== undefined ? `pr-${r.pr_number}` : undefined), + }); + + // Entity aggregation across audit_facts rows + interface EntAgg { distinct_prs: Set; count: number; types: Set; name: string; sources: Set } + const entAgg = new Map(); + const sourceCount: Record = {}; + let totalVerdictDist: Record = { CORRECT: 0, INCORRECT: 0, UNVERIFIABLE: 0, UNCHECKED: 0 }; + let factsDroppedTotal = 0; + let extractionsWithFacts = 0; + + for (const row of facts) { + const src = String(row.source ?? "unknown"); + sourceCount[src] = (sourceCount[src] ?? 0) + 1; + const pr = Number(row.pr_number); + if (Array.isArray(row.verifier_verdicts)) { + for (const v of row.verifier_verdicts) { + totalVerdictDist[v] = (totalVerdictDist[v] ?? 0) + 1; + } + } + factsDroppedTotal += Number(row.facts_dropped_by_verifier ?? 0); + if ((Array.isArray(row.facts) && row.facts.length > 0) || (Array.isArray(row.entities) && row.entities.length > 0)) { + extractionsWithFacts++; + } + for (const e of Array.isArray(row.entities) ? row.entities : []) { + const name = String(e?.name ?? "").trim(); + if (name.length < 3) continue; + const key = name.toLowerCase(); + const agg = entAgg.get(key) ?? { distinct_prs: new Set(), count: 0, types: new Set(), name, sources: new Set() }; + agg.count++; + if (Number.isFinite(pr) && pr > 0) agg.distinct_prs.add(pr); + if (e?.type) agg.types.add(String(e.type)); + agg.sources.add(src); + entAgg.set(key, agg); + } + } + + const entitiesIn2Plus = Array.from(entAgg.values()).filter(a => a.distinct_prs.size >= 2).length; + const entitiesIn5Plus = Array.from(entAgg.values()).filter(a => a.distinct_prs.size >= 5).length; + const topEntities = Array.from(entAgg.values()) + .sort((a, b) => b.distinct_prs.size - a.distinct_prs.size || b.count - a.count) + .slice(0, args.top) + .map(a => ({ + name: a.name, + distinct_prs: a.distinct_prs.size, + count: a.count, + types: Array.from(a.types), + })); + + const stats: Stats = { + audit_count: verdicts.length, + verdict_distribution: verdictDist, + per_pr_verdict_sequences: byPr, + verdict_instability: { + pr_count: Object.keys(byPr).length, + pr_with_multiple_verdicts: multi, + pr_with_3plus: tri, + }, + consensus, + kb: { + audit_lessons_rows: lessons.length, + audit_facts_rows: facts.length, + scrum_reviews_rows: reviews.length, + distinct_finding_signatures: lessonAgg.size, + distinct_entities_across_prs: entAgg.size, + entities_in_2plus_prs: entitiesIn2Plus, + entities_in_5plus_prs: entitiesIn5Plus, + }, + fact_quality: { + verifier_verdict_distribution: totalVerdictDist, + facts_dropped_by_verifier_total: factsDroppedTotal, + extraction_success_rate: facts.length > 0 ? extractionsWithFacts / facts.length : 0, + }, + top_entities: topEntities, + kb_by_source: sourceCount, + }; + return stats; +} + +function renderHuman(s: Stats): string { + const lines: string[] = []; + lines.push("═══ KB STATS ═══"); + lines.push(""); + lines.push(`Audits: ${s.audit_count} total across ${s.verdict_instability.pr_count} distinct PRs`); + lines.push(`Verdicts: ${Object.entries(s.verdict_distribution).map(([k, v]) => `${k}=${v}`).join(" ")}`); + const multiplePct = s.verdict_instability.pr_count > 0 + ? Math.round(100 * s.verdict_instability.pr_with_multiple_verdicts / s.verdict_instability.pr_count) + : 0; + lines.push(`Verdict instability: ${s.verdict_instability.pr_with_multiple_verdicts}/${s.verdict_instability.pr_count} PRs had 2+ distinct verdicts (${multiplePct}%) — 3+ distinct: ${s.verdict_instability.pr_with_3plus}`); + lines.push(""); + lines.push("─── Consensus ───"); + lines.push(` discrepancies logged: ${s.consensus.discrepancy_count}`); + lines.push(` tiebreaker used: ${s.consensus.tiebreaker_used}`); + lines.push(` unresolved: ${s.consensus.unresolved}`); + const dRate = s.audit_count > 0 ? (100 * s.consensus.discrepancy_count / s.audit_count).toFixed(1) : "0"; + lines.push(` discrepancy rate: ${dRate}% of audits`); + lines.push(""); + lines.push("─── KB size ───"); + lines.push(` audit_lessons.jsonl: ${s.kb.audit_lessons_rows} rows, ${s.kb.distinct_finding_signatures} distinct signatures`); + lines.push(` audit_facts.jsonl: ${s.kb.audit_facts_rows} rows, ${s.kb.distinct_entities_across_prs} distinct entities`); + lines.push(` scrum_reviews.jsonl: ${s.kb.scrum_reviews_rows} rows`); + lines.push(` entities in 2+ PRs: ${s.kb.entities_in_2plus_prs}`); + lines.push(` entities in 5+ PRs: ${s.kb.entities_in_5plus_prs} ← strong cross-cutting signal`); + lines.push(""); + lines.push("─── Fact quality ───"); + const v = s.fact_quality.verifier_verdict_distribution; + lines.push(` verifier verdicts: CORRECT=${v.CORRECT ?? 0} UNVERIFIABLE=${v.UNVERIFIABLE ?? 0} UNCHECKED=${v.UNCHECKED ?? 0} INCORRECT=${v.INCORRECT ?? 0}`); + lines.push(` facts dropped by verifier: ${s.fact_quality.facts_dropped_by_verifier_total}`); + lines.push(` extraction success rate: ${(s.fact_quality.extraction_success_rate * 100).toFixed(1)}%`); + lines.push(""); + lines.push("─── KB sources ───"); + for (const [src, n] of Object.entries(s.kb_by_source)) { + lines.push(` ${src}: ${n}`); + } + lines.push(""); + lines.push(`─── Top ${s.top_entities.length} recurring entities ───`); + for (const e of s.top_entities) { + lines.push(` [${e.distinct_prs} PRs × ${e.count} obs] ${e.name} (${e.types.join(",")})`); + } + return lines.join("\n"); +} + +async function main() { + const args = parseArgs(process.argv); + const stats = await collect(args); + if (args.json) { + console.log(JSON.stringify(stats, (_, v) => v instanceof Set ? Array.from(v) : v, 2)); + } else { + console.log(renderHuman(stats)); + } +} + +main().catch(e => { console.error("[kb_stats] fatal:", e); process.exit(1); }); diff --git a/docs/AUDITOR_CONTEXT.md b/docs/AUDITOR_CONTEXT.md new file mode 100644 index 0000000..98ba091 --- /dev/null +++ b/docs/AUDITOR_CONTEXT.md @@ -0,0 +1,69 @@ +# Auditor Context — project preamble for fact extraction + +This file is read by `auditor/fact_extractor.ts` and prepended to the +extract-facts prompt sent to llm_team. The goal: give the extractor + +verifier enough grounding to ground domain-specific facts instead of +marking them UNVERIFIABLE by default. + +Keep this short (< 400 words). Verifier only reads the first ~4KB of +the prompt alongside the facts. Longer = noise, not signal. + +Update when: a new Phase lands, a crate is added/removed, the project's +primary domain shifts (e.g. staffing → DevOps). + +--- + +## What Lakehouse is + +Lakehouse is a Rust-first data platform over S3-compatible object +storage. Primary use: a staffing company ingesting legacy CRM data for +AI-powered worker matching, contract fulfillment, and playbook-driven +coordination. + +Architecture: 13 Rust crates + a Python sidecar (Ollama) + TypeScript +sub-agents (auditor, scrum_master, bot). Runs on a single server +(Nvidia A4000, 128GB RAM). All services on localhost: gateway :3100, +sidecar :3200, UI :3300, MCP :3700, observer :3800, MinIO :9000. + +## Key crates (each maps to a responsibility) + +- **shared** — types, Arrow helpers, PII utilities, SecretsProvider +- **proto** — gRPC definitions +- **storaged** — S3/MinIO I/O, AppendLog, ErrorJournal +- **catalogd** — metadata authority (manifests, views, tombstones) +- **queryd** — DataFusion SQL, MemTable cache, compaction +- **ingestd** — CSV/JSON/PDF/Postgres/MySQL ingest +- **vectord** — embeddings, HNSW index, **playbook_memory meta-index** (Phase 19+) +- **vectord-lance** — Lance 4.0 firewall crate (separate Arrow version) +- **journald** — append-only mutation event log +- **aibridge** — Rust↔Python sidecar bridge, context budget + continuation +- **gateway** — Axum HTTP :3100 + gRPC :3101 (Phase 38+ adds /v1/chat) +- **ui** — Dioxus WASM (stale, pre-Phase-9) +- **lance-bench** — standalone benchmark + +## Current architectural direction (Phase 38-44) + +Universal AI Control Plane: a `/v1/chat` OpenAI-compatible API that +routes all LLM traffic through one layer for token accounting + provider +fallback. Truth Layer + Validation Pipeline enforce staffing-domain +invariants (worker eligibility, PII, contract rules). The Auditor +(Phase A of cohesion plan) hard-blocks PR merges on placeholder code. + +## Auditor sub-agent role + +`auditor/` (TypeScript, Bun runtime) polls Gitea every 90s for open PRs. +For each fresh head SHA it runs 4 checks in parallel: static (grep-style +placeholder detection), dynamic (runs the hybrid fixture), inference +(gpt-oss:120b cloud review with N=3 consensus + qwen3-coder:480b +tie-breaker), and kb_query (reads `data/_kb/*.jsonl` for prior evidence). +Verdicts post to Gitea as commit status + review comment. Findings +append to `data/_kb/audit_lessons.jsonl` (path-agnostic signatures for +dedup). Curated scratchpads from tree-split get routed through this +extract-facts pipeline to populate `audit_facts.jsonl` — which is what +you (the extractor) are currently producing. + +## Things that are NOT the auditor + +- The LLM Team UI at `/root/llm_team_ui.py` (devop.live:5000) — a separate product for human-facing multi-model experimentation +- The scrum_master pipeline at `tests/real-world/scrum_master_pipeline.ts` — reviews files, not claims +- The bot at `bot/` — will apply fixes, doesn't audit diff --git a/tests/real-world/scrum_master_pipeline.ts b/tests/real-world/scrum_master_pipeline.ts index 9323da7..369b6f0 100644 --- a/tests/real-world/scrum_master_pipeline.ts +++ b/tests/real-world/scrum_master_pipeline.ts @@ -343,12 +343,50 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of attempts_made: history.length, tree_split_fired: treeSplitFired, suggestions_preview: accepted.slice(0, 2000), + schema_version: 2, + scrum_master_reviewed: true, }; try { await appendFile(SCRUM_REVIEWS_JSONL, JSON.stringify(row) + "\n"); } catch (e) { console.error(`[scrum] failed to append scrum_reviews.jsonl: ${(e as Error).message}`); } + + // Route the accepted review through llm_team's fact extractor so + // its entities + relationships land in audit_facts.jsonl alongside + // inference-side extractions. Same index, two sources. Tagged + // source:"scrum_review" + scrum_master_reviewed:true so downstream + // queries can filter by provenance. Reviews shorter than 120 + // chars are skipped — they're usually one-liners ("LGTM") with + // no extractable knowledge. + if (accepted.length >= 120 && process.env.LH_SCRUM_SKIP_EXTRACT !== "1") { + try { + const { extractFacts } = await import("../../auditor/fact_extractor.ts"); + const ex = await extractFacts(accepted); + if (!ex.error || ex.entities.length + ex.facts.length > 0) { + const factRow = { + pr_number: 0, // scrum runs outside a PR scope + file: rel, + head_sha: "", // no SHA scope; scope is the file+timestamp + extracted_at: ex.extracted_at, + extractor: ex.extractor_model, + verifier: ex.verifier_model, + llm_team_run_id: ex.llm_team_run_id ?? null, + facts: ex.facts, + entities: ex.entities, + relationships: ex.relationships, + verification_preview: ex.verification.slice(0, 400), + schema_version: 2, + source: "scrum_review", + scrum_master_reviewed: true, + }; + const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl"; + await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(factRow) + "\n"); + } + } catch (e) { + console.error(`[scrum] fact extraction failed for ${rel}: ${(e as Error).message}`); + } + } } return review;