Compare commits
2 Commits
125e1c80b9
...
423a3817c5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
423a3817c5 | ||
|
|
6af0520ed2 |
@ -24,12 +24,41 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxPutBytes = 256 << 20 // 256 MiB per Qwen Q1 fix
|
||||
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
|
||||
retryAfterSecs = "5" // Retry-After header on 503
|
||||
// Default per-PUT body cap. Most callers (ingest parquets, catalog
|
||||
// manifests) live well under this. The cap is a safety gate, not a
|
||||
// memory bottleneck — manager.Uploader streams; this just refuses
|
||||
// reads past the limit so a runaway client can't drain the daemon.
|
||||
maxPutBytesDefault = 256 << 20 // 256 MiB
|
||||
|
||||
// Vector-persistence prefix gets a much larger cap because vectord
|
||||
// persists single-file LHV1 indexes that exceed 256 MiB above
|
||||
// ~150K vectors at d=768 (the 500K staffing test's documented gap).
|
||||
// 4 GiB covers >700K vectors at d=768 with HNSW graph overhead and
|
||||
// keeps the simple-Put torn-write guarantee from G1P intact (memory
|
||||
// project_golang_lakehouse.md: "Single Put eliminates the torn-write
|
||||
// class").
|
||||
maxPutBytesVectors = 4 << 30 // 4 GiB
|
||||
|
||||
// Vector-persistence prefix matched against incoming PUT keys. Keep
|
||||
// this in sync with internal/vectord/persistor.go's lhv1KeyFor.
|
||||
vectorsPrefix = "_vectors/"
|
||||
|
||||
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
|
||||
retryAfterSecs = "5" // Retry-After header on 503
|
||||
primaryBucket = "primary"
|
||||
)
|
||||
|
||||
// maxPutBytesFor returns the body-cap to apply to a PUT for the given
|
||||
// key. Vectord LHV1 persistence (under "_vectors/") gets the larger
|
||||
// cap; everything else stays at the default. Function-level so a
|
||||
// future operator-driven cap (env, config) can drop in cleanly.
|
||||
func maxPutBytesFor(key string) int64 {
|
||||
if strings.HasPrefix(key, vectorsPrefix) {
|
||||
return maxPutBytesVectors
|
||||
}
|
||||
return maxPutBytesDefault
|
||||
}
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
||||
secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)")
|
||||
@ -181,15 +210,14 @@ func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Up-front Content-Length cap. Per Opus C3 review: the
|
||||
// manager.Uploader's multipart path runs body reads in goroutines
|
||||
// and wraps errors in its own types, so *http.MaxBytesError can be
|
||||
// buried by the time it reaches us — meaning bodies just over the
|
||||
// 5 MiB multipart threshold could surface as 500 instead of 413.
|
||||
// Catching Content-Length up front returns 413 deterministically
|
||||
// when the client honestly declares size; MaxBytesReader + the
|
||||
// string-match fallback below cover chunked / lying-CL cases.
|
||||
if r.ContentLength > maxPutBytes {
|
||||
// Per-key body cap. Vectord LHV1 persistence under "_vectors/"
|
||||
// gets a 4 GiB cap; everything else keeps 256 MiB. Up-front
|
||||
// Content-Length check first per Opus C3 review (manager.Uploader's
|
||||
// multipart path can bury *http.MaxBytesError in its own error
|
||||
// types); MaxBytesReader + string-match fallback below cover
|
||||
// chunked / lying-CL cases.
|
||||
cap := maxPutBytesFor(key)
|
||||
if r.ContentLength > cap {
|
||||
w.Header().Set("Retry-After", retryAfterSecs)
|
||||
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
@ -208,11 +236,11 @@ func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// 256 MiB per-request body cap. Reads beyond this surface as
|
||||
// *http.MaxBytesError; for chunked-encoding bodies that's the only
|
||||
// signal we get. Defer LIFO order: r.Body.Close fires before
|
||||
// <-h.putSem, so the body is fully drained before the slot frees.
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxPutBytes)
|
||||
// Per-key body cap as MaxBytesReader so chunked-encoding bodies
|
||||
// also fail-loud at the limit. Defer LIFO order: r.Body.Close
|
||||
// fires before <-h.putSem, so the body is fully drained before
|
||||
// the slot frees.
|
||||
r.Body = http.MaxBytesReader(w, r.Body, cap)
|
||||
defer r.Body.Close()
|
||||
|
||||
bucket, err := h.reg.Resolve(primaryBucket)
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
package main
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord"
|
||||
)
|
||||
|
||||
func TestValidateKey(t *testing.T) {
|
||||
cases := []struct {
|
||||
@ -36,3 +40,57 @@ func TestValidateKey(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxPutBytesFor(t *testing.T) {
|
||||
// Vectord LHV1 persistence gets the larger cap so single-file
|
||||
// indexes above ~150K vectors at d=768 (the 500K staffing test
|
||||
// gap) can survive a Save without 413. Default cap stays at
|
||||
// 256 MiB for everything else (parquets, manifests, etc.).
|
||||
cases := []struct {
|
||||
name string
|
||||
key string
|
||||
want int64
|
||||
}{
|
||||
{"vectord LHV1 file", "_vectors/workers_500k.lhv1", maxPutBytesVectors},
|
||||
{"vectord nested", "_vectors/some/nested/index.lhv1", maxPutBytesVectors},
|
||||
{"parquet under datasets/", "datasets/workers/abc.parquet", maxPutBytesDefault},
|
||||
{"catalog manifest", "_catalog/manifests/foo.bin", maxPutBytesDefault},
|
||||
{"plain key", "x.txt", maxPutBytesDefault},
|
||||
{"empty key", "", maxPutBytesDefault},
|
||||
{"key with vectors-ish substring (not prefix)", "datasets/_vectors/x", maxPutBytesDefault},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := maxPutBytesFor(tc.key)
|
||||
if got != tc.want {
|
||||
t.Errorf("maxPutBytesFor(%q) = %d, want %d", tc.key, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestVectorPrefixSyncWithVectord locks the prefix value to vectord's
|
||||
// VectorPrefix constant. If a future refactor renames either side,
|
||||
// this test surfaces the drift before vectord saves silently bypass
|
||||
// the larger cap.
|
||||
func TestVectorPrefixSyncWithVectord(t *testing.T) {
|
||||
if vectorsPrefix != vectord.VectorPrefix {
|
||||
t.Errorf("storaged vectorsPrefix=%q out of sync with vectord.VectorPrefix=%q",
|
||||
vectorsPrefix, vectord.VectorPrefix)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVectorCapAccommodates500KStaffingTest reads the documented gap
|
||||
// (memory project_golang_lakehouse.md): "storaged 256 MiB PUT cap
|
||||
// blocks single-file LHV1 persistence above ~150K vectors at d=768."
|
||||
// 500K vectors at d=768 with HNSW graph overhead is approximately
|
||||
// 4.5 GiB resident, ~700 MiB on disk after compression.
|
||||
// 4 GiB cap covers more than the documented production workload.
|
||||
func TestVectorCapAccommodates500KStaffingTest(t *testing.T) {
|
||||
const fiveHundredKVectorsAtD768Disk = 700 << 20 // ~700 MiB conservative estimate
|
||||
if maxPutBytesVectors < fiveHundredKVectorsAtD768Disk {
|
||||
t.Errorf("maxPutBytesVectors=%d (%.0f MiB) below documented production "+
|
||||
"workload of ~700 MiB for 500K vectors at d=768",
|
||||
maxPutBytesVectors, float64(maxPutBytesVectors)/(1<<20))
|
||||
}
|
||||
}
|
||||
|
||||
75
internal/shared/bind.go
Normal file
75
internal/shared/bind.go
Normal file
@ -0,0 +1,75 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// requireLoopbackOrOverride enforces that the bind address is on the
|
||||
// loopback interface unless an explicit env override is set. Closes
|
||||
// the worst case of audit risk R-001 (queryd /sql + DuckDB + non-
|
||||
// loopback bind = RCE-equivalent for anyone who can reach the port)
|
||||
// without committing to an auth model.
|
||||
//
|
||||
// Override env: LH_<UPPER(serviceName)>_ALLOW_NONLOOPBACK=1.
|
||||
// When the override fires, we log a structured warn so the choice is
|
||||
// auditable in production logs.
|
||||
//
|
||||
// Cases that pass:
|
||||
// - 127.0.0.1, 127.x.y.z (the /8), [::1], localhost
|
||||
// - explicit-override env set to "1"
|
||||
//
|
||||
// Cases that fail-loud:
|
||||
// - 0.0.0.0, [::], any non-loopback IP
|
||||
// - empty host ":port" (listens on all interfaces)
|
||||
// - unparseable addr
|
||||
//
|
||||
// The function is also useful as a unit-testable predicate; callers
|
||||
// that want to gate something other than Run can call it directly.
|
||||
func requireLoopbackOrOverride(serviceName, addr string) error {
|
||||
if isLoopbackAddr(addr) {
|
||||
return nil
|
||||
}
|
||||
envKey := "LH_" + strings.ToUpper(serviceName) + "_ALLOW_NONLOOPBACK"
|
||||
if os.Getenv(envKey) == "1" {
|
||||
slog.Warn("non-loopback bind allowed by env override",
|
||||
"service", serviceName,
|
||||
"addr", addr,
|
||||
"env", envKey,
|
||||
"hint", "audit risk R-001 — see reports/scrum/risk-register.md")
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("refusing non-loopback bind %q for %q "+
|
||||
"(set %s=1 to override; see audit R-001)", addr, serviceName, envKey)
|
||||
}
|
||||
|
||||
// isLoopbackAddr returns true iff addr's host portion is on the
|
||||
// loopback interface. Covers IPv4 127.0.0.0/8, IPv6 ::1, and
|
||||
// "localhost". Empty host (":port"), empty string, and any
|
||||
// non-parseable addr return false.
|
||||
func isLoopbackAddr(addr string) bool {
|
||||
host, _, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
// Unparseable shape — could be a bare hostname or wholly
|
||||
// malformed. Rejecting protects against future changes that
|
||||
// silently accept new shapes.
|
||||
return false
|
||||
}
|
||||
if host == "" {
|
||||
// ":port" listens on ALL interfaces — explicitly non-loopback.
|
||||
return false
|
||||
}
|
||||
if host == "localhost" {
|
||||
return true
|
||||
}
|
||||
ip := net.ParseIP(host)
|
||||
if ip == nil {
|
||||
// Hostname that isn't "localhost". We don't resolve DNS here
|
||||
// (slow + misleading); reject so deploys must be explicit.
|
||||
return false
|
||||
}
|
||||
return ip.IsLoopback()
|
||||
}
|
||||
129
internal/shared/bind_test.go
Normal file
129
internal/shared/bind_test.go
Normal file
@ -0,0 +1,129 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Closes audit R-001's worst case (accidental non-loopback deploy)
|
||||
// at the predicate layer. Run integration coverage lives in the
|
||||
// existing smoke chain — this file proves the rules.
|
||||
|
||||
func TestIsLoopbackAddr(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
addr string
|
||||
want bool
|
||||
}{
|
||||
// Pass cases — every shape we accept.
|
||||
{"127.0.0.1 standard", "127.0.0.1:3214", true},
|
||||
{"127.0.0.0/8 mid-range", "127.5.6.7:3214", true},
|
||||
{"127.255.255.254 edge", "127.255.255.254:3214", true},
|
||||
{"IPv6 loopback", "[::1]:3214", true},
|
||||
{"localhost hostname", "localhost:3214", true},
|
||||
|
||||
// Reject cases — every shape that should fail-loud.
|
||||
{"empty addr", "", false},
|
||||
{"empty host (all interfaces)", ":3214", false},
|
||||
{"explicit any IPv4", "0.0.0.0:3214", false},
|
||||
{"explicit any IPv6", "[::]:3214", false},
|
||||
{"public IPv4", "8.8.8.8:3214", false},
|
||||
{"private LAN IPv4", "192.168.1.176:3214", false},
|
||||
{"hostname (not localhost)", "myhost.example.com:3214", false},
|
||||
{"missing port", "127.0.0.1", false},
|
||||
{"garbage", "not an addr", false},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := isLoopbackAddr(tc.addr)
|
||||
if got != tc.want {
|
||||
t.Errorf("isLoopbackAddr(%q) = %v, want %v", tc.addr, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireLoopback_AcceptsLoopback(t *testing.T) {
|
||||
if err := requireLoopbackOrOverride("queryd", "127.0.0.1:3214"); err != nil {
|
||||
t.Errorf("loopback should pass without env, got %v", err)
|
||||
}
|
||||
if err := requireLoopbackOrOverride("vectord", "[::1]:3215"); err != nil {
|
||||
t.Errorf("IPv6 loopback should pass, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireLoopback_RejectsNonLoopback(t *testing.T) {
|
||||
cases := []string{
|
||||
"0.0.0.0:3214",
|
||||
":3214",
|
||||
"192.168.1.176:3214",
|
||||
}
|
||||
for _, addr := range cases {
|
||||
t.Run(addr, func(t *testing.T) {
|
||||
err := requireLoopbackOrOverride("queryd", addr)
|
||||
if err == nil {
|
||||
t.Fatalf("expected error on %q without override, got nil", addr)
|
||||
}
|
||||
// Error message should cite the override env so operators
|
||||
// can quickly see how to opt in if intentional.
|
||||
if !strings.Contains(err.Error(), "LH_QUERYD_ALLOW_NONLOOPBACK") {
|
||||
t.Errorf("error should cite override env, got %q", err.Error())
|
||||
}
|
||||
// And reference R-001 so the audit trail is explicit.
|
||||
if !strings.Contains(err.Error(), "R-001") {
|
||||
t.Errorf("error should cite R-001, got %q", err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireLoopback_OverrideEnvAllowsNonLoopback(t *testing.T) {
|
||||
t.Setenv("LH_QUERYD_ALLOW_NONLOOPBACK", "1")
|
||||
if err := requireLoopbackOrOverride("queryd", "0.0.0.0:3214"); err != nil {
|
||||
t.Errorf("override should permit non-loopback, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireLoopback_OverrideEnvOnlyApplies_ExactValue1(t *testing.T) {
|
||||
// "true", "yes", anything else != "1" should NOT trigger the
|
||||
// override. Strict matching prevents silent acceptance of typos.
|
||||
cases := []string{"true", "yes", "TRUE", "01", " 1", ""}
|
||||
for _, val := range cases {
|
||||
t.Run("val="+val, func(t *testing.T) {
|
||||
t.Setenv("LH_QUERYD_ALLOW_NONLOOPBACK", val)
|
||||
err := requireLoopbackOrOverride("queryd", "0.0.0.0:3214")
|
||||
if err == nil {
|
||||
t.Fatalf("override value %q should NOT permit non-loopback", val)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireLoopback_EnvIsPerService(t *testing.T) {
|
||||
// Setting queryd's override should NOT affect vectord. Each binary
|
||||
// must opt in explicitly so a single-service exposure decision
|
||||
// doesn't silently apply to others.
|
||||
t.Setenv("LH_QUERYD_ALLOW_NONLOOPBACK", "1")
|
||||
// queryd allowed:
|
||||
if err := requireLoopbackOrOverride("queryd", "0.0.0.0:3214"); err != nil {
|
||||
t.Errorf("queryd should be allowed, got %v", err)
|
||||
}
|
||||
// vectord still rejected:
|
||||
if err := requireLoopbackOrOverride("vectord", "0.0.0.0:3215"); err == nil {
|
||||
t.Error("vectord should still be rejected — env is per-service")
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity: the env override variable name composition. If anyone ever
|
||||
// renames the prefix or casing, every cmd/<svc>/main.go behavior breaks.
|
||||
func TestRequireLoopback_OverrideEnvName(t *testing.T) {
|
||||
// Make sure the env we expect users to set actually triggers the
|
||||
// path. Helps catch a refactor that changes the prefix without
|
||||
// updating docs.
|
||||
defer os.Unsetenv("LH_GATEWAY_ALLOW_NONLOOPBACK")
|
||||
os.Setenv("LH_GATEWAY_ALLOW_NONLOOPBACK", "1")
|
||||
if err := requireLoopbackOrOverride("gateway", "0.0.0.0:3110"); err != nil {
|
||||
t.Errorf("gateway override env should work, got %v", err)
|
||||
}
|
||||
}
|
||||
@ -47,7 +47,16 @@ type RegisterRoutes func(r chi.Router)
|
||||
// want their own slog.Default() should set it before calling Run.
|
||||
// (Per Kimi review #4: shared library functions shouldn't silently
|
||||
// mutate package globals.)
|
||||
//
|
||||
// Refuses to bind a non-loopback address unless the
|
||||
// LH_<SERVICE>_ALLOW_NONLOOPBACK=1 env is set — closes the accidental
|
||||
// 0.0.0.0 deploy path for R-001 (queryd /sql is RCE-equivalent off
|
||||
// loopback, but the gate applies to every binary uniformly).
|
||||
func Run(serviceName, addr string, register RegisterRoutes) error {
|
||||
if err := requireLoopbackOrOverride(serviceName, addr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.LevelInfo,
|
||||
}))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user