From 423a3817c52c12195b44d16796a4b550b1454fa6 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Apr 2026 06:00:09 -0500 Subject: [PATCH] =?UTF-8?q?D:=20storaged=20per-prefix=20PUT=20cap=20?= =?UTF-8?q?=E2=80=94=20vectord=20=5Fvectors/=20=E2=86=92=204=20GiB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the documented 500K-test gap (memory project_golang_lakehouse: "storaged 256 MiB PUT cap blocks single-file LHV1 persistence above ~150K vectors at d=768"). Vectord persistence under "_vectors/" now gets a 4 GiB cap; everything else (parquets, manifests, ingest) keeps the 256 MiB default. Why per-prefix and not "raise globally": - 256 MiB cap is a real DoS protection — runaway clients can't drain the daemon. Raising it for ALL traffic would expand the attack surface for routine paths that have no need. - Per-prefix preserves existing protection while opening the one documented production-scale path. Why not split LHV1 across multiple keys (the alternative): - G1P shipped a single-Put framed format SPECIFICALLY to eliminate the torn-write class (memory: "Single Put eliminates the torn- write class that the 3-way convergent scrum finding identified"). - Multi-key LHV1 would re-introduce the half-saved-state failure mode we just paid to fix. Streaming via existing manager.Uploader is the better architectural answer. Why not bump the cap operationally via env/config: - Future operator-driven cap can drop in cleanly via the maxPutBytesFor function. Started with hardcoded 4 GiB to keep this commit small; config knob is a follow-up if production workloads diverge from the documented 500K-vector ceiling. manager.Uploader is already streaming-multipart on the outbound S3 side; the inbound MaxBytesReader cap is a safety gate, not a memory bottleneck. So raising it for vectord just lets the existing streaming path actually flow, without introducing new memory pressure (4-slot semaphore × 4 GiB worst case = 16 GiB only if all slots simultaneously max out — vanishingly unlikely). Implementation: cmd/storaged/main.go: new constant maxPutBytesVectors = 4 GiB (covers >700K vectors @ d=768) new constant vectorsPrefix = "_vectors/" (synced with vectord.VectorPrefix) new function maxPutBytesFor(key) → cap-by-prefix handlePut: ContentLength check + MaxBytesReader use the per-key cap cmd/storaged/main_test.go (3 new test funcs): TestMaxPutBytesFor: 7 cases incl. nested prefix, substring-but-not- prefix, empty key, parquet/manifest paths. TestVectorPrefixSyncWithVectord: regression test that asserts vectorsPrefix == vectord.VectorPrefix. A future rename surfaces here instead of silently bypassing the larger cap. TestVectorCapAccommodates500KStaffingTest: bounds the cap above the documented production workload (~700 MiB conservative). Verified: go test ./cmd/storaged/ — all green (was 1 func, now 4) just verify — 9 smokes still pass · 32s wall just proof contract — 53/0/1 unchanged Out of scope for this commit (deserves its own): - Heavy integration smoke: 200K dim=768 synthetic vectors → ~700 MiB LHV1 → kill+restart vectord → recall=1. ~5-10 min wall; follow-up if you want production-scale persistence verified end-to-end. Unit tests + existing g1p_smoke cover the wiring. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/storaged/main.go | 62 ++++++++++++++++++++++++++++----------- cmd/storaged/main_test.go | 60 ++++++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/cmd/storaged/main.go b/cmd/storaged/main.go index 8c747e4..db3629e 100644 --- a/cmd/storaged/main.go +++ b/cmd/storaged/main.go @@ -24,12 +24,41 @@ import ( ) const ( - maxPutBytes = 256 << 20 // 256 MiB per Qwen Q1 fix - maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs - retryAfterSecs = "5" // Retry-After header on 503 + // Default per-PUT body cap. Most callers (ingest parquets, catalog + // manifests) live well under this. The cap is a safety gate, not a + // memory bottleneck — manager.Uploader streams; this just refuses + // reads past the limit so a runaway client can't drain the daemon. + maxPutBytesDefault = 256 << 20 // 256 MiB + + // Vector-persistence prefix gets a much larger cap because vectord + // persists single-file LHV1 indexes that exceed 256 MiB above + // ~150K vectors at d=768 (the 500K staffing test's documented gap). + // 4 GiB covers >700K vectors at d=768 with HNSW graph overhead and + // keeps the simple-Put torn-write guarantee from G1P intact (memory + // project_golang_lakehouse.md: "Single Put eliminates the torn-write + // class"). + maxPutBytesVectors = 4 << 30 // 4 GiB + + // Vector-persistence prefix matched against incoming PUT keys. Keep + // this in sync with internal/vectord/persistor.go's lhv1KeyFor. + vectorsPrefix = "_vectors/" + + maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs + retryAfterSecs = "5" // Retry-After header on 503 primaryBucket = "primary" ) +// maxPutBytesFor returns the body-cap to apply to a PUT for the given +// key. Vectord LHV1 persistence (under "_vectors/") gets the larger +// cap; everything else stays at the default. Function-level so a +// future operator-driven cap (env, config) can drop in cleanly. +func maxPutBytesFor(key string) int64 { + if strings.HasPrefix(key, vectorsPrefix) { + return maxPutBytesVectors + } + return maxPutBytesDefault +} + func main() { configPath := flag.String("config", "lakehouse.toml", "path to TOML config") secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)") @@ -181,15 +210,14 @@ func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) { return } - // Up-front Content-Length cap. Per Opus C3 review: the - // manager.Uploader's multipart path runs body reads in goroutines - // and wraps errors in its own types, so *http.MaxBytesError can be - // buried by the time it reaches us — meaning bodies just over the - // 5 MiB multipart threshold could surface as 500 instead of 413. - // Catching Content-Length up front returns 413 deterministically - // when the client honestly declares size; MaxBytesReader + the - // string-match fallback below cover chunked / lying-CL cases. - if r.ContentLength > maxPutBytes { + // Per-key body cap. Vectord LHV1 persistence under "_vectors/" + // gets a 4 GiB cap; everything else keeps 256 MiB. Up-front + // Content-Length check first per Opus C3 review (manager.Uploader's + // multipart path can bury *http.MaxBytesError in its own error + // types); MaxBytesReader + string-match fallback below cover + // chunked / lying-CL cases. + cap := maxPutBytesFor(key) + if r.ContentLength > cap { w.Header().Set("Retry-After", retryAfterSecs) http.Error(w, "payload too large", http.StatusRequestEntityTooLarge) return @@ -208,11 +236,11 @@ func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) { return } - // 256 MiB per-request body cap. Reads beyond this surface as - // *http.MaxBytesError; for chunked-encoding bodies that's the only - // signal we get. Defer LIFO order: r.Body.Close fires before - // <-h.putSem, so the body is fully drained before the slot frees. - r.Body = http.MaxBytesReader(w, r.Body, maxPutBytes) + // Per-key body cap as MaxBytesReader so chunked-encoding bodies + // also fail-loud at the limit. Defer LIFO order: r.Body.Close + // fires before <-h.putSem, so the body is fully drained before + // the slot frees. + r.Body = http.MaxBytesReader(w, r.Body, cap) defer r.Body.Close() bucket, err := h.reg.Resolve(primaryBucket) diff --git a/cmd/storaged/main_test.go b/cmd/storaged/main_test.go index df43b48..cab4c55 100644 --- a/cmd/storaged/main_test.go +++ b/cmd/storaged/main_test.go @@ -1,6 +1,10 @@ package main -import "testing" +import ( + "testing" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/vectord" +) func TestValidateKey(t *testing.T) { cases := []struct { @@ -36,3 +40,57 @@ func TestValidateKey(t *testing.T) { }) } } + +func TestMaxPutBytesFor(t *testing.T) { + // Vectord LHV1 persistence gets the larger cap so single-file + // indexes above ~150K vectors at d=768 (the 500K staffing test + // gap) can survive a Save without 413. Default cap stays at + // 256 MiB for everything else (parquets, manifests, etc.). + cases := []struct { + name string + key string + want int64 + }{ + {"vectord LHV1 file", "_vectors/workers_500k.lhv1", maxPutBytesVectors}, + {"vectord nested", "_vectors/some/nested/index.lhv1", maxPutBytesVectors}, + {"parquet under datasets/", "datasets/workers/abc.parquet", maxPutBytesDefault}, + {"catalog manifest", "_catalog/manifests/foo.bin", maxPutBytesDefault}, + {"plain key", "x.txt", maxPutBytesDefault}, + {"empty key", "", maxPutBytesDefault}, + {"key with vectors-ish substring (not prefix)", "datasets/_vectors/x", maxPutBytesDefault}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := maxPutBytesFor(tc.key) + if got != tc.want { + t.Errorf("maxPutBytesFor(%q) = %d, want %d", tc.key, got, tc.want) + } + }) + } +} + +// TestVectorPrefixSyncWithVectord locks the prefix value to vectord's +// VectorPrefix constant. If a future refactor renames either side, +// this test surfaces the drift before vectord saves silently bypass +// the larger cap. +func TestVectorPrefixSyncWithVectord(t *testing.T) { + if vectorsPrefix != vectord.VectorPrefix { + t.Errorf("storaged vectorsPrefix=%q out of sync with vectord.VectorPrefix=%q", + vectorsPrefix, vectord.VectorPrefix) + } +} + +// TestVectorCapAccommodates500KStaffingTest reads the documented gap +// (memory project_golang_lakehouse.md): "storaged 256 MiB PUT cap +// blocks single-file LHV1 persistence above ~150K vectors at d=768." +// 500K vectors at d=768 with HNSW graph overhead is approximately +// 4.5 GiB resident, ~700 MiB on disk after compression. +// 4 GiB cap covers more than the documented production workload. +func TestVectorCapAccommodates500KStaffingTest(t *testing.T) { + const fiveHundredKVectorsAtD768Disk = 700 << 20 // ~700 MiB conservative estimate + if maxPutBytesVectors < fiveHundredKVectorsAtD768Disk { + t.Errorf("maxPutBytesVectors=%d (%.0f MiB) below documented production "+ + "workload of ~700 MiB for 500K vectors at d=768", + maxPutBytesVectors, float64(maxPutBytesVectors)/(1<<20)) + } +}