profit 6e39d8778f
All checks were successful
lakehouse/auditor all checks passed (3 findings, all info)
Cohesion: Python inventory + integration plan + Phase A verdict indexing
Three artifacts in one PR:

1. docs/PYTHON_INVENTORY.md — every .py file in the repo classified:
   Production (sidecar routers + 3 systemd services), Documented
   (kb_measure, kb_staffer_report), Manual (one-off tools), Dead
   (sidecar/sidecar/lab_ui.py + pipeline_lab.py are genuinely
   not imported anywhere).

2. docs/COHESION_INTEGRATION_PLAN.md — the "smarter DB" loop J
   called out as missing. Six phases A-F. Phase A ships here; B-F
   are named + sequenced for follow-up PRs. Each phase adds ONE
   wire of the loop; no single PR does them all.

3. Phase A wire (auditor verdicts → observer + KB):
   - auditor/audit.ts: after assembleVerdict, fire-and-forget POST
     to :3800/event with source="auditor" AND append to
     data/_kb/outcomes.jsonl with kind="audit". Errors log + drop
     — the verdict is still on disk at _auditor/verdicts/.
   - mcp-server/observer.ts: extend source union to include
     "auditor" | "bot" (was "mcp" | "scenario" only, which silently
     coerced my first auditor POST to source="scenario"). Accept
     body.ok OR body.success. Accept body.audit_duration_ms as a
     fallback for duration_ms. Uses body.one_liner as
     output_summary when set.

Live-verified after observer restart:
   re-audit PR #6 → verdict=request_changes, 4 findings (1 warn)
     observer: by_source={'auditor': 1}  (previously coerced to 'scenario')
     _kb/outcomes.jsonl tail: kind=audit sig=pr6-7fe47bab
       pr=6 overall=request_changes

