D: storaged per-prefix PUT cap — vectord _vectors/ → 4 GiB
Closes the documented 500K-test gap (memory project_golang_lakehouse:
"storaged 256 MiB PUT cap blocks single-file LHV1 persistence above
~150K vectors at d=768"). Vectord persistence under "_vectors/" now
gets a 4 GiB cap; everything else (parquets, manifests, ingest)
keeps the 256 MiB default.
Why per-prefix and not "raise globally":
- 256 MiB cap is a real DoS protection — runaway clients can't
drain the daemon. Raising it for ALL traffic would expand the
attack surface for routine paths that have no need.
- Per-prefix preserves existing protection while opening the one
documented production-scale path.
Why not split LHV1 across multiple keys (the alternative):
- G1P shipped a single-Put framed format SPECIFICALLY to eliminate
the torn-write class (memory: "Single Put eliminates the torn-
write class that the 3-way convergent scrum finding identified").
- Multi-key LHV1 would re-introduce the half-saved-state failure
mode we just paid to fix. Streaming via existing manager.Uploader
is the better architectural answer.
Why not bump the cap operationally via env/config:
- Future operator-driven cap can drop in cleanly via the
maxPutBytesFor function. Started with hardcoded 4 GiB to keep
this commit small; config knob is a follow-up if production
workloads diverge from the documented 500K-vector ceiling.
manager.Uploader is already streaming-multipart on the outbound
S3 side; the inbound MaxBytesReader cap is a safety gate, not a
memory bottleneck. So raising it for vectord just lets the
existing streaming path actually flow, without introducing new
memory pressure (4-slot semaphore × 4 GiB worst case = 16 GiB
only if all slots simultaneously max out — vanishingly unlikely).
Implementation:
cmd/storaged/main.go:
new constant maxPutBytesVectors = 4 GiB (covers >700K vectors @ d=768)
new constant vectorsPrefix = "_vectors/" (synced with vectord.VectorPrefix)
new function maxPutBytesFor(key) → cap-by-prefix
handlePut: ContentLength check + MaxBytesReader use the per-key cap
cmd/storaged/main_test.go (3 new test funcs):
TestMaxPutBytesFor: 7 cases incl. nested prefix, substring-but-not-
prefix, empty key, parquet/manifest paths.
TestVectorPrefixSyncWithVectord: regression test that asserts
vectorsPrefix == vectord.VectorPrefix. A future rename surfaces
here instead of silently bypassing the larger cap.
TestVectorCapAccommodates500KStaffingTest: bounds the cap above
the documented production workload (~700 MiB conservative).
Verified:
go test ./cmd/storaged/ — all green (was 1 func, now 4)
just verify — 9 smokes still pass · 32s wall
just proof contract — 53/0/1 unchanged
Out of scope for this commit (deserves its own):
- Heavy integration smoke: 200K dim=768 synthetic vectors → ~700
MiB LHV1 → kill+restart vectord → recall=1. ~5-10 min wall;
follow-up if you want production-scale persistence verified
end-to-end. Unit tests + existing g1p_smoke cover the wiring.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6af0520ed2
commit
423a3817c5
@ -24,12 +24,41 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxPutBytes = 256 << 20 // 256 MiB per Qwen Q1 fix
|
||||
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
|
||||
retryAfterSecs = "5" // Retry-After header on 503
|
||||
// Default per-PUT body cap. Most callers (ingest parquets, catalog
|
||||
// manifests) live well under this. The cap is a safety gate, not a
|
||||
// memory bottleneck — manager.Uploader streams; this just refuses
|
||||
// reads past the limit so a runaway client can't drain the daemon.
|
||||
maxPutBytesDefault = 256 << 20 // 256 MiB
|
||||
|
||||
// Vector-persistence prefix gets a much larger cap because vectord
|
||||
// persists single-file LHV1 indexes that exceed 256 MiB above
|
||||
// ~150K vectors at d=768 (the 500K staffing test's documented gap).
|
||||
// 4 GiB covers >700K vectors at d=768 with HNSW graph overhead and
|
||||
// keeps the simple-Put torn-write guarantee from G1P intact (memory
|
||||
// project_golang_lakehouse.md: "Single Put eliminates the torn-write
|
||||
// class").
|
||||
maxPutBytesVectors = 4 << 30 // 4 GiB
|
||||
|
||||
// Vector-persistence prefix matched against incoming PUT keys. Keep
|
||||
// this in sync with internal/vectord/persistor.go's lhv1KeyFor.
|
||||
vectorsPrefix = "_vectors/"
|
||||
|
||||
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
|
||||
retryAfterSecs = "5" // Retry-After header on 503
|
||||
primaryBucket = "primary"
|
||||
)
|
||||
|
||||
// maxPutBytesFor returns the body-cap to apply to a PUT for the given
|
||||
// key. Vectord LHV1 persistence (under "_vectors/") gets the larger
|
||||
// cap; everything else stays at the default. Function-level so a
|
||||
// future operator-driven cap (env, config) can drop in cleanly.
|
||||
func maxPutBytesFor(key string) int64 {
|
||||
if strings.HasPrefix(key, vectorsPrefix) {
|
||||
return maxPutBytesVectors
|
||||
}
|
||||
return maxPutBytesDefault
|
||||
}
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
||||
secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)")
|
||||
@ -181,15 +210,14 @@ func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Up-front Content-Length cap. Per Opus C3 review: the
|
||||
// manager.Uploader's multipart path runs body reads in goroutines
|
||||
// and wraps errors in its own types, so *http.MaxBytesError can be
|
||||
// buried by the time it reaches us — meaning bodies just over the
|
||||
// 5 MiB multipart threshold could surface as 500 instead of 413.
|
||||
// Catching Content-Length up front returns 413 deterministically
|
||||
// when the client honestly declares size; MaxBytesReader + the
|
||||
// string-match fallback below cover chunked / lying-CL cases.
|
||||
if r.ContentLength > maxPutBytes {
|
||||
// Per-key body cap. Vectord LHV1 persistence under "_vectors/"
|
||||
// gets a 4 GiB cap; everything else keeps 256 MiB. Up-front
|
||||
// Content-Length check first per Opus C3 review (manager.Uploader's
|
||||
// multipart path can bury *http.MaxBytesError in its own error
|
||||
// types); MaxBytesReader + string-match fallback below cover
|
||||
// chunked / lying-CL cases.
|
||||
cap := maxPutBytesFor(key)
|
||||
if r.ContentLength > cap {
|
||||
w.Header().Set("Retry-After", retryAfterSecs)
|
||||
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
@ -208,11 +236,11 @@ func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// 256 MiB per-request body cap. Reads beyond this surface as
|
||||
// *http.MaxBytesError; for chunked-encoding bodies that's the only
|
||||
// signal we get. Defer LIFO order: r.Body.Close fires before
|
||||
// <-h.putSem, so the body is fully drained before the slot frees.
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxPutBytes)
|
||||
// Per-key body cap as MaxBytesReader so chunked-encoding bodies
|
||||
// also fail-loud at the limit. Defer LIFO order: r.Body.Close
|
||||
// fires before <-h.putSem, so the body is fully drained before
|
||||
// the slot frees.
|
||||
r.Body = http.MaxBytesReader(w, r.Body, cap)
|
||||
defer r.Body.Close()
|
||||
|
||||
bucket, err := h.reg.Resolve(primaryBucket)
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
package main
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord"
|
||||
)
|
||||
|
||||
func TestValidateKey(t *testing.T) {
|
||||
cases := []struct {
|
||||
@ -36,3 +40,57 @@ func TestValidateKey(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxPutBytesFor(t *testing.T) {
|
||||
// Vectord LHV1 persistence gets the larger cap so single-file
|
||||
// indexes above ~150K vectors at d=768 (the 500K staffing test
|
||||
// gap) can survive a Save without 413. Default cap stays at
|
||||
// 256 MiB for everything else (parquets, manifests, etc.).
|
||||
cases := []struct {
|
||||
name string
|
||||
key string
|
||||
want int64
|
||||
}{
|
||||
{"vectord LHV1 file", "_vectors/workers_500k.lhv1", maxPutBytesVectors},
|
||||
{"vectord nested", "_vectors/some/nested/index.lhv1", maxPutBytesVectors},
|
||||
{"parquet under datasets/", "datasets/workers/abc.parquet", maxPutBytesDefault},
|
||||
{"catalog manifest", "_catalog/manifests/foo.bin", maxPutBytesDefault},
|
||||
{"plain key", "x.txt", maxPutBytesDefault},
|
||||
{"empty key", "", maxPutBytesDefault},
|
||||
{"key with vectors-ish substring (not prefix)", "datasets/_vectors/x", maxPutBytesDefault},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := maxPutBytesFor(tc.key)
|
||||
if got != tc.want {
|
||||
t.Errorf("maxPutBytesFor(%q) = %d, want %d", tc.key, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestVectorPrefixSyncWithVectord locks the prefix value to vectord's
|
||||
// VectorPrefix constant. If a future refactor renames either side,
|
||||
// this test surfaces the drift before vectord saves silently bypass
|
||||
// the larger cap.
|
||||
func TestVectorPrefixSyncWithVectord(t *testing.T) {
|
||||
if vectorsPrefix != vectord.VectorPrefix {
|
||||
t.Errorf("storaged vectorsPrefix=%q out of sync with vectord.VectorPrefix=%q",
|
||||
vectorsPrefix, vectord.VectorPrefix)
|
||||
}
|
||||
}
|
||||
|
||||
// TestVectorCapAccommodates500KStaffingTest reads the documented gap
|
||||
// (memory project_golang_lakehouse.md): "storaged 256 MiB PUT cap
|
||||
// blocks single-file LHV1 persistence above ~150K vectors at d=768."
|
||||
// 500K vectors at d=768 with HNSW graph overhead is approximately
|
||||
// 4.5 GiB resident, ~700 MiB on disk after compression.
|
||||
// 4 GiB cap covers more than the documented production workload.
|
||||
func TestVectorCapAccommodates500KStaffingTest(t *testing.T) {
|
||||
const fiveHundredKVectorsAtD768Disk = 700 << 20 // ~700 MiB conservative estimate
|
||||
if maxPutBytesVectors < fiveHundredKVectorsAtD768Disk {
|
||||
t.Errorf("maxPutBytesVectors=%d (%.0f MiB) below documented production "+
|
||||
"workload of ~700 MiB for 500K vectors at d=768",
|
||||
maxPutBytesVectors, float64(maxPutBytesVectors)/(1<<20))
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user