lakehouse/auditor/checks/inference.ts
root 20a039c379
Some checks failed
lakehouse/auditor 13 blocking issues: cloud: claim not backed — "Invariants enforced (proven by tests + real run):"
auditor: rebuild on mode runner + drop tree-split (use distillation substrate)
Architectural simplification leveraging Phase 5 distillation work:
the auditor no longer pre-extracts facts via per-shard summaries
because lakehouse_answers_v1 (gold-standard prior PR audits + observer
escalations corpus) supplies cross-PR context through the mode runner's
matrix retrieval. Same signal, ~50× fewer cloud calls per audit.

Per-audit cost:
  Before: 168 gpt-oss:120b shard summaries + 3 final inference calls
  After:  3 deepseek-v3.1:671b mode-runner calls (full retrieval included)

Wall-clock on PR #11 (1.36MB diff):
  Before: ~25 minutes
  After:  88 seconds (3/3 consensus succeeded)

Files:
  auditor/checks/inference.ts
    - Default MODEL kimi-k2:1t → deepseek-v3.1:671b. kimi-k2 is hitting
      sustained Ollama Cloud 500 ISE (verified via repeated trivial
      probes; multi-hour outage). deepseek is the proven drop-in from
      Phase 5 distillation acceptance testing.
    - Dropped treeSplitDiff invocation. Diff truncates to MAX_DIFF_CHARS
      and goes straight to /v1/mode/execute task_class=pr_audit; mode
      runner pulls cross-PR context from lakehouse_answers_v1 via
      matrix retrieval. SHARD_MODEL retained for legacy callCloud
      compatibility (default qwen3-coder:480b if it ever runs).
    - extractAndPersistFacts now reads from truncated diff (no
      scratchpad post-tree-split-removal).

  auditor/checks/static.ts
    - serde-derived struct exemption (commit 107a682 shipped this; this
      commit is the rest of the auditor rebuild it landed alongside)
    - multi-line template literal awareness in isInsideQuotedString —
      tracks backtick state across lines so todo!() inside docstrings
      doesn't trip BLOCK_PATTERNS.

  crates/gateway/src/v1/mode.rs
    - pr_audit native runner mode added to VALID_MODES + is_native_mode
      + flags_for_mode + framing_text. PrAudit framing produces strict
      JSON {claim_verdicts, unflagged_gaps} for the auditor to parse.

  config/modes.toml
    - pr_audit task class with default_model=deepseek-v3.1:671b and
      matrix_corpus=lakehouse_answers_v1. Documents kimi-k2 outage
      with link to the swap rationale.

Real-data audit on PR #11 head 1b433a9 (which is the PR with all the
distillation work + auditor rebuild itself):
  - Pipeline ran to completion (88s for inference; full audit ~3 min)
  - 3/3 consensus runs succeeded on deepseek-v3.1:671b
  - 156 findings: 12 block, 23 warn, 121 info
  - Block findings are legitimate signal: 12 reviewer claims like
    "Invariants enforced (proven by tests + real run):" that the
    truncated diff can't directly verify. The auditor is correctly
    flagging claim-vs-diff divergence — exactly its job.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 23:32:44 -05:00