The shape of the loop is now visible to downstream consumers. Phase
B (auditor's kb_query check reads these audit rows for history)
lands in a follow-up PR. Phase C-F similar.

NOT in this PR:
- Actually deleting lab_ui.py + pipeline_lab.py (operator decision,
  called out in the inventory doc)
- Cleaning up the 5 overlapping Python scripts (same)
- Phases B-F of the cohesion plan (separate PRs per wire)
- Integration test that asserts "smarter DB" across runs (Phase F)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 17:22:42 -05:00

244 lines
9.4 KiB
TypeScript
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Orchestrator — runs all four checks on a PR, assembles a verdict,
// posts to Gitea. This is task #8's integration layer; the poller
// (task #9) calls this once per PR on every fresh head SHA.
//
// Hard-block mechanism: commit status posted with state="failure"
// and context="lakehouse/auditor". If `main` branch protection
// requires that context to pass, merge is physically impossible
// until the auditor re-audits a fixed commit and flips the status
// to "success".
//
// Human-readable reasoning: posted as a PR issue comment (not a
// review — reviews have self-review restrictions on Gitea and the
// auditor currently uses the same PAT as the PR author).
import { readFile, writeFile, mkdir } from "node:fs/promises";
import { join } from "node:path";
import type { PrSnapshot, Verdict, Finding } from "./types.ts";
import { getPrDiff, postCommitStatus, postIssueComment } from "./gitea.ts";
import { parseClaims } from "./claim_parser.ts";
import { assembleVerdict } from "./policy.ts";
import { runStaticCheck } from "./checks/static.ts";
import { runDynamicCheck } from "./checks/dynamic.ts";
import { runInferenceCheck } from "./checks/inference.ts";
import { runKbCheck } from "./checks/kb_query.ts";
const VERDICTS_DIR = "/home/profit/lakehouse/data/_auditor/verdicts";
export interface AuditOptions {
// Skip the cloud inference call (fast path for iteration). Default false.
skip_inference?: boolean;
// Skip the dynamic check (avoid running the hybrid fixture every PR,
// since it hits live services and mutates playbook state). Default false
// on `main`-branch-target PRs, true when auditing feature branches
// where the fixture would pollute state. Caller decides.
skip_dynamic?: boolean;
// Skip Gitea posting — useful for dry-runs / local testing.
// Default false.
dry_run?: boolean;
}
export async function auditPr(pr: PrSnapshot, opts: AuditOptions = {}): Promise<Verdict> {
const t0 = Date.now();
const diff = await getPrDiff(pr.number);
const { claims } = parseClaims(pr);
// Run checks in parallel where they don't share mutable state.
// Static + kb_query + inference are all read-only. Dynamic mutates
// playbook state (nonce-scoped per run, but still live) so if
// skip_dynamic is false we still run it in parallel — the mutation
// is namespaced.
const [staticFindings, dynamicFindings, inferenceFindings, kbFindings] = await Promise.all([
runStaticCheck(diff),
opts.skip_dynamic ? Promise.resolve(stubFinding("dynamic", "skipped by options")) : runDynamicCheck(),
opts.skip_inference ? Promise.resolve(stubFinding("inference", "skipped by options")) : runInferenceCheck(claims, diff),
runKbCheck(claims),
]);
const allFindings: Finding[] = [
...staticFindings,
...dynamicFindings,
...inferenceFindings,
...kbFindings,
];
const duration_ms = Date.now() - t0;
const metrics = {
audit_duration_ms: duration_ms,
findings_total: allFindings.length,
findings_block: allFindings.filter(f => f.severity === "block").length,
findings_warn: allFindings.filter(f => f.severity === "warn").length,
findings_info: allFindings.filter(f => f.severity === "info").length,
claims_strong: claims.filter(c => c.strength === "strong").length,
claims_moderate: claims.filter(c => c.strength === "moderate").length,
claims_weak: claims.filter(c => c.strength === "weak").length,
claims_total: claims.length,
diff_bytes: diff.length,
};
const verdict = assembleVerdict(allFindings, metrics, pr.number, pr.head_sha);
await persistVerdict(verdict);
// Phase A of the cohesion plan (docs/COHESION_INTEGRATION_PLAN.md):
// make every audit verdict visible to the observer + KB. Enables
// future Phase B (kb_query sees prior audit history) without a
// separate backfill. Fire-and-forget: observer/KB failures don't
// block the Gitea post.
indexVerdictToObserver(verdict).catch(e =>
console.error(`[auditor] observer indexing failed: ${(e as Error).message}`));
appendVerdictToKbOutcomes(verdict).catch(e =>
console.error(`[auditor] kb outcomes append failed: ${(e as Error).message}`));
if (!opts.dry_run) {
await postToGitea(verdict);
}
return verdict;
}
// Phase A — verdict indexing.
//
// Two destinations, both append-only + non-blocking:
// 1. observer :3800/event — ring buffer + data/_observer/ops.jsonl
// 2. data/_kb/outcomes.jsonl — same file scenarios write to, with
// kind:"audit" so readers can filter
//
// Errors log + drop. The verdict is still on disk at
// _auditor/verdicts/{pr}-{sha}.json; observer + KB are a convenience
// surface, not a source of truth.
const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800";
const KB_OUTCOMES = "/home/profit/lakehouse/data/_kb/outcomes.jsonl";
async function indexVerdictToObserver(v: Verdict): Promise<void> {
const payload = {
source: "auditor",
event_kind: "audit",
ok: v.overall === "approve",
sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`,
pr_number: v.pr_number,
head_sha: v.head_sha,
overall: v.overall,
one_liner: v.one_liner,
findings_block: v.metrics.findings_block,
findings_warn: v.metrics.findings_warn,
audit_duration_ms: v.metrics.audit_duration_ms,
audited_at: v.audited_at,
};
const r = await fetch(`${OBSERVER_URL}/event`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(payload),
signal: AbortSignal.timeout(3000),
});
if (!r.ok) throw new Error(`observer ${r.status}: ${await r.text()}`);
}
async function appendVerdictToKbOutcomes(v: Verdict): Promise<void> {
const { appendFile, mkdir } = await import("node:fs/promises");
const { dirname } = await import("node:path");
await mkdir(dirname(KB_OUTCOMES), { recursive: true });
const row = {
kind: "audit",
sig_hash: `pr${v.pr_number}-${v.head_sha.slice(0, 8)}`,
audited_at: v.audited_at,
pr_number: v.pr_number,
head_sha: v.head_sha,
overall: v.overall,
one_liner: v.one_liner,
ok_events: v.overall === "approve" ? 1 : 0,
total_events: 1,
findings: {
block: v.metrics.findings_block,
warn: v.metrics.findings_warn,
info: v.metrics.findings_info,
},
elapsed_secs: (v.metrics.audit_duration_ms ?? 0) / 1000,
};
await appendFile(KB_OUTCOMES, JSON.stringify(row) + "\n");
}
async function persistVerdict(v: Verdict): Promise<void> {
await mkdir(VERDICTS_DIR, { recursive: true });
const filename = `${v.pr_number}-${v.head_sha.slice(0, 12)}.json`;
await writeFile(join(VERDICTS_DIR, filename), JSON.stringify(v, null, 2));
}
export async function postToGitea(v: Verdict): Promise<void> {
// 1. Commit status — the hard block signal (if branch protection
// is configured to require lakehouse/auditor on main).
const state = v.overall === "approve" ? "success" : "failure";
await postCommitStatus({
sha: v.head_sha,
state,
context: "lakehouse/auditor",
description: v.one_liner,
target_url: "", // no URL yet; could point to a verdicts dashboard
});
// 2. Issue comment — the reasoning. Gated so we don't spam the PR
// with identical comments on re-audits of the same SHA. Caller
// (poller) ensures we only re-audit fresh SHAs, but a dedup
// marker inside the body keeps it idempotent if re-run.
const body = formatReviewBody(v);
await postIssueComment({ pr_number: v.pr_number, body });
}
function formatReviewBody(v: Verdict): string {
const byCheck: Record<string, Finding[]> = {};
for (const f of v.findings) {
(byCheck[f.check] ||= []).push(f);
}
const verdictEmoji =
v.overall === "approve" ? "✅" :
v.overall === "request_changes" ? "⚠️" :
"🛑";
const lines: string[] = [];
lines.push(`## Auditor verdict: ${verdictEmoji} \`${v.overall}\``);
lines.push("");
lines.push(`**One-liner:** ${v.one_liner}`);
lines.push(`**Head SHA:** \`${v.head_sha.slice(0, 12)}\``);
lines.push(`**Audited at:** ${v.audited_at}`);
lines.push("");
// Per-check sections, only if the check produced findings.
const checkOrder = ["static", "dynamic", "inference", "kb_query"] as const;
for (const check of checkOrder) {
const fs = byCheck[check] ?? [];
if (fs.length === 0) continue;
const bySev = {
block: fs.filter(f => f.severity === "block").length,
warn: fs.filter(f => f.severity === "warn").length,
info: fs.filter(f => f.severity === "info").length,
};
lines.push(`<details><summary><b>${check}</b> — ${fs.length} findings (${bySev.block} block, ${bySev.warn} warn, ${bySev.info} info)</summary>`);
lines.push("");
for (const f of fs) {
const mark = f.severity === "block" ? "🛑" : f.severity === "warn" ? "⚠️" : "";
lines.push(`${mark} **${f.severity}** — ${f.summary}`);
for (const e of f.evidence.slice(0, 3)) {
lines.push(` - \`${e.slice(0, 180).replace(/\n/g, " ")}\``);
}
}
lines.push("");
lines.push("</details>");
lines.push("");
}
lines.push("### Metrics");
lines.push("```json");
lines.push(JSON.stringify(v.metrics, null, 2));
lines.push("```");
lines.push("");
lines.push(`<sub>Lakehouse auditor · SHA ${v.head_sha.slice(0, 8)} · re-audit on new commit flips the status automatically.</sub>`);
return lines.join("\n");
}
function stubFinding(check: "dynamic" | "inference", why: string): Finding[] {
return [{ check, severity: "info", summary: `${check} check skipped — ${why}`, evidence: [why] }];
}