From d9bd4c9bdf349a31993cbf649333d5a931277512 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 26 Apr 2026 18:36:19 -0500 Subject: [PATCH] observer: KB enrichment preamble before failure-cluster escalation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit escalateFailureClusterToLLMTeam now calls a new buildKbPreamble() that mirrors what scrum_master_pipeline does on every per-file review: queries /vectors/pathway/bug_fingerprints + /vectors/search against the lakehouse_arch_v1 corpus, then asks local qwen3.5:latest (provider=ollama) to synthesize a tight briefing. The synthesized preamble prepends the existing escalation prompt so the cloud reviewer sees historical context the same way scrum reviewers do. Reuses existing KB primitives — no new corpora, no new endpoints, no new abstractions. Same code path scrum already exercises 3+ times per review; observer joins the same compounding loop. Audit row gains kb_preamble_chars so we can later track enrichment yield per escalation. Empty preamble (both fingerprints + matrix return nothing) → empty string, prompt unchanged. Verified: qwen3.5:latest synthesis fires for every escalation with non-empty matrix hits (gateway log: 445→72 tokens, 3.1s). Matrix retrieval correctly surfaces PRD Phase 40/44 chunks for chat_completion clusters. Pathway memory stays consistent with scrum (84→87 traces); chat_completion task_class doesn't have fingerprints yet — graceful. Local-model synthesis was J's explicit ask: compress the raw bundle before the cloud call so the briefing is actionable, not a dump. Co-Authored-By: Claude Opus 4.7 (1M context) --- mcp-server/observer.ts | 103 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/mcp-server/observer.ts b/mcp-server/observer.ts index 4df134f..722b20d 100644 --- a/mcp-server/observer.ts +++ b/mcp-server/observer.ts @@ -160,6 +160,105 @@ const LLM_TEAM = process.env.LH_LLM_TEAM_URL ?? "http://localhost:5000"; const LLM_TEAM_ESCALATIONS = "/home/profit/lakehouse/data/_kb/observer_escalations.jsonl"; const ESCALATION_THRESHOLD = 3; // N+ failures on same sig_hash triggers +// ─── KB enrichment helper (2026-04-26) ──────────────────────────── +// Mirrors what scrum_master_pipeline already does on every per-file +// review: queries pathway_memory bug fingerprints + the lakehouse_arch +// matrix corpus, then asks qwen3.5:latest to synthesize a tight +// briefing. We reuse the same primitives so observer escalations carry +// the same compounding context the scrum loop builds — no new index +// surfaces, no new corpora. +// +// `task_class` is derived from the cluster (most ops use the same one); +// pathway/bug_fingerprints is permissive about a null file_path, so +// non-code clusters (scenario fills, v1.chat events) just see broader +// matches via task_class alone. +// +// Returns "" when there's no useful signal — caller treats empty as +// "no preamble" and skips the prepend. +async function buildKbPreamble(sigHash: string, cluster: ObservedOp[]): Promise { + const sample = cluster[0]; + const taskClass = sample?.event_kind + ?? (sample?.source === "scenario" ? "scenario_fill" : "observer_escalation"); + + // Step 1: pathway bug fingerprints. Best-effort; null filePath just + // widens the query at the matrix-index level. + let fingerprints: { flag: { kind: string }; pattern_key: string; example: string; occurrences: number }[] = []; + try { + const r = await fetch(`${LAKEHOUSE}/vectors/pathway/bug_fingerprints`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ task_class: taskClass, file_path: null, signal_class: null, limit: 5 }), + signal: AbortSignal.timeout(5000), + }); + if (r.ok) fingerprints = (await r.json() as any).fingerprints ?? []; + } catch {} + + // Step 2: matrix retrieval against the architectural corpus we + // already maintain. Cluster summary is the search query. + const clusterSummary = cluster.slice(-5).map(o => + `${o.endpoint ?? "?"} ${o.input_summary ?? ""} ${o.error ?? ""}` + ).join(" | "); + let matrixChunks: { doc_id?: string; chunk_text?: string; score?: number }[] = []; + try { + const r = await fetch(`${LAKEHOUSE}/vectors/search`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ index_name: "lakehouse_arch_v1", query: `${taskClass} ${clusterSummary}`, top_k: 5 }), + signal: AbortSignal.timeout(5000), + }); + if (r.ok) matrixChunks = (await r.json() as any).results ?? []; + } catch {} + + if (fingerprints.length === 0 && matrixChunks.length === 0) return ""; + + // Step 3: synthesis via local model (qwen3.5:latest, provider=ollama). + // Compresses the raw bundle to a 1-2 sentence briefing the cloud + // reviewer can actually use. If local model is down/slow, fall back + // to the raw dump rather than blocking the escalation path. + const rawBundle = [ + fingerprints.length > 0 + ? "PRIOR BUG PATTERNS (pathway memory):\n" + fingerprints.map((fp, i) => + `${i + 1}. [${fp.flag.kind}] ${fp.pattern_key} (×${fp.occurrences}) e.g. ${fp.example.slice(0, 120)}` + ).join("\n") + : "", + matrixChunks.length > 0 + ? "RELATED ARCHITECTURE CONTEXT:\n" + matrixChunks.map((c, i) => + `${i + 1}. [${c.doc_id ?? "?"}] ${(c.chunk_text ?? "").slice(0, 200)}` + ).join("\n") + : "", + ].filter(Boolean).join("\n\n"); + + const synthPrompt = `A failure cluster (sig_hash=${sigHash.slice(0, 8)}, ${cluster.length} occurrences, task_class=${taskClass}) is about to be escalated for diagnosis. Here are prior signals from our knowledge base: + +${rawBundle} + +Output a single paragraph (≤300 chars) briefing the cloud reviewer on which prior signals are most likely relevant to this cluster. If nothing matches, say so plainly. No preamble, no markdown.`; + + let synthesized = ""; + try { + const r = await fetch(`${LAKEHOUSE}/v1/chat`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + provider: "ollama", + model: "qwen3.5:latest", + messages: [{ role: "user", content: synthPrompt }], + max_tokens: 200, + temperature: 0.1, + think: false, + }), + signal: AbortSignal.timeout(15000), + }); + if (r.ok) { + const j = await r.json() as any; + synthesized = (j?.choices?.[0]?.message?.content ?? "").trim(); + } + } catch {} + + const body = synthesized.length > 0 ? synthesized : rawBundle; + return `═══ KB CONTEXT — prior signals on this task class (synthesized by qwen3.5:latest) ═══\n${body}\n═══\n\n`; +} + async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: ObservedOp[]) { // Package the failure cluster as a single context blob. Originally // I routed this to LLM Team's `code_review` mode at /api/run, but @@ -172,7 +271,8 @@ async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: Observe const context = cluster.slice(-8).map((o, i) => `[${i + 1}] endpoint=${o.endpoint} input=${o.input_summary} error=${o.error ?? "?"}` ).join("\n"); - const prompt = `sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`; + const kbPreamble = await buildKbPreamble(sigHash, cluster); + const prompt = `${kbPreamble}sig_hash=${sigHash} · ${cluster.length} failures on the same signature:\n\n${context}\n\nReview this failure cluster. Identify:\n1. Likely root cause (single sentence).\n2. Files most likely responsible (path hints).\n3. Concrete fix direction (under 3 sentences).\n4. Confidence: NN%\n\nBe specific, not generic.`; try { const resp = await fetch(`${LAKEHOUSE}/v1/chat`, { @@ -209,6 +309,7 @@ async function escalateFailureClusterToLLMTeam(sigHash: string, cluster: Observe cluster_endpoint: cluster[0]?.endpoint, prompt_tokens: j?.usage?.prompt_tokens ?? 0, completion_tokens: j?.usage?.completion_tokens ?? 0, + kb_preamble_chars: kbPreamble.length, analysis: analysis.slice(0, 4000), }; const { appendFile } = await import("node:fs/promises");