734 lines
31 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Cloud inference check — wraps the proven run_codereview pattern
// from llm_team_ui.py (same 3-stage framing, same cloud model) to
// critique a PR's claims against its diff.
//
// Proved out 2026-04-22 via /tmp/codereview_runner.py — gpt-oss:120b
// caught a real ternary bug in auditor/fixtures/hybrid_38_40_45.ts
// that unit tests missed. This module reuses the reviewer prompt
// shape (bugs / security / performance / style / edge cases) and
// adds claim-vs-diff specific framing.
//
// Call surface: runInferenceCheck(claims, diff) → Finding[].
// Cloud latency budget: ~60s (gpt-oss:120b reviewer typically 35-50s
// with a 15KB diff + claim list).
import type { Claim, Finding } from "../types.ts";
import { Glob } from "bun";
import { readFile, mkdir, appendFile } from "node:fs/promises";
import { extractFacts } from "../fact_extractor.ts";
const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100";
// Rebuild 2026-04-26: route claim verification through /v1/mode/execute
// (task_class=pr_audit) so we get pathway memory + lakehouse_answers_v1
// + JSON-shaped framing molded into ONE prompt. The hand-rolled
// systemMsg/userMsg path was reinventing the mode runner badly.
//
// 2026-04-27 update: original default kimi-k2:1t hit a sustained
// upstream outage on Ollama Cloud (consistent 500 ISE across hours of
// retries — verified with trivial 8-token probes). Swapped default to
// deepseek-v3.1:671b which is proven working end-to-end through the
// pr_audit mode runner during Phase 5 distillation acceptance testing.
// kimi-k2:1t can be re-selected via LH_AUDITOR_REVIEW_MODEL env when
// the upstream returns. Tie-breaker stays grok-4.1-fast (different
// vendor lineage so consensus + tie-break won't fail-correlate).
const MODEL = process.env.LH_AUDITOR_REVIEW_MODEL ?? "deepseek-v3.1:671b";
const TIEBREAKER_MODEL = process.env.LH_AUDITOR_TIEBREAKER_MODEL ?? "x-ai/grok-4.1-fast";
// SHARD_MODEL retained for the legacy callCloud path (still used by
// runCloudInference's diagnostic mode), but no longer fired by the
// main inference flow — tree-split was retired 2026-04-27 in favor of
// the mode runner's matrix retrieval against lakehouse_answers_v1.
const SHARD_MODEL = process.env.LH_AUDITOR_SHARD_MODEL ?? "qwen3-coder:480b";
const N_CONSENSUS = Number(process.env.LH_AUDITOR_CONSENSUS_N ?? 3);
// Bounded parallelism on the tree-split shard loop. Old behavior was
// fully serial ("keep gateway load bounded") which made huge PRs take
// 5+ minutes of curation alone. 6 in flight keeps gateway busy without
// thrashing it; tunable via env.
const SHARD_CONCURRENCY = Number(process.env.LH_AUDITOR_SHARD_CONCURRENCY ?? 6);
const AUDIT_DISCREPANCIES_JSONL = "/home/profit/lakehouse/data/_kb/audit_discrepancies.jsonl";
// 40KB comfortably fits gpt-oss:120b's context. PR #1 (~39KB) was
// previously truncated at 15KB causing the reviewer to miss later
// files (gitea.ts, policy.ts) and flag "no Gitea client present" as a
// block finding when the file was simply outside the truncation window.
//
// Above this threshold we curate via tree-split rather than truncate,
// following the scrum_master pattern: shard the diff, summarize each
// shard against the claim-verification task, merge into a compact
// scratchpad, then ask the cloud to verify claims against the
// scratchpad. This gives the cloud full-PR fidelity without bursting
// its context window (observed failure mode: empty response or
// unparseable output when prompt exceeds model's comfortable range).
const MAX_DIFF_CHARS = 40000;
// Tree-split kicks in above this. 30KB is below MAX_DIFF_CHARS so we
// curate BEFORE truncation would happen — never lose signal to a hard
// cut. Shard size is chosen so ~10 shards cover PR #8-size diffs in a
// reasonable round-trip budget.
const CURATION_THRESHOLD = 30000;
const DIFF_SHARD_SIZE = 4500;
const CALL_TIMEOUT_MS = 120_000;
// Mode runner can take longer than a raw /v1/chat call because it does
// pathway-fingerprint lookup + matrix retrieval + relevance filter
// before the LLM call. Budget extra time so we don't trip on a slow
// answers-corpus search.
const MODE_RUNNER_TIMEOUT_MS = 240_000;
const REPO_ROOT = "/home/profit/lakehouse";
export interface InferenceContext {
pr_number: number;
head_sha: string;
}
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
export async function runInferenceCheck(
claims: Claim[],
diff: string,
ctx?: InferenceContext,
): Promise<Finding[]> {
if (claims.length === 0) {
return [{
check: "inference",
severity: "info",
summary: "no ship-claims extracted — skipping cloud inference",
evidence: ["parser returned empty claim list; nothing to verify against cloud"],
}];
}
// Empirical claims (runtime metrics / observed outcomes) can't be
// verified from the diff. Drop them from the cloud prompt so the
// reviewer doesn't chase ghosts. A future `runtime_evidence` check
// can validate these against data/_kb/*/summary.json outputs.
const verifiable = claims.filter(c => c.strength !== "empirical");
const empiricalCount = claims.length - verifiable.length;
if (verifiable.length === 0) {
return [{
check: "inference",
severity: "info",
summary: `all ${claims.length} claims are empirical (runtime metrics) — skipping cloud inference`,
evidence: [`empirical claims can't be verified from a static diff; needs runtime-evidence check`],
}];
}
// 2026-04-27 architecture simplification: dropped the tree-split
// scratchpad layer. Rationale: the mode runner's pr_audit pipeline
// pulls from lakehouse_answers_v1 (gold-standard prior audits) +
// lakehouse_arch_v1 + lakehouse_symbols_v1 via matrix retrieval. That
// corpus IS the cross-PR context the tree-split was synthesizing
// from scratch on every audit run. With the distillation substrate
// shipped (commits 27b1d27..1b433a9), per-shard fact extraction is
// redundant — and gpt-oss:120b at 168 calls/audit was the dominant
// cost. Now: truncate diff to MAX_DIFF_CHARS, hand straight to the
// mode runner, let retrieval supply context. ONE strong-model call
// per consensus rep × N=3 reps = 3 calls total per audit.
const truncated = diff.length > MAX_DIFF_CHARS
? diff.slice(0, MAX_DIFF_CHARS) + `\n...[${diff.length - MAX_DIFF_CHARS} more chars truncated — the pr_audit mode runner has matrix retrieval against lakehouse_answers_v1 + arch + symbols for cross-PR context]`
: diff;
const curationNote = diff.length > MAX_DIFF_CHARS
? ` (truncated ${diff.length}${MAX_DIFF_CHARS} chars; matrix retrieval supplies cross-PR context)`
: "";
// Build the reviewer prompt in the same shape as run_codereview's
// review stage (llm_team_ui.py:10950), adapted for claim verification:
// "Task: ..."
// "Code: ..."
// "Review: bugs/security/perf/style/edge. Provide corrected code."
// We add: claim list upfront + ask for structured JSON verdict.
//
// Curation flag is now just a truncation flag — when the diff was
// cut, tell the reviewer it didn't see the full picture so it doesn't
// confidently mark a claim NOT BACKED based on absence in the
// (potentially incomplete) input.
const isCurated = curationNote.length > 0;
const prNumber = ctx?.pr_number ?? 0;
// N=3 consensus — fire the mode runner three times in parallel.
// Each /v1/mode/execute call composes pathway memory + answers corpus
// + JSON-shaped pr_audit framing internally, so the auditor's only
// job here is to vote-aggregate. Wall-clock ~= single call.
const primaryRuns = await Promise.all(
Array.from({ length: N_CONSENSUS }, () =>
runModeRunnerInference(truncated, verifiable, prNumber, isCurated, MODEL)),
);
const parsedRuns = primaryRuns.filter(r => r.parsed !== null);
if (parsedRuns.length === 0) {
// All N calls failed. Surface the first-run diagnostic so the
// operator sees *why* (unreachable / non-200 / unparseable).
const first = primaryRuns[0];
return [{
check: "inference",
severity: "info",
summary: `cloud inference all ${N_CONSENSUS} consensus runs failed — ${first.error ?? "unknown"}`,
evidence: [
`first-run diagnostic: ${first.diagnostic ?? "(none)"}`,
`successful runs: 0 / ${N_CONSENSUS}`,
],
}];
}
// Aggregate votes per claim_idx.
interface Votes { trues: number; falses: number; evidences: string[] }
const votesByClaim = new Map<number, Votes>();
const unflaggedByRun: any[][] = [];
let totalLatencyMs = 0;
let totalEnrichedChars = 0;
let bugFingerprintsSeen = 0;
let matrixKeptSeen = 0;
for (const run of parsedRuns) {
totalLatencyMs += run.latency_ms ?? 0;
totalEnrichedChars += run.enriched_chars ?? 0;
bugFingerprintsSeen = Math.max(bugFingerprintsSeen, run.bug_fingerprints ?? 0);
matrixKeptSeen = Math.max(matrixKeptSeen, run.matrix_kept ?? 0);
unflaggedByRun.push(Array.isArray(run.parsed?.unflagged_gaps) ? run.parsed.unflagged_gaps : []);
for (const v of run.parsed?.claim_verdicts ?? []) {
const idx = Number(v?.claim_idx);
if (!Number.isFinite(idx)) continue;
const rec = votesByClaim.get(idx) ?? { trues: 0, falses: 0, evidences: [] };
if (v.backed === false) {
rec.falses++;
rec.evidences.push(String(v.evidence ?? ""));
} else if (v.backed === true) {
rec.trues++;
}
votesByClaim.set(idx, rec);
}
}
const findings: Finding[] = [];
// Summary finding so the verdict layer knows the check ran.
findings.push({
check: "inference",
severity: "info",
summary: `pr_audit mode runner completed (model=${MODEL}, consensus=${parsedRuns.length}/${N_CONSENSUS}, ${totalLatencyMs}ms total)${curationNote}`,
evidence: [
`claims voted: ${votesByClaim.size}`,
`parsed runs: ${parsedRuns.length} / ${N_CONSENSUS}`,
`enrichment: ${bugFingerprintsSeen} bug fingerprints, ${matrixKeptSeen} answers-corpus chunks, prompt avg ${Math.round(totalEnrichedChars / Math.max(parsedRuns.length, 1))} chars`,
],
});
// Per-claim majority vote; tie-break if no majority.
const discrepancies: Array<{
claim_idx: number;
claim_text: string;
votes: { trues: number; falses: number };
resolution: "majority_backed" | "majority_not_backed" | "tiebreaker_backed" | "tiebreaker_not_backed" | "unresolved";
tiebreaker_model?: string;
}> = [];
for (const [idx, votes] of votesByClaim) {
const claim = verifiable[idx];
if (!claim) continue;
const totalVotes = votes.trues + votes.falses;
let notBacked: boolean | null = null;
let resolution: typeof discrepancies[number]["resolution"] = "majority_backed";
let evidenceText = "";
let tbModel: string | undefined;
if (votes.falses > votes.trues) {
notBacked = true;
resolution = "majority_not_backed";
evidenceText = votes.evidences[0] ?? "(no reason given)";
} else if (votes.trues > votes.falses) {
notBacked = false;
resolution = "majority_backed";
} else {
// Tie. Run tie-breaker with a different-architecture model
// through the same mode runner so framing/enrichment match.
const tb = await runModeRunnerInference(truncated, verifiable, prNumber, isCurated, TIEBREAKER_MODEL);
if (tb.parsed) {
const tv = (tb.parsed.claim_verdicts ?? []).find((v: any) => Number(v?.claim_idx) === idx);
if (tv?.backed === false) {
notBacked = true;
resolution = "tiebreaker_not_backed";
evidenceText = `(tie-breaker ${TIEBREAKER_MODEL}) ${String(tv.evidence ?? "")}`;
tbModel = TIEBREAKER_MODEL;
} else if (tv?.backed === true) {
notBacked = false;
resolution = "tiebreaker_backed";
tbModel = TIEBREAKER_MODEL;
} else {
resolution = "unresolved";
}
} else {
resolution = "unresolved";
}
}
// Log every case where the N runs disagreed — discrepancies are
// signal, not noise. Separate from audit_lessons.jsonl because
// they're about the *auditor's* quality, not the PR's quality.
const disagreed = totalVotes >= 2 && votes.trues > 0 && votes.falses > 0;
if (disagreed || resolution.startsWith("tiebreaker") || resolution === "unresolved") {
discrepancies.push({
claim_idx: idx,
claim_text: claim.text,
votes: { trues: votes.trues, falses: votes.falses },
resolution,
tiebreaker_model: tbModel,
});
}
if (notBacked === true) {
const sev: Finding["severity"] = claim.strength === "strong" ? "block"
: claim.strength === "moderate" ? "warn"
: "info";
findings.push({
check: "inference",
severity: sev,
claim_text: claim.text,
summary: `cloud: claim not backed — "${claim.text.slice(0, 100)}"`,
evidence: [
`at ${claim.location}`,
`consensus: ${votes.falses}/${totalVotes} not-backed (resolution: ${resolution})`,
`cloud reason: ${evidenceText.slice(0, 200)}`,
],
});
}
}
// Persist discrepancies so we can measure consensus drift over time.
if (discrepancies.length > 0 && ctx) {
persistDiscrepancies(ctx, discrepancies).catch(e =>
console.error(`[inference] discrepancy log failed: ${(e as Error).message}`));
}
// Use first run's parsed for downstream unflagged_gaps processing.
const parsed = parsedRuns[0].parsed;
// Route the curated scratchpad through llm_team's extract-facts
// pipeline when we have (a) a curated scratchpad (best signal about
// what the PR actually changed) and (b) PR context to scope facts.
// AWAITED (not fire-and-forget) so CLI callers like audit_one.ts
// don't exit before extraction lands; the systemd poller has plenty
// of headroom (90s cycle vs ~15s extraction). A failure inside
// extractAndPersistFacts is caught + logged but never throws.
// Post-2026-04-27: extraction now runs against the truncated diff
// (no scratchpad to extract from since tree-split was retired).
// Fact extraction is still useful for surfacing entities/symbols
// into audit_facts.jsonl even from truncated input.
if (isCurated && ctx && process.env.LH_AUDITOR_SKIP_EXTRACT !== "1") {
try {
await extractAndPersistFacts(truncated, ctx);
} catch (e) {
console.error(`[inference] fact extraction failed: ${(e as Error).message}`);
}
}
// Belt-and-suspenders: when operating on a curated scratchpad, drop
// the unflagged_gaps section entirely. The distillation can't
// reliably ground gap-detection, and false positives are worse than
// misses for this signal class. The systemMsg already asks the
// cloud to skip this section when curated — but the model may still
// emit it, so we filter here too.
const gapsToEmit = isCurated ? [] : (parsed.unflagged_gaps ?? []);
for (const g of gapsToEmit) {
const summary = String(g?.summary ?? "?");
const location = String(g?.location ?? "?");
// False-positive guard — when the cloud says "X not defined in this
// diff" or "missing implementation of X", the cloud may just mean
// "X is not in the added lines," not "X doesn't exist in the repo."
// Extract candidate symbol names and grep the repo. If any symbol
// is defined elsewhere, drop the finding — it's a known-symbol
// reference, not a placeholder.
if (/not\s+defined|missing\s+implementation|never\s+referenced\s+or\s+integrated/i.test(summary)) {
const symbols = extractSymbols(summary);
if (symbols.length > 0) {
const resolved = await symbolsExistInRepo(symbols);
if (resolved.length === symbols.length) {
// Every named symbol exists somewhere in the repo — silent drop.
continue;
}
if (resolved.length > 0) {
// Partially resolved — demote to info with a note.
findings.push({
check: "inference",
severity: "info",
summary: `cloud gap partially resolved by repo grep: ${summary.slice(0, 120)}`,
evidence: [
`location: ${location.slice(0, 140)}`,
`resolved via grep: ${resolved.join(",")}`,
`unresolved: ${symbols.filter(s => !resolved.includes(s)).join(",")}`,
],
});
continue;
}
}
}
findings.push({
check: "inference",
severity: "warn",
summary: `cloud-flagged gap not in any claim: ${summary.slice(0, 120)}`,
evidence: [`location: ${location.slice(0, 140)}`],
});
}
return findings;
}
// Single mode-runner call — consensus + tie-breaker dispatch through
// here. Returns parsed JSON shape + telemetry from /v1/mode/execute
// (latency, enrichment metrics) + any error diagnostic. NEVER throws.
// The consensus aggregator handles partial failures by dropping
// non-parsed runs from the vote.
interface CloudRunResult {
parsed: any | null;
latency_ms: number;
enriched_chars: number;
bug_fingerprints: number;
matrix_kept: number;
error?: string; // "unreachable" | "non_200" | "unparseable"
diagnostic?: string; // first 200 chars for debugging
model: string;
}
async function runModeRunnerInference(
diffOrScratchpad: string,
claims: Claim[],
prNumber: number,
isCurated: boolean,
model: string,
): Promise<CloudRunResult> {
// user_question carries the claim list + the curation note (if any).
// pr_audit's framing (mode.rs FRAMING_PR_AUDIT) holds the JSON shape +
// strict-output rules so we don't repeat them here.
const claimDigest = claims
.map((c, i) => ` ${i}. [${c.strength}] "${c.text}" at ${c.location}`)
.join("\n");
const curationNote = isCurated
? "\n\nNOTE: the FILE below is a curated multi-shard scratchpad of the diff, not the raw diff itself. Absence in the scratchpad is NOT evidence of absence in the actual diff. Only mark backed=false on direct contradiction (e.g. scratchpad shows the function is empty / a stub). Skip unflagged_gaps entirely when scratchpad is curated."
: "";
const userQuestion = [
"Verify each ship-claim against the diff (or scratchpad).",
"",
"Ship-claims (numbered 0..N-1):",
claimDigest,
curationNote,
"",
"Every claim above must produce exactly one claim_verdicts entry. Output strict JSON only — no prose outside the JSON object.",
].join("\n");
let resp: Response;
try {
resp = await fetch(`${GATEWAY}/v1/mode/execute`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
task_class: "pr_audit",
file_path: `pr-${prNumber}.diff`,
file_content: diffOrScratchpad,
user_question: userQuestion,
force_model: model,
force_temperature: 0,
}),
signal: AbortSignal.timeout(MODE_RUNNER_TIMEOUT_MS),
});
} catch (e) {
return {
parsed: null, latency_ms: 0, enriched_chars: 0, bug_fingerprints: 0, matrix_kept: 0,
error: "unreachable", diagnostic: (e as Error).message.slice(0, 200), model,
};
}
if (!resp.ok) {
return {
parsed: null, latency_ms: 0, enriched_chars: 0, bug_fingerprints: 0, matrix_kept: 0,
error: "non_200", diagnostic: `${resp.status}: ${(await resp.text()).slice(0, 160)}`, model,
};
}
let body: any;
try { body = await resp.json(); }
catch (e) {
return {
parsed: null, latency_ms: 0, enriched_chars: 0, bug_fingerprints: 0, matrix_kept: 0,
error: "unparseable", diagnostic: (e as Error).message, model,
};
}
const content: string = body?.response ?? "";
const parsed = extractJson(content);
return {
parsed,
latency_ms: body?.latency_ms ?? 0,
enriched_chars: body?.enriched_prompt_chars ?? 0,
bug_fingerprints: body?.sources?.bug_fingerprints_count ?? 0,
matrix_kept: body?.sources?.matrix_chunks_kept ?? 0,
error: parsed ? undefined : "unparseable",
diagnostic: parsed ? undefined : content.slice(0, 200),
model,
};
}
// Legacy direct /v1/chat caller — kept for callers outside the
// pr_audit pipeline. Currently unused after the 2026-04-26 mode-runner
// rebuild; preserved so we can A/B against the mode runner if a
// regression surfaces.
async function runCloudInference(systemMsg: string, userMsg: string, model: string): Promise<{ parsed: any | null; tokens: number; error?: string; diagnostic?: string; model: string }> {
let resp: Response;
try {
resp = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "ollama_cloud",
model,
messages: [
{ role: "system", content: systemMsg },
{ role: "user", content: userMsg },
],
max_tokens: 3000,
temperature: 0,
think: true,
}),
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
});
} catch (e) {
return { parsed: null, tokens: 0, error: "unreachable", diagnostic: (e as Error).message.slice(0, 200), model };
}
if (!resp.ok) {
return { parsed: null, tokens: 0, error: "non_200", diagnostic: `${resp.status}: ${(await resp.text()).slice(0, 160)}`, model };
}
let body: any;
try { body = await resp.json(); }
catch (e) { return { parsed: null, tokens: 0, error: "unparseable", diagnostic: (e as Error).message, model }; }
const content: string = body?.choices?.[0]?.message?.content ?? "";
const tokens: number = body?.usage?.total_tokens ?? 0;
const parsed = extractJson(content);
if (!parsed) {
return { parsed: null, tokens, error: "unparseable", diagnostic: content.slice(0, 200), model };
}
return { parsed, tokens, model };
}
async function persistDiscrepancies(ctx: InferenceContext, discrepancies: any[]): Promise<void> {
await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true });
const rows = discrepancies.map(d => JSON.stringify({
pr_number: ctx.pr_number,
head_sha: ctx.head_sha,
logged_at: new Date().toISOString(),
...d,
}));
await appendFile(AUDIT_DISCREPANCIES_JSONL, rows.join("\n") + "\n");
}
// Extract structured knowledge from the curated scratchpad and append
// to data/_kb/audit_facts.jsonl — one row per extract run, keyed by
// PR number + head SHA for scope tracking. kb_query tails this next
// audit to surface recurring entities/relationships across PRs.
async function extractAndPersistFacts(scratchpad: string, ctx: InferenceContext): Promise<void> {
const ex = await extractFacts(scratchpad);
if (ex.error && ex.entities.length === 0 && ex.facts.length === 0) {
// Full failure — log but don't write an empty row.
console.error(`[inference] extractFacts skipped row: ${ex.error}`);
return;
}
const row = {
pr_number: ctx.pr_number,
head_sha: ctx.head_sha,
extracted_at: ex.extracted_at,
extractor: ex.extractor_model,
verifier: ex.verifier_model,
llm_team_run_id: ex.llm_team_run_id ?? null,
facts: ex.facts,
entities: ex.entities,
relationships: ex.relationships,
verification_preview: ex.verification.slice(0, 400),
verifier_verdicts: ex.verifier_verdicts,
facts_dropped_by_verifier: ex.facts_dropped_by_verifier ?? 0,
schema_version: 2,
source: "audit_inference",
};
await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true });
await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(row) + "\n");
}
// Curation via tree-split — ports the scrum_master pattern into the
// inference check. Shards the raw diff into DIFF_SHARD_SIZE chunks,
// summarizes each shard *against the claim-verification task* so the
// summary preserves exactly what the cloud needs to judge claims
// (function signatures, struct fields, deletions, new files), drops
// everything else. Merges into a compact scratchpad.
//
// Cost: N cloud calls for shard summaries + the final verification.
// Pre-2026-04-26 the shard loop ran serially "to keep gateway load
// bounded" — turned out to be a bottleneck on PRs with 50+ shards
// (5+ minutes of curation). Now bounded-parallel via
// SHARD_CONCURRENCY: in-flight ≤ N at any time, gateway stays calm,
// wall-clock drops 4-6×.
//
// Determinism: each shard summary call uses temp=0 + think=false
// (same as before), so identical input yields identical scratchpad.
// Order is preserved by indexed-write into a fixed-length array
// before string-join, so concurrency doesn't shuffle the scratchpad.
async function treeSplitDiff(
fullDiff: string,
claims: Claim[],
): Promise<{ scratchpad: string; shards: number }> {
const shards: Array<{ from: number; to: number; text: string }> = [];
for (let i = 0; i < fullDiff.length; i += DIFF_SHARD_SIZE) {
const end = Math.min(i + DIFF_SHARD_SIZE, fullDiff.length);
shards.push({ from: i, to: end, text: fullDiff.slice(i, end) });
}
// Curate the claim list into a short form the summary prompt can
// use to bias extraction toward relevant facts.
const claimDigest = claims.map((c, i) =>
`${i}. [${c.strength}] "${c.text.slice(0, 100)}"`
).join("\n");
const buildPrompt = (si: number, shard: { from: number; to: number; text: string }): string => [
`You are summarizing shard ${si + 1}/${shards.length} (chars ${shard.from}..${shard.to}) of a PR diff.`,
`The downstream task will verify these ship-claims against the full-PR summary. Extract ONLY facts that could confirm or refute these claims:`,
"",
claimDigest,
"",
"Extract: new function/method signatures, struct fields, deletions, new files, wiring (function X calls Y), absence-of-implementation markers, TODO comments on added lines.",
"Skip: comment-only edits, whitespace, import reordering, unrelated cosmetic changes.",
"",
"─────── shard diff ───────",
shard.text,
"─────── end shard ───────",
"",
"Output: up to 180 words of facts in bullet form. No prose preamble, no claim verdicts (that's for the downstream step).",
].join("\n");
// Pre-allocate so we can write back at the original index from
// out-of-order completion.
const summaries: string[] = new Array(shards.length).fill("");
let nextIdx = 0;
async function worker() {
while (true) {
const myIdx = nextIdx++;
if (myIdx >= shards.length) return;
const r = await callCloud(buildPrompt(myIdx, shards[myIdx]), 400);
summaries[myIdx] = r.content;
}
}
const concurrency = Math.max(1, Math.min(SHARD_CONCURRENCY, shards.length));
await Promise.all(Array.from({ length: concurrency }, worker));
let scratchpad = "";
for (const [si, shard] of shards.entries()) {
const summary = summaries[si];
if (summary) {
scratchpad += `\n--- shard ${si + 1} (chars ${shard.from}..${shard.to}) ---\n${summary.trim()}\n`;
}
}
return { scratchpad: scratchpad.trim(), shards: shards.length };
}
// Minimal cloud caller used only by treeSplitDiff — same gateway +
// model as the top-level call, but think=false. Shards are small
// (≤DIFF_SHARD_SIZE ~4500 chars) and the task is pure fact
// extraction, not reasoning. think=true on the shards introduced
// variance in reasoning traces that compounded across 23 calls into
// a non-deterministic scratchpad (observed during curation
// validation: same-SHA runs produced 5/7/8 final findings).
// think=false on small prompts is stable — only breaks at the main
// call's 10K+ prompt size, which keeps think=true.
async function callCloud(prompt: string, maxTokens: number): Promise<{ content: string }> {
try {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: "ollama_cloud",
model: SHARD_MODEL,
messages: [{ role: "user", content: prompt }],
max_tokens: maxTokens,
temperature: 0,
think: false,
}),
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
});
if (!r.ok) return { content: "" };
const j: any = await r.json();
return { content: j?.choices?.[0]?.message?.content ?? "" };
} catch {
return { content: "" };
}
}
// Pull out plausible code-symbol names from a summary string.
// Matches:
// - identifier with backticks: `foo_bar`
// - identifier followed by parens: foo_bar()
// - CamelCase types
// - snake_case_functions
// Filters out common English words that could be matched accidentally.
const STOPWORDS = new Set([
"not","the","and","for","this","that","with","but","are","was","has",
"have","been","any","missing","implementation","diff","defined","never",
"referenced","integrated","flow","code","file","some","only","when",
]);
function extractSymbols(text: string): string[] {
const out = new Set<string>();
// `backticked` symbols
for (const m of text.matchAll(/`([A-Za-z_][A-Za-z0-9_]{2,})`/g)) out.add(m[1]);
// foo() or foo_bar() calls
for (const m of text.matchAll(/\b([A-Za-z_][A-Za-z0-9_]{2,})\s*\(/g)) out.add(m[1]);
// CamelCase types (3+ chars, must start with uppercase)
for (const m of text.matchAll(/\b([A-Z][A-Za-z0-9]{2,})\b/g)) out.add(m[1]);
return Array.from(out).filter(s => !STOPWORDS.has(s.toLowerCase()));
}
// Scan the repo for at least one definition of each symbol. Uses Bun's
// Glob to walk TS/Rust/Python/JS sources; ignores node_modules, data/,
// and target/. Skips files > 500KB — those are fixtures/snapshots that
// won't contain a definition line and slurping them slows the audit.
async function symbolsExistInRepo(symbols: string[]): Promise<string[]> {
const patterns = ["**/*.ts", "**/*.tsx", "**/*.rs", "**/*.py", "**/*.js"];
const skip = (p: string) => p.includes("/node_modules/") || p.startsWith("data/") || p.includes("/target/") || p.startsWith("dist/");
const MAX_FILE_BYTES = 500_000;
const { stat } = await import("node:fs/promises");
const resolved = new Set<string>();
const toFind = new Set(symbols);
for (const pat of patterns) {
if (toFind.size === 0) break;
const glob = new Glob(pat);
for await (const f of glob.scan({ cwd: REPO_ROOT, onlyFiles: true })) {
if (skip(f)) continue;
try { const s = await stat(`${REPO_ROOT}/${f}`); if (s.size > MAX_FILE_BYTES) continue; } catch { continue; }
let content: string;
try { content = await readFile(`${REPO_ROOT}/${f}`, "utf8"); } catch { continue; }
for (const sym of Array.from(toFind)) {
// Definition heuristics: `function sym`, `fn sym`, `const sym`,
// `let sym`, `def sym`, `class sym`, `struct sym`, `enum sym`,
// `trait sym`, `async function sym`, `pub (async )?fn sym`.
const re = new RegExp(
`\\b(function|async\\s+function|const|let|var|def|class|struct|enum|trait|impl|type|interface|fn|pub\\s+(async\\s+)?fn)\\s+${escapeRe(sym)}\\b`
);
if (re.test(content)) {
resolved.add(sym);
toFind.delete(sym);
if (toFind.size === 0) break;
}
}
}
}
return Array.from(resolved);
}
function escapeRe(s: string): string {
return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
// Lift the first balanced JSON object out of the response. Tolerates
// leading prose, code fences, and model reasoning preamble when the
// cloud model ignored "strict JSON only."
function extractJson(text: string): any | null {
const cleaned = text.replace(/^```(?:json)?\s*/im, "").replace(/```\s*$/im, "");
let depth = 0;
let start = -1;
for (let i = 0; i < cleaned.length; i++) {
const c = cleaned[i];
if (c === "{") {
if (depth === 0) start = i;
depth++;
} else if (c === "}") {
depth--;
if (depth === 0 && start >= 0) {
try { return JSON.parse(cleaned.slice(start, i + 1)); } catch { start = -1; }
}
}
}
return null;
}