Adds langfuseMiddleware in internal/shared so every daemon's shared.Run gets free production-traffic trace visibility when LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY are set. Same env names + file shape as the multi_coord_stress driver, so operators ship one /etc/lakehouse/langfuse.env across the deploy. Wiring is auth-gated: middleware runs INSIDE the RequireAuth group, so 401s from credential-stuffing don't pollute traces. /health is exempt so LB probes don't either. Missing env vars → nil client → middleware is a passthrough no-op (fail-open per ADR-005 5.1). Bundled deploy: - langfuse.env.example template (mode 0640, root:lakehouse) - 11 systemd units gain `EnvironmentFile=-/etc/lakehouse/langfuse.env` (leading - so missing file = OK) - REPLICATION.md bootstrap section documents setup Tests (4): nil passthrough, /health bypass, real-request emission, status-writer wrapping. All green. STATE_OF_PLAY OPEN list: 5 rows → 4 rows. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
107 lines
3.3 KiB
Go
107 lines
3.3 KiB
Go
package shared
|
|
|
|
import (
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse"
|
|
)
|
|
|
|
// langfuseMiddleware emits one Langfuse trace per HTTP request, with
|
|
// a single span carrying start/end timestamps + status code. Per
|
|
// OPEN item #2 (closed by the wave that adds this file): production
|
|
// traffic gets free trace visibility without per-handler wiring.
|
|
//
|
|
// nil client → returns a passthrough no-op middleware so callers
|
|
// don't need a nil check in shared.Run. Same fail-open posture as
|
|
// Langfuse's queue layer (per ADR-005 Decision 5.1: observability
|
|
// is a witness, never a gate).
|
|
//
|
|
// /health bypasses tracing — operators don't want every LB probe
|
|
// or monitor heartbeat polluting traces. Real traffic surfaces
|
|
// only via the registered routes.
|
|
func langfuseMiddleware(serviceName string, lf *langfuse.Client) func(http.Handler) http.Handler {
|
|
if lf == nil {
|
|
return func(next http.Handler) http.Handler { return next }
|
|
}
|
|
return func(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
// /health bypasses tracing — same exemption logic as
|
|
// the auth middleware (see RequireAuth).
|
|
if r.URL.Path == "/health" {
|
|
next.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
start := time.Now()
|
|
sw := &statusWriter{ResponseWriter: w, status: http.StatusOK}
|
|
|
|
traceID := lf.Trace(r.Context(), langfuse.TraceInput{
|
|
Name: serviceName + " " + r.Method + " " + r.URL.Path,
|
|
Tags: []string{serviceName, r.Method},
|
|
Metadata: map[string]any{
|
|
"path": r.URL.Path,
|
|
"method": r.Method,
|
|
"remote_addr": r.RemoteAddr,
|
|
},
|
|
})
|
|
|
|
next.ServeHTTP(sw, r)
|
|
|
|
level := ""
|
|
if sw.status >= 500 {
|
|
level = "ERROR"
|
|
} else if sw.status >= 400 {
|
|
level = "WARNING"
|
|
}
|
|
lf.Span(r.Context(), langfuse.SpanInput{
|
|
TraceID: traceID,
|
|
Name: "http.request",
|
|
Input: map[string]any{
|
|
"method": r.Method,
|
|
"path": r.URL.Path,
|
|
"remote_addr": r.RemoteAddr,
|
|
},
|
|
Output: map[string]any{
|
|
"status": sw.status,
|
|
"duration_ms": time.Since(start).Milliseconds(),
|
|
},
|
|
StartTime: start,
|
|
EndTime: time.Now(),
|
|
StatusCode: sw.status,
|
|
Level: level,
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
// statusWriter is the standard "wrap http.ResponseWriter to capture
|
|
// the status code" trick. WriteHeader is the only method that
|
|
// changes status; any handler that doesn't call WriteHeader gets
|
|
// the implicit 200 from our struct's default.
|
|
type statusWriter struct {
|
|
http.ResponseWriter
|
|
status int
|
|
}
|
|
|
|
func (sw *statusWriter) WriteHeader(code int) {
|
|
sw.status = code
|
|
sw.ResponseWriter.WriteHeader(code)
|
|
}
|
|
|
|
// loadLangfuseFromEnv builds a langfuse.Client from environment
|
|
// variables. Returns nil if any of LANGFUSE_URL / LANGFUSE_PUBLIC_KEY
|
|
// / LANGFUSE_SECRET_KEY is unset (best-effort: missing config means
|
|
// no tracing, never a startup error). Same env names as the bare
|
|
// /etc/lakehouse/langfuse.env file used by the multi_coord_stress
|
|
// driver — operators ship one env file across every daemon.
|
|
func loadLangfuseFromEnv() *langfuse.Client {
|
|
url := os.Getenv("LANGFUSE_URL")
|
|
pk := os.Getenv("LANGFUSE_PUBLIC_KEY")
|
|
sk := os.Getenv("LANGFUSE_SECRET_KEY")
|
|
if url == "" || pk == "" || sk == "" {
|
|
return nil
|
|
}
|
|
return langfuse.New(url, pk, sk, nil)
|
|
}
|