Session infrastructure: OpenRouter + tree-split reducer + observer→LLM Team + scrum_applier #11

Merged
profit merged 118 commits from scrum/auto-apply-19814 into main 2026-04-27 15:55:24 +00:00
3 changed files with 61 additions and 21 deletions
Showing only changes of commit 454da15301 - Show all commits

View File

@ -169,12 +169,16 @@ export async function runInferenceCheck(
interface Votes { trues: number; falses: number; evidences: string[] } interface Votes { trues: number; falses: number; evidences: string[] }
const votesByClaim = new Map<number, Votes>(); const votesByClaim = new Map<number, Votes>();
const unflaggedByRun: any[][] = []; const unflaggedByRun: any[][] = [];
let totalLatencyMs = 0; // The N=3 consensus calls run via Promise.all — wall-clock is
// bounded by the SLOWEST call, not the sum. Pre-2026-04-27 we
// summed and reported "Xms total" which double/triple-counted
// (Opus self-audit caught it). Use max for accurate wall-clock.
let maxLatencyMs = 0;
let totalEnrichedChars = 0; let totalEnrichedChars = 0;
let bugFingerprintsSeen = 0; let bugFingerprintsSeen = 0;
let matrixKeptSeen = 0; let matrixKeptSeen = 0;
for (const run of parsedRuns) { for (const run of parsedRuns) {
totalLatencyMs += run.latency_ms ?? 0; maxLatencyMs = Math.max(maxLatencyMs, run.latency_ms ?? 0);
totalEnrichedChars += run.enriched_chars ?? 0; totalEnrichedChars += run.enriched_chars ?? 0;
bugFingerprintsSeen = Math.max(bugFingerprintsSeen, run.bug_fingerprints ?? 0); bugFingerprintsSeen = Math.max(bugFingerprintsSeen, run.bug_fingerprints ?? 0);
matrixKeptSeen = Math.max(matrixKeptSeen, run.matrix_kept ?? 0); matrixKeptSeen = Math.max(matrixKeptSeen, run.matrix_kept ?? 0);
@ -199,7 +203,7 @@ export async function runInferenceCheck(
findings.push({ findings.push({
check: "inference", check: "inference",
severity: "info", severity: "info",
summary: `pr_audit mode runner completed (model=${MODEL}, consensus=${parsedRuns.length}/${N_CONSENSUS}, ${totalLatencyMs}ms total)${curationNote}`, summary: `pr_audit mode runner completed (model=${MODEL}, consensus=${parsedRuns.length}/${N_CONSENSUS}, ${maxLatencyMs}ms wall-clock)${curationNote}`,
evidence: [ evidence: [
`claims voted: ${votesByClaim.size}`, `claims voted: ${votesByClaim.size}`,
`parsed runs: ${parsedRuns.length} / ${N_CONSENSUS}`, `parsed runs: ${parsedRuns.length} / ${N_CONSENSUS}`,

View File

@ -28,7 +28,7 @@
import { readFile, writeFile, mkdir, appendFile, stat } from "node:fs/promises"; import { readFile, writeFile, mkdir, appendFile, stat } from "node:fs/promises";
import { existsSync } from "node:fs"; import { existsSync } from "node:fs";
import { join, resolve } from "node:path"; import { dirname, join, resolve } from "node:path";
import type { Finding, CheckKind } from "../types.ts"; import type { Finding, CheckKind } from "../types.ts";
const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100"; const GATEWAY = process.env.LH_GATEWAY_URL ?? "http://localhost:3100";
@ -106,15 +106,29 @@ export async function runKimiArchitectCheck(
ctx: KimiArchitectContext, ctx: KimiArchitectContext,
): Promise<Finding[]> { ): Promise<Finding[]> {
const cachePath = join(KIMI_VERDICTS_DIR, `${ctx.pr_number}-${ctx.head_sha.slice(0, 12)}.json`); const cachePath = join(KIMI_VERDICTS_DIR, `${ctx.pr_number}-${ctx.head_sha.slice(0, 12)}.json`);
const outageSentinel = `${cachePath}.outage`;
const OUTAGE_TTL_MS = 10 * 60 * 1000;
// Outage negative-cache — if upstream failed within OUTAGE_TTL_MS,
// skip this audit and return immediately. Prevents the daemon from
// hammering a downed Kimi/Anthropic upstream every 90s.
if (existsSync(outageSentinel)) {
try {
const s = await stat(outageSentinel);
if (Date.now() - s.mtimeMs < OUTAGE_TTL_MS) {
const note = JSON.parse(await readFile(outageSentinel, "utf8"));
return [skipFinding(`upstream still down (cached ${Math.round((Date.now() - s.mtimeMs) / 1000)}s ago): ${String(note.reason).slice(0, 160)}`)];
}
} catch { /* malformed sentinel — fall through to fresh call */ }
}
// Cost cap — return cached findings if a verdict for this exact head // Cost cap — return cached findings if a verdict for this exact head
// SHA was generated within the TTL. // SHA was generated within the TTL.
const cached = await loadCachedVerdict(cachePath); const cached = await loadCachedVerdict(cachePath);
if (cached) { if (cached) {
const fs2: Finding[] = cached.findings.length > 0 return cached.findings.length > 0
? cached.findings ? cached.findings
: [{ check: "kimi_architect" as CheckKind, severity: "info", summary: "kimi_architect cached — 0 findings", evidence: [`cache: ${cachePath}`] }]; : [{ check: "kimi_architect" as CheckKind, severity: "info", summary: "kimi_architect cached — 0 findings", evidence: [`cache: ${cachePath}`] }];
return fs2;
} }
const selected = selectModel(diff.length); const selected = selectModel(diff.length);
@ -122,6 +136,13 @@ export async function runKimiArchitectCheck(
try { try {
response = await callKimi(buildPrompt(diff, priorFindings, ctx), selected.provider, selected.model); response = await callKimi(buildPrompt(diff, priorFindings, ctx), selected.provider, selected.model);
} catch (e) { } catch (e) {
// Negative-cache for 10 min on outage (caught 2026-04-27 by Opus
// self-audit): without this, every audit cycle within the 24h
// TTL re-calls upstream while it's still down. Use a sentinel
// file with mtime check rather than persisting a verdict so the
// happy-path cache reader doesn't have to special-case it.
const sentinel = `${cachePath}.outage`;
try { await writeFile(sentinel, JSON.stringify({ at: new Date().toISOString(), reason: (e as Error).message.slice(0, 200) })); } catch {}
return [skipFinding(`kimi call failed (${selected.model}): ${(e as Error).message.slice(0, 200)}`)]; return [skipFinding(`kimi call failed (${selected.model}): ${(e as Error).message.slice(0, 200)}`)];
} }
@ -145,17 +166,25 @@ export async function runKimiArchitectCheck(
grounding, grounding,
}; };
await persistVerdict(cachePath, verdict); // Cache-poisoning guard (caught 2026-04-27 by Opus self-audit):
// when parseFindings returns 0 findings (Kimi rambled, prompt too
// big, or the markdown shape changed and our regex missed every
// block), persisting the empty verdict short-circuits all future
// audits in the 24h TTL window with a useless cached "0 findings"
// result. Better to leave no cache and re-call upstream next time.
// Always append metrics — observability shouldn't depend on whether
// findings parsed.
await appendMetrics(verdict); await appendMetrics(verdict);
if (findings.length > 0) {
return findings.length > 0 await persistVerdict(cachePath, verdict);
? findings return findings;
: [{ }
check: "kimi_architect" as CheckKind, return [{
severity: "info", check: "kimi_architect" as CheckKind,
summary: `kimi_architect produced 0 ranked findings (${response.finish_reason}, ${verdict.usage.completion_tokens} tokens)`, severity: "info",
evidence: [`raw response: ${cachePath}`], summary: `kimi_architect produced 0 ranked findings (${response.finish_reason}, ${verdict.usage.completion_tokens} tokens) — not cached`,
}]; evidence: [`raw saved (no cache): see kimi_audits.jsonl ${verdict.cached_at}`],
}];
} }
async function loadCachedVerdict(path: string): Promise<KimiVerdictFile | null> { async function loadCachedVerdict(path: string): Promise<KimiVerdictFile | null> {
@ -325,7 +354,11 @@ async function persistVerdict(path: string, v: KimiVerdictFile): Promise<void> {
} }
async function appendMetrics(v: KimiVerdictFile): Promise<void> { async function appendMetrics(v: KimiVerdictFile): Promise<void> {
await mkdir(join(KIMI_AUDITS_JSONL, ".."), { recursive: true }); // dirname() instead of join(path, "..") — caught 2026-04-27 by both
// Haiku and Opus self-audits. The "/.." idiom resolves correctly
// via Node path normalization but is non-idiomatic + breaks if the
// path ever has trailing dots.
await mkdir(dirname(KIMI_AUDITS_JSONL), { recursive: true });
await appendFile(KIMI_AUDITS_JSONL, JSON.stringify({ await appendFile(KIMI_AUDITS_JSONL, JSON.stringify({
pr_number: v.pr_number, pr_number: v.pr_number,
head_sha: v.head_sha, head_sha: v.head_sha,

View File

@ -195,8 +195,11 @@ pub async fn generate_continuable<G: TextGenerator>(
let req = make_request(opts, prompt.to_string(), current_max); let req = make_request(opts, prompt.to_string(), current_max);
let resp = generator.generate_text(req).await?; let resp = generator.generate_text(req).await?;
calls += 1; calls += 1;
prompt_tokens = prompt_tokens.saturating_add(resp.tokens_evaluated.unwrap_or(0) as u32); // u32::try_from saturates at u32::MAX instead of silently
completion_tokens = completion_tokens.saturating_add(resp.tokens_generated.unwrap_or(0) as u32); // truncating bits when tokens_evaluated/_generated comes back
// as a u64 > 4 billion. Caught 2026-04-27 by Opus self-audit.
prompt_tokens = prompt_tokens.saturating_add(u32::try_from(resp.tokens_evaluated.unwrap_or(0)).unwrap_or(u32::MAX));
completion_tokens = completion_tokens.saturating_add(u32::try_from(resp.tokens_generated.unwrap_or(0)).unwrap_or(u32::MAX));
if !resp.text.trim().is_empty() { if !resp.text.trim().is_empty() {
combined = resp.text; combined = resp.text;
break; break;
@ -227,8 +230,8 @@ pub async fn generate_continuable<G: TextGenerator>(
let req = make_request(opts, cont_prompt, current_max.min(opts.budget_cap)); let req = make_request(opts, cont_prompt, current_max.min(opts.budget_cap));
let resp = generator.generate_text(req).await?; let resp = generator.generate_text(req).await?;
calls += 1; calls += 1;
prompt_tokens = prompt_tokens.saturating_add(resp.tokens_evaluated.unwrap_or(0) as u32); prompt_tokens = prompt_tokens.saturating_add(u32::try_from(resp.tokens_evaluated.unwrap_or(0)).unwrap_or(u32::MAX));
completion_tokens = completion_tokens.saturating_add(resp.tokens_generated.unwrap_or(0) as u32); completion_tokens = completion_tokens.saturating_add(u32::try_from(resp.tokens_generated.unwrap_or(0)).unwrap_or(u32::MAX));
combined.push_str(&resp.text); combined.push_str(&resp.text);
continuations += 1; continuations += 1;
} }