lakehouse/tests/real-world/scrum_master_pipeline.ts
root 8b77d67c9c
Some checks failed
lakehouse/auditor 1 blocking issue: cloud: claim not backed — "journal event verified live (total_events_created 0→1 after probe)."
OpenRouter rescue ladder + tree-split reduce fix + observer→LLM Team + scrum_applier + first auto-applied patch
## Infrastructure (scrum loop hardening)

crates/gateway/src/v1/openrouter.rs — new OpenRouter provider
  Direct HTTPS to openrouter.ai/api/v1/chat/completions with OpenAI-compatible shape.
  Key resolution: OPENROUTER_API_KEY env → /home/profit/.env → /root/llm_team_config.json
  (shares LLM Team UI's quota). Added after iter 5 hit repeated Ollama Cloud 502s on
  kimi-k2:1t — different provider backbone as rescue rung. Unit tests pin the URL
  stripping and OpenAI wire shape.

crates/gateway/src/v1/mod.rs + main.rs
  Added `"openrouter" | "openrouter_free"` arm to /v1/chat dispatch.
  V1State.openrouter_key loaded at startup via openrouter::resolve_openrouter_key()
  mirroring the Ollama Cloud pattern. Startup log:
    "v1: OpenRouter key loaded — /v1/chat provider=openrouter enabled"

tests/real-world/scrum_master_pipeline.ts
  * 9-rung ladder — kimi-k2:1t → qwen3-coder:480b → deepseek-v3.1:671b →
    mistral-large-3:675b → gpt-oss:120b → qwen3.5:397b → openrouter/gpt-oss-120b:free
    → openrouter/gemma-3-27b-it:free → local qwen3.5:latest.
    Added qwen3-coder:480b as rung 2 after live probes confirmed it rescues
    kimi-k2:1t 502s cleanly (0.9s latency, substantive reviews).
    Dropped devstral-2 (displaced by qwen3-coder); dropped kimi-k2.6 (not available);
    dropped minimax-m2.7 (returned 0 chars / 400 thinking tokens).
    Local fallback promoted qwen3.5:latest per J's direction 2026-04-24.
  * MAX_ATTEMPTS bumped 6 → 9 to accommodate the rescue tier.
  * Tree-split scratchpad fixed — was concatenating shard markers directly
    into the reviewer input, causing kimi-k2:1t to write titles like
    "Forensic Audit Report – file.rs (shard 3)". Now uses internal §N§
    markers during accumulation and runs a proper reduce step that
    collapses per-shard digests into ONE coherent file-level synthesis
    with markers stripped. Matches the Phase 21 aibridge::tree_split
    map→reduce design. Fallback to stripped scratchpad if reducer returns thin.

tests/real-world/scrum_applier.ts — NEW (737 lines)
  The auto-apply pipeline. Reads scrum_reviews.jsonl, filters rows where
  gradient_tier ∈ {auto, dry_run} AND confidence_avg ≥ MIN_CONF (default 90),
  asks the reviewer model for concrete old_string/new_string patch JSON,
  applies via text replacement, runs cargo check after each file, commits
  if green and reverts if red. Deny-list: /etc/, config/, ops/, auditor/,
  docs/, data/, mcp-server/, ui/, sidecar/, scripts/. Hard caps: per-patch
  confidence ≥ MIN_CONF, old_string must be exactly unique, max 20 lines per
  patch. Never runs on main without explicit LH_APPLIER_BRANCH override.
  Audit trail in data/_kb/auto_apply.jsonl.

  Empirical behavior (dry-run over iter 4 reviews):
    5 eligible files → 1 green commit-ready, 2 build-red reverts, 2 all-rejected
  The build-green gate caught 2 bad patches before they'd have merged.

mcp-server/observer.ts — LLM Team code_review escalation
  When a sig_hash accumulates ≥3 failures (ESCALATION_THRESHOLD), fire-and-forget
  POST /api/run?mode=code_review at localhost:5000 with the failure cluster context.
  Parses facts/entities/relationships/file_hints from the response. Writes to a
  new data/_kb/observer_escalations.jsonl surface. Answers J's vision of the
  observer triggering richer LLM Team calls when failures pile up.
  Non-blocking: runs parallel to existing qwen2.5 analyzer, never replaces it.
  Tracks escalated sig_hashes in a session-local Set to avoid re-hammering
  LLM Team when a cluster persists across observer cycles.

crates/aibridge/src/context.rs
  First auto-applied patch produced by scrum_applier.ts (dry-run path —
  applier writes files in dry-run mode but doesn't commit; bug noted for
  iter 6 fix). Adds #[deprecated] annotation to the inline estimate_tokens
  helper pointing callers to the centralized shared::model_matrix::ModelMatrix
  entry point (P21-002 — duplicate token-estimator surfaces). Cargo check
  passes with the annotation (verified by applier's own build gate).

## Visual Control Plane (UI)

ui/server.ts — Bun.serve on :3950 with /data/* fan-out:
  /data/services, /data/reviews, /data/metrics, /data/trust, /data/overrides,
  /data/findings, /data/outcomes, /data/audit_facts, /data/file/:path,
  /data/refactor_signals, /data/search?q=, /data/signal_classes,
  /data/logs/:svc (journalctl tail per systemd unit), /data/scrum_log.
  Bug fix: tryFetch always attempts JSON.parse before falling back to text
  — observer's Bun.serve returns JSON without application/json content-type,
  which was displaying stats as a raw string ("0 ops" on map) before.

ui/index.html + ui.css — dark neo-brutalist shell. 6 views:
  MAP (D3 force-graph + overlays) / TRACE (per-file iter history) /
  TRAJECTORY (signal-class cards + refactor-signals table + reverse-index
  search box) / METRICS (every card has SOURCE + GOOD lines explaining
  where the number comes from and what target trajectory means) /
  KB (card grid with tooltips on every field) / CONSOLE (per-service
  journalctl tabs).

ui/ui.js — polling client, D3 wiring, signal-class panel, refactor-signals
  table, reverse-index search, per-service console tabs. Bug fix:
  renderNodeContext had Object.entries() iterating string characters when
  /health returned a plain string — now guards with typeof check so
  "lakehouse ok" renders as one row instead of "0 l / 1 a / 2 k / ...".

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 03:45:35 -05:00

783 lines
36 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Scrum-master orchestrator — pulls git repo source + PRD + a change
// proposal, chunks everything, hands each code piece to the proven
// escalation ladder (small-local → big-local → cloud → specialist →
// biggest) with learning context between attempts. Collects per-file
// suggestions in a coherent handoff report.
//
// What it composes (everything below is already shipped + proven):
// - Chunker + embeddings (sidecar /embed, nomic-embed-text)
// - In-memory cosine retrieval (top-K PRD + plan chunks per file)
// - Escalation ladder (6 tiers, cycling on empty/error/thin-answer)
// - Per-attempt learning-context injection (prior failures → prompt)
// - Tree-split fallback when combined context exceeds budget
// - JSONL output per file + summary
//
// Deliberate scope limit: TARGET_FILES is 3 files by default. The
// pipeline works at larger N, but at ~90s/file × 3 files = 4-5 min,
// 15 files = 22 min. Bump via env LH_SCRUM_FILES="path1,path2,...".
//
// Run: bun run tests/real-world/scrum_master_pipeline.ts
import { readFile, writeFile, mkdir } from "node:fs/promises";
import { createHash } from "node:crypto";
const GATEWAY = "http://localhost:3100";
const SIDECAR = "http://localhost:3200";
const CHUNK_SIZE = 800;
const CHUNK_OVERLAP = 120;
const TOP_K_CONTEXT = 5;
const MAX_ATTEMPTS = 9;
// Files larger than this get tree-split instead of truncated. Fixes the
// 6KB false-positive class (model claiming a field is "missing" when
// it exists past the context cutoff).
const FILE_TREE_SPLIT_THRESHOLD = 6000;
const FILE_SHARD_SIZE = 3500;
// Appended jsonl so auditor's kb_query can surface scrum findings for
// files touched by a PR under review. Part of cohesion plan Phase C.
const SCRUM_REVIEWS_JSONL = "/home/profit/lakehouse/data/_kb/scrum_reviews.jsonl";
const OUT_DIR = `/home/profit/lakehouse/tests/real-world/runs/scrum_${Date.now().toString(36)}`;
const PRD_PATH = "/home/profit/lakehouse/docs/PRD.md";
// Using CONTROL_PLANE_PRD as the "suggested changes" doc since it
// describes the Phase 38-44 target architecture and is on main.
// Override via LH_SCRUM_PROPOSAL env to point at a fix-wave doc
// generated from a phase-sweep audit, so the scrum pulls direction
// from concrete findings instead of the high-level PRD alone.
const PROPOSAL_PATH = process.env.LH_SCRUM_PROPOSAL
|| "/home/profit/lakehouse/docs/CONTROL_PLANE_PRD.md";
// Iter 2+ — when LH_SCRUM_FORENSIC is set to a file path, prepend its
// contents as an adversarial auditor preamble to every per-file prompt.
// This flips the review tone from "suggest improvements" to "prove it
// works or mark FAIL." Added 2026-04-23 for iter 2 of the 6x loop.
// Empty string = no preamble (iter-1 behavior).
const FORENSIC_PREAMBLE = process.env.LH_SCRUM_FORENSIC
? (() => {
try {
return require("node:fs").readFileSync(process.env.LH_SCRUM_FORENSIC!, "utf8");
} catch (e) {
console.error(`[scrum] warning: could not read LH_SCRUM_FORENSIC=${process.env.LH_SCRUM_FORENSIC}: ${e}`);
return "";
}
})()
: "";
// Scoped target: 3 representative source files by default.
// The scrum-master walks these in order and produces one suggestion
// set per file. Override via env for a wider sweep.
const DEFAULT_TARGETS = [
"/home/profit/lakehouse/crates/vectord/src/playbook_memory.rs",
"/home/profit/lakehouse/crates/vectord/src/doc_drift.rs",
"/home/profit/lakehouse/auditor/audit.ts",
];
const TARGET_FILES: string[] = process.env.LH_SCRUM_FILES
? process.env.LH_SCRUM_FILES.split(",").map(s => s.trim())
: DEFAULT_TARGETS;
// Cloud-first ladder, STRONGEST-MODEL-FIRST (iter 3+, 2026-04-24).
// J's direction: "switch to the strongest cloud model" for iter 3 —
// the forensic prompt is demanding enough that even 120B gets rejected
// as thin. Rank by parameter count / reasoning strength:
// 1. kimi-k2:1t — 1T params, Moonshot flagship (biggest)
// 2. kimi-k2.6 — Moonshot next-gen, pro tier
// 3. deepseek-v3.1:671b — 671B, strong reasoning + coding
// 4. mistral-large-3:675b — 675B, deep analysis
// 5. qwen3.5:397b — 397B (iter 2's rescue model)
// 6. gpt-oss:120b — 120B (iter 1's primary; still strong fallback)
// Local fallbacks kept for cloud-down scenarios.
// Hot-path pipelines (scenario.ts / execution_loop) stay local per
// Phase 20 t1_hot — this scrum is not hot path.
const LADDER: Array<{ provider: "ollama" | "ollama_cloud" | "openrouter"; model: string; note: string }> = [
{ provider: "ollama_cloud", model: "kimi-k2:1t", note: "cloud 1T — biggest available, 1.4s probe" },
{ provider: "ollama_cloud", model: "qwen3-coder:480b", note: "cloud 480B — coding specialist, 0.9s probe" },
{ provider: "ollama_cloud", model: "deepseek-v3.1:671b", note: "cloud 671B — fast reasoning (1.0s probe)" },
{ provider: "ollama_cloud", model: "mistral-large-3:675b", note: "cloud 675B — deep analysis (0.9s probe)" },
{ provider: "ollama_cloud", model: "gpt-oss:120b", note: "cloud 120B — reliable workhorse (iter1 baseline)" },
{ provider: "ollama_cloud", model: "qwen3.5:397b", note: "cloud 397B dense — deep final thinker (J 2026-04-24)" },
// Free-tier rescue — different provider backbone, different quota.
// Added 2026-04-24 after iter 5 hit repeated Ollama Cloud 502s on
// kimi-k2:1t. These have lower parameter counts than the Ollama
// Cloud rungs but high availability: if upstream is down, we still
// land a review instead of giving up.
{ provider: "openrouter", model: "openai/gpt-oss-120b:free", note: "OpenRouter free 120B — substantive rescue, 2.8s probe" },
{ provider: "openrouter", model: "google/gemma-3-27b-it:free", note: "OpenRouter free 27B — fastest rescue, 1.4s probe" },
{ provider: "ollama", model: "qwen3.5:latest", note: "local qwen3.5 — best local model per J (2026-04-24), last-resort if all cloud down" },
// Dropped from the ladder after 2026-04-24 probe:
// - kimi-k2.6 — not available on current tier (empty response)
// - devstral-2:123b — displaced by qwen3-coder:480b (better coding specialist)
// - minimax-m2.7 — 400 thinking tokens, 0 content output
// - openrouter qwen3-coder:free / llama-3.3 / hermes-3 — provider errors
// - openrouter minimax-m2.5:free — 45s timeout
];
type Chunk = { id: string; text: string; embedding: number[]; origin: string; offset: number };
interface FileReview {
file: string;
file_bytes: number;
tree_split_fired: boolean;
shards_summarized: number;
top_prd_chunks: Array<{ origin: string; offset: number; score: number }>;
top_proposal_chunks: Array<{ origin: string; offset: number; score: number }>;
attempts_made: number;
attempts_history: Array<{ n: number; model: string; status: "accepted" | "thin" | "error"; chars: number; error?: string }>;
accepted_on: number | null;
escalated_to_model: string;
suggestions: string;
duration_ms: number;
}
function log(msg: string) { console.log(`[scrum] ${msg}`); }
function cosine(a: number[], b: number[]): number {
let dot = 0, na = 0, nb = 0;
for (let i = 0; i < a.length; i++) { dot += a[i] * b[i]; na += a[i] * a[i]; nb += b[i] * b[i]; }
return na && nb ? dot / (Math.sqrt(na) * Math.sqrt(nb)) : 0;
}
function chunkText(text: string): Array<{ text: string; offset: number }> {
const out: Array<{ text: string; offset: number }> = [];
for (let i = 0; i < text.length; ) {
const end = Math.min(i + CHUNK_SIZE, text.length);
const slice = text.slice(i, end).trim();
if (slice.length > 60) out.push({ text: slice, offset: i });
if (end >= text.length) break;
i = end - CHUNK_OVERLAP;
}
return out;
}
async function embedBatch(texts: string[]): Promise<number[][]> {
const r = await fetch(`${SIDECAR}/embed`, {
method: "POST", headers: { "content-type": "application/json" },
body: JSON.stringify({ texts }),
signal: AbortSignal.timeout(120000),
});
if (!r.ok) throw new Error(`embed ${r.status}`);
return (await r.json() as any).embeddings;
}
async function chat(opts: {
provider: "ollama" | "ollama_cloud",
model: string,
prompt: string,
max_tokens?: number,
}): Promise<{ content: string; error?: string; prompt_tokens: number; completion_tokens: number }> {
try {
const r = await fetch(`${GATEWAY}/v1/chat`, {
method: "POST", headers: { "content-type": "application/json" },
body: JSON.stringify({
provider: opts.provider,
model: opts.model,
messages: [{ role: "user", content: opts.prompt }],
max_tokens: opts.max_tokens ?? 1500,
temperature: 0.2,
think: false,
}),
signal: AbortSignal.timeout(180000),
});
if (!r.ok) return { content: "", error: `/v1/chat ${r.status}: ${(await r.text()).slice(0, 200)}`, prompt_tokens: 0, completion_tokens: 0 };
const j: any = await r.json();
return {
content: j.choices?.[0]?.message?.content ?? "",
prompt_tokens: j.usage?.prompt_tokens ?? 0,
completion_tokens: j.usage?.completion_tokens ?? 0,
};
} catch (e) {
return { content: "", error: (e as Error).message, prompt_tokens: 0, completion_tokens: 0 };
}
}
// Accept a file-review answer if it's substantive + structured.
// We're not validating Rust here — we're validating that the model
// produced a coherent suggestion set.
function isAcceptable(answer: string): boolean {
if (answer.length < 200) return false; // too thin
// Must at least try a structured form — numbered list, bullets,
// or sections. Models that just hand-wave fail.
const hasStructure = /^\s*[-*]\s/m.test(answer)
|| /^\s*\d+\.\s/m.test(answer)
|| /^\s*#/m.test(answer);
return hasStructure;
}
function retrieveTopK(query_emb: number[], pool: Chunk[], k: number): Chunk[] {
return pool
.map(c => ({ c, score: cosine(query_emb, c.embedding) }))
.sort((a, b) => b.score - a.score)
.slice(0, k)
.map(x => ({ ...x.c, _score: x.score } as any));
}
// Tree-split a large file: shard it, summarize each shard into a
// running scratchpad, THEN run a reduce step that collapses the
// scratchpad into one file-level synthesis with shard boundaries
// stripped. Returns the synthesis (not the raw scratchpad) so the
// final reviewer never sees "--- shard N ---" markers and can't
// leak them into its review title.
//
// Phase 21 design (aibridge/src/tree_split.rs) with the map → reduce
// shape. Earlier version concatenated per-shard digests directly into
// the reviewer prompt, which led to kimi-k2:1t writing review titles
// like "Forensic Audit Report file.rs (shard 3)" because the shard
// markers bled through. Fix 2026-04-24 adds the reduce step.
async function treeSplitFile(
filePath: string,
content: string,
): Promise<{ scratchpad: string; shards: number; cloud_calls: number }> {
const shards: Array<{ from: number; to: number; text: string }> = [];
for (let i = 0; i < content.length; i += FILE_SHARD_SIZE) {
const end = Math.min(i + FILE_SHARD_SIZE, content.length);
shards.push({ from: i, to: end, text: content.slice(i, end) });
}
// MAP — each shard produces a digest that feeds the next shard's
// context. Internal markers are kept to help the reducer align
// overlapping observations across shards; they're stripped before
// the reviewer sees anything.
let workingScratchpad = "";
let cloud_calls = 0;
log(` tree-split: ${content.length} chars → ${shards.length} shards of ${FILE_SHARD_SIZE}`);
for (const [si, shard] of shards.entries()) {
const prompt = `You are writing a SECTION of a full-file summary. File: ${filePath}. This is one piece (bytes ${shard.from}..${shard.to}) of a larger source file you are NOT seeing in its entirety right now.
─────── source ───────
${shard.text}
─────── end source ───────
Prior-piece notes so far (if empty, this is the first piece):
${workingScratchpad || "(empty)"}
Extract facts about the code in this piece that will help review the FULL file later: function + struct names with brief purpose, struct fields + types, invariants, TODOs, error-handling style, obvious gaps. Under 150 words. Flat facts only, no headings, no phrases like "this shard" or "in my section".`;
const r = await chat({
provider: "ollama_cloud",
model: "gpt-oss:120b",
prompt,
max_tokens: 400,
});
cloud_calls += 1;
if (r.content) {
// Keep internal alignment markers for the reducer; stripped later.
workingScratchpad += `\${si + 1}§\n${r.content.trim()}`;
}
}
// REDUCE — collapse the per-shard digests into one coherent
// file-level summary. The reducer sees all digests at once and
// produces a single narrative the reviewer can treat as "the file".
// Shard markers are NOT in the output. This is what fixes the
// shard-leakage bug that affected both the scrum and the auditor.
const reducePrompt = `You are producing a SINGLE coherent summary of a Rust/TypeScript source file from a set of prior-piece notes. The notes were taken while walking the file in order but should be merged into one description of the whole file.
FILE: ${filePath} (${content.length} bytes, ${shards.length} pieces)
PRIOR-PIECE NOTES (markers §N§ delimit pieces but are artifacts — do not mention them):
${workingScratchpad}
Produce ONE coherent file-level summary:
1. One-sentence purpose of the file.
2. Key public types / functions / constants (names + one-line purpose each).
3. Known gaps, TODOs, or error-handling inconsistencies the notes surfaced.
4. Obvious invariants the file relies on.
Do NOT say "piece 1" or "shard N" or "section" — present the summary as if you read the whole file at once. Under 600 words.`;
const reduced = await chat({
provider: "ollama_cloud",
model: "gpt-oss:120b",
prompt: reducePrompt,
max_tokens: 900,
});
cloud_calls += 1;
const synthesis = reduced.content?.trim() ?? "";
// Safety: if the reducer returned thin output, fall back to the
// raw scratchpad stripped of markers — better than nothing.
const final = synthesis.length > 200
? synthesis
: workingScratchpad.replace(/§\d+§\n/g, "").trim();
return { scratchpad: final, shards: shards.length, cloud_calls };
}
async function reviewFile(
filePath: string,
prd_chunks: Chunk[],
proposal_chunks: Chunk[],
): Promise<FileReview> {
const t0 = Date.now();
log(`file: ${filePath}`);
const content = await readFile(filePath, "utf8");
const rel = filePath.replace("/home/profit/lakehouse/", "");
// Build a query embedding from the first ~800 chars of the file
// (good enough for topical retrieval).
const seed = content.slice(0, 800);
const [seedEmb] = await embedBatch([seed]);
const topPrd = retrieveTopK(seedEmb, prd_chunks, TOP_K_CONTEXT);
const topPlan = retrieveTopK(seedEmb, proposal_chunks, TOP_K_CONTEXT);
log(` retrieved ${topPrd.length} PRD chunks + ${topPlan.length} proposal chunks`);
const contextBlock = [
"═══ RELEVANT PRD EXCERPTS ═══",
...topPrd.map(c => `[PRD @${c.offset}]\n${c.text.slice(0, 600)}`),
"",
"═══ RELEVANT CHANGE PROPOSAL EXCERPTS ═══",
...topPlan.map(c => `[PLAN @${c.offset}]\n${c.text.slice(0, 600)}`),
].join("\n\n");
// Files bigger than FILE_TREE_SPLIT_THRESHOLD get tree-split.
// Summarize each shard to a scratchpad, then review against the
// scratchpad instead of the truncated first chunk. Prevents the
// false-positive pattern where the model claims a field is
// "missing" because it's past the context cutoff.
let sourceForPrompt: string;
let treeSplitFired = false;
let shardsSummarized = 0;
let extraCloudCalls = 0;
if (content.length > FILE_TREE_SPLIT_THRESHOLD) {
treeSplitFired = true;
const ts = await treeSplitFile(rel, content);
shardsSummarized = ts.shards;
extraCloudCalls = ts.cloud_calls;
sourceForPrompt = `[FULL-FILE SCRATCHPAD — distilled from ${ts.shards} shards via tree-split]\n${ts.scratchpad}`;
} else {
sourceForPrompt = content;
}
// Prompt — when tree-split fired, include an explicit instruction
// not to claim a field/function is "missing" because the scratchpad
// is a distillation not the full file. Attacks the rubric-tuning
// concern J called out.
const truncationWarning = treeSplitFired
? `\nIMPORTANT: the "source" below is a multi-shard distillation (tree-split across ${shardsSummarized} shards), NOT the full raw file. DO NOT claim any field, function, or feature is "missing" based on its absence from this distillation — the distillation may have elided it. Only call out gaps that appear DIRECTLY contradicted by the PRD excerpts.\n`
: "";
const forensicPrefix = FORENSIC_PREAMBLE
? `${FORENSIC_PREAMBLE}\n\n═══ FILE UNDER AUDIT ═══\n\n`
: "";
const baseTask = `${forensicPrefix}You are reviewing one source file against the Lakehouse PRD and an active cohesion-integration plan.
FILE: ${rel} (${content.length} bytes${treeSplitFired ? `, tree-split into ${shardsSummarized} shards` : ""})
${truncationWarning}
─────── source ───────
${sourceForPrompt}
─────── end source ───────
${contextBlock}
Produce a structured review with:
1. Alignment score (1-10) between this file and the PRD intent
2. 3-5 concrete suggested changes (bullet points), each naming a specific function/line and what to change
3. Any gap where this file's behavior contradicts the PRD or the proposal
${FORENSIC_PREAMBLE ? "4. Apply the forensic audit passes from the preamble: pseudocode detection, PRD contract status, normalization/validation pipeline, failure→repair loop, execution memory, relevance orchestration, execution safety, testing evidence. Issue a verdict pass|needs_patch|fail." : ""}
**Per-finding confidence (required on every suggestion):**
Attach a self-assessed **Confidence: NN%** to every suggested change AND every gap you list. The percentage is your belief that the suggestion is correct, will compile, and lands the PRD intent. Calibration guide:
- 90-100%: pattern seen repeatedly in shipped code; change is mechanical; low risk of regressions
- 70-89%: confident in direction, some room for interpretation on API shape or naming
- 50-69%: plausible fix but may not match existing conventions or may cascade to other files
- <50%: genuinely uncertain — include regardless so downstream knows to investigate before applying
Format each finding as: \`**1.** <change>. **Confidence: NN%.**\` (in tables, add a final "Confidence" column.) Low confidence is valuable signal — do not round up.
Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-offset when relevant.`;
const history: FileReview["attempts_history"] = [];
let accepted: string | null = null;
let acceptedModel = "";
let acceptedOn = 0;
for (let i = 0; i < MAX_ATTEMPTS; i++) {
const n = i + 1;
const rung = LADDER[i];
const learning = history.length > 0
? `\n\n═══ PRIOR ATTEMPTS FAILED. Specific issues to fix: ═══\n${history.map(h => `Attempt ${h.n} (${h.model}, ${h.chars} chars): ${h.status}${h.error ?? "thin/unstructured answer"}`).join("\n")}\n═══`
: "";
log(` attempt ${n}/${MAX_ATTEMPTS}: ${rung.provider}::${rung.model}${learning ? " [w/ learning]" : ""}`);
const r = await chat({
provider: rung.provider,
model: rung.model,
prompt: baseTask + learning,
max_tokens: 1500,
});
if (r.error) {
history.push({ n, model: rung.model, status: "error", chars: 0, error: r.error.slice(0, 180) });
log(` ✗ error: ${r.error.slice(0, 80)}`);
continue;
}
if (!isAcceptable(r.content)) {
history.push({ n, model: rung.model, status: "thin", chars: r.content.length, error: `thin/unstructured (${r.content.length} chars)` });
log(` ✗ thin/unstructured (${r.content.length} chars)`);
continue;
}
history.push({ n, model: rung.model, status: "accepted", chars: r.content.length });
accepted = r.content;
acceptedModel = `${rung.provider}/${rung.model}`;
acceptedOn = n;
log(` ✓ ACCEPTED on attempt ${n} (${rung.model}, ${r.content.length} chars)`);
break;
}
const review: FileReview = {
file: rel,
file_bytes: content.length,
tree_split_fired: treeSplitFired,
shards_summarized: shardsSummarized,
top_prd_chunks: topPrd.map(c => ({ origin: c.origin, offset: c.offset, score: (c as any)._score })),
top_proposal_chunks: topPlan.map(c => ({ origin: c.origin, offset: c.offset, score: (c as any)._score })),
attempts_made: history.length,
attempts_history: history,
accepted_on: acceptedOn || null,
escalated_to_model: acceptedModel,
suggestions: accepted ?? "[no acceptable answer after escalation ladder exhausted]",
duration_ms: Date.now() - t0,
};
// Append to the shared scrum-reviews jsonl so the auditor's
// kb_query check can surface relevant reviews for files in a
// PR diff. Cohesion plan Phase C wire.
if (accepted) {
const { appendFile, mkdir } = await import("node:fs/promises");
const { dirname } = await import("node:path");
await mkdir(dirname(SCRUM_REVIEWS_JSONL), { recursive: true });
// Extract per-finding confidences from the accepted markdown.
// Patterns tried: "Confidence: NN%", "Confidence**: NN%",
// and table-cell "| 92% |". Cap at 20 matches to bound row size.
// Added 2026-04-23 (iter 2 direction from J: "make scrum output
// include self-assessed confidence per finding").
const confidences: number[] = [];
// Markdown format: "Confidence: 92%" / "Confidence**: 92%" / "| 92% |"
const patMarkdown = /(?:Confidence[*:\s]*\s*|\|\s*)(\d{1,3})\s*%/gi;
// JSON format (forensic strict output): "confidence": 92
const patJson = /"confidence"\s*:\s*(\d{1,3})(?!\d)/gi;
for (const pat of [patMarkdown, patJson]) {
const matches = accepted.matchAll(pat);
for (const hit of matches) {
if (confidences.length >= 40) break;
const pct = parseInt(hit[1], 10);
if (pct >= 0 && pct <= 100) confidences.push(pct);
}
}
const conf_avg = confidences.length
? Math.round(confidences.reduce((a, b) => a + b, 0) / confidences.length)
: null;
const conf_min = confidences.length ? Math.min(...confidences) : null;
// Score extraction — regex accepts decimals ("Score: 4.5/10") and
// surrounding punctuation ("4/10 — mid"). iter 3 had 4 unparseable
// scores because the prior regex /(\d)\s*\/\s*10/ missed decimals.
const scoreMatch = accepted.match(/(?:score[\s*:]*)?(\d(?:\.\d)?)\s*\/\s*10\b/i);
const alignment_score = scoreMatch ? parseFloat(scoreMatch[1]) : null;
// Forensic JSON extraction — iter 3 showed 20/21 files came back
// as JSON (verdict + critical_failures[] + verified_components[] + ...)
// rather than markdown tables. Previously we only stored suggestions_preview
// (truncated to 2KB); now we also capture the structured counters so
// consumers can filter by verdict, sort by critical_failures_count, etc.
let verdict: string | null = null;
let critical_failures_count = 0;
let pseudocode_flags_count = 0;
let prd_mismatches_count = 0;
let missing_components_count = 0;
let verified_components_count = 0;
let risk_points_count = 0;
const isJsonShape = accepted.includes('"verdict"');
if (isJsonShape) {
const vm = accepted.match(/"verdict"\s*:\s*"([a-z_]+)"/i);
verdict = vm ? vm[1] : null;
// Count object entries per array by counting occurrences of
// either a unique-per-entry field name or {...} bracket pairs
// inside the array span. A straight "count opening braces inside
// the array range" is simplest and robust to field order.
const countArrayEntries = (arrayName: string): number => {
const re = new RegExp(`"${arrayName}"\\s*:\\s*\\[([\\s\\S]*?)\\]`, "i");
const m = accepted.match(re);
if (!m || !m[1].trim()) return 0;
// Count opening braces of direct-child objects.
let depth = 0, entries = 0;
for (const ch of m[1]) {
if (ch === '{') { if (depth === 0) entries++; depth++; }
else if (ch === '}') depth--;
}
return entries;
};
critical_failures_count = countArrayEntries("critical_failures");
pseudocode_flags_count = countArrayEntries("pseudocode_flags");
prd_mismatches_count = countArrayEntries("prd_mismatches");
missing_components_count = countArrayEntries("missing_components");
verified_components_count = countArrayEntries("verified_components");
risk_points_count = countArrayEntries("risk_points");
}
// Permission Gradient (Layer #6 from SYSTEM_EVOLUTION_LAYERS.md).
// Classify the overall finding set by confidence_avg:
// ≥90 auto-apply-safe, ≥70 dry-run + diff, ≥50 simulation only,
// <50 block (human review). Use conf_min as the tier-lower-bound
// so one shaky finding drags the whole row down to the safer tier.
const tierFor = (c: number | null): string => {
if (c === null) return "unknown";
if (c >= 90) return "auto";
if (c >= 70) return "dry_run";
if (c >= 50) return "simulation";
return "block";
};
const gradient_tier = tierFor(conf_min); // conservative: weakest finding decides
const gradient_tier_avg = tierFor(conf_avg);
const row = {
file: rel,
reviewed_at: new Date().toISOString(),
accepted_model: acceptedModel,
accepted_on_attempt: acceptedOn,
attempts_made: history.length,
tree_split_fired: treeSplitFired,
suggestions_preview: accepted.slice(0, 2000),
// Iter-3+ confidence fields.
confidences_per_finding: confidences,
confidence_avg: conf_avg,
confidence_min: conf_min,
findings_count: confidences.length,
// Layer #6 Permission Gradient — downstream consumers decide
// apply-semantics based on these fields instead of blindly trusting
// every suggestion.
gradient_tier,
gradient_tier_avg,
// Score (decimal-aware) and forensic JSON structured counters.
// iter 4+ fields (schema_version 4).
alignment_score,
output_format: isJsonShape ? "forensic_json" : "markdown",
verdict,
critical_failures_count,
pseudocode_flags_count,
prd_mismatches_count,
missing_components_count,
verified_components_count,
risk_points_count,
schema_version: 4,
scrum_master_reviewed: true,
};
// Layer #2 Model Trust Profiling — append one row per file-accept
// so over time we can compute per-(model, task_type) success/thin
// rates and trust scores. task_type here is inferred from the file
// path — good enough for initial stats, refine later.
try {
const taskType = rel.includes("/truth/") ? "truth"
: rel.includes("/gateway/") ? "gateway"
: rel.includes("/vectord") ? "vector"
: rel.includes("/ingestd") ? "ingest"
: rel.includes("/queryd") ? "query"
: rel.includes("/storaged") || rel.includes("/catalogd") || rel.includes("/journald") ? "storage"
: rel.includes("/aibridge") ? "aibridge"
: "other";
const trustRow = {
run_id: OUT_DIR.split("/").pop(),
reviewed_at: row.reviewed_at,
file: rel,
task_type: taskType,
accepted_model: acceptedModel,
accepted_on_attempt: acceptedOn,
attempts_made: history.length,
thin_rejections: history.filter(h => h.status === "thin").length,
errors: history.filter(h => h.status === "error").length,
confidence_avg: conf_avg,
tree_split_fired: treeSplitFired,
};
await appendFile("/home/profit/lakehouse/data/_kb/model_trust.jsonl",
JSON.stringify(trustRow) + "\n");
} catch (e) {
console.error(`[scrum] model_trust append failed: ${(e as Error).message}`);
}
try {
await appendFile(SCRUM_REVIEWS_JSONL, JSON.stringify(row) + "\n");
} catch (e) {
console.error(`[scrum] failed to append scrum_reviews.jsonl: ${(e as Error).message}`);
}
// Close the scrum → observer loop (fix 2026-04-24). Architecture
// audit surfaced: observer ring had 2000 ops, 1999 from Langfuse,
// zero from scrum. Observer's analyzeErrors + PLAYBOOK_BUILDER loops
// were blind to the very pipeline most likely to teach them. One
// fire-and-forget POST wires them in. Observer tolerates unreachable
// backends; no scrum run fails if observer is down.
//
// Schema matches observer's ObservedOp shape (source, staffer_id,
// sig_hash, event_kind, success, ...). file + accepted_model +
// confidence_avg + gradient_tier give downstream analyzers enough
// signal to correlate reviews with later regressions.
try {
const sigHash = createHash("sha256")
.update(`${rel}|${OUT_DIR.split("/").pop()}`)
.digest("hex")
.slice(0, 16);
fetch("http://localhost:3800/event", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
source: "scrum",
staffer_id: "scrum_master",
sig_hash: sigHash,
event_kind: "file_review",
success: true,
run_id: OUT_DIR.split("/").pop(),
file: rel,
accepted_model: acceptedModel,
accepted_on_attempt: acceptedOn,
attempts_made: history.length,
thin_rejections: history.filter(h => h.status === "thin").length,
confidence_avg: conf_avg,
confidence_min: conf_min,
findings_count: confidences.length,
gradient_tier,
tree_split_fired: treeSplitFired,
// iter4+ forensic-JSON fields so observer's analyzer can
// route by verdict / sort by critical_failures_count
alignment_score,
verdict,
output_format: isJsonShape ? "forensic_json" : "markdown",
critical_failures_count,
verified_components_count,
missing_components_count,
ts: row.reviewed_at,
}),
signal: AbortSignal.timeout(3000),
}).catch(() => {
// observer down — not a scrum-run failure, just lose the signal.
});
} catch (e) {
// Synchronous construction error — ignore.
}
// Route the accepted review through llm_team's fact extractor so
// its entities + relationships land in audit_facts.jsonl alongside
// inference-side extractions. Same index, two sources. Tagged
// source:"scrum_review" + scrum_master_reviewed:true so downstream
// queries can filter by provenance. Reviews shorter than 120
// chars are skipped — they're usually one-liners ("LGTM") with
// no extractable knowledge.
if (accepted.length >= 120 && process.env.LH_SCRUM_SKIP_EXTRACT !== "1") {
try {
const { extractFacts } = await import("../../auditor/fact_extractor.ts");
const ex = await extractFacts(accepted);
if (!ex.error || ex.entities.length + ex.facts.length > 0) {
const factRow = {
pr_number: 0, // scrum runs outside a PR scope
file: rel,
head_sha: "", // no SHA scope; scope is the file+timestamp
extracted_at: ex.extracted_at,
extractor: ex.extractor_model,
verifier: ex.verifier_model,
llm_team_run_id: ex.llm_team_run_id ?? null,
facts: ex.facts,
entities: ex.entities,
relationships: ex.relationships,
verification_preview: ex.verification.slice(0, 400),
schema_version: 2,
source: "scrum_review",
scrum_master_reviewed: true,
};
const AUDIT_FACTS_JSONL = "/home/profit/lakehouse/data/_kb/audit_facts.jsonl";
await appendFile(AUDIT_FACTS_JSONL, JSON.stringify(factRow) + "\n");
}
} catch (e) {
console.error(`[scrum] fact extraction failed for ${rel}: ${(e as Error).message}`);
}
}
}
return review;
}
async function loadAndChunk(path: string, origin_tag: string): Promise<Chunk[]> {
const text = await readFile(path, "utf8");
const raw = chunkText(text);
const embs = await embedBatch(raw.map(r => r.text));
return raw.map((r, i) => ({
id: createHash("sha256").update(r.text).digest("hex").slice(0, 10),
text: r.text,
embedding: embs[i],
origin: origin_tag,
offset: r.offset,
}));
}
async function main() {
await mkdir(OUT_DIR, { recursive: true });
log(`output: ${OUT_DIR}`);
log(`targets: ${TARGET_FILES.length} files`);
log("loading + embedding PRD...");
const prd_chunks = await loadAndChunk(PRD_PATH, "PRD");
log(` PRD: ${prd_chunks.length} chunks`);
log("loading + embedding cohesion plan...");
const plan_chunks = await loadAndChunk(PROPOSAL_PATH, "COHESION_PLAN");
log(` plan: ${plan_chunks.length} chunks`);
log("");
log("─── scrum master: walking target files ───");
const reviews: FileReview[] = [];
for (const f of TARGET_FILES) {
const review = await reviewFile(f, prd_chunks, plan_chunks);
reviews.push(review);
await writeFile(
`${OUT_DIR}/review_${review.file.replace(/\//g, "_")}.json`,
JSON.stringify(review, null, 2),
);
log(`${review.file}: ${review.accepted_on ? `accepted on ${review.accepted_on} by ${review.escalated_to_model}` : "UNRESOLVED"} (${review.duration_ms}ms)`);
}
// Consolidated scrum-master report
const report_md: string[] = [];
report_md.push(`# Scrum-master review\n`);
report_md.push(`Generated: ${new Date().toISOString()}`);
report_md.push(`Files reviewed: ${reviews.length}`);
report_md.push(`Total duration: ${(reviews.reduce((s, r) => s + r.duration_ms, 0) / 1000).toFixed(1)}s\n`);
for (const r of reviews) {
report_md.push(`\n## ${r.file}`);
report_md.push(`- **Accepted on attempt:** ${r.accepted_on ?? "NOT resolved after 6 attempts"}`);
report_md.push(`- **Escalated to:** \`${r.escalated_to_model || "—"}\``);
report_md.push(`- **Total attempts:** ${r.attempts_made}`);
if (r.attempts_history.length > 1) {
report_md.push(`- **Attempt history:**`);
for (const h of r.attempts_history) {
report_md.push(` - ${h.n}: \`${h.model}\`${h.status}${h.error ? ` (${h.error.slice(0, 100)})` : ""}`);
}
}
report_md.push(`\n### Suggestions\n\n${r.suggestions}\n`);
}
await writeFile(`${OUT_DIR}/scrum_report.md`, report_md.join("\n"));
const summary = {
ran_at: new Date().toISOString(),
target_count: TARGET_FILES.length,
resolved: reviews.filter(r => r.accepted_on !== null).length,
total_attempts: reviews.reduce((s, r) => s + r.attempts_made, 0),
total_duration_ms: reviews.reduce((s, r) => s + r.duration_ms, 0),
per_file: reviews.map(r => ({ file: r.file, accepted_on: r.accepted_on, model: r.escalated_to_model, attempts: r.attempts_made, ms: r.duration_ms })),
};
await writeFile(`${OUT_DIR}/summary.json`, JSON.stringify(summary, null, 2));
log("");
log("═══ SCRUM REPORT ═══");
log(` files: ${summary.target_count}, resolved: ${summary.resolved}, total attempts: ${summary.total_attempts}`);
log(` total time: ${(summary.total_duration_ms / 1000).toFixed(1)}s`);
log("");
for (const p of summary.per_file) {
const mark = p.accepted_on ? "✓" : "✗";
log(` ${mark} ${p.file.padEnd(60)} attempt ${p.accepted_on ?? "—"}/${p.attempts} ${p.model} ${p.ms}ms`);
}
log("");
log(`report: ${OUT_DIR}/scrum_report.md`);
process.exit(summary.resolved === summary.target_count ? 0 : 1);
}
main().catch(e => { console.error("[scrum] fatal:", e); process.exit(2); });