root 423a3817c5 D: storaged per-prefix PUT cap — vectord _vectors/ → 4 GiB
Closes the documented 500K-test gap (memory project_golang_lakehouse:
"storaged 256 MiB PUT cap blocks single-file LHV1 persistence above
~150K vectors at d=768"). Vectord persistence under "_vectors/" now
gets a 4 GiB cap; everything else (parquets, manifests, ingest)
keeps the 256 MiB default.

Why per-prefix and not "raise globally":
  - 256 MiB cap is a real DoS protection — runaway clients can't
    drain the daemon. Raising it for ALL traffic would expand the
    attack surface for routine paths that have no need.
  - Per-prefix preserves existing protection while opening the one
    documented production-scale path.

Why not split LHV1 across multiple keys (the alternative):
  - G1P shipped a single-Put framed format SPECIFICALLY to eliminate
    the torn-write class (memory: "Single Put eliminates the torn-
    write class that the 3-way convergent scrum finding identified").
  - Multi-key LHV1 would re-introduce the half-saved-state failure
    mode we just paid to fix. Streaming via existing manager.Uploader
    is the better architectural answer.

Why not bump the cap operationally via env/config:
  - Future operator-driven cap can drop in cleanly via the
    maxPutBytesFor function. Started with hardcoded 4 GiB to keep
    this commit small; config knob is a follow-up if production
    workloads diverge from the documented 500K-vector ceiling.

manager.Uploader is already streaming-multipart on the outbound
S3 side; the inbound MaxBytesReader cap is a safety gate, not a
memory bottleneck. So raising it for vectord just lets the
existing streaming path actually flow, without introducing new
memory pressure (4-slot semaphore × 4 GiB worst case = 16 GiB
only if all slots simultaneously max out — vanishingly unlikely).

Implementation:
  cmd/storaged/main.go:
    new constant maxPutBytesVectors = 4 GiB (covers >700K vectors @ d=768)
    new constant vectorsPrefix = "_vectors/" (synced with vectord.VectorPrefix)
    new function maxPutBytesFor(key) → cap-by-prefix
    handlePut: ContentLength check + MaxBytesReader use the per-key cap

  cmd/storaged/main_test.go (3 new test funcs):
    TestMaxPutBytesFor: 7 cases incl. nested prefix, substring-but-not-
      prefix, empty key, parquet/manifest paths.
    TestVectorPrefixSyncWithVectord: regression test that asserts
      vectorsPrefix == vectord.VectorPrefix. A future rename surfaces
      here instead of silently bypassing the larger cap.
    TestVectorCapAccommodates500KStaffingTest: bounds the cap above
      the documented production workload (~700 MiB conservative).

Verified:
  go test ./cmd/storaged/ — all green (was 1 func, now 4)
  just verify             — 9 smokes still pass · 32s wall
  just proof contract     — 53/0/1 unchanged

Out of scope for this commit (deserves its own):
  - Heavy integration smoke: 200K dim=768 synthetic vectors → ~700
    MiB LHV1 → kill+restart vectord → recall=1. ~5-10 min wall;
    follow-up if you want production-scale persistence verified
    end-to-end. Unit tests + existing g1p_smoke cover the wiring.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 06:00:09 -05:00

309 lines
10 KiB
Go

// storaged is the object I/O service. D2 wires GET / PUT / LIST /
// DELETE routes against a single bucket ("primary") in the registry.
// Bind is 127.0.0.1 only (G0 dev — no auth on the wire); body cap is
// 256 MiB; concurrent in-flight PUTs are capped at 4 with non-blocking
// try-acquire (503 + Retry-After when full).
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"io"
"log/slog"
"net/http"
"os"
"strings"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storaged"
)
const (
// Default per-PUT body cap. Most callers (ingest parquets, catalog
// manifests) live well under this. The cap is a safety gate, not a
// memory bottleneck — manager.Uploader streams; this just refuses
// reads past the limit so a runaway client can't drain the daemon.
maxPutBytesDefault = 256 << 20 // 256 MiB
// Vector-persistence prefix gets a much larger cap because vectord
// persists single-file LHV1 indexes that exceed 256 MiB above
// ~150K vectors at d=768 (the 500K staffing test's documented gap).
// 4 GiB covers >700K vectors at d=768 with HNSW graph overhead and
// keeps the simple-Put torn-write guarantee from G1P intact (memory
// project_golang_lakehouse.md: "Single Put eliminates the torn-write
// class").
maxPutBytesVectors = 4 << 30 // 4 GiB
// Vector-persistence prefix matched against incoming PUT keys. Keep
// this in sync with internal/vectord/persistor.go's lhv1KeyFor.
vectorsPrefix = "_vectors/"
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
retryAfterSecs = "5" // Retry-After header on 503
primaryBucket = "primary"
)
// maxPutBytesFor returns the body-cap to apply to a PUT for the given
// key. Vectord LHV1 persistence (under "_vectors/") gets the larger
// cap; everything else stays at the default. Function-level so a
// future operator-driven cap (env, config) can drop in cleanly.
func maxPutBytesFor(key string) int64 {
if strings.HasPrefix(key, vectorsPrefix) {
return maxPutBytesVectors
}
return maxPutBytesDefault
}
func main() {
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)")
flag.Parse()
cfg, err := shared.LoadConfig(*configPath)
if err != nil {
slog.Error("config", "err", err)
os.Exit(1)
}
registry, err := buildRegistry(cfg, *secretsPath)
if err != nil {
slog.Error("bucket registry", "err", err)
os.Exit(1)
}
h := newHandlers(registry)
if err := shared.Run("storaged", cfg.Storaged.Bind, h.register); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
}
}
// buildRegistry constructs the (single, G0) bucket registry. Multi-bucket
// federation lands in G2; the registry shape is in place so that arrives
// without an HTTP-layer refactor.
func buildRegistry(cfg shared.Config, secretsPath string) (*storaged.BucketRegistry, error) {
prov, err := secrets.NewFileProvider(secretsPath, secrets.S3Credentials{
AccessKeyID: cfg.S3.AccessKeyID,
SecretAccessKey: cfg.S3.SecretAccessKey,
})
if err != nil {
return nil, err
}
// Per Opus C1 review: don't tie the AWS config-load context to a
// canceller that fires when buildRegistry returns. With static creds
// it's fine today, but EC2 IMDS / SSO / AssumeRole credential
// providers (G2+) capture the load ctx for refresh — a cancelled
// ctx silently fails them at the next refresh. Use Background here;
// per-request lifetimes flow through r.Context() in handlers.
bucket, err := storaged.NewBucket(context.Background(), cfg.S3, prov, primaryBucket)
if err != nil {
return nil, err
}
reg := storaged.NewRegistry()
if err := reg.Register(bucket); err != nil {
return nil, err
}
return reg, nil
}
// handlers carries the registry + the PUT semaphore. One instance
// per process; chi routes close over it.
type handlers struct {
reg *storaged.BucketRegistry
putSem chan struct{}
}
func newHandlers(reg *storaged.BucketRegistry) *handlers {
return &handlers{
reg: reg,
putSem: make(chan struct{}, maxConcurrentPut),
}
}
func (h *handlers) register(r chi.Router) {
// Verb-paths per PHASE_G0_KICKOFF D2.4 spec. REST-style refactor
// (GET/PUT/DELETE on a single /storage/{key}) deferred until G2.
r.Get("/storage/get/*", h.handleGet)
r.Put("/storage/put/*", h.handlePut)
r.Get("/storage/list", h.handleList)
r.Delete("/storage/delete/*", h.handleDelete)
}
// extractKey pulls the wildcard key out of a chi route. Routes are
// registered as `/storage/<verb>/*` so the wildcard captures the
// remainder, including embedded slashes.
func extractKey(r *http.Request) string {
// chi.URLParam with "*" returns everything after the route prefix.
return chi.URLParam(r, "*")
}
// validateKey rejects keys that would be unsafe to round-trip through
// the system. The exact policy is a G0 design choice — see the focused
// decision request in the D2 PR description (validateKey-policy).
func validateKey(key string) error {
if key == "" {
return errors.New("empty key")
}
if len(key) > 1024 {
return errors.New("key too long (>1024 bytes)")
}
if strings.ContainsRune(key, 0) {
return errors.New("key contains NUL byte")
}
if strings.HasPrefix(key, "/") {
return errors.New("key starts with /")
}
for _, part := range strings.Split(key, "/") {
if part == ".." {
return errors.New("key contains .. component")
}
}
if strings.ContainsAny(key, "\r\n\t") {
return errors.New("key contains control char")
}
return nil
}
func (h *handlers) handleGet(w http.ResponseWriter, r *http.Request) {
key := extractKey(r)
if err := validateKey(key); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body, info, err := bucket.Get(r.Context(), key)
if errors.Is(err, storaged.ErrKeyNotFound) {
http.Error(w, "not found", http.StatusNotFound)
return
}
if err != nil {
slog.Error("storage get", "key", key, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
defer body.Close()
if info.ETag != "" {
w.Header().Set("ETag", info.ETag)
}
w.Header().Set("Content-Type", "application/octet-stream")
if _, err := io.Copy(w, body); err != nil {
slog.Warn("storage get copy", "key", key, "err", err)
}
}
func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
key := extractKey(r)
if err := validateKey(key); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Per-key body cap. Vectord LHV1 persistence under "_vectors/"
// gets a 4 GiB cap; everything else keeps 256 MiB. Up-front
// Content-Length check first per Opus C3 review (manager.Uploader's
// multipart path can bury *http.MaxBytesError in its own error
// types); MaxBytesReader + string-match fallback below cover
// chunked / lying-CL cases.
cap := maxPutBytesFor(key)
if r.ContentLength > cap {
w.Header().Set("Retry-After", retryAfterSecs)
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
return
}
// Non-blocking try-acquire: if the 4-slot semaphore is full, return
// 503 + Retry-After:5 instantly rather than holding the connection.
// Per PHASE_G0_KICKOFF D2.4: "PUTs blocked on the semaphore → 503
// with Retry-After: 5".
select {
case h.putSem <- struct{}{}:
defer func() { <-h.putSem }()
default:
w.Header().Set("Retry-After", retryAfterSecs)
http.Error(w, "storaged: put concurrency cap reached", http.StatusServiceUnavailable)
return
}
// Per-key body cap as MaxBytesReader so chunked-encoding bodies
// also fail-loud at the limit. Defer LIFO order: r.Body.Close
// fires before <-h.putSem, so the body is fully drained before
// the slot frees.
r.Body = http.MaxBytesReader(w, r.Body, cap)
defer r.Body.Close()
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := bucket.Put(r.Context(), key, r.Body); err != nil {
// Two-layer detect: errors.As catches the typed error when it
// survives unwrapping; the string-suffix check catches cases
// where manager.Uploader's multipart path wraps the body-read
// failure in its own aggregate type.
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
w.Header().Set("Retry-After", retryAfterSecs)
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
return
}
slog.Error("storage put", "key", key, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"ok"}`))
}
func (h *handlers) handleList(w http.ResponseWriter, r *http.Request) {
prefix := r.URL.Query().Get("prefix")
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
objs, err := bucket.List(r.Context(), prefix)
if err != nil {
slog.Error("storage list", "prefix", prefix, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"prefix": prefix,
"objects": objs,
})
}
func (h *handlers) handleDelete(w http.ResponseWriter, r *http.Request) {
key := extractKey(r)
if err := validateKey(key); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := bucket.Delete(r.Context(), key); err != nil {
slog.Error("storage delete", "key", key, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}