diff --git a/mcp-server/observer.ts b/mcp-server/observer.ts index b1d0fa2..17e23d8 100644 --- a/mcp-server/observer.ts +++ b/mcp-server/observer.ts @@ -224,6 +224,165 @@ async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: Observe // persists across cycles. const escalatedSigHashes = new Set(); +// ─── Hand-review for scrum/agent candidate responses (2026-04-25) ─── +// +// Observer is OUTSIDE the scrum loop's epistemic scope, so its verdict +// can be treated as truth about whether a candidate review is grounded. +// Two-tier evaluator: +// 1. Try cloud LLM (qwen3-coder:480b) — semantic judgment with +// response + source excerpt + grounding stats as context. +// 2. On cloud failure (throttle/timeout) → deterministic heuristic +// over grounding_pct + total_quotes. Marked source: "heuristic" +// so consumers can tell which rung produced the verdict. +// Every verdict is persisted to data/_kb/observer_reviews.jsonl. + +const OBSERVER_REVIEWS = "/home/profit/lakehouse/data/_kb/observer_reviews.jsonl"; + +interface HandReviewInput { + file_path: string; + model: string; + response: string; + source_content: string; + grounding_stats: { total: number; grounded: number; groundedPct: number | null }; + attempt: number; +} + +interface HandReviewVerdict { + verdict: "accept" | "reject" | "cycle"; + confidence: number; + notes: string; + source: "cloud" | "heuristic"; +} + +async function handReview(input: HandReviewInput): Promise { + const t0 = Date.now(); + let verdict: HandReviewVerdict; + + try { + verdict = await cloudHandReview(input); + } catch (e) { + console.error(`[observer/review] cloud failed (${(e as Error).message}); using heuristic`); + verdict = heuristicHandReview(input); + } + + // Persist regardless of source so we can later compare cloud vs + // heuristic verdicts on the same input and tune the heuristic. + const row = { + ts: new Date().toISOString(), + file_path: input.file_path, + model: input.model, + attempt: input.attempt, + response_chars: input.response.length, + grounding_stats: input.grounding_stats, + verdict: verdict.verdict, + confidence: verdict.confidence, + notes: verdict.notes, + source: verdict.source, + duration_ms: Date.now() - t0, + }; + try { + const { appendFile } = await import("node:fs/promises"); + await appendFile(OBSERVER_REVIEWS, JSON.stringify(row) + "\n"); + } catch { /* best-effort persistence */ } + + return verdict; +} + +async function cloudHandReview(input: HandReviewInput): Promise { + const grounded = input.grounding_stats.grounded; + const total = input.grounding_stats.total; + const pct = input.grounding_stats.groundedPct; + // Truncate to keep the prompt under typical context windows. + // 2000 + 4000 = ~6000 chars ≈ 1500 tokens, plus response context. + const responseExcerpt = input.response.slice(0, 2000); + const sourceExcerpt = input.source_content.slice(0, 4000); + + const prompt = `You are a code-review quality observer. Decide whether the following automated review is grounded in the actual source — not invented, not hallucinated. + +FILE: ${input.file_path} +MODEL: ${input.model} +ATTEMPT: ${input.attempt} +ANCHOR GROUNDING: ${grounded}/${total} backtick-quoted snippets matched the source verbatim${pct !== null ? ` (${pct}%)` : ""} + +REVIEW (first 2000 chars): +\`\`\` +${responseExcerpt} +\`\`\` + +SOURCE EXCERPT (first 4000 chars): +\`\`\` +${sourceExcerpt} +\`\`\` + +Respond ONLY with a JSON object: +{ + "verdict": "accept" | "reject" | "cycle", + "confidence": 0-100, + "notes": "<1-2 sentences on what makes this grounded or hallucinated>" +} + +- accept: review references real symbols/lines in source; findings could be acted on. +- reject: review invents APIs, fabricates calls, contradicts source. Do NOT record. +- cycle: review is mediocre — partially grounded but wrong shape, try a stronger model.`; + + const resp = await fetch(`${LAKEHOUSE}/v1/chat`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + provider: "ollama_cloud", + model: "qwen3-coder:480b", + messages: [{ role: "user", content: prompt }], + max_tokens: 300, + temperature: 0.0, + }), + signal: AbortSignal.timeout(45000), + }); + if (!resp.ok) { + throw new Error(`/v1/chat ${resp.status}: ${(await resp.text()).slice(0, 200)}`); + } + const j: any = await resp.json(); + const content = (j?.choices?.[0]?.message?.content ?? "").trim(); + // Pull JSON object from the response — model may wrap it in prose. + const m = content.match(/\{[\s\S]*\}/); + if (!m) throw new Error(`no JSON object in response: ${content.slice(0, 100)}`); + const parsed = JSON.parse(m[0]); + const v = String(parsed.verdict ?? "accept").toLowerCase(); + return { + verdict: (v === "reject" || v === "cycle") ? v as "reject" | "cycle" : "accept", + confidence: Number(parsed.confidence ?? 50), + notes: String(parsed.notes ?? "").slice(0, 500), + source: "cloud", + }; +} + +function heuristicHandReview(input: HandReviewInput): HandReviewVerdict { + // Deterministic fallback when cloud is throttled. Conservative: + // only flip to reject when the evidence is overwhelming, otherwise + // accept (fall-open principle — observer is policy, not blocker). + const total = input.grounding_stats.total; + const pct = input.grounding_stats.groundedPct; + const respLen = input.response.length; + + // Too short to be a real review + if (respLen < 1500) { + return { verdict: "reject", confidence: 80, notes: `response too short (${respLen} chars)`, source: "heuristic" }; + } + // Below 5 quotes — not enough signal to judge grounding; accept + if (total < 5 || pct === null) { + return { verdict: "accept", confidence: 50, notes: `insufficient quote signal (${total} quotes); accepting`, source: "heuristic" }; + } + // Very heavy hallucination + if (pct < 20) { + return { verdict: "reject", confidence: 85, notes: `low grounding (${pct}% of ${total} quotes)`, source: "heuristic" }; + } + // Mediocre — cycle to a stronger model + if (pct < 50) { + return { verdict: "cycle", confidence: 65, notes: `mediocre grounding (${pct}% of ${total} quotes); try stronger`, source: "heuristic" }; + } + // Good enough + return { verdict: "accept", confidence: 75, notes: `grounding ${pct}% of ${total} quotes`, source: "heuristic" }; +} + async function maybeEscalate(failures: ObservedOp[]) { // Group failures by sig_hash const bySig = new Map(); @@ -362,6 +521,28 @@ function startHttpListener() { .map(o => ({ ts: o.timestamp, ok: o.success, staffer: o.staffer_id, kind: o.event_kind, role: o.role })), })); } + // ─── Hand-review endpoint (2026-04-25) ─── + // scrum/agent posts a candidate response + source content + grounding + // stats. Observer evaluates via cloud LLM (qwen3-coder:480b) with + // semantic context and returns {verdict, confidence, notes}. On + // cloud throttle, falls back to a deterministic heuristic over the + // grounding stats so the loop keeps moving with honest signal. + // + // This is the policy layer scrum was missing — pre-2026-04-25 the + // scrum_master applied a hardcoded grounding-rate threshold inline, + // which baked judgment into the wrong layer. Now scrum reports data + // (response + source + stats) and observer decides accept/reject/cycle. + if (req.method === "POST" && url.pathname === "/review") { + return req.json().then((body: any) => handReview(body)) + .then((verdict) => new Response(JSON.stringify(verdict), { + headers: { "content-type": "application/json" }, + })) + .catch((e: Error) => + new Response(JSON.stringify({ verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }), { + status: 200, // fall-open shape — scrum keeps moving on observer failure + headers: { "content-type": "application/json" }, + })); + } if (req.method === "POST" && url.pathname === "/event") { return req.json().then((body: any) => { const op: ObservedOp = { diff --git a/tests/real-world/scrum_master_pipeline.ts b/tests/real-world/scrum_master_pipeline.ts index e1fe3d3..aabddcd 100644 --- a/tests/real-world/scrum_master_pipeline.ts +++ b/tests/real-world/scrum_master_pipeline.ts @@ -301,6 +301,52 @@ async function recordPathwayReplay(pathwayId: string, succeeded: boolean): Promi } } +// Observer hand-review — the policy layer that decides whether a +// candidate response is grounded enough to accept. Lives in mcp-server's +// observer (port 3800) so it sits OUTSIDE the scrum loop's epistemic +// scope. Synchronous so scrum can act on the verdict immediately. +// +// Returns {verdict, confidence, notes}. verdict ∈ {accept, reject, cycle}. +// On unreachable observer (network, timeout, parse failure), falls open +// to {verdict: "accept"} — the observer is the policy layer, not a hard +// dependency. Pipeline keeps moving when the observer is down. +const OBSERVER_URL = process.env.LH_OBSERVER_URL ?? "http://localhost:3800"; + +interface ObserverVerdict { + verdict: "accept" | "reject" | "cycle"; + confidence?: number; + notes?: string; + source?: "cloud" | "heuristic"; +} + +async function observerHandReview(input: { + file_path: string; + model: string; + response: string; + source_content: string; + grounding_stats: { total: number; grounded: number; groundedPct: number | null }; + attempt: number; +}): Promise { + try { + const r = await fetch(`${OBSERVER_URL}/review`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(input), + signal: AbortSignal.timeout(60000), + }); + if (!r.ok) { + // Observer down or rejected the request — fall open to accept so + // the loop keeps moving. Log so we notice degradation. + console.error(`[scrum] observer review unreachable (${r.status}), falling open to accept`); + return { verdict: "accept", notes: `observer ${r.status}`, source: "heuristic" }; + } + return (await r.json()) as ObserverVerdict; + } catch (e: any) { + console.error(`[scrum] observer review failed (${e.message}), falling open to accept`); + return { verdict: "accept", notes: `observer error: ${e.message}`, source: "heuristic" }; + } +} + // ADR-021 Phase C: pre-review enrichment. Fetch aggregated bug // fingerprints for this narrow fingerprint (same key as hot-swap — // task_class + file_prefix + signal_class) so the reviewer prompt @@ -1057,22 +1103,41 @@ Respond with markdown. Be specific, not generic. Cite file-region + PRD-chunk-of log(` ✗ thin/unstructured (${r.content.length} chars)`); continue; } + // Compute grounding stats as DATA — feed to observer for hand-review. + // We no longer gate locally on a hardcoded threshold; that judgment + // belongs to the observer (which has Langfuse traces + can call cloud + // models for semantic review). Local stats are still informational + // and get appended as a footer for humans. + const groundingStats = verifyAnchorGrounding(r.content, content); + + // Observer hand-review — synchronous call to mcp-server :3800. Observer + // returns {verdict: accept|reject|cycle, confidence, notes}. If the + // observer is unreachable or errors, fall through to acceptance (the + // observer is the policy layer; pipeline keeps moving when it's down). + const obsVerdict = await observerHandReview({ + file_path: rel, + model: `${rung.provider}/${rung.model}`, + response: r.content, + source_content: content, + grounding_stats: groundingStats, + attempt: n, + }); + if (obsVerdict.verdict === "reject" || obsVerdict.verdict === "cycle") { + const reason = `observer ${obsVerdict.verdict}: ${obsVerdict.notes ?? "no notes"} (conf=${obsVerdict.confidence ?? "?"})`; + history.push({ n, model: rung.model, status: "thin", chars: r.content.length, error: reason }); + pathwayAttempts.push({ rung: i + 1, model: rung.model, latency_ms: attemptMs, accepted: false, reject_reason: reason }); + log(` ✗ ${reason} — cycling ladder`); + continue; + } history.push({ n, model: rung.model, status: "accepted", chars: r.content.length }); pathwayAttempts.push({ rung: i + 1, model: rung.model, latency_ms: attemptMs, accepted: true, reject_reason: null }); accepted = r.content; acceptedModel = `${rung.provider}/${rung.model}`; acceptedOn = n; - // Post-acceptance: when tree-split fired, run the anchor-grounding - // verifier and append a footer with the grounding rate. The footer - // surfaces ungrounded quotes so humans can spot hallucinated - // findings at a glance — prevents the 0/10 confabulation pattern - // observed on llm_team_ui.py 2026-04-24. - if (treeSplitFired) { - const stats = verifyAnchorGrounding(accepted, content); - log(` ⚓ anchor grounding: ${stats.grounded}/${stats.total} quotes matched source` + - (stats.groundedPct !== null ? ` (${stats.groundedPct}%)` : "")); - accepted = appendGroundingFooter(accepted, stats); - } + log(` ⚓ anchor grounding: ${groundingStats.grounded}/${groundingStats.total} quotes matched source` + + (groundingStats.groundedPct !== null ? ` (${groundingStats.groundedPct}%)` : "") + + ` · observer ${obsVerdict.verdict}` + (obsVerdict.confidence ? ` (conf=${obsVerdict.confidence})` : "")); + accepted = appendGroundingFooter(accepted, groundingStats); log(` ✓ ACCEPTED on attempt ${n} (${rung.model}, ${r.content.length} chars)`); break; }