diff --git a/mcp-server/observer.ts b/mcp-server/observer.ts index a82389e..e92b244 100644 --- a/mcp-server/observer.ts +++ b/mcp-server/observer.ts @@ -193,8 +193,8 @@ async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise< if (r.ok) fingerprints = (await r.json() as any).fingerprints ?? []; } catch {} - // Step 2: matrix retrieval against the architectural corpus we - // already maintain. Cluster summary is the search query. + // Step 2: architectural matrix (lakehouse_arch_v1) — ADRs/PRD/plan + // intent. Cluster summary is the search query. const clusterSummary = cluster.slice(-5).map(o => `${o.endpoint ?? "?"} ${o.input_summary ?? ""} ${o.error ?? ""}` ).join(" | "); @@ -203,13 +203,28 @@ async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise< const r = await fetch(`${LAKEHOUSE}/vectors/search`, { method: "POST", headers: { "content-type": "application/json" }, - body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 5 }), + body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 4 }), signal: AbortSignal.timeout(5000), }); if (r.ok) matrixChunks = (await r.json() as any).results ?? []; } catch {} - if (fingerprints.length === 0 && matrixChunks.length === 0) return ""; + // Step 3: gold-standard prior answers (lakehouse_answers_v1) — past + // scrum reviews + observer escalations. This is where the BIG-model + // results we save live; future small-model handlers retrieve them + // here as scaffolding so the cheap rung gets near-paid quality. + let answerChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = []; + try { + const r = await fetch(`${LAKEHOUSE}/vectors/search`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ index_name: "lakehouse_answers_v1", query: `${taskClass} ${clusterSummary}`, top_k: 3 }), + signal: AbortSignal.timeout(5000), + }); + if (r.ok) answerChunks = (await r.json() as any).results ?? []; + } catch {} + + if (fingerprints.length === 0 && matrixChunks.length === 0 && answerChunks.length === 0) return ""; // Step 3: synthesis via local model (qwen3.5:latest, provider=ollama). // Compresses the raw bundle to a 1-2 sentence briefing the cloud @@ -226,6 +241,11 @@ async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise< `${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 200)}` ).join("\n") : "", + answerChunks.length > 0 + ? "PRIOR GOLD-STANDARD ANSWERS (similar past reviews + escalations):\n" + answerChunks.map((c, i) => + `${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 240)}` + ).join("\n") + : "", ].filter(Boolean).join("\n\n"); const synthPrompt = `A failure cluster (sig_hash=${sigHash.slice(0, 8)}, ${cluster.length} occurrences, task_class=${taskClass}) is about to be escalated for diagnosis. Here are prior signals from our knowledge base: diff --git a/scripts/build_answers_corpus.ts b/scripts/build_answers_corpus.ts new file mode 100644 index 0000000..c5f4321 --- /dev/null +++ b/scripts/build_answers_corpus.ts @@ -0,0 +1,138 @@ +#!/usr/bin/env bun +/** + * Build `lakehouse_answers_v1` — the gold-standard answer corpus. + * + * Sources: + * - data/_kb/scrum_reviews.jsonl (per-file scrum reviews) + * - data/_kb/observer_escalations.jsonl (deepseek-v3.1-terminus diagnoses) + * + * doc_id prefixes distinguish origin so consumers can same-file-gate + * (only return `review:` chunks when the focus file matches) or + * broaden (always include `escalation:` for general task-class signal). + * + * Replaces scrum_findings_v1 — broader scope, cleaner gating story. + * + * Re-run after every scrum run / observer escalation lands. The + * pipeline's epilogue calls this; manual runs work too. + */ + +import { readFileSync, existsSync } from "node:fs"; +import { resolve } from "node:path"; + +const ROOT = resolve(import.meta.dir, ".."); +const GATEWAY = process.env.LH_GATEWAY ?? "http://localhost:3100"; +const INDEX_NAME = process.env.LH_CORPUS_NAME ?? "lakehouse_answers_v1"; +const SOURCE_LABEL = "lakehouse_answers"; +const CHUNK_SIZE = Number(process.env.LH_CHUNK_SIZE ?? 1500); +const OVERLAP = Number(process.env.LH_OVERLAP ?? 150); +const MIN_BYTES = 200; // skip stub rows + +interface Doc { id: string; text: string } + +function slugFile(path: string): string { + return path.replace(/^crates\//, "").replace(/[^a-z0-9]+/gi, "_").slice(0, 40); +} + +function compactTs(iso: string): string { + return iso.replace(/[-:T]/g, "").slice(0, 14); +} + +function buildScrumReviewDocs(): Doc[] { + const path = resolve(ROOT, "data/_kb/scrum_reviews.jsonl"); + if (!existsSync(path)) return []; + const lines = readFileSync(path, "utf8").split("\n").filter(Boolean); + const docs: Doc[] = []; + const idCounts = new Map(); + + for (const line of lines) { + let row: any; + try { row = JSON.parse(line); } catch { continue; } + const file = row.file ?? ""; + const preview = row.suggestions_preview ?? ""; + if (!file || preview.length < MIN_BYTES) continue; + + const ts = compactTs(row.reviewed_at ?? ""); + const baseId = `review:${slugFile(file)}:${ts || "no_ts"}`; + const count = (idCounts.get(baseId) ?? 0) + 1; + idCounts.set(baseId, count); + const id = count === 1 ? baseId : `${baseId}_${count}`; + + const header = `File: ${file}\nReviewed: ${row.reviewed_at ?? "?"}\nModel: ${row.accepted_model ?? "?"}\nVerdict: ${row.verdict ?? "?"}\n\n`; + docs.push({ id, text: header + preview }); + } + return docs; +} + +function buildEscalationDocs(): Doc[] { + const path = resolve(ROOT, "data/_kb/observer_escalations.jsonl"); + if (!existsSync(path)) return []; + const lines = readFileSync(path, "utf8").split("\n").filter(Boolean); + const docs: Doc[] = []; + const idCounts = new Map(); + + for (const line of lines) { + let row: any; + try { row = JSON.parse(line); } catch { continue; } + const analysis = row.analysis ?? ""; + if (analysis.length < MIN_BYTES) continue; + + const ts = compactTs(row.ts ?? ""); + const sigSlug = String(row.sig_hash ?? "no_sig").slice(0, 12); + const baseId = `escalation:${sigSlug}:${ts || "no_ts"}`; + const count = (idCounts.get(baseId) ?? 0) + 1; + idCounts.set(baseId, count); + const id = count === 1 ? baseId : `${baseId}_${count}`; + + const header = `Failure cluster · sig_hash=${row.sig_hash ?? "?"} · cluster_size=${row.cluster_size ?? "?"} · endpoint=${row.cluster_endpoint ?? "?"}\nDiagnosed by: ${row.mode ?? "?"}\nWhen: ${row.ts ?? "?"}\n\n`; + docs.push({ id, text: header + analysis }); + } + return docs; +} + +async function main() { + const dryRun = process.argv.includes("--dry-run") || process.argv.includes("--print"); + const printOnly = process.argv.includes("--print"); + + const reviews = buildScrumReviewDocs(); + const escalations = buildEscalationDocs(); + const docs = [...reviews, ...escalations]; + const totalBytes = docs.reduce((s, d) => s + d.text.length, 0); + + console.log(`[answers] ${docs.length} docs · ${totalBytes} bytes`); + console.log(`[answers] reviews: ${reviews.length}`); + console.log(`[answers] escalations: ${escalations.length}`); + + if (printOnly) { + docs.slice(0, 2).forEach(d => console.log(` ${d.id} (${d.text.length}b) ${d.text.slice(0, 100).replace(/\n/g, " ")}…`)); + if (escalations.length > 0) { + console.log(` ... (last escalation): ${escalations[escalations.length - 1].id}`); + } + return; + } + if (dryRun) return; + if (docs.length === 0) { + console.log("[answers] no docs to index — skipping POST"); + return; + } + + const r = await fetch(`${GATEWAY}/vectors/index`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + index_name: INDEX_NAME, + source: SOURCE_LABEL, + documents: docs, + chunk_size: CHUNK_SIZE, + overlap: OVERLAP, + }), + signal: AbortSignal.timeout(60_000), + }); + if (!r.ok) { + console.error(`[answers] HTTP ${r.status}: ${await r.text()}`); + process.exit(1); + } + const j: any = await r.json(); + console.log(`[answers] job ${j.job_id} · ${j.documents} docs → ${j.chunks} chunks queued`); +} + +main().catch(e => { console.error(e); process.exit(1); }); diff --git a/tests/real-world/scrum_master_pipeline.ts b/tests/real-world/scrum_master_pipeline.ts index 9d77e97..820367f 100644 --- a/tests/real-world/scrum_master_pipeline.ts +++ b/tests/real-world/scrum_master_pipeline.ts @@ -2208,6 +2208,25 @@ async function main() { log(""); log(`report: ${OUT_DIR}/scrum_report.md`); + // Auto-rebuild lakehouse_answers_v1 so this run's reviews are + // retrievable by future scrum/observer enrichment paths within ~30s. + // Best-effort, fire-and-forget — don't fail the pipeline on rebuild + // hiccups. Toggle off via LH_SCRUM_SKIP_ANSWERS_REBUILD=1. + if (process.env.LH_SCRUM_SKIP_ANSWERS_REBUILD !== "1") { + try { + const { spawn } = await import("node:child_process"); + const child = spawn("bun", ["run", "scripts/build_answers_corpus.ts"], { + cwd: process.cwd(), + stdio: "inherit", + detached: true, + }); + child.unref(); + log("answers corpus rebuild dispatched (bun run scripts/build_answers_corpus.ts)"); + } catch (e) { + log(`answers corpus rebuild skipped: ${(e as Error).message}`); + } + } + process.exit(summary.resolved === summary.target_count ? 0 : 1); }