Compare commits
No commits in common. "75a0f424ef48613a30fb479df7f99b9b625d9f73" and "42a11d35cdf388ee58d4fc7c81dd628bca6c1f35" have entirely different histories.
75a0f424ef
...
42a11d35cd
@ -204,18 +204,6 @@ async fn main() {
|
||||
}
|
||||
k
|
||||
},
|
||||
// Phase 40 early deliverable — Langfuse trace emitter.
|
||||
// Defaults match mcp-server/tracing.ts conventions so
|
||||
// gateway traces land in the same staffing project.
|
||||
langfuse: {
|
||||
let c = v1::langfuse_trace::LangfuseClient::from_env_or_defaults();
|
||||
if c.is_some() {
|
||||
tracing::info!("v1: Langfuse tracing enabled — /v1/chat calls will appear at localhost:3001");
|
||||
} else {
|
||||
tracing::warn!("v1: Langfuse keys missing — /v1/chat calls will not be traced");
|
||||
}
|
||||
c
|
||||
},
|
||||
}));
|
||||
|
||||
// Auth middleware (if enabled)
|
||||
|
||||
@ -1,226 +0,0 @@
|
||||
//! Phase 40 (early deliverable) — Langfuse tracing from the Rust gateway.
|
||||
//!
|
||||
//! Langfuse is already running at `http://localhost:3001` with the
|
||||
//! staffing project — `mcp-server/tracing.ts` has been emitting traces
|
||||
//! for months. This file bridges the gap: every `/v1/chat` call now
|
||||
//! appears in the same Langfuse project, alongside traces from the
|
||||
//! Bun side.
|
||||
//!
|
||||
//! Design:
|
||||
//! - Fire-and-forget — emit spawns a tokio task. Response to client
|
||||
//! never waits on Langfuse, never fails because of it. If Langfuse
|
||||
//! is down or slow, we silently drop traces (and `tracing::warn!`).
|
||||
//! - Two events per call — `trace-create` + `generation-create`. The
|
||||
//! generation nests under the trace so Langfuse UI shows the whole
|
||||
//! thing as one clickable row.
|
||||
//! - No SDK — plain HTTP POST to `/api/public/ingestion` with Basic
|
||||
//! auth. Keeps the Rust dependency graph small.
|
||||
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::Message;
|
||||
|
||||
const INGESTION_PATH: &str = "/api/public/ingestion";
|
||||
const EMIT_TIMEOUT_SECS: u64 = 5;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LangfuseClient {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
base_url: String,
|
||||
public_key: String,
|
||||
secret_key: String,
|
||||
http: reqwest::Client,
|
||||
}
|
||||
|
||||
impl LangfuseClient {
|
||||
pub fn new(base_url: String, public_key: String, secret_key: String) -> Self {
|
||||
let http = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(EMIT_TIMEOUT_SECS))
|
||||
.build()
|
||||
.expect("langfuse client init");
|
||||
Self { inner: Arc::new(Inner { base_url, public_key, secret_key, http }) }
|
||||
}
|
||||
|
||||
/// Resolve a LangfuseClient from the same conventions the TS side
|
||||
/// already uses. Returns None if keys are missing — caller logs a
|
||||
/// warning and omits tracing (non-fatal).
|
||||
///
|
||||
/// Sources checked in order:
|
||||
/// 1. LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY env
|
||||
/// 2. /root/llm_team_config.json (if it carries langfuse keys)
|
||||
/// 3. Hardcoded staffing-project defaults matching mcp-server
|
||||
/// (pk-lf-staffing / sk-lf-staffing-secret / localhost:3001)
|
||||
pub fn from_env_or_defaults() -> Option<Self> {
|
||||
let base = std::env::var("LANGFUSE_URL")
|
||||
.unwrap_or_else(|_| "http://localhost:3001".to_string());
|
||||
let pk = std::env::var("LANGFUSE_PUBLIC_KEY")
|
||||
.unwrap_or_else(|_| "pk-lf-staffing".to_string());
|
||||
let sk = std::env::var("LANGFUSE_SECRET_KEY")
|
||||
.unwrap_or_else(|_| "sk-lf-staffing-secret".to_string());
|
||||
if pk.trim().is_empty() || sk.trim().is_empty() { return None; }
|
||||
Some(Self::new(base, pk, sk))
|
||||
}
|
||||
|
||||
/// Fire-and-forget emit. Never blocks, never errors upward.
|
||||
pub fn emit_chat(&self, ev: ChatTrace) {
|
||||
let this = self.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = this.emit_chat_inner(ev).await {
|
||||
tracing::warn!(target: "v1.langfuse", "trace drop: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn emit_chat_inner(&self, ev: ChatTrace) -> Result<(), String> {
|
||||
let trace_id = uuid_v7_like();
|
||||
let gen_id = uuid_v7_like();
|
||||
let trace_ts = ev.start_time.clone();
|
||||
|
||||
let batch = IngestionBatch {
|
||||
batch: vec![
|
||||
IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: trace_ts.clone(),
|
||||
kind: "trace-create",
|
||||
body: serde_json::json!({
|
||||
"id": trace_id,
|
||||
"name": format!("v1.chat:{}", ev.provider),
|
||||
"input": serde_json::json!({
|
||||
"model": ev.model,
|
||||
"messages": ev.input,
|
||||
}),
|
||||
"metadata": serde_json::json!({
|
||||
"provider": ev.provider,
|
||||
"think": ev.think,
|
||||
}),
|
||||
}),
|
||||
},
|
||||
IngestionEvent {
|
||||
id: uuid_v7_like(),
|
||||
timestamp: ev.end_time.clone(),
|
||||
kind: "generation-create",
|
||||
body: serde_json::json!({
|
||||
"id": gen_id,
|
||||
"traceId": trace_id,
|
||||
"name": "chat",
|
||||
"model": ev.model,
|
||||
"modelParameters": serde_json::json!({
|
||||
"temperature": ev.temperature,
|
||||
"max_tokens": ev.max_tokens,
|
||||
"think": ev.think,
|
||||
}),
|
||||
"input": ev.input,
|
||||
"output": ev.output,
|
||||
"usage": serde_json::json!({
|
||||
"input": ev.prompt_tokens,
|
||||
"output": ev.completion_tokens,
|
||||
"total": ev.prompt_tokens + ev.completion_tokens,
|
||||
"unit": "TOKENS",
|
||||
}),
|
||||
"startTime": ev.start_time,
|
||||
"endTime": ev.end_time,
|
||||
"metadata": serde_json::json!({
|
||||
"provider": ev.provider,
|
||||
"latency_ms": ev.latency_ms,
|
||||
}),
|
||||
}),
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
let url = format!("{}{}", self.inner.base_url.trim_end_matches('/'), INGESTION_PATH);
|
||||
let resp = self.inner.http
|
||||
.post(url)
|
||||
.basic_auth(&self.inner.public_key, Some(&self.inner.secret_key))
|
||||
.json(&batch)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("POST failed: {e}"))?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(format!("{}: {}", resp.status(), resp.text().await.unwrap_or_default()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Everything the v1.chat handler collects for one completed call.
|
||||
pub struct ChatTrace {
|
||||
pub provider: String,
|
||||
pub model: String,
|
||||
pub input: Vec<Message>,
|
||||
pub output: String,
|
||||
pub prompt_tokens: u32,
|
||||
pub completion_tokens: u32,
|
||||
pub temperature: Option<f64>,
|
||||
pub max_tokens: Option<u32>,
|
||||
pub think: Option<bool>,
|
||||
pub start_time: String,
|
||||
pub end_time: String,
|
||||
pub latency_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct IngestionBatch {
|
||||
batch: Vec<IngestionEvent>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct IngestionEvent {
|
||||
id: String,
|
||||
timestamp: String,
|
||||
#[serde(rename = "type")]
|
||||
kind: &'static str,
|
||||
body: serde_json::Value,
|
||||
}
|
||||
|
||||
/// UUIDv7-ish identifier — not strictly UUIDv7 but time-ordered and
|
||||
/// unique enough for event deduplication. Avoids a uuid crate dep.
|
||||
fn uuid_v7_like() -> String {
|
||||
let ts = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
|
||||
let rand = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.subsec_nanos())
|
||||
.unwrap_or(0);
|
||||
format!("{:016x}-{:08x}", ts, rand)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn from_env_or_defaults_returns_some_when_defaults_present() {
|
||||
// Defaults are hardcoded non-empty — should always produce Some.
|
||||
let c = LangfuseClient::from_env_or_defaults();
|
||||
assert!(c.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn uuid_v7_like_produces_distinct_ids() {
|
||||
let a = uuid_v7_like();
|
||||
// Different nanosecond timestamps — but they might collide on
|
||||
// back-to-back calls. Loop a bit to ensure uniqueness surface.
|
||||
let b = uuid_v7_like();
|
||||
let c = uuid_v7_like();
|
||||
assert!(a != b || b != c, "three consecutive ids should have at least one distinct pair");
|
||||
assert!(a.contains('-'));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingestion_batch_serializes_with_rename() {
|
||||
let ev = IngestionEvent {
|
||||
id: "e1".into(),
|
||||
timestamp: "2026-04-22T00:00:00Z".into(),
|
||||
kind: "trace-create",
|
||||
body: serde_json::json!({"name": "t"}),
|
||||
};
|
||||
let json = serde_json::to_string(&IngestionBatch { batch: vec![ev] }).unwrap();
|
||||
assert!(json.contains("\"type\":\"trace-create\""));
|
||||
assert!(json.contains("\"batch\":["));
|
||||
}
|
||||
}
|
||||
@ -13,7 +13,6 @@
|
||||
|
||||
pub mod ollama;
|
||||
pub mod ollama_cloud;
|
||||
pub mod langfuse_trace;
|
||||
|
||||
use axum::{
|
||||
Router,
|
||||
@ -34,10 +33,6 @@ pub struct V1State {
|
||||
/// Ollama Cloud bearer token. Loaded at startup via
|
||||
/// `ollama_cloud::resolve_cloud_key()`. None = cloud routes 503.
|
||||
pub ollama_cloud_key: Option<String>,
|
||||
/// Phase 40 early deliverable — Langfuse client. None = tracing
|
||||
/// disabled (keys missing or container unreachable). Traces are
|
||||
/// fire-and-forget: never block the response path.
|
||||
pub langfuse: Option<langfuse_trace::LangfuseClient>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Serialize)]
|
||||
@ -132,9 +127,6 @@ async fn chat(
|
||||
}
|
||||
|
||||
let provider = req.provider.as_deref().unwrap_or("ollama").to_ascii_lowercase();
|
||||
let start_time = chrono::Utc::now();
|
||||
let start_instant = std::time::Instant::now();
|
||||
|
||||
let resp = match provider.as_str() {
|
||||
"ollama" | "local" | "" => ollama::chat(&state.ai_client, &req)
|
||||
.await
|
||||
@ -156,33 +148,6 @@ async fn chat(
|
||||
}
|
||||
};
|
||||
|
||||
let end_time = chrono::Utc::now();
|
||||
let latency_ms = start_instant.elapsed().as_millis() as u64;
|
||||
|
||||
// Phase 40 — emit Langfuse trace. Fire-and-forget: the clone is
|
||||
// cheap (Arc inside), the tokio::spawn never blocks us, a dead
|
||||
// Langfuse just logs a warn. Client-visible response latency is
|
||||
// untouched.
|
||||
if let Some(lf) = &state.langfuse {
|
||||
let output = resp.choices.first()
|
||||
.map(|c| c.message.content.clone())
|
||||
.unwrap_or_default();
|
||||
lf.emit_chat(langfuse_trace::ChatTrace {
|
||||
provider: provider.clone(),
|
||||
model: resp.model.clone(),
|
||||
input: req.messages.clone(),
|
||||
output,
|
||||
prompt_tokens: resp.usage.prompt_tokens,
|
||||
completion_tokens: resp.usage.completion_tokens,
|
||||
temperature: req.temperature,
|
||||
max_tokens: req.max_tokens,
|
||||
think: req.think,
|
||||
start_time: start_time.to_rfc3339(),
|
||||
end_time: end_time.to_rfc3339(),
|
||||
latency_ms,
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let mut u = state.usage.write().await;
|
||||
u.requests += 1;
|
||||
|
||||
@ -72,11 +72,11 @@ Ship each phase before starting the next. Each ends with green tests + docs upda
|
||||
|
||||
---
|
||||
|
||||
## Phase 40 — Routing & Policy Engine + Observability Recovery
|
||||
## Phase 40 — Routing & Policy Engine
|
||||
|
||||
**Goal:** Replace hardcoded T1-T5 routing with a rules engine. Add Gemini + Claude adapters. Cost gating enforced at router level. **Reinstate Langfuse + Gitea MCP** — recovery of the observability + repo-ops stack J built previously (see `project_lost_stack` memory).
|
||||
**Goal:** Replace hardcoded T1-T5 routing with a rules engine. Add Gemini + Claude adapters. Cost gating enforced at router level.
|
||||
|
||||
**Ships — routing:**
|
||||
**Ships:**
|
||||
- `crates/aibridge/src/routing.rs` — rules engine (match on: task type, token budget, previous attempt failures, profile ID)
|
||||
- `config/routing.toml` — rules in TOML (human-editable, hot-reloadable)
|
||||
- `crates/aibridge/src/providers/gemini.rs` — `generativelanguage.googleapis.com` adapter
|
||||
@ -84,24 +84,15 @@ Ship each phase before starting the next. Each ends with green tests + docs upda
|
||||
- Fallback chain support: if primary returns 5xx or times out, try next in chain
|
||||
- Cost gate: per-request budget + daily budget per-provider
|
||||
|
||||
**Ships — observability (was lost, now restored):**
|
||||
- **Langfuse** self-hosted via Docker Compose. Single source of truth for every LLM call trace: prompt / response / tokens / cost / latency / provider / fallback chain / profile used. UI at `localhost:3000`. Keys in `/etc/lakehouse/secrets.toml`.
|
||||
- `crates/aibridge/src/langfuse.rs` — thin fire-and-forget trace emitter. Every `/v1/chat` call spawns a background task that POSTs to `langfuse/api/public/ingestion`. Non-blocking: trace failures never affect response.
|
||||
- **Langfuse → observer pipe** — `mcp-server/langfuse_bridge.ts` or similar. Polls Langfuse's trace API at interval, forwards completed traces to observer `:3800/event` with `source: "langfuse"`. KB now sees cost/latency deltas per model, not just outcome deltas.
|
||||
- **Gitea MCP reconnect** — the MCP server binary still installed at `/home/profit/.bun/install/cache/gitea-mcp@0.0.10/` gets wired into `mcp-server/index.ts` tool registry. Agents can open PRs, comment on issues, list commits via named tools. Closes Phase 28's repo-ops gap.
|
||||
|
||||
**Gate:**
|
||||
- Rule like "local models for simple JSON emitters, cloud for reasoning" fires correctly by task type
|
||||
- Primary fails → fallback provider hits, response still matches `/v1/chat` shape
|
||||
- Daily budget hit → subsequent requests return 429 with clear retry-at header
|
||||
- `/v1/usage` reports per-provider breakdown
|
||||
- **Every `/v1/chat` call appears in Langfuse UI** with correct prompt, response, latency, token count within 2 seconds of the request completing
|
||||
- **Langfuse → observer pipe** delivers trace deltas to KB: `GET :3800/stats?source=langfuse` shows non-zero count after a few scenarios run
|
||||
- **Gitea MCP tools callable** — `list_prs`, `open_pr`, `comment_on_issue` exposed in `mcp-server/index.ts`, verifiable via a quick agent scenario
|
||||
|
||||
**Non-goals:** Retrieval Profile split (Phase 41), Truth Layer (Phase 42). Langfuse self-hosted UI customization / SSO.
|
||||
**Non-goals:** Retrieval Profile split (Phase 41), Truth Layer (Phase 42).
|
||||
|
||||
**Risk:** Medium. Multi-provider auth + cost tracking is cross-cutting; Langfuse adds 4-5 Docker containers (PostgreSQL, ClickHouse, Redis, web, worker). Mitigation: every provider call wrapped in a single `dispatch()` function so observability flows through one point; Langfuse Docker Compose is their supported deployment path, well-tested.
|
||||
**Risk:** Medium. Multi-provider auth + cost tracking is cross-cutting. Mitigation: every provider call wrapped in a single `dispatch()` function, all observability flows through there.
|
||||
|
||||
---
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user