// langfuse_bridge — the missing piece called out in project_lost_stack.md // and Phase 40 PRD. Polls Langfuse `/api/public/traces` at interval, // forwards every completed trace to observer `:3800/event` with // `source: "langfuse"`. Observer's existing ring buffer + analyzer // pick it up, so the KB learns from cost/latency/provider deltas per // model — not just from scenario outcomes. // // Loopback: observer persistOp() appends to data/_observer/ops.jsonl // and its aggregator produces pathway_recommendations.jsonl. This // bridge closes the feedback loop between LLM call metadata and the // playbook/KB learning surface. // // State persistence: last-seen trace timestamp written to a JSON file // so restarts don't double-emit. Bounded forward window (50/tick) so // first-run catch-up doesn't hammer the observer. const LANGFUSE_URL = process.env.LANGFUSE_URL ?? "http://localhost:3000"; const LANGFUSE_PUBLIC = process.env.LANGFUSE_PUBLIC_KEY; const LANGFUSE_SECRET = process.env.LANGFUSE_SECRET_KEY; const OBSERVER_URL = process.env.OBSERVER_URL ?? "http://localhost:3800"; const POLL_INTERVAL_MS = Number(process.env.LANGFUSE_POLL_MS ?? 30000); const BATCH_LIMIT = Number(process.env.LANGFUSE_BATCH_LIMIT ?? 50); const STATE_FILE = process.env.LANGFUSE_STATE_FILE ?? "/var/lib/lakehouse-guard/langfuse_last_seen.json"; interface LangfuseTrace { id: string; name?: string; timestamp: string; input?: any; output?: any; latency?: number; // seconds, per Langfuse API totalCost?: number; usage?: { input?: number; output?: number; total?: number }; metadata?: any; } interface State { last_seen_ts?: string } function basicAuth(): string { return "Basic " + btoa(`${LANGFUSE_PUBLIC}:${LANGFUSE_SECRET}`); } async function loadState(): Promise { try { const f = Bun.file(STATE_FILE); if (!(await f.exists())) return {}; return JSON.parse(await f.text()) as State; } catch (e) { console.warn(`[langfuse-bridge] state load failed: ${e}`); return {}; } } async function saveState(s: State): Promise { try { await Bun.write(STATE_FILE, JSON.stringify(s)); } catch (e) { console.warn(`[langfuse-bridge] state save failed: ${e}`); } } async function fetchTracesSince(cursor?: string): Promise { const url = new URL("/api/public/traces", LANGFUSE_URL); url.searchParams.set("limit", String(BATCH_LIMIT)); url.searchParams.set("orderBy", "timestamp.asc"); if (cursor) url.searchParams.set("fromTimestamp", cursor); const resp = await fetch(url, { headers: { authorization: basicAuth() }, signal: AbortSignal.timeout(10_000), }); if (!resp.ok) { throw new Error(`langfuse ${resp.status}: ${(await resp.text()).slice(0, 200)}`); } const body: any = await resp.json(); return (body.data ?? []) as LangfuseTrace[]; } // Shape one Langfuse trace into the ObservedOp the observer expects // (see mcp-server/observer.ts:29). `source: "langfuse"` is the // provenance flag so the analyzer can weight traces differently from // scenario-sourced events. function toObservedOp(t: LangfuseTrace): Record { const endpoint = t.metadata?.provider ?? t.metadata?.model ?? t.name ?? "langfuse.trace"; const inputSummary = typeof t.input === "string" ? t.input.slice(0, 200) : JSON.stringify(t.input ?? {}).slice(0, 200); const outputSummary = typeof t.output === "string" ? t.output.slice(0, 200) : JSON.stringify(t.output ?? {}).slice(0, 200); return { timestamp: t.timestamp, endpoint: `langfuse:${endpoint}`, input_summary: inputSummary, success: !t.metadata?.error, duration_ms: Math.round((t.latency ?? 0) * 1000), output_summary: outputSummary, source: "langfuse", sig_hash: t.metadata?.sig_hash, event_kind: t.metadata?.task_class, // Extra fields the observer doesn't schema but the KB aggregator // can still pick up via JSON passthrough. model: t.metadata?.model, provider: t.metadata?.provider, prompt_tokens: t.usage?.input, completion_tokens: t.usage?.output, total_tokens: t.usage?.total, total_cost: t.totalCost, }; } async function forwardToObserver(op: Record): Promise { const resp = await fetch(`${OBSERVER_URL}/event`, { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify(op), signal: AbortSignal.timeout(5_000), }); if (!resp.ok) { throw new Error(`observer ${resp.status}: ${(await resp.text()).slice(0, 200)}`); } } async function tick(): Promise { const state = await loadState(); let traces: LangfuseTrace[]; try { traces = await fetchTracesSince(state.last_seen_ts); } catch (e) { console.warn(`[langfuse-bridge] fetch failed: ${e}`); return; } if (traces.length === 0) { console.log(`[langfuse-bridge] no new traces since ${state.last_seen_ts ?? "start"}`); return; } let last = state.last_seen_ts ?? ""; let forwarded = 0; for (const t of traces) { try { await forwardToObserver(toObservedOp(t)); forwarded++; if (t.timestamp > last) last = t.timestamp; } catch (e) { console.warn(`[langfuse-bridge] forward ${t.id} failed: ${e}`); // Don't advance cursor on forward failure — retry next tick. break; } } if (last) await saveState({ last_seen_ts: last }); console.log( `[langfuse-bridge] forwarded ${forwarded}/${traces.length}, last_seen=${last}`, ); } async function main(): Promise { if (!LANGFUSE_PUBLIC || !LANGFUSE_SECRET) { console.error("LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY required"); process.exit(1); } console.log( `[langfuse-bridge] polling ${LANGFUSE_URL} every ${POLL_INTERVAL_MS}ms → ${OBSERVER_URL}/event`, ); await tick(); setInterval(tick, POLL_INTERVAL_MS); } main().catch(e => { console.error(`[langfuse-bridge] fatal: ${e}`); process.exit(1); });