Adds langfuseMiddleware in internal/shared so every daemon's shared.Run gets free production-traffic trace visibility when LANGFUSE_URL + LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY are set. Same env names + file shape as the multi_coord_stress driver, so operators ship one /etc/lakehouse/langfuse.env across the deploy. Wiring is auth-gated: middleware runs INSIDE the RequireAuth group, so 401s from credential-stuffing don't pollute traces. /health is exempt so LB probes don't either. Missing env vars → nil client → middleware is a passthrough no-op (fail-open per ADR-005 5.1). Bundled deploy: - langfuse.env.example template (mode 0640, root:lakehouse) - 11 systemd units gain `EnvironmentFile=-/etc/lakehouse/langfuse.env` (leading - so missing file = OK) - REPLICATION.md bootstrap section documents setup Tests (4): nil passthrough, /health bypass, real-request emission, status-writer wrapping. All green. STATE_OF_PLAY OPEN list: 5 rows → 4 rows. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
203 lines
6.6 KiB
Go
203 lines
6.6 KiB
Go
// Package shared provides common HTTP server bootstrap for every
|
|
// Lakehouse-Go service. Each cmd/<service> calls Run with its name,
|
|
// bind address, and a route-registration callback. The factory wires
|
|
// chi, slog, /health, and graceful shutdown identically across all
|
|
// five binaries — the place where uniformity beats per-service
|
|
// flexibility.
|
|
//
|
|
// G1+ note: when queryd needs to drain a cgo DuckDB handle on
|
|
// shutdown, the simple shared factory will need a per-service hook
|
|
// (an io.Closer slice or an OnShutdown callback). For G0 a plain
|
|
// chi.Router + http.Server.Shutdown(ctx) is sufficient.
|
|
package shared
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/go-chi/chi/v5/middleware"
|
|
)
|
|
|
|
// HealthResponse is the JSON shape returned by /health on every
|
|
// service. Service-specific status hooks can extend it post-G0.
|
|
type HealthResponse struct {
|
|
Status string `json:"status"`
|
|
Service string `json:"service"`
|
|
}
|
|
|
|
// RegisterRoutes is the per-service callback that wires its own
|
|
// routes onto the shared router AFTER /health has been mounted.
|
|
type RegisterRoutes func(r chi.Router)
|
|
|
|
// Run boots a chi router with slog logging, the /health endpoint,
|
|
// and graceful-shutdown handling. Blocks until SIGINT/SIGTERM or a
|
|
// fatal listener error.
|
|
//
|
|
// The logger is constructed locally and used as the request-logging
|
|
// sink. Run does NOT mutate the global slog default — callers that
|
|
// want their own slog.Default() should set it before calling Run.
|
|
// (Per Kimi review #4: shared library functions shouldn't silently
|
|
// mutate package globals.)
|
|
//
|
|
// Three startup gates apply in order:
|
|
//
|
|
// 1. requireLoopbackOrOverride — refuses non-loopback bind unless
|
|
// LH_<SERVICE>_ALLOW_NONLOOPBACK=1 is set. Closes the accidental
|
|
// 0.0.0.0 deploy path for R-001.
|
|
// 2. requireAuthOnNonLoopback — refuses non-loopback bind when
|
|
// auth.token is empty. Mechanically prevents R-001 + R-007's
|
|
// worst case: world-reachable bind with no auth layer.
|
|
// 3. RequireAuth middleware — runs per-request on registered routes.
|
|
// /health stays exempt (mounted on the outer router, before the
|
|
// authed group).
|
|
//
|
|
// Per ADR-003: empty auth.token + empty allowed_ips → middleware is
|
|
// a no-op. Smokes and proof harness keep working without setting
|
|
// either.
|
|
func Run(serviceName, addr string, register RegisterRoutes, auth AuthConfig) error {
|
|
if err := requireLoopbackOrOverride(serviceName, addr); err != nil {
|
|
return err
|
|
}
|
|
if err := requireAuthOnNonLoopback(serviceName, addr, auth); err != nil {
|
|
return err
|
|
}
|
|
|
|
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
|
Level: slog.LevelInfo,
|
|
}))
|
|
|
|
// Optional Langfuse client from LANGFUSE_URL + LANGFUSE_PUBLIC_KEY
|
|
// + LANGFUSE_SECRET_KEY env vars. Per OPEN item #2: every daemon
|
|
// gets free production-traffic trace visibility when those env
|
|
// vars are set. Missing any of the three → nil client → middleware
|
|
// becomes a passthrough.
|
|
lf := loadLangfuseFromEnv()
|
|
if lf != nil {
|
|
// Make sure pending events flush on graceful shutdown so the
|
|
// last few requests' traces don't get lost.
|
|
defer func() { _ = lf.Close() }()
|
|
}
|
|
|
|
r := chi.NewRouter()
|
|
r.Use(middleware.RequestID)
|
|
r.Use(middleware.RealIP)
|
|
r.Use(middleware.Recoverer)
|
|
r.Use(slogRequest(logger))
|
|
|
|
// /health stays on the outer router — public, no auth. Operators
|
|
// rely on it for liveness probes that don't carry a token.
|
|
r.Get("/health", func(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(HealthResponse{
|
|
Status: "ok",
|
|
Service: serviceName,
|
|
})
|
|
})
|
|
|
|
if register != nil {
|
|
// Registered routes live inside an auth-gated group so
|
|
// RequireAuth applies uniformly without per-handler wiring.
|
|
// Empty auth → middleware is a no-op (group is transparent).
|
|
// Langfuse middleware sits AFTER auth so we don't trace 401s
|
|
// from credential-stuffing attempts (avoids polluting traces
|
|
// with non-real-traffic).
|
|
r.Group(func(authed chi.Router) {
|
|
authed.Use(RequireAuth(auth))
|
|
authed.Use(langfuseMiddleware(serviceName, lf))
|
|
register(authed)
|
|
})
|
|
}
|
|
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: r,
|
|
ReadHeaderTimeout: 10 * time.Second,
|
|
}
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
// Race-safe startup: bind the listener synchronously BEFORE
|
|
// returning so a fast bind error (e.g. port already in use) is
|
|
// surfaced as Run's return value rather than racing the select.
|
|
// Per Opus + Qwen BLOCK #1: the prior pattern could drop bind
|
|
// errors when ctx.Done already fired or a fast failure happened
|
|
// during select setup.
|
|
ln, err := newListener(srv.Addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
logger.Info("listening", "service", serviceName, "addr", addr)
|
|
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
errCh <- err
|
|
}
|
|
close(errCh)
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("shutdown signal received", "service", serviceName)
|
|
case err := <-errCh:
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Server exited cleanly without a signal (unlikely but possible).
|
|
return nil
|
|
}
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
if err := srv.Shutdown(shutdownCtx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Drain errCh so a late error from the listener goroutine
|
|
// surfaces as the return value instead of leaking. After Shutdown
|
|
// the channel will close on graceful exit; if a real error
|
|
// landed first we return it.
|
|
if err := <-errCh; err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// newListener binds the TCP listener up-front so bind errors are
|
|
// returned synchronously to Run's caller. Extracted into its own
|
|
// function for testability + to keep Run readable.
|
|
func newListener(addr string) (net.Listener, error) {
|
|
return net.Listen("tcp", addr)
|
|
}
|
|
|
|
// slogRequest returns a chi middleware that logs each request via slog.
|
|
// Replaces chi's default text logger so all log output stays JSON.
|
|
func slogRequest(logger *slog.Logger) func(http.Handler) http.Handler {
|
|
return func(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
start := time.Now()
|
|
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
|
|
defer func() {
|
|
logger.Info("http",
|
|
"method", r.Method,
|
|
"path", r.URL.Path,
|
|
"status", ww.Status(),
|
|
"dur_ms", time.Since(start).Milliseconds(),
|
|
"req_id", middleware.GetReqID(r.Context()),
|
|
)
|
|
}()
|
|
next.ServeHTTP(ww, r)
|
|
})
|
|
}
|
|
}
|