From 68d9e554b0ee24999f467d9fddd7fdc6142309c3 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 30 Apr 2026 19:55:42 -0500 Subject: [PATCH] =?UTF-8?q?shared:=20auto-emit=20Langfuse=20trace+span=20p?= =?UTF-8?q?er=20HTTP=20request=20=E2=80=94=20closes=20OPEN=20#2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds langfuseMiddleware in internal/shared so every daemon's shared.Run gets free production-traffic trace visibility when LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY are set. Same env names + file shape as the multi_coord_stress driver, so operators ship one /etc/lakehouse/langfuse.env across the deploy. Wiring is auth-gated: middleware runs INSIDE the RequireAuth group, so 401s from credential-stuffing don't pollute traces. /health is exempt so LB probes don't either. Missing env vars → nil client → middleware is a passthrough no-op (fail-open per ADR-005 5.1). Bundled deploy: - langfuse.env.example template (mode 0640, root:lakehouse) - 11 systemd units gain `EnvironmentFile=-/etc/lakehouse/langfuse.env` (leading - so missing file = OK) - REPLICATION.md bootstrap section documents setup Tests (4): nil passthrough, /health bypass, real-request emission, status-writer wrapping. All green. STATE_OF_PLAY OPEN list: 5 rows → 4 rows. Co-Authored-By: Claude Opus 4.7 (1M context) --- REPLICATION.md | 9 ++ STATE_OF_PLAY.md | 9 +- deploy/etc-lakehouse/langfuse.env.example | 19 +++ deploy/systemd/lakehouse-catalogd.service | 1 + deploy/systemd/lakehouse-chatd.service | 1 + deploy/systemd/lakehouse-embedd.service | 1 + deploy/systemd/lakehouse-gateway.service | 1 + deploy/systemd/lakehouse-ingestd.service | 1 + deploy/systemd/lakehouse-matrixd.service | 1 + deploy/systemd/lakehouse-observerd.service | 1 + deploy/systemd/lakehouse-pathwayd.service | 1 + deploy/systemd/lakehouse-queryd.service | 1 + deploy/systemd/lakehouse-storaged.service | 1 + deploy/systemd/lakehouse-vectord.service | 1 + internal/shared/langfuse_middleware.go | 106 ++++++++++++++ internal/shared/langfuse_middleware_test.go | 146 ++++++++++++++++++++ internal/shared/server.go | 16 +++ 17 files changed, 311 insertions(+), 5 deletions(-) create mode 100644 deploy/etc-lakehouse/langfuse.env.example create mode 100644 internal/shared/langfuse_middleware.go create mode 100644 internal/shared/langfuse_middleware_test.go diff --git a/REPLICATION.md b/REPLICATION.md index 5432190..a7f26e4 100644 --- a/REPLICATION.md +++ b/REPLICATION.md @@ -79,6 +79,15 @@ sudo chmod 0640 /etc/lakehouse/auth.env # AUTH_TOKEN= sudo $EDITOR /etc/lakehouse/auth.env +# Optional: Langfuse traces. When set, every authenticated HTTP +# request to every daemon emits a trace + span (production +# observability per OPEN item #2 closure). Missing file = no +# traces, no warnings. +sudo cp deploy/etc-lakehouse/langfuse.env.example /etc/lakehouse/langfuse.env +sudo chown root:lakehouse /etc/lakehouse/langfuse.env +sudo chmod 0640 /etc/lakehouse/langfuse.env +sudo $EDITOR /etc/lakehouse/langfuse.env # set URL + PUBLIC_KEY + SECRET_KEY + # Optional: chatd cloud provider keys, one file per provider # (each is its own EnvironmentFile so rotations don't restart all chatd) for provider in ollama_cloud openrouter opencode kimi; do diff --git a/STATE_OF_PLAY.md b/STATE_OF_PLAY.md index 76dc794..01359f3 100644 --- a/STATE_OF_PLAY.md +++ b/STATE_OF_PLAY.md @@ -220,11 +220,10 @@ The list is intentionally short. Items move to closed when the work demands them | # | Item | When to act | |---|---|---| -| 1 | **Wider Langfuse instrumentation across daemons** — `internal/langfuse/middleware.go` that auto-emits one span per HTTP request from every daemon's `shared.Run`. Production traffic gets free trace visibility without per-handler wiring. | When production traffic actually starts hitting the gateway. | -| 2 | **Periodic fresh→main index merge** — two-tier pattern works but `fresh_workers` grows monotonically. A scheduled job that re-ingests the fresh corpus into `workers` (with the v2-moe embedder) + clears fresh closes the loop. | When `fresh_workers` crosses ~500 items in production. | -| 3 | **Distillation full port** — `57d0df1` shipped scorer + contamination firewall (E partial). SFT export pipeline + audit_baselines lineage still on the Rust side. | When distillation becomes a production dependency. | -| 4 | **Drift quantification** — `be65f85` is "scorer drift first." Full distribution-drift signal is underspecified everywhere; this is research, not a port. | Open research item; no calendar. | -| 5 | **Operational nice-to-haves** — real-time wall-clock for the stress harness; chatd fixture-mode storage half (mock S3 for CI without MinIO); liberal-paraphrase calibration once real coordinator queries land. | When any of these block someone. | +| 1 | **Periodic fresh→main index merge** — two-tier pattern works but `fresh_workers` grows monotonically. A scheduled job that re-ingests the fresh corpus into `workers` (with the v2-moe embedder) + clears fresh closes the loop. | When `fresh_workers` crosses ~500 items in production. | +| 2 | **Distillation full port** — `57d0df1` shipped scorer + contamination firewall (E partial). SFT export pipeline + audit_baselines lineage still on the Rust side. | When distillation becomes a production dependency. | +| 3 | **Drift quantification** — `be65f85` is "scorer drift first." Full distribution-drift signal is underspecified everywhere; this is research, not a port. | Open research item; no calendar. | +| 4 | **Operational nice-to-haves** — real-time wall-clock for the stress harness; chatd fixture-mode storage half (mock S3 for CI without MinIO); liberal-paraphrase calibration once real coordinator queries land. | When any of these block someone. | --- diff --git a/deploy/etc-lakehouse/langfuse.env.example b/deploy/etc-lakehouse/langfuse.env.example new file mode 100644 index 0000000..f99bf6d --- /dev/null +++ b/deploy/etc-lakehouse/langfuse.env.example @@ -0,0 +1,19 @@ +# /etc/lakehouse/langfuse.env — Langfuse trace observability. +# +# Mode 0640, root:lakehouse. Loaded by every daemon's systemd unit +# via EnvironmentFile=-/etc/lakehouse/langfuse.env. The `-` prefix +# means "missing file is OK" — Langfuse is optional. If the file is +# missing or any of the three env vars below is empty, every daemon +# runs without Langfuse and with no logged warnings (best-effort). +# +# When all three are set, every authenticated HTTP request to every +# daemon emits one Langfuse trace + span, browseable at LANGFUSE_URL. +# /health is exempt to keep LB probes from polluting traces. +# +# Same env vars + same shape as the file the multi_coord_stress +# driver reads — operators ship one langfuse.env across the whole +# deploy. + +LANGFUSE_URL=http://localhost:3001 +LANGFUSE_PUBLIC_KEY=pk-lf-staffing +LANGFUSE_SECRET_KEY=sk-lf-staffing-secret diff --git a/deploy/systemd/lakehouse-catalogd.service b/deploy/systemd/lakehouse-catalogd.service index 7deba9f..c196835 100644 --- a/deploy/systemd/lakehouse-catalogd.service +++ b/deploy/systemd/lakehouse-catalogd.service @@ -15,6 +15,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-chatd.service b/deploy/systemd/lakehouse-chatd.service index 142774e..eb30e6f 100644 --- a/deploy/systemd/lakehouse-chatd.service +++ b/deploy/systemd/lakehouse-chatd.service @@ -19,6 +19,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env # chatd reads provider key files via paths in lakehouse.toml [chatd] # (ollama_cloud_key_file etc.) — each is its own EnvironmentFile so # operators can rotate one provider without restarting others. diff --git a/deploy/systemd/lakehouse-embedd.service b/deploy/systemd/lakehouse-embedd.service index 8d917c5..03c34e6 100644 --- a/deploy/systemd/lakehouse-embedd.service +++ b/deploy/systemd/lakehouse-embedd.service @@ -18,6 +18,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-gateway.service b/deploy/systemd/lakehouse-gateway.service index 63c17ea..c3782f2 100644 --- a/deploy/systemd/lakehouse-gateway.service +++ b/deploy/systemd/lakehouse-gateway.service @@ -17,6 +17,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-ingestd.service b/deploy/systemd/lakehouse-ingestd.service index d5323ae..2ec083f 100644 --- a/deploy/systemd/lakehouse-ingestd.service +++ b/deploy/systemd/lakehouse-ingestd.service @@ -15,6 +15,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-matrixd.service b/deploy/systemd/lakehouse-matrixd.service index 9b868e3..9523a2a 100644 --- a/deploy/systemd/lakehouse-matrixd.service +++ b/deploy/systemd/lakehouse-matrixd.service @@ -15,6 +15,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-observerd.service b/deploy/systemd/lakehouse-observerd.service index fe01fd6..8aa2813 100644 --- a/deploy/systemd/lakehouse-observerd.service +++ b/deploy/systemd/lakehouse-observerd.service @@ -17,6 +17,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-pathwayd.service b/deploy/systemd/lakehouse-pathwayd.service index ee9a0e4..4792a9f 100644 --- a/deploy/systemd/lakehouse-pathwayd.service +++ b/deploy/systemd/lakehouse-pathwayd.service @@ -14,6 +14,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-queryd.service b/deploy/systemd/lakehouse-queryd.service index 9e62822..d06c4f6 100644 --- a/deploy/systemd/lakehouse-queryd.service +++ b/deploy/systemd/lakehouse-queryd.service @@ -18,6 +18,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/deploy/systemd/lakehouse-storaged.service b/deploy/systemd/lakehouse-storaged.service index 2d283d4..5a2e285 100644 --- a/deploy/systemd/lakehouse-storaged.service +++ b/deploy/systemd/lakehouse-storaged.service @@ -21,6 +21,7 @@ RestartSec=5 # Empty AUTH_TOKEN is fine for loopback-only deploys (matches # requireAuthOnNonLoopback gate at startup). EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env # Hardening — minimum needed for the daemon to read its config # + write its log + open its bind port. diff --git a/deploy/systemd/lakehouse-vectord.service b/deploy/systemd/lakehouse-vectord.service index 2bda7de..76a109f 100644 --- a/deploy/systemd/lakehouse-vectord.service +++ b/deploy/systemd/lakehouse-vectord.service @@ -17,6 +17,7 @@ Restart=on-failure RestartSec=5 EnvironmentFile=-/etc/lakehouse/auth.env +EnvironmentFile=-/etc/lakehouse/langfuse.env NoNewPrivileges=true ProtectSystem=strict diff --git a/internal/shared/langfuse_middleware.go b/internal/shared/langfuse_middleware.go new file mode 100644 index 0000000..93d0b8b --- /dev/null +++ b/internal/shared/langfuse_middleware.go @@ -0,0 +1,106 @@ +package shared + +import ( + "net/http" + "os" + "time" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse" +) + +// langfuseMiddleware emits one Langfuse trace per HTTP request, with +// a single span carrying start/end timestamps + status code. Per +// OPEN item #2 (closed by the wave that adds this file): production +// traffic gets free trace visibility without per-handler wiring. +// +// nil client → returns a passthrough no-op middleware so callers +// don't need a nil check in shared.Run. Same fail-open posture as +// Langfuse's queue layer (per ADR-005 Decision 5.1: observability +// is a witness, never a gate). +// +// /health bypasses tracing — operators don't want every LB probe +// or monitor heartbeat polluting traces. Real traffic surfaces +// only via the registered routes. +func langfuseMiddleware(serviceName string, lf *langfuse.Client) func(http.Handler) http.Handler { + if lf == nil { + return func(next http.Handler) http.Handler { return next } + } + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // /health bypasses tracing — same exemption logic as + // the auth middleware (see RequireAuth). + if r.URL.Path == "/health" { + next.ServeHTTP(w, r) + return + } + start := time.Now() + sw := &statusWriter{ResponseWriter: w, status: http.StatusOK} + + traceID := lf.Trace(r.Context(), langfuse.TraceInput{ + Name: serviceName + " " + r.Method + " " + r.URL.Path, + Tags: []string{serviceName, r.Method}, + Metadata: map[string]any{ + "path": r.URL.Path, + "method": r.Method, + "remote_addr": r.RemoteAddr, + }, + }) + + next.ServeHTTP(sw, r) + + level := "" + if sw.status >= 500 { + level = "ERROR" + } else if sw.status >= 400 { + level = "WARNING" + } + lf.Span(r.Context(), langfuse.SpanInput{ + TraceID: traceID, + Name: "http.request", + Input: map[string]any{ + "method": r.Method, + "path": r.URL.Path, + "remote_addr": r.RemoteAddr, + }, + Output: map[string]any{ + "status": sw.status, + "duration_ms": time.Since(start).Milliseconds(), + }, + StartTime: start, + EndTime: time.Now(), + StatusCode: sw.status, + Level: level, + }) + }) + } +} + +// statusWriter is the standard "wrap http.ResponseWriter to capture +// the status code" trick. WriteHeader is the only method that +// changes status; any handler that doesn't call WriteHeader gets +// the implicit 200 from our struct's default. +type statusWriter struct { + http.ResponseWriter + status int +} + +func (sw *statusWriter) WriteHeader(code int) { + sw.status = code + sw.ResponseWriter.WriteHeader(code) +} + +// loadLangfuseFromEnv builds a langfuse.Client from environment +// variables. Returns nil if any of LANGFUSE_URL / LANGFUSE_PUBLIC_KEY +// / LANGFUSE_SECRET_KEY is unset (best-effort: missing config means +// no tracing, never a startup error). Same env names as the bare +// /etc/lakehouse/langfuse.env file used by the multi_coord_stress +// driver — operators ship one env file across every daemon. +func loadLangfuseFromEnv() *langfuse.Client { + url := os.Getenv("LANGFUSE_URL") + pk := os.Getenv("LANGFUSE_PUBLIC_KEY") + sk := os.Getenv("LANGFUSE_SECRET_KEY") + if url == "" || pk == "" || sk == "" { + return nil + } + return langfuse.New(url, pk, sk, nil) +} diff --git a/internal/shared/langfuse_middleware_test.go b/internal/shared/langfuse_middleware_test.go new file mode 100644 index 0000000..71af67c --- /dev/null +++ b/internal/shared/langfuse_middleware_test.go @@ -0,0 +1,146 @@ +package shared + +import ( + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/langfuse" +) + +// TestLangfuseMiddleware_NilClientPassthrough locks the +// "no client → no-op" contract. Every daemon calls shared.Run; +// operators who don't set LANGFUSE_URL must not see middleware +// failures, latency, or behavior change of any kind. +func TestLangfuseMiddleware_NilClientPassthrough(t *testing.T) { + mw := langfuseMiddleware("test-service", nil) + called := false + h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + called = true + w.WriteHeader(http.StatusTeapot) // distinctive code + })) + srv := httptest.NewServer(h) + defer srv.Close() + resp, err := http.Get(srv.URL + "/anything") + if err != nil { + t.Fatalf("GET: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusTeapot { + t.Errorf("expected 418 (handler ran), got %d", resp.StatusCode) + } + if !called { + t.Error("handler should have run via passthrough") + } +} + +// TestLangfuseMiddleware_HealthBypassed locks the /health-exempt +// rule (per langfuseMiddleware's doc comment): LB probes must not +// emit traces or the trace volume drowns out real signal. +func TestLangfuseMiddleware_HealthBypassed(t *testing.T) { + var ( + mu sync.Mutex + captured []string // ingestion endpoint payloads + ) + // Mock Langfuse ingestion endpoint that records every batch. + lfMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + mu.Lock() + captured = append(captured, string(body)) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer lfMock.Close() + + lf := langfuse.New(lfMock.URL, "test-pk", "test-sk", nil) + mw := langfuseMiddleware("test-service", lf) + h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + srv := httptest.NewServer(h) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/health") + if err != nil { + t.Fatalf("GET /health: %v", err) + } + resp.Body.Close() + + _ = lf.Close() // force flush + + mu.Lock() + defer mu.Unlock() + if len(captured) > 0 { + t.Errorf("expected zero ingestion calls for /health, got %d (%v)", len(captured), captured) + } +} + +// TestLangfuseMiddleware_RealRequestEmitted locks the happy path: +// a real request through an authed route produces ingestion events +// (trace + span). We don't decode the payload here — the +// internal/langfuse client tests already verify the wire format. +// What this test asserts is the wiring: middleware → client → POST. +func TestLangfuseMiddleware_RealRequestEmitted(t *testing.T) { + var ( + mu sync.Mutex + captured int + ) + lfMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(r.Body) + mu.Lock() + captured++ + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer lfMock.Close() + + lf := langfuse.New(lfMock.URL, "test-pk", "test-sk", nil) + mw := langfuseMiddleware("test-service", lf) + h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + srv := httptest.NewServer(h) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/api/data") + if err != nil { + t.Fatalf("GET /api/data: %v", err) + } + resp.Body.Close() + + _ = lf.Close() // force flush — sends the queued trace + span + + mu.Lock() + defer mu.Unlock() + if captured == 0 { + t.Error("expected at least one ingestion call after real request") + } +} + +// TestLangfuseMiddleware_StatusCaptured locks the status-writer +// wrapping: when the handler returns 500, the middleware must see +// 500 in the span output (otherwise error traces all show 200 and +// debugging gets harder). +func TestLangfuseMiddleware_StatusCaptured(t *testing.T) { + mw := langfuseMiddleware("test-service", nil) // nil client; just exercise wrapping + called := false + h := mw(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + called = true + http.Error(w, "boom", http.StatusInternalServerError) + })) + srv := httptest.NewServer(h) + defer srv.Close() + resp, err := http.Get(srv.URL + "/api/fail") + if err != nil { + t.Fatalf("GET: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusInternalServerError { + t.Errorf("expected 500, got %d", resp.StatusCode) + } + if !called { + t.Error("handler should have run") + } +} diff --git a/internal/shared/server.go b/internal/shared/server.go index edbcf46..9bdff23 100644 --- a/internal/shared/server.go +++ b/internal/shared/server.go @@ -75,6 +75,18 @@ func Run(serviceName, addr string, register RegisterRoutes, auth AuthConfig) err Level: slog.LevelInfo, })) + // Optional Langfuse client from LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + // + LANGFUSE_SECRET_KEY env vars. Per OPEN item #2: every daemon + // gets free production-traffic trace visibility when those env + // vars are set. Missing any of the three → nil client → middleware + // becomes a passthrough. + lf := loadLangfuseFromEnv() + if lf != nil { + // Make sure pending events flush on graceful shutdown so the + // last few requests' traces don't get lost. + defer func() { _ = lf.Close() }() + } + r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.RealIP) @@ -95,8 +107,12 @@ func Run(serviceName, addr string, register RegisterRoutes, auth AuthConfig) err // Registered routes live inside an auth-gated group so // RequireAuth applies uniformly without per-handler wiring. // Empty auth → middleware is a no-op (group is transparent). + // Langfuse middleware sits AFTER auth so we don't trace 401s + // from credential-stuffing attempts (avoids polluting traces + // with non-real-traffic). r.Group(func(authed chi.Router) { authed.Use(RequireAuth(auth)) + authed.Use(langfuseMiddleware(serviceName, lf)) register(authed) }) }