shared: auto-emit Langfuse trace+span per HTTP request — closes OPEN #2
Adds langfuseMiddleware in internal/shared so every daemon's shared.Run gets free production-traffic trace visibility when LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY are set. Same env names + file shape as the multi_coord_stress driver, so operators ship one /etc/lakehouse/langfuse.env across the deploy. Wiring is auth-gated: middleware runs INSIDE the RequireAuth group, so 401s from credential-stuffing don't pollute traces. /health is exempt so LB probes don't either. Missing env vars → nil client → middleware is a passthrough no-op (fail-open per ADR-005 5.1). Bundled deploy: - langfuse.env.example template (mode 0640, root:lakehouse) - 11 systemd units gain `EnvironmentFile=-/etc/lakehouse/langfuse.env` (leading - so missing file = OK) - REPLICATION.md bootstrap section documents setup Tests (4): nil passthrough, /health bypass, real-request emission, status-writer wrapping. All green. STATE_OF_PLAY OPEN list: 5 rows → 4 rows. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
5a3364f539
commit
68d9e554b0
@ -79,6 +79,15 @@ sudo chmod 0640 /etc/lakehouse/auth.env
|
||||
# AUTH_TOKEN=<generate via `openssl rand -hex 32`>
|
||||
sudo $EDITOR /etc/lakehouse/auth.env
|
||||
|
||||
# Optional: Langfuse traces. When set, every authenticated HTTP
|
||||
# request to every daemon emits a trace + span (production
|
||||
# observability per OPEN item #2 closure). Missing file = no
|
||||
# traces, no warnings.
|
||||
sudo cp deploy/etc-lakehouse/langfuse.env.example /etc/lakehouse/langfuse.env
|
||||
sudo chown root:lakehouse /etc/lakehouse/langfuse.env
|
||||
sudo chmod 0640 /etc/lakehouse/langfuse.env
|
||||
sudo $EDITOR /etc/lakehouse/langfuse.env # set URL + PUBLIC_KEY + SECRET_KEY
|
||||
|
||||
# Optional: chatd cloud provider keys, one file per provider
|
||||
# (each is its own EnvironmentFile so rotations don't restart all chatd)
|
||||
for provider in ollama_cloud openrouter opencode kimi; do
|
||||
|
||||
@ -220,11 +220,10 @@ The list is intentionally short. Items move to closed when the work demands them
|
||||
|
||||
| # | Item | When to act |
|
||||
|---|---|---|
|
||||
| 1 | **Wider Langfuse instrumentation across daemons** — `internal/langfuse/middleware.go` that auto-emits one span per HTTP request from every daemon's `shared.Run`. Production traffic gets free trace visibility without per-handler wiring. | When production traffic actually starts hitting the gateway. |
|
||||
| 2 | **Periodic fresh→main index merge** — two-tier pattern works but `fresh_workers` grows monotonically. A scheduled job that re-ingests the fresh corpus into `workers` (with the v2-moe embedder) + clears fresh closes the loop. | When `fresh_workers` crosses ~500 items in production. |
|
||||
| 3 | **Distillation full port** — `57d0df1` shipped scorer + contamination firewall (E partial). SFT export pipeline + audit_baselines lineage still on the Rust side. | When distillation becomes a production dependency. |
|
||||
| 4 | **Drift quantification** — `be65f85` is "scorer drift first." Full distribution-drift signal is underspecified everywhere; this is research, not a port. | Open research item; no calendar. |
|
||||
| 5 | **Operational nice-to-haves** — real-time wall-clock for the stress harness; chatd fixture-mode storage half (mock S3 for CI without MinIO); liberal-paraphrase calibration once real coordinator queries land. | When any of these block someone. |
|
||||
| 1 | **Periodic fresh→main index merge** — two-tier pattern works but `fresh_workers` grows monotonically. A scheduled job that re-ingests the fresh corpus into `workers` (with the v2-moe embedder) + clears fresh closes the loop. | When `fresh_workers` crosses ~500 items in production. |
|
||||
| 2 | **Distillation full port** — `57d0df1` shipped scorer + contamination firewall (E partial). SFT export pipeline + audit_baselines lineage still on the Rust side. | When distillation becomes a production dependency. |
|
||||
| 3 | **Drift quantification** — `be65f85` is "scorer drift first." Full distribution-drift signal is underspecified everywhere; this is research, not a port. | Open research item; no calendar. |
|
||||
| 4 | **Operational nice-to-haves** — real-time wall-clock for the stress harness; chatd fixture-mode storage half (mock S3 for CI without MinIO); liberal-paraphrase calibration once real coordinator queries land. | When any of these block someone. |
|
||||
|
||||
---
|
||||
|
||||
|
||||
19
deploy/etc-lakehouse/langfuse.env.example
Normal file
19
deploy/etc-lakehouse/langfuse.env.example
Normal file
@ -0,0 +1,19 @@
|
||||
# /etc/lakehouse/langfuse.env — Langfuse trace observability.
|
||||
#
|
||||
# Mode 0640, root:lakehouse. Loaded by every daemon's systemd unit
|
||||
# via EnvironmentFile=-/etc/lakehouse/langfuse.env. The `-` prefix
|
||||
# means "missing file is OK" — Langfuse is optional. If the file is
|
||||
# missing or any of the three env vars below is empty, every daemon
|
||||
# runs without Langfuse and with no logged warnings (best-effort).
|
||||
#
|
||||
# When all three are set, every authenticated HTTP request to every
|
||||
# daemon emits one Langfuse trace + span, browseable at LANGFUSE_URL.
|
||||
# /health is exempt to keep LB probes from polluting traces.
|
||||
#
|
||||
# Same env vars + same shape as the file the multi_coord_stress
|
||||
# driver reads — operators ship one langfuse.env across the whole
|
||||
# deploy.
|
||||
|
||||
LANGFUSE_URL=http://localhost:3001
|
||||
LANGFUSE_PUBLIC_KEY=pk-lf-staffing
|
||||
LANGFUSE_SECRET_KEY=sk-lf-staffing-secret
|
||||
@ -15,6 +15,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -19,6 +19,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
# chatd reads provider key files via paths in lakehouse.toml [chatd]
|
||||
# (ollama_cloud_key_file etc.) — each is its own EnvironmentFile so
|
||||
# operators can rotate one provider without restarting others.
|
||||
|
||||
@ -18,6 +18,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -17,6 +17,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -15,6 +15,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -15,6 +15,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -17,6 +17,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -14,6 +14,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -18,6 +18,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
@ -21,6 +21,7 @@ RestartSec=5
|
||||
# Empty AUTH_TOKEN is fine for loopback-only deploys (matches
|
||||
# requireAuthOnNonLoopback gate at startup).
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
# Hardening — minimum needed for the daemon to read its config
|
||||
# + write its log + open its bind port.
|
||||
|
||||
@ -17,6 +17,7 @@ Restart=on-failure
|
||||
RestartSec=5
|
||||
|
||||
EnvironmentFile=-/etc/lakehouse/auth.env
|
||||
EnvironmentFile=-/etc/lakehouse/langfuse.env
|
||||
|
||||
NoNewPrivileges=true
|
||||
ProtectSystem=strict
|
||||
|
||||
106
internal/shared/langfuse_middleware.go
Normal file
106
internal/shared/langfuse_middleware.go
Normal file
@ -0,0 +1,106 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse"
|
||||
)
|
||||
|
||||
// langfuseMiddleware emits one Langfuse trace per HTTP request, with
|
||||
// a single span carrying start/end timestamps + status code. Per
|
||||
// OPEN item #2 (closed by the wave that adds this file): production
|
||||
// traffic gets free trace visibility without per-handler wiring.
|
||||
//
|
||||
// nil client → returns a passthrough no-op middleware so callers
|
||||
// don't need a nil check in shared.Run. Same fail-open posture as
|
||||
// Langfuse's queue layer (per ADR-005 Decision 5.1: observability
|
||||
// is a witness, never a gate).
|
||||
//
|
||||
// /health bypasses tracing — operators don't want every LB probe
|
||||
// or monitor heartbeat polluting traces. Real traffic surfaces
|
||||
// only via the registered routes.
|
||||
func langfuseMiddleware(serviceName string, lf *langfuse.Client) func(http.Handler) http.Handler {
|
||||
if lf == nil {
|
||||
return func(next http.Handler) http.Handler { return next }
|
||||
}
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// /health bypasses tracing — same exemption logic as
|
||||
// the auth middleware (see RequireAuth).
|
||||
if r.URL.Path == "/health" {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
sw := &statusWriter{ResponseWriter: w, status: http.StatusOK}
|
||||
|
||||
traceID := lf.Trace(r.Context(), langfuse.TraceInput{
|
||||
Name: serviceName + " " + r.Method + " " + r.URL.Path,
|
||||
Tags: []string{serviceName, r.Method},
|
||||
Metadata: map[string]any{
|
||||
"path": r.URL.Path,
|
||||
"method": r.Method,
|
||||
"remote_addr": r.RemoteAddr,
|
||||
},
|
||||
})
|
||||
|
||||
next.ServeHTTP(sw, r)
|
||||
|
||||
level := ""
|
||||
if sw.status >= 500 {
|
||||
level = "ERROR"
|
||||
} else if sw.status >= 400 {
|
||||
level = "WARNING"
|
||||
}
|
||||
lf.Span(r.Context(), langfuse.SpanInput{
|
||||
TraceID: traceID,
|
||||
Name: "http.request",
|
||||
Input: map[string]any{
|
||||
"method": r.Method,
|
||||
"path": r.URL.Path,
|
||||
"remote_addr": r.RemoteAddr,
|
||||
},
|
||||
Output: map[string]any{
|
||||
"status": sw.status,
|
||||
"duration_ms": time.Since(start).Milliseconds(),
|
||||
},
|
||||
StartTime: start,
|
||||
EndTime: time.Now(),
|
||||
StatusCode: sw.status,
|
||||
Level: level,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// statusWriter is the standard "wrap http.ResponseWriter to capture
|
||||
// the status code" trick. WriteHeader is the only method that
|
||||
// changes status; any handler that doesn't call WriteHeader gets
|
||||
// the implicit 200 from our struct's default.
|
||||
type statusWriter struct {
|
||||
http.ResponseWriter
|
||||
status int
|
||||
}
|
||||
|
||||
func (sw *statusWriter) WriteHeader(code int) {
|
||||
sw.status = code
|
||||
sw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
// loadLangfuseFromEnv builds a langfuse.Client from environment
|
||||
// variables. Returns nil if any of LANGFUSE_URL / LANGFUSE_PUBLIC_KEY
|
||||
// / LANGFUSE_SECRET_KEY is unset (best-effort: missing config means
|
||||
// no tracing, never a startup error). Same env names as the bare
|
||||
// /etc/lakehouse/langfuse.env file used by the multi_coord_stress
|
||||
// driver — operators ship one env file across every daemon.
|
||||
func loadLangfuseFromEnv() *langfuse.Client {
|
||||
url := os.Getenv("LANGFUSE_URL")
|
||||
pk := os.Getenv("LANGFUSE_PUBLIC_KEY")
|
||||
sk := os.Getenv("LANGFUSE_SECRET_KEY")
|
||||
if url == "" || pk == "" || sk == "" {
|
||||
return nil
|
||||
}
|
||||
return langfuse.New(url, pk, sk, nil)
|
||||
}
|
||||
146
internal/shared/langfuse_middleware_test.go
Normal file
146
internal/shared/langfuse_middleware_test.go
Normal file
@ -0,0 +1,146 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse"
|
||||
)
|
||||
|
||||
// TestLangfuseMiddleware_NilClientPassthrough locks the
|
||||
// "no client → no-op" contract. Every daemon calls shared.Run;
|
||||
// operators who don't set LANGFUSE_URL must not see middleware
|
||||
// failures, latency, or behavior change of any kind.
|
||||
func TestLangfuseMiddleware_NilClientPassthrough(t *testing.T) {
|
||||
mw := langfuseMiddleware("test-service", nil)
|
||||
called := false
|
||||
h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
called = true
|
||||
w.WriteHeader(http.StatusTeapot) // distinctive code
|
||||
}))
|
||||
srv := httptest.NewServer(h)
|
||||
defer srv.Close()
|
||||
resp, err := http.Get(srv.URL + "/anything")
|
||||
if err != nil {
|
||||
t.Fatalf("GET: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusTeapot {
|
||||
t.Errorf("expected 418 (handler ran), got %d", resp.StatusCode)
|
||||
}
|
||||
if !called {
|
||||
t.Error("handler should have run via passthrough")
|
||||
}
|
||||
}
|
||||
|
||||
// TestLangfuseMiddleware_HealthBypassed locks the /health-exempt
|
||||
// rule (per langfuseMiddleware's doc comment): LB probes must not
|
||||
// emit traces or the trace volume drowns out real signal.
|
||||
func TestLangfuseMiddleware_HealthBypassed(t *testing.T) {
|
||||
var (
|
||||
mu sync.Mutex
|
||||
captured []string // ingestion endpoint payloads
|
||||
)
|
||||
// Mock Langfuse ingestion endpoint that records every batch.
|
||||
lfMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := io.ReadAll(r.Body)
|
||||
mu.Lock()
|
||||
captured = append(captured, string(body))
|
||||
mu.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer lfMock.Close()
|
||||
|
||||
lf := langfuse.New(lfMock.URL, "test-pk", "test-sk", nil)
|
||||
mw := langfuseMiddleware("test-service", lf)
|
||||
h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
srv := httptest.NewServer(h)
|
||||
defer srv.Close()
|
||||
|
||||
resp, err := http.Get(srv.URL + "/health")
|
||||
if err != nil {
|
||||
t.Fatalf("GET /health: %v", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
_ = lf.Close() // force flush
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if len(captured) > 0 {
|
||||
t.Errorf("expected zero ingestion calls for /health, got %d (%v)", len(captured), captured)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLangfuseMiddleware_RealRequestEmitted locks the happy path:
|
||||
// a real request through an authed route produces ingestion events
|
||||
// (trace + span). We don't decode the payload here — the
|
||||
// internal/langfuse client tests already verify the wire format.
|
||||
// What this test asserts is the wiring: middleware → client → POST.
|
||||
func TestLangfuseMiddleware_RealRequestEmitted(t *testing.T) {
|
||||
var (
|
||||
mu sync.Mutex
|
||||
captured int
|
||||
)
|
||||
lfMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.ReadAll(r.Body)
|
||||
mu.Lock()
|
||||
captured++
|
||||
mu.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer lfMock.Close()
|
||||
|
||||
lf := langfuse.New(lfMock.URL, "test-pk", "test-sk", nil)
|
||||
mw := langfuseMiddleware("test-service", lf)
|
||||
h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
srv := httptest.NewServer(h)
|
||||
defer srv.Close()
|
||||
|
||||
resp, err := http.Get(srv.URL + "/api/data")
|
||||
if err != nil {
|
||||
t.Fatalf("GET /api/data: %v", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
_ = lf.Close() // force flush — sends the queued trace + span
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if captured == 0 {
|
||||
t.Error("expected at least one ingestion call after real request")
|
||||
}
|
||||
}
|
||||
|
||||
// TestLangfuseMiddleware_StatusCaptured locks the status-writer
|
||||
// wrapping: when the handler returns 500, the middleware must see
|
||||
// 500 in the span output (otherwise error traces all show 200 and
|
||||
// debugging gets harder).
|
||||
func TestLangfuseMiddleware_StatusCaptured(t *testing.T) {
|
||||
mw := langfuseMiddleware("test-service", nil) // nil client; just exercise wrapping
|
||||
called := false
|
||||
h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
called = true
|
||||
http.Error(w, "boom", http.StatusInternalServerError)
|
||||
}))
|
||||
srv := httptest.NewServer(h)
|
||||
defer srv.Close()
|
||||
resp, err := http.Get(srv.URL + "/api/fail")
|
||||
if err != nil {
|
||||
t.Fatalf("GET: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d", resp.StatusCode)
|
||||
}
|
||||
if !called {
|
||||
t.Error("handler should have run")
|
||||
}
|
||||
}
|
||||
@ -75,6 +75,18 @@ func Run(serviceName, addr string, register RegisterRoutes, auth AuthConfig) err
|
||||
Level: slog.LevelInfo,
|
||||
}))
|
||||
|
||||
// Optional Langfuse client from LANGFUSE_URL + LANGFUSE_PUBLIC_KEY
|
||||
// + LANGFUSE_SECRET_KEY env vars. Per OPEN item #2: every daemon
|
||||
// gets free production-traffic trace visibility when those env
|
||||
// vars are set. Missing any of the three → nil client → middleware
|
||||
// becomes a passthrough.
|
||||
lf := loadLangfuseFromEnv()
|
||||
if lf != nil {
|
||||
// Make sure pending events flush on graceful shutdown so the
|
||||
// last few requests' traces don't get lost.
|
||||
defer func() { _ = lf.Close() }()
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Use(middleware.RequestID)
|
||||
r.Use(middleware.RealIP)
|
||||
@ -95,8 +107,12 @@ func Run(serviceName, addr string, register RegisterRoutes, auth AuthConfig) err
|
||||
// Registered routes live inside an auth-gated group so
|
||||
// RequireAuth applies uniformly without per-handler wiring.
|
||||
// Empty auth → middleware is a no-op (group is transparent).
|
||||
// Langfuse middleware sits AFTER auth so we don't trace 401s
|
||||
// from credential-stuffing attempts (avoids polluting traces
|
||||
// with non-real-traffic).
|
||||
r.Group(func(authed chi.Router) {
|
||||
authed.Use(RequireAuth(auth))
|
||||
authed.Use(langfuseMiddleware(serviceName, lf))
|
||||
register(authed)
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user