//! Phase 40 (early deliverable) — Langfuse tracing from the Rust gateway. //! //! Langfuse is already running at `http://localhost:3001` with the //! staffing project — `mcp-server/tracing.ts` has been emitting traces //! for months. This file bridges the gap: every `/v1/chat` call now //! appears in the same Langfuse project, alongside traces from the //! Bun side. //! //! Design: //! - Fire-and-forget — emit spawns a tokio task. Response to client //! never waits on Langfuse, never fails because of it. If Langfuse //! is down or slow, we silently drop traces (and `tracing::warn!`). //! - Two events per call — `trace-create` + `generation-create`. The //! generation nests under the trace so Langfuse UI shows the whole //! thing as one clickable row. //! - No SDK — plain HTTP POST to `/api/public/ingestion` with Basic //! auth. Keeps the Rust dependency graph small. use serde::Serialize; use std::sync::Arc; use std::time::Duration; use super::Message; const INGESTION_PATH: &str = "/api/public/ingestion"; const EMIT_TIMEOUT_SECS: u64 = 5; #[derive(Clone)] pub struct LangfuseClient { inner: Arc, } struct Inner { base_url: String, public_key: String, secret_key: String, http: reqwest::Client, } impl LangfuseClient { pub fn new(base_url: String, public_key: String, secret_key: String) -> Self { let http = reqwest::Client::builder() .timeout(Duration::from_secs(EMIT_TIMEOUT_SECS)) .build() .expect("langfuse client init"); Self { inner: Arc::new(Inner { base_url, public_key, secret_key, http }) } } /// Resolve a LangfuseClient from the same conventions the TS side /// already uses. Returns None if keys are missing — caller logs a /// warning and omits tracing (non-fatal). /// /// Sources checked in order: /// 1. LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY env /// 2. /root/llm_team_config.json (if it carries langfuse keys) /// 3. Hardcoded staffing-project defaults matching mcp-server /// (pk-lf-staffing / sk-lf-staffing-secret / localhost:3001) pub fn from_env_or_defaults() -> Option { let base = std::env::var("LANGFUSE_URL") .unwrap_or_else(|_| "http://localhost:3001".to_string()); let pk = std::env::var("LANGFUSE_PUBLIC_KEY") .unwrap_or_else(|_| "pk-lf-staffing".to_string()); let sk = std::env::var("LANGFUSE_SECRET_KEY") .unwrap_or_else(|_| "sk-lf-staffing-secret".to_string()); if pk.trim().is_empty() || sk.trim().is_empty() { return None; } Some(Self::new(base, pk, sk)) } /// Fire-and-forget emit. Never blocks, never errors upward. pub fn emit_chat(&self, ev: ChatTrace) { let this = self.clone(); tokio::spawn(async move { if let Err(e) = this.emit_chat_inner(ev).await { tracing::warn!(target: "v1.langfuse", "trace drop: {e}"); } }); } async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> { let trace_id = uuid_v7_like(); let gen_id = uuid_v7_like(); let trace_ts = ev.start_time.clone(); let batch = IngestionBatch { batch: vec![ IngestionEvent { id: uuid_v7_like(), timestamp: trace_ts.clone(), kind: "trace-create", body: serde_json::json!({ "id": trace_id, "name": format!("v1.chat:{}", ev.provider), "input": serde_json::json!({ "model": ev.model, "messages": ev.input, }), "metadata": serde_json::json!({ "provider": ev.provider, "think": ev.think, }), }), }, IngestionEvent { id: uuid_v7_like(), timestamp: ev.end_time.clone(), kind: "generation-create", body: serde_json::json!({ "id": gen_id, "traceId": trace_id, "name": "chat", "model": ev.model, "modelParameters": serde_json::json!({ "temperature": ev.temperature, "max_tokens": ev.max_tokens, "think": ev.think, }), "input": ev.input, "output": ev.output, "usage": serde_json::json!({ "input": ev.prompt_tokens, "output": ev.completion_tokens, "total": ev.prompt_tokens + ev.completion_tokens, "unit": "TOKENS", }), "startTime": ev.start_time, "endTime": ev.end_time, "metadata": serde_json::json!({ "provider": ev.provider, "latency_ms": ev.latency_ms, }), }), }, ], }; let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH); let resp = self.inner.http .post(url) .basic_auth(&self.inner.public_key, Some(&self.inner.secret_key)) .json(&batch) .send() .await .map_err(|e| format!("POST failed: {e}"))?; if !resp.status().is_success() { return Err(format!("{}: {}", resp.status(), resp.text().await.unwrap_or_default())); } Ok(()) } } /// Everything the v1.chat handler collects for one completed call. pub struct ChatTrace { pub provider: String, pub model: String, pub input: Vec, pub output: String, pub prompt_tokens: u32, pub completion_tokens: u32, pub temperature: Option, pub max_tokens: Option, pub think: Option, pub start_time: String, pub end_time: String, pub latency_ms: u64, } #[derive(Serialize)] struct IngestionBatch { batch: Vec, } #[derive(Serialize)] struct IngestionEvent { id: String, timestamp: String, #[serde(rename = "type")] kind: &'static str, body: serde_json::Value, } /// UUIDv7-ish identifier — not strictly UUIDv7 but time-ordered and /// unique enough for event deduplication. Avoids a uuid crate dep. fn uuid_v7_like() -> String { let ts = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0); let rand = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.subsec_nanos()) .unwrap_or(0); format!("{:016x}-{:08x}", ts, rand) } #[cfg(test)] mod tests { use super::*; #[test] fn from_env_or_defaults_returns_some_when_defaults_present() { // Defaults are hardcoded non-empty — should always produce Some. let c = LangfuseClient::from_env_or_defaults(); assert!(c.is_some()); } #[test] fn uuid_v7_like_produces_distinct_ids() { let a = uuid_v7_like(); // Different nanosecond timestamps — but they might collide on // back-to-back calls. Loop a bit to ensure uniqueness surface. let b = uuid_v7_like(); let c = uuid_v7_like(); assert!(a != b || b != c, "three consecutive ids should have at least one distinct pair"); assert!(a.contains('-')); } #[test] fn ingestion_batch_serializes_with_rename() { let ev = IngestionEvent { id: "e1".into(), timestamp: "2026-04-22T00:00:00Z".into(), kind: "trace-create", body: serde_json::json!({"name": "t"}), }; let json = serde_json::to_string(&IngestionBatch { batch: vec![ev] }).unwrap(); assert!(json.contains("\"type\":\"trace-create\"")); assert!(json.contains("\"batch\":[")); } }