Architectural snapshot of the lakehouse codebase at the point where the
full matrix-driven agent loop with Mem0 versioning + deletion was
validated end-to-end.
WHAT THIS REPO IS
A clean single-commit snapshot of the lakehouse code. Heavy test data
(.parquet datasets, vector indexes) excluded — see REPLICATION.md for
regen path. Full lakehouse history at git.agentview.dev/profit/lakehouse.
WHAT WAS PROVEN
- Vector retrieval across multi-corpora matrix (chicago_permits + entity
briefs + sec_tickers + distilled procedural + llm_team runs)
- Observer hand-review (cloud + heuristic fallback) gating each candidate
- Local-model agent loop (qwen3.5:latest) with tool use + scratchpad
- Playbook seal on success → next-iter retrieval surfaces it as preamble
- Mem0 versioning + deletion in pathway_memory:
* UPSERT: ADD on new workflow, UPDATE bumps replay_count on identical
* REVISE: chains versions, parent.superseded_at + superseded_by stamped
* RETIRE: marks specific trace retired with reason, excluded from retrieval
* HISTORY: walks chain root→tip, cycle-safe
KEY DIRECTORIES
- crates/vectord/src/pathway_memory.rs — Mem0 ops live here
- crates/vectord/src/playbook_memory.rs — original Mem0 reference
- tests/agent_test/ — local-model agent harness + PRD + session archives
- scripts/dump_raw_corpus.sh — MinIO bucket dump (raw test corpus)
- scripts/vectorize_raw_corpus.ts — corpus → vector indexes
- scripts/analyze_chicago_contracts.ts — real inference pipeline
- scripts/seal_agent_playbook.ts — Mem0 upsert from agent traces
Replication: see REPLICATION.md for Debian 13 clean install + cloud-only
adaptation (no local Ollama).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
667 lines
28 KiB
TypeScript
667 lines
28 KiB
TypeScript
// Cloud inference check — wraps the proven run_codereview pattern
|
|
// from llm_team_ui.py (same 3-stage framing, same cloud model) to
|
|
// critique a PR's claims against its diff.
|
|
//
|
|
// Proved out 2026-04-22 via /tmp/codereview_runner.py — gpt-oss:120b
|
|
// caught a real ternary bug in auditor/fixtures/hybrid_38_40_45.ts
|
|
// that unit tests missed. This module reuses the reviewer prompt
|
|
// shape (bugs / security / performance / style / edge cases) and
|
|
// adds claim-vs-diff specific framing.
|
|
//
|
|
// Call surface: runInferenceCheck(claims, diff) → Finding[].
|
|
// Cloud latency budget: ~60s (gpt-oss:120b reviewer typically 35-50s
|
|
// with a 15KB diff + claim list).
|
|
|
|
import type { Claim, Finding } from "../types.ts";
|
|
import { Glob } from "bun";
|
|
import { readFile, mkdir, appendFile } from "node:fs/promises";
|
|
import { extractFacts } from "../fact_extractor.ts";
|
|
|
|
const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100";
|
|
const MODEL = process.env.LH_AUDITOR_REVIEW_MODEL ?? "gpt-oss:120b";
|
|
// Tie-breaker for claims where the N=3 consensus produces a 1-1-1
|
|
// split (genuinely borderline). Different architecture from the
|
|
// primary reviewer (gpt-oss) so the tie-break isn't correlated with
|
|
// the original disagreement. qwen3-coder:480b is a newer coding
|
|
// specialist at 480B params, well-suited to PR-diff claim verification
|
|
// and distinct in training lineage from gpt-oss.
|
|
const TIEBREAKER_MODEL = process.env.LH_AUDITOR_TIEBREAKER_MODEL ?? "qwen3-coder:480b";
|
|
const N_CONSENSUS = Number(process.env.LH_AUDITOR_CONSENSUS_N ?? 3);
|
|
const AUDIT_DISCREPANCIES_JSONL = "/home/profit/lakehouse/data/_kb/audit_discrepancies.jsonl";
|
|
// 40KB comfortably fits gpt-oss:120b's context. PR #1 (~39KB) was
|
|
// previously truncated at 15KB causing the reviewer to miss later
|
|
// files (gitea.ts, policy.ts) and flag "no Gitea client present" as a
|
|
// block finding when the file was simply outside the truncation window.
|
|
//
|
|
// Above this threshold we curate via tree-split rather than truncate,
|
|
// following the scrum_master pattern: shard the diff, summarize each
|
|
// shard against the claim-verification task, merge into a compact
|
|
// scratchpad, then ask the cloud to verify claims against the
|
|
// scratchpad. This gives the cloud full-PR fidelity without bursting
|
|
// its context window (observed failure mode: empty response or
|
|
// unparseable output when prompt exceeds model's comfortable range).
|
|
const MAX_DIFF_CHARS = 40000;
|
|
// Tree-split kicks in above this. 30KB is below MAX_DIFF_CHARS so we
|
|
// curate BEFORE truncation would happen — never lose signal to a hard
|
|
// cut. Shard size is chosen so ~10 shards cover PR #8-size diffs in a
|
|
// reasonable round-trip budget.
|
|
const CURATION_THRESHOLD = 30000;
|
|
const DIFF_SHARD_SIZE = 4500;
|
|
const CALL_TIMEOUT_MS = 120_000;
|
|
const REPO_ROOT = "/home/profit/lakehouse";
|
|
|
|
export interface InferenceContext {
|
|
pr_number: number;
|
|
head_sha: string;
|
|
}
|
|
|
|
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
|
|
|
|
export async function runInferenceCheck(
|
|
claims: Claim[],
|
|
diff: string,
|
|
ctx?: InferenceContext,
|
|
): Promise<Finding[]> {
|
|
if (claims.length === 0) {
|
|
return [{
|
|
check: "inference",
|
|
severity: "info",
|
|
summary: "no ship-claims extracted — skipping cloud inference",
|
|
evidence: ["parser returned empty claim list; nothing to verify against cloud"],
|
|
}];
|
|
}
|
|
|
|
// Empirical claims (runtime metrics / observed outcomes) can't be
|
|
// verified from the diff. Drop them from the cloud prompt so the
|
|
// reviewer doesn't chase ghosts. A future `runtime_evidence` check
|
|
// can validate these against data/_kb/*/summary.json outputs.
|
|
const verifiable = claims.filter(c => c.strength !== "empirical");
|
|
const empiricalCount = claims.length - verifiable.length;
|
|
if (verifiable.length === 0) {
|
|
return [{
|
|
check: "inference",
|
|
severity: "info",
|
|
summary: `all ${claims.length} claims are empirical (runtime metrics) — skipping cloud inference`,
|
|
evidence: [`empirical claims can't be verified from a static diff; needs runtime-evidence check`],
|
|
}];
|
|
}
|
|
|
|
// Diff source for the cloud prompt — either the raw diff (small
|
|
// enough to fit), or a tree-split scratchpad (curation layer). We
|
|
// prefer curation to truncation: truncation silently drops files
|
|
// past the window; curation summarizes them so the cloud still sees
|
|
// what changed, just densified.
|
|
let diffForPrompt: string;
|
|
let curationNote = "";
|
|
if (diff.length > CURATION_THRESHOLD) {
|
|
const ts = await treeSplitDiff(diff, verifiable);
|
|
diffForPrompt = ts.scratchpad;
|
|
curationNote = ` (curated: ${diff.length} chars → ${ts.shards} shards → scratchpad ${ts.scratchpad.length} chars)`;
|
|
} else {
|
|
diffForPrompt = diff;
|
|
}
|
|
// Belt-and-suspenders truncation — even a tree-split scratchpad
|
|
// shouldn't exceed MAX_DIFF_CHARS in practice, but guard anyway so
|
|
// pathological inputs can't burst the prompt.
|
|
const truncated = diffForPrompt.length > MAX_DIFF_CHARS
|
|
? diffForPrompt.slice(0, MAX_DIFF_CHARS) + `\n...[${diffForPrompt.length - MAX_DIFF_CHARS} more chars truncated]`
|
|
: diffForPrompt;
|
|
|
|
// Build the reviewer prompt in the same shape as run_codereview's
|
|
// review stage (llm_team_ui.py:10950), adapted for claim verification:
|
|
// "Task: ..."
|
|
// "Code: ..."
|
|
// "Review: bugs/security/perf/style/edge. Provide corrected code."
|
|
// We add: claim list upfront + ask for structured JSON verdict.
|
|
//
|
|
// When the diff was curated (tree-split scratchpad), we add an
|
|
// explicit anti-false-positive instruction: the scratchpad is a
|
|
// distillation, not the full source, so absence-from-scratchpad is
|
|
// NOT evidence of absence-from-diff. Mirrors the fix we made in
|
|
// scrum_master's review prompt for the same class of error.
|
|
const isCurated = curationNote.length > 0;
|
|
const curationGuard = isCurated
|
|
? [
|
|
"",
|
|
"CRITICAL: the 'Diff' below is a curated multi-shard scratchpad,",
|
|
"NOT the full raw diff. The scratchpad distills each shard down",
|
|
"to facts useful for claim verification and drops the rest.",
|
|
"DO NOT flag a function/field/feature as 'missing' or 'not",
|
|
"implemented' based solely on its absence from the scratchpad —",
|
|
"absence in a distillation is NOT evidence of absence in the",
|
|
"actual diff. Only judge a claim NOT BACKED when the scratchpad",
|
|
"DIRECTLY contradicts it (e.g. scratchpad shows the function was",
|
|
"added empty, or shows the claimed code path is a stub).",
|
|
"Skip the unflagged_gaps section entirely when operating on a",
|
|
"curated scratchpad — you can't reliably detect gaps from a",
|
|
"distillation, and false positives there are worse than misses.",
|
|
].join("\n")
|
|
: "";
|
|
const systemMsg = [
|
|
"You review pull-request diffs against the author's own ship-claims.",
|
|
"For each claim, decide: is it backed by actual code in the diff, or is",
|
|
"it placeholder / aspirational / unwired?",
|
|
"",
|
|
"A claim is BACKED when the diff contains a real code path that delivers",
|
|
"the claimed behavior. A claim is NOT BACKED when:",
|
|
" - the claim asserts functionality but the diff only adds types/fields",
|
|
" with no consumer",
|
|
" - the claim mentions tests but no test function was added",
|
|
" - the claim claims integration but the integration point is a stub",
|
|
" - the diff contains unimplemented!() / todo!() / TODO comments",
|
|
" - the claim says 'works end-to-end' but the diff has no end-to-end test",
|
|
curationGuard,
|
|
"",
|
|
"Respond with strict JSON only. No prose before or after. Shape:",
|
|
"{",
|
|
' "claim_verdicts": [',
|
|
' {"claim_idx": 0, "backed": false, "evidence": "short reason"}',
|
|
" ],",
|
|
' "unflagged_gaps": [',
|
|
' {"location": "file:line", "summary": "short description"}',
|
|
" ]",
|
|
"}",
|
|
].join("\n");
|
|
|
|
const userMsg = [
|
|
`Ship-claims the author made (numbered 0..N-1):`,
|
|
verifiable.map((c, i) => ` ${i}. [${c.strength}] "${c.text}" at ${c.location}`).join("\n"),
|
|
"",
|
|
`Diff:`,
|
|
"```",
|
|
truncated,
|
|
"```",
|
|
"",
|
|
`For each numbered claim above, emit a claim_verdicts entry. For gaps the`,
|
|
`author DIDN'T claim but that look like placeholder code, emit unflagged_gaps.`,
|
|
`Strict JSON only, matching the shape described. No prose outside JSON.`,
|
|
].join("\n");
|
|
|
|
// N=3 consensus — run the primary reviewer in parallel, collect
|
|
// all three parsed responses, majority-vote per claim. Parallel
|
|
// (Promise.all) because each call is ~20-30s and they're independent;
|
|
// wall-clock stays ~same as single call, cost 3x tokens. Empirical
|
|
// justification: in 3-run determinism tests, 7/8 findings were
|
|
// stable but 1 flipped across runs — majority vote stabilizes the
|
|
// flipping class without losing the stable signal.
|
|
const primaryRuns = await Promise.all(
|
|
Array.from({ length: N_CONSENSUS }, () =>
|
|
runCloudInference(systemMsg, userMsg, MODEL)),
|
|
);
|
|
|
|
const parsedRuns = primaryRuns.filter(r => r.parsed !== null);
|
|
if (parsedRuns.length === 0) {
|
|
// All N calls failed. Surface the first-run diagnostic so the
|
|
// operator sees *why* (unreachable / non-200 / unparseable).
|
|
const first = primaryRuns[0];
|
|
return [{
|
|
check: "inference",
|
|
severity: "info",
|
|
summary: `cloud inference all ${N_CONSENSUS} consensus runs failed — ${first.error ?? "unknown"}`,
|
|
evidence: [
|
|
`first-run diagnostic: ${first.diagnostic ?? "(none)"}`,
|
|
`successful runs: 0 / ${N_CONSENSUS}`,
|
|
],
|
|
}];
|
|
}
|
|
|
|
// Aggregate votes per claim_idx.
|
|
interface Votes { trues: number; falses: number; evidences: string[] }
|
|
const votesByClaim = new Map<number, Votes>();
|
|
const unflaggedByRun: any[][] = [];
|
|
let totalTokens = 0;
|
|
for (const run of parsedRuns) {
|
|
totalTokens += run.tokens;
|
|
unflaggedByRun.push(Array.isArray(run.parsed?.unflagged_gaps) ? run.parsed.unflagged_gaps : []);
|
|
for (const v of run.parsed?.claim_verdicts ?? []) {
|
|
const idx = Number(v?.claim_idx);
|
|
if (!Number.isFinite(idx)) continue;
|
|
const rec = votesByClaim.get(idx) ?? { trues: 0, falses: 0, evidences: [] };
|
|
if (v.backed === false) {
|
|
rec.falses++;
|
|
rec.evidences.push(String(v.evidence ?? ""));
|
|
} else if (v.backed === true) {
|
|
rec.trues++;
|
|
}
|
|
votesByClaim.set(idx, rec);
|
|
}
|
|
}
|
|
|
|
const findings: Finding[] = [];
|
|
|
|
// Summary finding so the verdict layer knows the check ran.
|
|
findings.push({
|
|
check: "inference",
|
|
severity: "info",
|
|
summary: `cloud review completed (model=${MODEL}, consensus=${parsedRuns.length}/${N_CONSENSUS}, tokens=${totalTokens})${curationNote}`,
|
|
evidence: [
|
|
`claims voted: ${votesByClaim.size}`,
|
|
`parsed runs: ${parsedRuns.length} / ${N_CONSENSUS}`,
|
|
],
|
|
});
|
|
|
|
// Per-claim majority vote; tie-break if no majority.
|
|
const discrepancies: Array<{
|
|
claim_idx: number;
|
|
claim_text: string;
|
|
votes: { trues: number; falses: number };
|
|
resolution: "majority_backed" | "majority_not_backed" | "tiebreaker_backed" | "tiebreaker_not_backed" | "unresolved";
|
|
tiebreaker_model?: string;
|
|
}> = [];
|
|
|
|
for (const [idx, votes] of votesByClaim) {
|
|
const claim = verifiable[idx];
|
|
if (!claim) continue;
|
|
const totalVotes = votes.trues + votes.falses;
|
|
let notBacked: boolean | null = null;
|
|
let resolution: typeof discrepancies[number]["resolution"] = "majority_backed";
|
|
let evidenceText = "";
|
|
let tbModel: string | undefined;
|
|
|
|
if (votes.falses > votes.trues) {
|
|
notBacked = true;
|
|
resolution = "majority_not_backed";
|
|
evidenceText = votes.evidences[0] ?? "(no reason given)";
|
|
} else if (votes.trues > votes.falses) {
|
|
notBacked = false;
|
|
resolution = "majority_backed";
|
|
} else {
|
|
// Tie. Run tie-breaker with a different-architecture model.
|
|
const tb = await runCloudInference(systemMsg, userMsg, TIEBREAKER_MODEL);
|
|
if (tb.parsed) {
|
|
const tv = (tb.parsed.claim_verdicts ?? []).find((v: any) => Number(v?.claim_idx) === idx);
|
|
if (tv?.backed === false) {
|
|
notBacked = true;
|
|
resolution = "tiebreaker_not_backed";
|
|
evidenceText = `(tie-breaker ${TIEBREAKER_MODEL}) ${String(tv.evidence ?? "")}`;
|
|
tbModel = TIEBREAKER_MODEL;
|
|
} else if (tv?.backed === true) {
|
|
notBacked = false;
|
|
resolution = "tiebreaker_backed";
|
|
tbModel = TIEBREAKER_MODEL;
|
|
} else {
|
|
resolution = "unresolved";
|
|
}
|
|
} else {
|
|
resolution = "unresolved";
|
|
}
|
|
}
|
|
|
|
// Log every case where the N runs disagreed — discrepancies are
|
|
// signal, not noise. Separate from audit_lessons.jsonl because
|
|
// they're about the *auditor's* quality, not the PR's quality.
|
|
const disagreed = totalVotes >= 2 && votes.trues > 0 && votes.falses > 0;
|
|
if (disagreed || resolution.startsWith("tiebreaker") || resolution === "unresolved") {
|
|
discrepancies.push({
|
|
claim_idx: idx,
|
|
claim_text: claim.text,
|
|
votes: { trues: votes.trues, falses: votes.falses },
|
|
resolution,
|
|
tiebreaker_model: tbModel,
|
|
});
|
|
}
|
|
|
|
if (notBacked === true) {
|
|
const sev: Finding["severity"] = claim.strength === "strong" ? "block"
|
|
: claim.strength === "moderate" ? "warn"
|
|
: "info";
|
|
findings.push({
|
|
check: "inference",
|
|
severity: sev,
|
|
claim_text: claim.text,
|
|
summary: `cloud: claim not backed — "${claim.text.slice(0, 100)}"`,
|
|
evidence: [
|
|
`at ${claim.location}`,
|
|
`consensus: ${votes.falses}/${totalVotes} not-backed (resolution: ${resolution})`,
|
|
`cloud reason: ${evidenceText.slice(0, 200)}`,
|
|
],
|
|
});
|
|
}
|
|
}
|
|
|
|
// Persist discrepancies so we can measure consensus drift over time.
|
|
if (discrepancies.length > 0 && ctx) {
|
|
persistDiscrepancies(ctx, discrepancies).catch(e =>
|
|
console.error(`[inference] discrepancy log failed: ${(e as Error).message}`));
|
|
}
|
|
|
|
// Use first run's parsed for downstream unflagged_gaps processing.
|
|
const parsed = parsedRuns[0].parsed;
|
|
|
|
// Route the curated scratchpad through llm_team's extract-facts
|
|
// pipeline when we have (a) a curated scratchpad (best signal about
|
|
// what the PR actually changed) and (b) PR context to scope facts.
|
|
// AWAITED (not fire-and-forget) so CLI callers like audit_one.ts
|
|
// don't exit before extraction lands; the systemd poller has plenty
|
|
// of headroom (90s cycle vs ~15s extraction). A failure inside
|
|
// extractAndPersistFacts is caught + logged but never throws.
|
|
if (isCurated && ctx && process.env.LH_AUDITOR_SKIP_EXTRACT !== "1") {
|
|
try {
|
|
await extractAndPersistFacts(diffForPrompt, ctx);
|
|
} catch (e) {
|
|
console.error(`[inference] fact extraction failed: ${(e as Error).message}`);
|
|
}
|
|
}
|
|
|
|
// Belt-and-suspenders: when operating on a curated scratchpad, drop
|
|
// the unflagged_gaps section entirely. The distillation can't
|
|
// reliably ground gap-detection, and false positives are worse than
|
|
// misses for this signal class. The systemMsg already asks the
|
|
// cloud to skip this section when curated — but the model may still
|
|
// emit it, so we filter here too.
|
|
const gapsToEmit = isCurated ? [] : (parsed.unflagged_gaps ?? []);
|
|
for (const g of gapsToEmit) {
|
|
const summary = String(g?.summary ?? "?");
|
|
const location = String(g?.location ?? "?");
|
|
// False-positive guard — when the cloud says "X not defined in this
|
|
// diff" or "missing implementation of X", the cloud may just mean
|
|
// "X is not in the added lines," not "X doesn't exist in the repo."
|
|
// Extract candidate symbol names and grep the repo. If any symbol
|
|
// is defined elsewhere, drop the finding — it's a known-symbol
|
|
// reference, not a placeholder.
|
|
if (/not\s+defined|missing\s+implementation|never\s+referenced\s+or\s+integrated/i.test(summary)) {
|
|
const symbols = extractSymbols(summary);
|
|
if (symbols.length > 0) {
|
|
const resolved = await symbolsExistInRepo(symbols);
|
|
if (resolved.length === symbols.length) {
|
|
// Every named symbol exists somewhere in the repo — silent drop.
|
|
continue;
|
|
}
|
|
if (resolved.length > 0) {
|
|
// Partially resolved — demote to info with a note.
|
|
findings.push({
|
|
check: "inference",
|
|
severity: "info",
|
|
summary: `cloud gap partially resolved by repo grep: ${summary.slice(0, 120)}`,
|
|
evidence: [
|
|
`location: ${location.slice(0, 140)}`,
|
|
`resolved via grep: ${resolved.join(",")}`,
|
|
`unresolved: ${symbols.filter(s => !resolved.includes(s)).join(",")}`,
|
|
],
|
|
});
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
findings.push({
|
|
check: "inference",
|
|
severity: "warn",
|
|
summary: `cloud-flagged gap not in any claim: ${summary.slice(0, 120)}`,
|
|
evidence: [`location: ${location.slice(0, 140)}`],
|
|
});
|
|
}
|
|
|
|
return findings;
|
|
}
|
|
|
|
// Single cloud call — the consensus loop calls this N times in
|
|
// parallel. Returns the parsed JSON shape + token usage + any error
|
|
// diagnostic. NEVER throws; the consensus aggregator handles partial
|
|
// failures by dropping non-parsed runs from the vote.
|
|
interface CloudRunResult {
|
|
parsed: any | null;
|
|
tokens: number;
|
|
error?: string; // "unreachable" | "non_200" | "unparseable"
|
|
diagnostic?: string; // first 200 chars for debugging
|
|
model: string;
|
|
}
|
|
|
|
async function runCloudInference(systemMsg: string, userMsg: string, model: string): Promise<CloudRunResult> {
|
|
let resp: Response;
|
|
try {
|
|
resp = await fetch(`${GATEWAY}/v1/chat`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
provider: "ollama_cloud",
|
|
model,
|
|
messages: [
|
|
{ role: "system", content: systemMsg },
|
|
{ role: "user", content: userMsg },
|
|
],
|
|
// temp=0 (greedy) + think=true. think=true is required for
|
|
// gpt-oss:120b — without it the model returns empty content
|
|
// on large prompts. Variance from the think trace is observed
|
|
// in practice, which is why we use N=3 consensus, not single-
|
|
// call determinism.
|
|
max_tokens: 3000,
|
|
temperature: 0,
|
|
think: true,
|
|
}),
|
|
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
|
|
});
|
|
} catch (e) {
|
|
return { parsed: null, tokens: 0, error: "unreachable", diagnostic: (e as Error).message.slice(0, 200), model };
|
|
}
|
|
if (!resp.ok) {
|
|
return { parsed: null, tokens: 0, error: "non_200", diagnostic: `${resp.status}: ${(await resp.text()).slice(0, 160)}`, model };
|
|
}
|
|
let body: any;
|
|
try { body = await resp.json(); }
|
|
catch (e) { return { parsed: null, tokens: 0, error: "unparseable", diagnostic: (e as Error).message, model }; }
|
|
const content: string = body?.choices?.[0]?.message?.content ?? "";
|
|
const tokens: number = body?.usage?.total_tokens ?? 0;
|
|
const parsed = extractJson(content);
|
|
if (!parsed) {
|
|
return { parsed: null, tokens, error: "unparseable", diagnostic: content.slice(0, 200), model };
|
|
}
|
|
return { parsed, tokens, model };
|
|
}
|
|
|
|
async function persistDiscrepancies(ctx: InferenceContext, discrepancies: any[]): Promise<void> {
|
|
await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true });
|
|
const rows = discrepancies.map(d => JSON.stringify({
|
|
pr_number: ctx.pr_number,
|
|
head_sha: ctx.head_sha,
|
|
logged_at: new Date().toISOString(),
|
|
...d,
|
|
}));
|
|
await appendFile(AUDIT_DISCREPANCIES_JSONL, rows.join("\n") + "\n");
|
|
}
|
|
|
|
// Extract structured knowledge from the curated scratchpad and append
|
|
// to data/_kb/audit_facts.jsonl — one row per extract run, keyed by
|
|
// PR number + head SHA for scope tracking. kb_query tails this next
|
|
// audit to surface recurring entities/relationships across PRs.
|
|
async function extractAndPersistFacts(scratchpad: string, ctx: InferenceContext): Promise<void> {
|
|
const ex = await extractFacts(scratchpad);
|
|
if (ex.error && ex.entities.length === 0 && ex.facts.length === 0) {
|
|
// Full failure — log but don't write an empty row.
|
|
console.error(`[inference] extractFacts skipped row: ${ex.error}`);
|
|
return;
|
|
}
|
|
const row = {
|
|
pr_number: ctx.pr_number,
|
|
head_sha: ctx.head_sha,
|
|
extracted_at: ex.extracted_at,
|
|
extractor: ex.extractor_model,
|
|
verifier: ex.verifier_model,
|
|
llm_team_run_id: ex.llm_team_run_id ?? null,
|
|
facts: ex.facts,
|
|
entities: ex.entities,
|
|
relationships: ex.relationships,
|
|
verification_preview: ex.verification.slice(0, 400),
|
|
verifier_verdicts: ex.verifier_verdicts,
|
|
facts_dropped_by_verifier: ex.facts_dropped_by_verifier ?? 0,
|
|
schema_version: 2,
|
|
source: "audit_inference",
|
|
};
|
|
await mkdir("/home/profit/lakehouse/data/_kb", { recursive: true });
|
|
await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(row) + "\n");
|
|
}
|
|
|
|
// Curation via tree-split — ports the scrum_master pattern into the
|
|
// inference check. Shards the raw diff into DIFF_SHARD_SIZE chunks,
|
|
// summarizes each shard *against the claim-verification task* so the
|
|
// summary preserves exactly what the cloud needs to judge claims
|
|
// (function signatures, struct fields, deletions, new files), drops
|
|
// everything else. Merges into a compact scratchpad.
|
|
//
|
|
// Cost: N cloud calls for the shard summaries + 1 cloud call for the
|
|
// final verification = N+1 calls instead of 1. Mitigation: shards run
|
|
// serially (not parallel) to keep gateway load bounded; summary calls
|
|
// use max_tokens=400 so they're fast (~2s each on gpt-oss:120b).
|
|
//
|
|
// Determinism: each shard summary call uses temp=0 + think=true (same
|
|
// as the top-level inference call), so identical input yields
|
|
// identical scratchpad. The final verification call then sees a
|
|
// stable scratchpad, giving stable verdicts.
|
|
async function treeSplitDiff(
|
|
fullDiff: string,
|
|
claims: Claim[],
|
|
): Promise<{ scratchpad: string; shards: number }> {
|
|
const shards: Array<{ from: number; to: number; text: string }> = [];
|
|
for (let i = 0; i < fullDiff.length; i += DIFF_SHARD_SIZE) {
|
|
const end = Math.min(i + DIFF_SHARD_SIZE, fullDiff.length);
|
|
shards.push({ from: i, to: end, text: fullDiff.slice(i, end) });
|
|
}
|
|
// Curate the claim list into a short form the summary prompt can
|
|
// use to bias extraction toward relevant facts.
|
|
const claimDigest = claims.map((c, i) =>
|
|
`${i}. [${c.strength}] "${c.text.slice(0, 100)}"`
|
|
).join("\n");
|
|
|
|
let scratchpad = "";
|
|
for (const [si, shard] of shards.entries()) {
|
|
const prompt = [
|
|
`You are summarizing shard ${si + 1}/${shards.length} (chars ${shard.from}..${shard.to}) of a PR diff.`,
|
|
`The downstream task will verify these ship-claims against the full-PR summary. Extract ONLY facts that could confirm or refute these claims:`,
|
|
"",
|
|
claimDigest,
|
|
"",
|
|
"Extract: new function/method signatures, struct fields, deletions, new files, wiring (function X calls Y), absence-of-implementation markers, TODO comments on added lines.",
|
|
"Skip: comment-only edits, whitespace, import reordering, unrelated cosmetic changes.",
|
|
"",
|
|
"─────── shard diff ───────",
|
|
shard.text,
|
|
"─────── end shard ───────",
|
|
"",
|
|
"Output: up to 180 words of facts in bullet form. No prose preamble, no claim verdicts (that's for the downstream step).",
|
|
].join("\n");
|
|
|
|
const r = await callCloud(prompt, 400);
|
|
if (r.content) {
|
|
scratchpad += `\n--- shard ${si + 1} (chars ${shard.from}..${shard.to}) ---\n${r.content.trim()}\n`;
|
|
}
|
|
}
|
|
return { scratchpad: scratchpad.trim(), shards: shards.length };
|
|
}
|
|
|
|
// Minimal cloud caller used only by treeSplitDiff — same gateway +
|
|
// model as the top-level call, but think=false. Shards are small
|
|
// (≤DIFF_SHARD_SIZE ~4500 chars) and the task is pure fact
|
|
// extraction, not reasoning. think=true on the shards introduced
|
|
// variance in reasoning traces that compounded across 23 calls into
|
|
// a non-deterministic scratchpad (observed during curation
|
|
// validation: same-SHA runs produced 5/7/8 final findings).
|
|
// think=false on small prompts is stable — only breaks at the main
|
|
// call's 10K+ prompt size, which keeps think=true.
|
|
async function callCloud(prompt: string, maxTokens: number): Promise<{ content: string }> {
|
|
try {
|
|
const r = await fetch(`${GATEWAY}/v1/chat`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
provider: "ollama_cloud",
|
|
model: MODEL,
|
|
messages: [{ role: "user", content: prompt }],
|
|
max_tokens: maxTokens,
|
|
temperature: 0,
|
|
think: false,
|
|
}),
|
|
signal: AbortSignal.timeout(CALL_TIMEOUT_MS),
|
|
});
|
|
if (!r.ok) return { content: "" };
|
|
const j: any = await r.json();
|
|
return { content: j?.choices?.[0]?.message?.content ?? "" };
|
|
} catch {
|
|
return { content: "" };
|
|
}
|
|
}
|
|
|
|
// Pull out plausible code-symbol names from a summary string.
|
|
// Matches:
|
|
// - identifier with backticks: `foo_bar`
|
|
// - identifier followed by parens: foo_bar()
|
|
// - CamelCase types
|
|
// - snake_case_functions
|
|
// Filters out common English words that could be matched accidentally.
|
|
const STOPWORDS = new Set([
|
|
"not","the","and","for","this","that","with","but","are","was","has",
|
|
"have","been","any","missing","implementation","diff","defined","never",
|
|
"referenced","integrated","flow","code","file","some","only","when",
|
|
]);
|
|
function extractSymbols(text: string): string[] {
|
|
const out = new Set<string>();
|
|
// `backticked` symbols
|
|
for (const m of text.matchAll(/`([A-Za-z_][A-Za-z0-9_]{2,})`/g)) out.add(m[1]);
|
|
// foo() or foo_bar() calls
|
|
for (const m of text.matchAll(/\b([A-Za-z_][A-Za-z0-9_]{2,})\s*\(/g)) out.add(m[1]);
|
|
// CamelCase types (3+ chars, must start with uppercase)
|
|
for (const m of text.matchAll(/\b([A-Z][A-Za-z0-9]{2,})\b/g)) out.add(m[1]);
|
|
return Array.from(out).filter(s => !STOPWORDS.has(s.toLowerCase()));
|
|
}
|
|
|
|
// Scan the repo for at least one definition of each symbol. Uses Bun's
|
|
// Glob to walk TS/Rust/Python/JS sources; ignores node_modules, data/,
|
|
// and target/. Skips files > 500KB — those are fixtures/snapshots that
|
|
// won't contain a definition line and slurping them slows the audit.
|
|
async function symbolsExistInRepo(symbols: string[]): Promise<string[]> {
|
|
const patterns = ["**/*.ts", "**/*.tsx", "**/*.rs", "**/*.py", "**/*.js"];
|
|
const skip = (p: string) => p.includes("/node_modules/") || p.startsWith("data/") || p.includes("/target/") || p.startsWith("dist/");
|
|
const MAX_FILE_BYTES = 500_000;
|
|
const { stat } = await import("node:fs/promises");
|
|
const resolved = new Set<string>();
|
|
const toFind = new Set(symbols);
|
|
for (const pat of patterns) {
|
|
if (toFind.size === 0) break;
|
|
const glob = new Glob(pat);
|
|
for await (const f of glob.scan({ cwd: REPO_ROOT, onlyFiles: true })) {
|
|
if (skip(f)) continue;
|
|
try { const s = await stat(`${REPO_ROOT}/${f}`); if (s.size > MAX_FILE_BYTES) continue; } catch { continue; }
|
|
let content: string;
|
|
try { content = await readFile(`${REPO_ROOT}/${f}`, "utf8"); } catch { continue; }
|
|
for (const sym of Array.from(toFind)) {
|
|
// Definition heuristics: `function sym`, `fn sym`, `const sym`,
|
|
// `let sym`, `def sym`, `class sym`, `struct sym`, `enum sym`,
|
|
// `trait sym`, `async function sym`, `pub (async )?fn sym`.
|
|
const re = new RegExp(
|
|
`\\b(function|async\\s+function|const|let|var|def|class|struct|enum|trait|impl|type|interface|fn|pub\\s+(async\\s+)?fn)\\s+${escapeRe(sym)}\\b`
|
|
);
|
|
if (re.test(content)) {
|
|
resolved.add(sym);
|
|
toFind.delete(sym);
|
|
if (toFind.size === 0) break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return Array.from(resolved);
|
|
}
|
|
|
|
function escapeRe(s: string): string {
|
|
return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
|
|
}
|
|
|
|
// Lift the first balanced JSON object out of the response. Tolerates
|
|
// leading prose, code fences, and model reasoning preamble when the
|
|
// cloud model ignored "strict JSON only."
|
|
function extractJson(text: string): any | null {
|
|
const cleaned = text.replace(/^```(?:json)?\s*/im, "").replace(/```\s*$/im, "");
|
|
let depth = 0;
|
|
let start = -1;
|
|
for (let i = 0; i < cleaned.length; i++) {
|
|
const c = cleaned[i];
|
|
if (c === "{") {
|
|
if (depth === 0) start = i;
|
|
depth++;
|
|
} else if (c === "}") {
|
|
depth--;
|
|
if (depth === 0 && start >= 0) {
|
|
try { return JSON.parse(cleaned.slice(start, i + 1)); } catch { start = -1; }
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|