package shared import ( "net/http" "os" "time" "git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse" ) // langfuseMiddleware emits one Langfuse trace per HTTP request, with // a single span carrying start/end timestamps + status code. Per // OPEN item #2 (closed by the wave that adds this file): production // traffic gets free trace visibility without per-handler wiring. // // nil client → returns a passthrough no-op middleware so callers // don't need a nil check in shared.Run. Same fail-open posture as // Langfuse's queue layer (per ADR-005 Decision 5.1: observability // is a witness, never a gate). // // /health bypasses tracing — operators don't want every LB probe // or monitor heartbeat polluting traces. Real traffic surfaces // only via the registered routes. func langfuseMiddleware(serviceName string, lf *langfuse.Client) func(http.Handler) http.Handler { if lf == nil { return func(next http.Handler) http.Handler { return next } } return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // /health bypasses tracing — same exemption logic as // the auth middleware (see RequireAuth). if r.URL.Path == "/health" { next.ServeHTTP(w, r) return } start := time.Now() sw := &statusWriter{ResponseWriter: w, status: http.StatusOK} traceID := lf.Trace(r.Context(), langfuse.TraceInput{ Name: serviceName + " " + r.Method + " " + r.URL.Path, Tags: []string{serviceName, r.Method}, Metadata: map[string]any{ "path": r.URL.Path, "method": r.Method, "remote_addr": r.RemoteAddr, }, }) next.ServeHTTP(sw, r) level := "" if sw.status >= 500 { level = "ERROR" } else if sw.status >= 400 { level = "WARNING" } lf.Span(r.Context(), langfuse.SpanInput{ TraceID: traceID, Name: "http.request", Input: map[string]any{ "method": r.Method, "path": r.URL.Path, "remote_addr": r.RemoteAddr, }, Output: map[string]any{ "status": sw.status, "duration_ms": time.Since(start).Milliseconds(), }, StartTime: start, EndTime: time.Now(), StatusCode: sw.status, Level: level, }) }) } } // statusWriter is the standard "wrap http.ResponseWriter to capture // the status code" trick. WriteHeader is the only method that // changes status; any handler that doesn't call WriteHeader gets // the implicit 200 from our struct's default. type statusWriter struct { http.ResponseWriter status int } func (sw *statusWriter) WriteHeader(code int) { sw.status = code sw.ResponseWriter.WriteHeader(code) } // loadLangfuseFromEnv builds a langfuse.Client from environment // variables. Returns nil if any of LANGFUSE_URL / LANGFUSE_PUBLIC_KEY // / LANGFUSE_SECRET_KEY is unset (best-effort: missing config means // no tracing, never a startup error). Same env names as the bare // /etc/lakehouse/langfuse.env file used by the multi_coord_stress // driver — operators ship one env file across every daemon. func loadLangfuseFromEnv() *langfuse.Client { url := os.Getenv("LANGFUSE_URL") pk := os.Getenv("LANGFUSE_PUBLIC_KEY") sk := os.Getenv("LANGFUSE_SECRET_KEY") if url == "" || pk == "" || sk == "" { return nil } return langfuse.New(url, pk, sk, nil) }