Implements the auth posture from ADR-003 (commit 0d18ffa). Two independent layers — Bearer token (constant-time compare via crypto/subtle) and IP allowlist (CIDR set) — composed in shared.Run so every binary inherits the same gate without per-binary wiring. Together with the bind-gate from commit 6af0520, this mechanically closes audit risks R-001 + R-007: - non-loopback bind without auth.token = startup refuse - non-loopback bind WITH auth.token + override env = allowed - loopback bind = all gates open (G0 dev unchanged) internal/shared/auth.go (NEW) RequireAuth(cfg AuthConfig) returns chi-compatible middleware. Empty Token + empty AllowedIPs → pass-through (G0 dev mode). Token-only → 401 Bearer mismatch. AllowedIPs-only → 403 source IP not in CIDR set. Both → both gates apply. /health bypasses both layers (load-balancer / liveness probes shouldn't carry tokens). CIDR parsing pre-runs at boot; bare IP (no /N) treated as /32 (or /128 for IPv6). Invalid entries log warn and drop, fail-loud-but- not-fatal so a typo doesn't kill the binary. Token comparison: subtle.ConstantTimeCompare on the full "Bearer <token>" wire-format string. Length-mismatch returns 0 (per stdlib spec), so wrong-length tokens reject without timing leak. Pre-encoded comparison slice stored in the middleware closure — one allocation per request. Source-IP extraction prefers net.SplitHostPort fallback to RemoteAddr-as-is for httptest compatibility. X-Forwarded-For support is a follow-up when a trusted proxy fronts the gateway (config knob TBD per ADR-003 §"Future"). internal/shared/server.go Run signature: gained AuthConfig parameter (4th arg). /health stays mounted on the outer router (public). Registered routes go inside chi.Group with RequireAuth applied — empty config = transparent group. Added requireAuthOnNonLoopback startup check: non-loopback bind with empty Token = refuse to start (cites R-001 + R-007 by name). internal/shared/config.go AuthConfig type added with TOML tags. Fields: Token, AllowedIPs. Composed into Config under [auth]. cmd/<svc>/main.go × 7 (catalogd, embedd, gateway, ingestd, queryd, storaged, vectord, mcpd is unaffected — stdio doesn't bind a port) Each call site adds cfg.Auth as the 4th arg to shared.Run. No other changes — middleware applies via shared.Run uniformly. internal/shared/auth_test.go (12 test funcs) Empty config pass-through, missing-token 401, wrong-token 401, correct-token 200, raw-token-without-Bearer-prefix 401, /health always public, IP allowlist allow + reject, bare IP /32, both layers when both configured, invalid CIDR drop-with-warn, RemoteAddr shape extraction. The constant-time comparison is verified by inspection (comments in auth.go) plus the existence of the passthrough test (length-mismatch case). Verified: go test -count=1 ./internal/shared/ — all green (was 21, now 33 funcs) just verify — vet + test + 9 smokes 33s just proof contract — 53/0/1 unchanged Smokes + proof harness keep working without any token configuration: default Auth is empty struct → middleware is no-op → existing tests pass unchanged. To exercise the gate, operators set [auth].token in lakehouse.toml (or, per the "future" note in the ADR, via env var). Closes audit findings: R-001 HIGH — fully mechanically closed (was: partial via bind gate) R-007 MED — fully mechanically closed (was: design-only ADR-003) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
309 lines
10 KiB
Go
309 lines
10 KiB
Go
// storaged is the object I/O service. D2 wires GET / PUT / LIST /
|
|
// DELETE routes against a single bucket ("primary") in the registry.
|
|
// Bind is 127.0.0.1 only (G0 dev — no auth on the wire); body cap is
|
|
// 256 MiB; concurrent in-flight PUTs are capped at 4 with non-blocking
|
|
// try-acquire (503 + Retry-After when full).
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storaged"
|
|
)
|
|
|
|
const (
|
|
// Default per-PUT body cap. Most callers (ingest parquets, catalog
|
|
// manifests) live well under this. The cap is a safety gate, not a
|
|
// memory bottleneck — manager.Uploader streams; this just refuses
|
|
// reads past the limit so a runaway client can't drain the daemon.
|
|
maxPutBytesDefault = 256 << 20 // 256 MiB
|
|
|
|
// Vector-persistence prefix gets a much larger cap because vectord
|
|
// persists single-file LHV1 indexes that exceed 256 MiB above
|
|
// ~150K vectors at d=768 (the 500K staffing test's documented gap).
|
|
// 4 GiB covers >700K vectors at d=768 with HNSW graph overhead and
|
|
// keeps the simple-Put torn-write guarantee from G1P intact (memory
|
|
// project_golang_lakehouse.md: "Single Put eliminates the torn-write
|
|
// class").
|
|
maxPutBytesVectors = 4 << 30 // 4 GiB
|
|
|
|
// Vector-persistence prefix matched against incoming PUT keys. Keep
|
|
// this in sync with internal/vectord/persistor.go's lhv1KeyFor.
|
|
vectorsPrefix = "_vectors/"
|
|
|
|
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
|
|
retryAfterSecs = "5" // Retry-After header on 503
|
|
primaryBucket = "primary"
|
|
)
|
|
|
|
// maxPutBytesFor returns the body-cap to apply to a PUT for the given
|
|
// key. Vectord LHV1 persistence (under "_vectors/") gets the larger
|
|
// cap; everything else stays at the default. Function-level so a
|
|
// future operator-driven cap (env, config) can drop in cleanly.
|
|
func maxPutBytesFor(key string) int64 {
|
|
if strings.HasPrefix(key, vectorsPrefix) {
|
|
return maxPutBytesVectors
|
|
}
|
|
return maxPutBytesDefault
|
|
}
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
|
secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)")
|
|
flag.Parse()
|
|
|
|
cfg, err := shared.LoadConfig(*configPath)
|
|
if err != nil {
|
|
slog.Error("config", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
registry, err := buildRegistry(cfg, *secretsPath)
|
|
if err != nil {
|
|
slog.Error("bucket registry", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
h := newHandlers(registry)
|
|
|
|
if err := shared.Run("storaged", cfg.Storaged.Bind, h.register, cfg.Auth); err != nil {
|
|
slog.Error("server", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// buildRegistry constructs the (single, G0) bucket registry. Multi-bucket
|
|
// federation lands in G2; the registry shape is in place so that arrives
|
|
// without an HTTP-layer refactor.
|
|
func buildRegistry(cfg shared.Config, secretsPath string) (*storaged.BucketRegistry, error) {
|
|
prov, err := secrets.NewFileProvider(secretsPath, secrets.S3Credentials{
|
|
AccessKeyID: cfg.S3.AccessKeyID,
|
|
SecretAccessKey: cfg.S3.SecretAccessKey,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Per Opus C1 review: don't tie the AWS config-load context to a
|
|
// canceller that fires when buildRegistry returns. With static creds
|
|
// it's fine today, but EC2 IMDS / SSO / AssumeRole credential
|
|
// providers (G2+) capture the load ctx for refresh — a cancelled
|
|
// ctx silently fails them at the next refresh. Use Background here;
|
|
// per-request lifetimes flow through r.Context() in handlers.
|
|
bucket, err := storaged.NewBucket(context.Background(), cfg.S3, prov, primaryBucket)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reg := storaged.NewRegistry()
|
|
if err := reg.Register(bucket); err != nil {
|
|
return nil, err
|
|
}
|
|
return reg, nil
|
|
}
|
|
|
|
// handlers carries the registry + the PUT semaphore. One instance
|
|
// per process; chi routes close over it.
|
|
type handlers struct {
|
|
reg *storaged.BucketRegistry
|
|
putSem chan struct{}
|
|
}
|
|
|
|
func newHandlers(reg *storaged.BucketRegistry) *handlers {
|
|
return &handlers{
|
|
reg: reg,
|
|
putSem: make(chan struct{}, maxConcurrentPut),
|
|
}
|
|
}
|
|
|
|
func (h *handlers) register(r chi.Router) {
|
|
// Verb-paths per PHASE_G0_KICKOFF D2.4 spec. REST-style refactor
|
|
// (GET/PUT/DELETE on a single /storage/{key}) deferred until G2.
|
|
r.Get("/storage/get/*", h.handleGet)
|
|
r.Put("/storage/put/*", h.handlePut)
|
|
r.Get("/storage/list", h.handleList)
|
|
r.Delete("/storage/delete/*", h.handleDelete)
|
|
}
|
|
|
|
// extractKey pulls the wildcard key out of a chi route. Routes are
|
|
// registered as `/storage/<verb>/*` so the wildcard captures the
|
|
// remainder, including embedded slashes.
|
|
func extractKey(r *http.Request) string {
|
|
// chi.URLParam with "*" returns everything after the route prefix.
|
|
return chi.URLParam(r, "*")
|
|
}
|
|
|
|
// validateKey rejects keys that would be unsafe to round-trip through
|
|
// the system. The exact policy is a G0 design choice — see the focused
|
|
// decision request in the D2 PR description (validateKey-policy).
|
|
func validateKey(key string) error {
|
|
if key == "" {
|
|
return errors.New("empty key")
|
|
}
|
|
if len(key) > 1024 {
|
|
return errors.New("key too long (>1024 bytes)")
|
|
}
|
|
if strings.ContainsRune(key, 0) {
|
|
return errors.New("key contains NUL byte")
|
|
}
|
|
if strings.HasPrefix(key, "/") {
|
|
return errors.New("key starts with /")
|
|
}
|
|
for _, part := range strings.Split(key, "/") {
|
|
if part == ".." {
|
|
return errors.New("key contains .. component")
|
|
}
|
|
}
|
|
if strings.ContainsAny(key, "\r\n\t") {
|
|
return errors.New("key contains control char")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *handlers) handleGet(w http.ResponseWriter, r *http.Request) {
|
|
key := extractKey(r)
|
|
if err := validateKey(key); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
bucket, err := h.reg.Resolve(primaryBucket)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
body, info, err := bucket.Get(r.Context(), key)
|
|
if errors.Is(err, storaged.ErrKeyNotFound) {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
if err != nil {
|
|
slog.Error("storage get", "key", key, "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer body.Close()
|
|
if info.ETag != "" {
|
|
w.Header().Set("ETag", info.ETag)
|
|
}
|
|
w.Header().Set("Content-Type", "application/octet-stream")
|
|
if _, err := io.Copy(w, body); err != nil {
|
|
slog.Warn("storage get copy", "key", key, "err", err)
|
|
}
|
|
}
|
|
|
|
func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
|
|
key := extractKey(r)
|
|
if err := validateKey(key); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Per-key body cap. Vectord LHV1 persistence under "_vectors/"
|
|
// gets a 4 GiB cap; everything else keeps 256 MiB. Up-front
|
|
// Content-Length check first per Opus C3 review (manager.Uploader's
|
|
// multipart path can bury *http.MaxBytesError in its own error
|
|
// types); MaxBytesReader + string-match fallback below cover
|
|
// chunked / lying-CL cases.
|
|
cap := maxPutBytesFor(key)
|
|
if r.ContentLength > cap {
|
|
w.Header().Set("Retry-After", retryAfterSecs)
|
|
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
|
|
return
|
|
}
|
|
|
|
// Non-blocking try-acquire: if the 4-slot semaphore is full, return
|
|
// 503 + Retry-After:5 instantly rather than holding the connection.
|
|
// Per PHASE_G0_KICKOFF D2.4: "PUTs blocked on the semaphore → 503
|
|
// with Retry-After: 5".
|
|
select {
|
|
case h.putSem <- struct{}{}:
|
|
defer func() { <-h.putSem }()
|
|
default:
|
|
w.Header().Set("Retry-After", retryAfterSecs)
|
|
http.Error(w, "storaged: put concurrency cap reached", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
// Per-key body cap as MaxBytesReader so chunked-encoding bodies
|
|
// also fail-loud at the limit. Defer LIFO order: r.Body.Close
|
|
// fires before <-h.putSem, so the body is fully drained before
|
|
// the slot frees.
|
|
r.Body = http.MaxBytesReader(w, r.Body, cap)
|
|
defer r.Body.Close()
|
|
|
|
bucket, err := h.reg.Resolve(primaryBucket)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if err := bucket.Put(r.Context(), key, r.Body); err != nil {
|
|
// Two-layer detect: errors.As catches the typed error when it
|
|
// survives unwrapping; the string-suffix check catches cases
|
|
// where manager.Uploader's multipart path wraps the body-read
|
|
// failure in its own aggregate type.
|
|
var maxErr *http.MaxBytesError
|
|
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
|
w.Header().Set("Retry-After", retryAfterSecs)
|
|
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
|
|
return
|
|
}
|
|
slog.Error("storage put", "key", key, "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"ok"}`))
|
|
}
|
|
|
|
func (h *handlers) handleList(w http.ResponseWriter, r *http.Request) {
|
|
prefix := r.URL.Query().Get("prefix")
|
|
bucket, err := h.reg.Resolve(primaryBucket)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
objs, err := bucket.List(r.Context(), prefix)
|
|
if err != nil {
|
|
slog.Error("storage list", "prefix", prefix, "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
|
"prefix": prefix,
|
|
"objects": objs,
|
|
})
|
|
}
|
|
|
|
func (h *handlers) handleDelete(w http.ResponseWriter, r *http.Request) {
|
|
key := extractKey(r)
|
|
if err := validateKey(key); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
bucket, err := h.reg.Resolve(primaryBucket)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if err := bucket.Delete(r.Context(), key); err != nil {
|
|
slog.Error("storage delete", "key", key, "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|