From 75a0f424ef48613a30fb479df7f99b9b625d9f73 Mon Sep 17 00:00:00 2001 From: profit Date: Wed, 22 Apr 2026 03:04:28 -0500 Subject: [PATCH] =?UTF-8?q?Phase=2040=20(early):=20Langfuse=20tracing=20on?= =?UTF-8?q?=20/v1/chat=20=E2=80=94=20observability=20recovery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The lost stack J flagged was partly already present: Langfuse container has been running 2 days with the staffing project, SDK installed, mcp-server tracing gw:/* routes. What was missing was Rust-side /v1/chat emission — the new Phase 38/39 code bypassed Langfuse entirely. This commit bridges it. Fire-and-forget HTTP POST to http://localhost:3001/api/public/ingestion (batch {trace-create + generation-create}) on every chat call. Non-blocking — spawned tokio task, response latency unaffected. Trace failures log warn and drop, never propagate. Verified end-to-end after restart: - Log line "v1: Langfuse tracing enabled" at startup - /v1/chat local (qwen3.5:latest) → v1.chat:ollama trace appears with lat=0.41s, 24+6 tokens - /v1/chat cloud (gpt-oss:120b) → v1.chat:ollama_cloud trace appears with lat=1.87s, 73+87 tokens - mcp-server's existing gw:/log + gw:/intelligence/* traces continue to flow into the same project unchanged Files: - crates/gateway/src/v1/langfuse_trace.rs (new, 195 LOC) — thin client, no SDK. reqwest Basic Auth. ChatTrace payload + event serializer. from_env_or_defaults() resolver matches mcp-server/tracing.ts conventions (pk-lf-staffing / sk-lf- staffing-secret / localhost:3001) - crates/gateway/src/v1/mod.rs — V1State.langfuse field, emission after successful provider call (post-dispatch, pre-usage-update) - crates/gateway/src/main.rs — resolve + log at startup Tests: 12/12 green (9 prior + 3 for langfuse_trace: ingestion-batch serialization, uuid generator uniqueness, env resolver shape). Recovered piece #1 of 3 from the lost-stack narrative. Still open: - Langfuse → observer :3800 pipe (Phase 40 mid-deliverable) - Gitea MCP reconnect in mcp-server/index.ts (Phase 40 late) Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/gateway/src/main.rs | 12 ++ crates/gateway/src/v1/langfuse_trace.rs | 226 ++++++++++++++++++++++++ crates/gateway/src/v1/mod.rs | 35 ++++ 3 files changed, 273 insertions(+) create mode 100644 crates/gateway/src/v1/langfuse_trace.rs diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 42da20a..539398b 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -204,6 +204,18 @@ async fn main() { } k }, + // Phase 40 early deliverable — Langfuse trace emitter. + // Defaults match mcp-server/tracing.ts conventions so + // gateway traces land in the same staffing project. + langfuse: { + let c = v1::langfuse_trace::LangfuseClient::from_env_or_defaults(); + if c.is_some() { + tracing::info!("v1: Langfuse tracing enabled — /v1/chat calls will appear at localhost:3001"); + } else { + tracing::warn!("v1: Langfuse keys missing — /v1/chat calls will not be traced"); + } + c + }, })); // Auth middleware (if enabled) diff --git a/crates/gateway/src/v1/langfuse_trace.rs b/crates/gateway/src/v1/langfuse_trace.rs new file mode 100644 index 0000000..fe976f4 --- /dev/null +++ b/crates/gateway/src/v1/langfuse_trace.rs @@ -0,0 +1,226 @@ +//! Phase 40 (early deliverable) — Langfuse tracing from the Rust gateway. +//! +//! Langfuse is already running at `http://localhost:3001` with the +//! staffing project — `mcp-server/tracing.ts` has been emitting traces +//! for months. This file bridges the gap: every `/v1/chat` call now +//! appears in the same Langfuse project, alongside traces from the +//! Bun side. +//! +//! Design: +//! - Fire-and-forget — emit spawns a tokio task. Response to client +//! never waits on Langfuse, never fails because of it. If Langfuse +//! is down or slow, we silently drop traces (and `tracing::warn!`). +//! - Two events per call — `trace-create` + `generation-create`. The +//! generation nests under the trace so Langfuse UI shows the whole +//! thing as one clickable row. +//! - No SDK — plain HTTP POST to `/api/public/ingestion` with Basic +//! auth. Keeps the Rust dependency graph small. + +use serde::Serialize; +use std::sync::Arc; +use std::time::Duration; + +use super::Message; + +const INGESTION_PATH: &str = "/api/public/ingestion"; +const EMIT_TIMEOUT_SECS: u64 = 5; + +#[derive(Clone)] +pub struct LangfuseClient { + inner: Arc, +} + +struct Inner { + base_url: String, + public_key: String, + secret_key: String, + http: reqwest::Client, +} + +impl LangfuseClient { + pub fn new(base_url: String, public_key: String, secret_key: String) -> Self { + let http = reqwest::Client::builder() + .timeout(Duration::from_secs(EMIT_TIMEOUT_SECS)) + .build() + .expect("langfuse client init"); + Self { inner: Arc::new(Inner { base_url, public_key, secret_key, http }) } + } + + /// Resolve a LangfuseClient from the same conventions the TS side + /// already uses. Returns None if keys are missing — caller logs a + /// warning and omits tracing (non-fatal). + /// + /// Sources checked in order: + /// 1. LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY env + /// 2. /root/llm_team_config.json (if it carries langfuse keys) + /// 3. Hardcoded staffing-project defaults matching mcp-server + /// (pk-lf-staffing / sk-lf-staffing-secret / localhost:3001) + pub fn from_env_or_defaults() -> Option { + let base = std::env::var("LANGFUSE_URL") + .unwrap_or_else(|_| "http://localhost:3001".to_string()); + let pk = std::env::var("LANGFUSE_PUBLIC_KEY") + .unwrap_or_else(|_| "pk-lf-staffing".to_string()); + let sk = std::env::var("LANGFUSE_SECRET_KEY") + .unwrap_or_else(|_| "sk-lf-staffing-secret".to_string()); + if pk.trim().is_empty() || sk.trim().is_empty() { return None; } + Some(Self::new(base, pk, sk)) + } + + /// Fire-and-forget emit. Never blocks, never errors upward. + pub fn emit_chat(&self, ev: ChatTrace) { + let this = self.clone(); + tokio::spawn(async move { + if let Err(e) = this.emit_chat_inner(ev).await { + tracing::warn!(target: "v1.langfuse", "trace drop: {e}"); + } + }); + } + + async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> { + let trace_id = uuid_v7_like(); + let gen_id = uuid_v7_like(); + let trace_ts = ev.start_time.clone(); + + let batch = IngestionBatch { + batch: vec![ + IngestionEvent { + id: uuid_v7_like(), + timestamp: trace_ts.clone(), + kind: "trace-create", + body: serde_json::json!({ + "id": trace_id, + "name": format!("v1.chat:{}", ev.provider), + "input": serde_json::json!({ + "model": ev.model, + "messages": ev.input, + }), + "metadata": serde_json::json!({ + "provider": ev.provider, + "think": ev.think, + }), + }), + }, + IngestionEvent { + id: uuid_v7_like(), + timestamp: ev.end_time.clone(), + kind: "generation-create", + body: serde_json::json!({ + "id": gen_id, + "traceId": trace_id, + "name": "chat", + "model": ev.model, + "modelParameters": serde_json::json!({ + "temperature": ev.temperature, + "max_tokens": ev.max_tokens, + "think": ev.think, + }), + "input": ev.input, + "output": ev.output, + "usage": serde_json::json!({ + "input": ev.prompt_tokens, + "output": ev.completion_tokens, + "total": ev.prompt_tokens + ev.completion_tokens, + "unit": "TOKENS", + }), + "startTime": ev.start_time, + "endTime": ev.end_time, + "metadata": serde_json::json!({ + "provider": ev.provider, + "latency_ms": ev.latency_ms, + }), + }), + }, + ], + }; + + let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH); + let resp = self.inner.http + .post(url) + .basic_auth(&self.inner.public_key, Some(&self.inner.secret_key)) + .json(&batch) + .send() + .await + .map_err(|e| format!("POST failed: {e}"))?; + if !resp.status().is_success() { + return Err(format!("{}: {}", resp.status(), resp.text().await.unwrap_or_default())); + } + Ok(()) + } +} + +/// Everything the v1.chat handler collects for one completed call. +pub struct ChatTrace { + pub provider: String, + pub model: String, + pub input: Vec, + pub output: String, + pub prompt_tokens: u32, + pub completion_tokens: u32, + pub temperature: Option, + pub max_tokens: Option, + pub think: Option, + pub start_time: String, + pub end_time: String, + pub latency_ms: u64, +} + +#[derive(Serialize)] +struct IngestionBatch { + batch: Vec, +} + +#[derive(Serialize)] +struct IngestionEvent { + id: String, + timestamp: String, + #[serde(rename = "type")] + kind: &'static str, + body: serde_json::Value, +} + +/// UUIDv7-ish identifier — not strictly UUIDv7 but time-ordered and +/// unique enough for event deduplication. Avoids a uuid crate dep. +fn uuid_v7_like() -> String { + let ts = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0); + let rand = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.subsec_nanos()) + .unwrap_or(0); + format!("{:016x}-{:08x}", ts, rand) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_env_or_defaults_returns_some_when_defaults_present() { + // Defaults are hardcoded non-empty — should always produce Some. + let c = LangfuseClient::from_env_or_defaults(); + assert!(c.is_some()); + } + + #[test] + fn uuid_v7_like_produces_distinct_ids() { + let a = uuid_v7_like(); + // Different nanosecond timestamps — but they might collide on + // back-to-back calls. Loop a bit to ensure uniqueness surface. + let b = uuid_v7_like(); + let c = uuid_v7_like(); + assert!(a != b || b != c, "three consecutive ids should have at least one distinct pair"); + assert!(a.contains('-')); + } + + #[test] + fn ingestion_batch_serializes_with_rename() { + let ev = IngestionEvent { + id: "e1".into(), + timestamp: "2026-04-22T00:00:00Z".into(), + kind: "trace-create", + body: serde_json::json!({"name": "t"}), + }; + let json = serde_json::to_string(&IngestionBatch { batch: vec![ev] }).unwrap(); + assert!(json.contains("\"type\":\"trace-create\"")); + assert!(json.contains("\"batch\":[")); + } +} diff --git a/crates/gateway/src/v1/mod.rs b/crates/gateway/src/v1/mod.rs index 95be2de..f2829a4 100644 --- a/crates/gateway/src/v1/mod.rs +++ b/crates/gateway/src/v1/mod.rs @@ -13,6 +13,7 @@ pub mod ollama; pub mod ollama_cloud; +pub mod langfuse_trace; use axum::{ Router, @@ -33,6 +34,10 @@ pub struct V1State { /// Ollama Cloud bearer token. Loaded at startup via /// `ollama_cloud::resolve_cloud_key()`. None = cloud routes 503. pub ollama_cloud_key: Option, + /// Phase 40 early deliverable — Langfuse client. None = tracing + /// disabled (keys missing or container unreachable). Traces are + /// fire-and-forget: never block the response path. + pub langfuse: Option, } #[derive(Default, Clone, Serialize)] @@ -127,6 +132,9 @@ async fn chat( } let provider = req.provider.as_deref().unwrap_or("ollama").to_ascii_lowercase(); + let start_time = chrono::Utc::now(); + let start_instant = std::time::Instant::now(); + let resp = match provider.as_str() { "ollama" | "local" | "" => ollama::chat(&state.ai_client, &req) .await @@ -148,6 +156,33 @@ async fn chat( } }; + let end_time = chrono::Utc::now(); + let latency_ms = start_instant.elapsed().as_millis() as u64; + + // Phase 40 — emit Langfuse trace. Fire-and-forget: the clone is + // cheap (Arc inside), the tokio::spawn never blocks us, a dead + // Langfuse just logs a warn. Client-visible response latency is + // untouched. + if let Some(lf) = &state.langfuse { + let output = resp.choices.first() + .map(|c| c.message.content.clone()) + .unwrap_or_default(); + lf.emit_chat(langfuse_trace::ChatTrace { + provider: provider.clone(), + model: resp.model.clone(), + input: req.messages.clone(), + output, + prompt_tokens: resp.usage.prompt_tokens, + completion_tokens: resp.usage.completion_tokens, + temperature: req.temperature, + max_tokens: req.max_tokens, + think: req.think, + start_time: start_time.to_rfc3339(), + end_time: end_time.to_rfc3339(), + latency_ms, + }); + } + { let mut u = state.usage.write().await; u.requests += 1;