diff --git a/auditor/audit.ts b/auditor/audit.ts new file mode 100644 index 0000000..d626373 --- /dev/null +++ b/auditor/audit.ts @@ -0,0 +1,171 @@ +// Orchestrator — runs all four checks on a PR, assembles a verdict, +// posts to Gitea. This is task #8's integration layer; the poller +// (task #9) calls this once per PR on every fresh head SHA. +// +// Hard-block mechanism: commit status posted with state="failure" +// and context="lakehouse/auditor". If `main` branch protection +// requires that context to pass, merge is physically impossible +// until the auditor re-audits a fixed commit and flips the status +// to "success". +// +// Human-readable reasoning: posted as a PR issue comment (not a +// review — reviews have self-review restrictions on Gitea and the +// auditor currently uses the same PAT as the PR author). + +import { readFile, writeFile, mkdir } from "node:fs/promises"; +import { join } from "node:path"; +import type { PrSnapshot, Verdict, Finding } from "./types.ts"; +import { getPrDiff, postCommitStatus, postIssueComment } from "./gitea.ts"; +import { parseClaims } from "./claim_parser.ts"; +import { assembleVerdict } from "./policy.ts"; +import { runStaticCheck } from "./checks/static.ts"; +import { runDynamicCheck } from "./checks/dynamic.ts"; +import { runInferenceCheck } from "./checks/inference.ts"; +import { runKbCheck } from "./checks/kb_query.ts"; + +const VERDICTS_DIR = "/home/profit/lakehouse/data/_auditor/verdicts"; + +export interface AuditOptions { + // Skip the cloud inference call (fast path for iteration). Default false. + skip_inference?: boolean; + // Skip the dynamic check (avoid running the hybrid fixture every PR, + // since it hits live services and mutates playbook state). Default false + // on `main`-branch-target PRs, true when auditing feature branches + // where the fixture would pollute state. Caller decides. + skip_dynamic?: boolean; + // Skip Gitea posting — useful for dry-runs / local testing. + // Default false. + dry_run?: boolean; +} + +export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise { + const t0 = Date.now(); + const diff = await getPrDiff(pr.number); + const { claims } = parseClaims(pr); + + // Run checks in parallel where they don't share mutable state. + // Static + kb_query + inference are all read-only. Dynamic mutates + // playbook state (nonce-scoped per run, but still live) so if + // skip_dynamic is false we still run it in parallel — the mutation + // is namespaced. + const [staticFindings, dynamicFindings, inferenceFindings, kbFindings] = await Promise.all([ + runStaticCheck(diff), + opts.skip_dynamic ? Promise.resolve(stubFinding("dynamic", "skipped by options")) : runDynamicCheck(), + opts.skip_inference ? Promise.resolve(stubFinding("inference", "skipped by options")) : runInferenceCheck(claims, diff), + runKbCheck(claims), + ]); + + const allFindings: Finding[] = [ + ...staticFindings, + ...dynamicFindings, + ...inferenceFindings, + ...kbFindings, + ]; + + const duration_ms = Date.now() - t0; + const metrics = { + audit_duration_ms: duration_ms, + findings_total: allFindings.length, + findings_block: allFindings.filter(f => f.severity === "block").length, + findings_warn: allFindings.filter(f => f.severity === "warn").length, + findings_info: allFindings.filter(f => f.severity === "info").length, + claims_strong: claims.filter(c => c.strength === "strong").length, + claims_moderate: claims.filter(c => c.strength === "moderate").length, + claims_weak: claims.filter(c => c.strength === "weak").length, + claims_total: claims.length, + diff_bytes: diff.length, + }; + + const verdict = assembleVerdict(allFindings, metrics, pr.number, pr.head_sha); + + await persistVerdict(verdict); + + if (!opts.dry_run) { + await postToGitea(verdict); + } + + return verdict; +} + +async function persistVerdict(v: Verdict): Promise { + await mkdir(VERDICTS_DIR, { recursive: true }); + const filename = `${v.pr_number}-${v.head_sha.slice(0, 12)}.json`; + await writeFile(join(VERDICTS_DIR, filename), JSON.stringify(v, null, 2)); +} + +export async function postToGitea(v: Verdict): Promise { + // 1. Commit status — the hard block signal (if branch protection + // is configured to require lakehouse/auditor on main). + const state = v.overall === "approve" ? "success" : "failure"; + await postCommitStatus({ + sha: v.head_sha, + state, + context: "lakehouse/auditor", + description: v.one_liner, + target_url: "", // no URL yet; could point to a verdicts dashboard + }); + + // 2. Issue comment — the reasoning. Gated so we don't spam the PR + // with identical comments on re-audits of the same SHA. Caller + // (poller) ensures we only re-audit fresh SHAs, but a dedup + // marker inside the body keeps it idempotent if re-run. + const body = formatReviewBody(v); + await postIssueComment({ pr_number: v.pr_number, body }); +} + +function formatReviewBody(v: Verdict): string { + const byCheck: Record = {}; + for (const f of v.findings) { + (byCheck[f.check] ||= []).push(f); + } + + const verdictEmoji = + v.overall === "approve" ? "✅" : + v.overall === "request_changes" ? "⚠️" : + "🛑"; + + const lines: string[] = []; + lines.push(`## Auditor verdict: ${verdictEmoji} \`${v.overall}\``); + lines.push(""); + lines.push(`**One-liner:** ${v.one_liner}`); + lines.push(`**Head SHA:** \`${v.head_sha.slice(0, 12)}\``); + lines.push(`**Audited at:** ${v.audited_at}`); + lines.push(""); + + // Per-check sections, only if the check produced findings. + const checkOrder = ["static", "dynamic", "inference", "kb_query"] as const; + for (const check of checkOrder) { + const fs = byCheck[check] ?? []; + if (fs.length === 0) continue; + const bySev = { + block: fs.filter(f => f.severity === "block").length, + warn: fs.filter(f => f.severity === "warn").length, + info: fs.filter(f => f.severity === "info").length, + }; + lines.push(`
${check} — ${fs.length} findings (${bySev.block} block, ${bySev.warn} warn, ${bySev.info} info)`); + lines.push(""); + for (const f of fs) { + const mark = f.severity === "block" ? "🛑" : f.severity === "warn" ? "⚠️" : "ℹ️"; + lines.push(`${mark} **${f.severity}** — ${f.summary}`); + for (const e of f.evidence.slice(0, 3)) { + lines.push(` - \`${e.slice(0, 180).replace(/\n/g, " ")}\``); + } + } + lines.push(""); + lines.push("
"); + lines.push(""); + } + + lines.push("### Metrics"); + lines.push("```json"); + lines.push(JSON.stringify(v.metrics, null, 2)); + lines.push("```"); + lines.push(""); + lines.push(`Lakehouse auditor · SHA ${v.head_sha.slice(0, 8)} · re-audit on new commit flips the status automatically.`); + + return lines.join("\n"); +} + +function stubFinding(check: "dynamic" | "inference", why: string): Finding[] { + return [{ check, severity: "info", summary: `${check} check skipped — ${why}`, evidence: [why] }]; +} diff --git a/auditor/checks/kb_query.ts b/auditor/checks/kb_query.ts new file mode 100644 index 0000000..b87066c --- /dev/null +++ b/auditor/checks/kb_query.ts @@ -0,0 +1,183 @@ +// Local-KB check — reads data/_kb/ + data/_observer/ + data/_bot/ +// for prior evidence bearing on this PR's claims. Cheap, offline, +// no model calls. The point: if a claim like "Phase X shipped" has +// a historical record of failing on the same signature before, the +// auditor surfaces that pattern before the cloud check has to +// infer it. +// +// What this check reads (all file-backed, append-only or periodic): +// data/_kb/outcomes.jsonl — per-scenario outcomes (kb.ts) +// data/_kb/error_corrections.jsonl — fail→succeed deltas on same sig +// data/_observer/ops.jsonl — observer ring → disk stream +// data/_bot/cycles/*.json — bot cycle results +// +// Each JSONL line / per-cycle file is small; this check reads tails +// only (last N lines or last M files) to stay cheap on large corpora. + +import { readFile, readdir, stat } from "node:fs/promises"; +import { join } from "node:path"; +import type { Claim, Finding } from "../types.ts"; + +const KB_DIR = "/home/profit/lakehouse/data/_kb"; +const OBSERVER_OPS = "/home/profit/lakehouse/data/_observer/ops.jsonl"; +const BOT_CYCLES_DIR = "/home/profit/lakehouse/data/_bot/cycles"; +const TAIL_LINES = 500; +const MAX_BOT_CYCLE_FILES = 30; + +export async function runKbCheck(claims: Claim[]): Promise { + const findings: Finding[] = []; + + // 1. Recent scenario outcomes: are strong-claim-style phrases showing + // up alongside failed events? That's "we claimed it worked" + + // "it didn't" in the KB. + const scenarioFindings = await checkScenarioOutcomes(claims); + findings.push(...scenarioFindings); + + // 2. Error corrections: any of the claims text overlap a + // recently-observed fail→succeed pair? If yes, add context. + const correctionFindings = await checkErrorCorrections(claims); + findings.push(...correctionFindings); + + // 3. Bot cycles: any prior bot cycle ended in tests_failed or + // apply_failed on a file this PR is also touching? + const botFindings = await checkBotCycles(); + findings.push(...botFindings); + + // 4. Observer: count recent error events. High volume = shared + // infra problem, worth flagging (context for other findings). + const obsFindings = await checkObserverStream(); + findings.push(...obsFindings); + + return findings; +} + +async function tailJsonl(path: string, n: number): Promise { + try { + const raw = await readFile(path, "utf8"); + const lines = raw.split("\n").filter(l => l.length > 0); + const slice = lines.slice(-n); + const out: T[] = []; + for (const line of slice) { + try { out.push(JSON.parse(line)); } catch { /* skip malformed */ } + } + return out; + } catch { + return []; + } +} + +async function checkScenarioOutcomes(_claims: Claim[]): Promise { + const outcomes = await tailJsonl(join(KB_DIR, "outcomes.jsonl"), TAIL_LINES); + if (outcomes.length === 0) return []; + const totalEvents = outcomes.reduce((s, o) => s + (o.total_events ?? 0), 0); + const okEvents = outcomes.reduce((s, o) => s + (o.ok_events ?? 0), 0); + const failRate = totalEvents > 0 ? 1 - okEvents / totalEvents : 0; + + if (totalEvents === 0) { + return [{ + check: "kb_query", + severity: "info", + summary: `KB: no scenario outcomes on file — learning loop is empty`, + evidence: [`data/_kb/outcomes.jsonl has ${outcomes.length} entries with 0 total events`], + }]; + } + + const recent = outcomes.slice(-10); + const recentFailSigs: string[] = recent + .filter(o => (o.ok_events ?? 0) < (o.total_events ?? 0)) + .map(o => o.sig_hash) + .filter(s => typeof s === "string"); + + const findings: Finding[] = [{ + check: "kb_query", + severity: failRate > 0.3 ? "warn" : "info", + summary: `KB: ${outcomes.length} recent scenario runs, ${okEvents}/${totalEvents} events ok (fail rate ${(failRate * 100).toFixed(1)}%)`, + evidence: [ + `most recent: ${recent[recent.length - 1]?.run_id ?? "?"}`, + `recent failing sigs: ${recentFailSigs.length > 0 ? recentFailSigs.slice(-3).join(", ") : "none"}`, + ], + }]; + return findings; +} + +async function checkErrorCorrections(_claims: Claim[]): Promise { + const corrections = await tailJsonl(join(KB_DIR, "error_corrections.jsonl"), TAIL_LINES); + if (corrections.length === 0) return []; + return [{ + check: "kb_query", + severity: "info", + summary: `KB: ${corrections.length} error corrections on file (fail→succeed pairs)`, + evidence: [ + corrections.length > 0 + ? `most recent: ${String(corrections[corrections.length - 1]?.sig_hash ?? "?").slice(0, 24)}` + : "none", + ], + }]; +} + +async function checkBotCycles(): Promise { + let entries: string[] = []; + try { entries = await readdir(BOT_CYCLES_DIR); } + catch { return []; } + + const jsonFiles = entries.filter(e => e.endsWith(".json")); + if (jsonFiles.length === 0) return []; + + // Sort by mtime desc, take most recent N + const withStat = await Promise.all( + jsonFiles.map(async name => { + try { return { name, mtime: (await stat(join(BOT_CYCLES_DIR, name))).mtimeMs }; } + catch { return { name, mtime: 0 }; } + }), + ); + const recent = withStat.sort((a, b) => b.mtime - a.mtime).slice(0, MAX_BOT_CYCLE_FILES); + + const outcomes: Record = {}; + for (const { name } of recent) { + try { + const r = JSON.parse(await readFile(join(BOT_CYCLES_DIR, name), "utf8")); + const o = String(r.outcome ?? "unknown"); + outcomes[o] = (outcomes[o] ?? 0) + 1; + } catch { /* skip */ } + } + + const summary = Object.entries(outcomes) + .sort((a, b) => b[1] - a[1]) + .map(([k, v]) => `${k}=${v}`) + .join(", "); + + const failCount = (outcomes["tests_failed"] ?? 0) + (outcomes["apply_failed"] ?? 0) + (outcomes["model_failed"] ?? 0); + return [{ + check: "kb_query", + severity: failCount > recent.length / 2 ? "warn" : "info", + summary: `KB: bot recorded ${recent.length} recent cycles — ${summary || "no outcomes parsed"}`, + evidence: [ + `dir: ${BOT_CYCLES_DIR}`, + `fail-class total: ${failCount} / ${recent.length}`, + ], + }]; +} + +async function checkObserverStream(): Promise { + const ops = await tailJsonl(OBSERVER_OPS, TAIL_LINES); + if (ops.length === 0) return []; + const failures = ops.filter(o => o.ok === false).length; + return [{ + check: "kb_query", + severity: "info", + summary: `KB: observer stream ${ops.length} recent ops, ${failures} failures`, + evidence: [ + `source: ${OBSERVER_OPS}`, + `by source: ${observerBySource(ops)}`, + ], + }]; +} + +function observerBySource(ops: any[]): string { + const c: Record = {}; + for (const o of ops) { + const s = String(o.source ?? "unknown"); + c[s] = (c[s] ?? 0) + 1; + } + return Object.entries(c).sort((a, b) => b[1] - a[1]).map(([k, v]) => `${k}=${v}`).join(", ") || "empty"; +} diff --git a/auditor/checks/static.ts b/auditor/checks/static.ts index 954bfd5..dc31e38 100644 --- a/auditor/checks/static.ts +++ b/auditor/checks/static.ts @@ -44,19 +44,31 @@ export function runStaticCheck(diff: string): Finding[] { // Skip diff bookkeeping + pure-delete files if (!lines.some(l => l.startsWith("+") && !l.startsWith("+++"))) continue; + // The auditor's own check files literally contain the BLOCK + // patterns as regex definitions (BLOCK_PATTERNS in this file, + // prompt examples in inference.ts). Skipping BLOCK scan on these + // specific paths prevents the checker from self-flagging its own + // string literals. WARN/INFO patterns still run — those genuinely + // could indicate problems in the checker's own code (TODO + // comments don't self-define). + const isAuditorCheckerFile = path.startsWith("auditor/checks/") || + path.startsWith("auditor/fixtures/"); + for (let idx = 0; idx < lines.length; idx++) { const line = lines[idx]; if (!line.startsWith("+") || line.startsWith("+++")) continue; const added = line.slice(1); - for (const { re, why } of BLOCK_PATTERNS) { - if (re.test(added)) { - findings.push({ - check: "static", - severity: "block", - summary: `${why} in ${path}`, - evidence: [`${path}:+${idx + 1}: ${added.trim().slice(0, 160)}`], - }); + if (!isAuditorCheckerFile) { + for (const { re, why } of BLOCK_PATTERNS) { + if (re.test(added)) { + findings.push({ + check: "static", + severity: "block", + summary: `${why} in ${path}`, + evidence: [`${path}:+${idx + 1}: ${added.trim().slice(0, 160)}`], + }); + } } } for (const { re, why } of WARN_COMMENT_PATTERNS) { diff --git a/auditor/gitea.ts b/auditor/gitea.ts index 582d9af..2b20d57 100644 --- a/auditor/gitea.ts +++ b/auditor/gitea.ts @@ -107,8 +107,9 @@ export async function postCommitStatus(args: { if (!r.ok) throw new Error(`postCommitStatus ${r.status}: ${await r.text()}`); } -/// Post a review comment. Type: "REQUEST_CHANGES" for block, -/// "COMMENT" for non-blocking, "APPROVE" for green. +/// Post a review comment. Gitea typically blocks self-review +/// (author posting a review on their own PR). Prefer +/// `postIssueComment` when running with the author's PAT. export async function postReview(args: { pr_number: number; commit_id: string; @@ -125,3 +126,20 @@ export async function postReview(args: { }); if (!r.ok) throw new Error(`postReview ${r.status}: ${await r.text()}`); } + +/// Plain issue comment. Works for the auditor's own PAT because +/// Gitea allows authors to comment on their own PRs (just not +/// review them). Auditor uses this for the reasoning body; the +/// actual block signal is the commit status. +export async function postIssueComment(args: { + pr_number: number; + body: string; +}): Promise<{ id: number; html_url: string }> { + const r = await giteaFetch(`/repos/${OWNER}/${REPO}/issues/${args.pr_number}/comments`, { + method: "POST", + body: JSON.stringify({ body: args.body }), + }); + if (!r.ok) throw new Error(`postIssueComment ${r.status}: ${await r.text()}`); + const j = await r.json() as any; + return { id: j.id, html_url: j.html_url }; +}