From 8cfcdb8e5fef2e708ece344d9be9d1f8510eaaa1 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 28 Apr 2026 23:23:03 -0500 Subject: [PATCH] =?UTF-8?q?G0=20D2:=20storaged=20S3=20GET/PUT/LIST/DELETE?= =?UTF-8?q?=20=C2=B7=203-lineage=20scrum=20=C2=B7=204=20fixes=20applied?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase G0 Day 2 ships storaged: aws-sdk-go-v2 wrapper + chi routes binding 127.0.0.1:3211 with 256 MiB MaxBytesReader, Content-Length up-front 413, and a 4-slot non-blocking semaphore returning 503 + Retry-After:5 when full. Acceptance smoke (6/6 probes) PASSES against the dedicated MinIO bucket lakehouse-go-primary, isolated from the Rust system's lakehouse bucket during coexistence. Cross-lineage scrum on the shipped code: - Opus 4.7 (opencode): 1 BLOCK + 3 WARN + 3 INFO - Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO (3 false positives) - Kimi K2-0905 (openrouter, after route-shopping past opencode's 4k cap and the direct adapter's empty-content reasoning bug): 1 BLOCK + 2 WARN + 1 INFO Fixed: C1 buildRegistry ctx cancel footgun → context.Background() (Opus + Kimi convergent; future credential refresh chains) C2 MaxBytesReader unwrap through manager.Uploader multipart goroutines → Content-Length up-front 413 + string-suffix fallback (Opus + Kimi convergent; latent 500-instead-of-413 in 5-256 MiB range) C3 Bucket.List unbounded accumulation → MaxListResults=10_000 cap (Opus + Kimi convergent; OOM guard) S1 PUT response Content-Type: application/json (Opus single-reviewer) Strict validateKey policy (J approved): rejects empty, >1024B, NUL, leading "/", ".." path components, CR/LF/tab control characters. DELETE exposed at HTTP layer (J approved option A) for symmetry + smoke ergonomics. Build clean, vet clean, all unit tests pass, smoke 6/6 PASS after every fix round. go.mod 1.23 → 1.24 (required by aws-sdk-go-v2). Process finding worth recording: opencode caps non-streaming Kimi at max_tokens=4096; the direct kimi.com adapter consumed 8192 tokens of reasoning but surfaced empty content; openrouter/moonshotai/kimi-k2-0905 delivered structured output in ~33s. Future Kimi scrums should default to that route. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 1 + cmd/storaged/main.go | 264 ++++++++++++++++++++++++++++- cmd/storaged/main_test.go | 38 +++++ docs/PHASE_G0_KICKOFF.md | 122 +++++++++++++ go.mod | 24 ++- go.sum | 38 +++++ internal/secrets/provider.go | 108 ++++++++++++ internal/secrets/provider_test.go | 122 +++++++++++++ internal/storaged/bucket.go | 208 +++++++++++++++++++++++ internal/storaged/registry.go | 64 +++++++ internal/storaged/registry_test.go | 49 ++++++ lakehouse.toml | 6 +- scripts/d2_smoke.sh | 167 ++++++++++++++++++ 13 files changed, 1199 insertions(+), 12 deletions(-) create mode 100644 cmd/storaged/main_test.go create mode 100644 internal/secrets/provider.go create mode 100644 internal/secrets/provider_test.go create mode 100644 internal/storaged/bucket.go create mode 100644 internal/storaged/registry.go create mode 100644 internal/storaged/registry_test.go create mode 100755 scripts/d2_smoke.sh diff --git a/.gitignore b/.gitignore index 9ac7e07..af15b10 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ vendor/ # Secrets — never commit. Resolved via SecretsProvider per ADR-001 §1.x. *.env secrets.toml +secrets-go.toml diff --git a/cmd/storaged/main.go b/cmd/storaged/main.go index 292e74b..8c747e4 100644 --- a/cmd/storaged/main.go +++ b/cmd/storaged/main.go @@ -1,18 +1,38 @@ -// storaged is the object I/O service. D2 wires the actual S3 -// GET/PUT/LIST routes; D1 just stands up the binary with /health. +// storaged is the object I/O service. D2 wires GET / PUT / LIST / +// DELETE routes against a single bucket ("primary") in the registry. +// Bind is 127.0.0.1 only (G0 dev — no auth on the wire); body cap is +// 256 MiB; concurrent in-flight PUTs are capped at 4 with non-blocking +// try-acquire (503 + Retry-After when full). package main import ( + "context" + "encoding/json" + "errors" "flag" + "io" "log/slog" + "net/http" "os" + "strings" - "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/storaged" +) + +const ( + maxPutBytes = 256 << 20 // 256 MiB per Qwen Q1 fix + maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs + retryAfterSecs = "5" // Retry-After header on 503 + primaryBucket = "primary" ) func main() { configPath := flag.String("config", "lakehouse.toml", "path to TOML config") + secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)") flag.Parse() cfg, err := shared.LoadConfig(*configPath) @@ -21,12 +41,240 @@ func main() { os.Exit(1) } - if err := shared.Run("storaged", cfg.Storaged.Bind, func(_ chi.Router) { - // D2.4 wires GET/PUT/LIST routes here with localhost-only - // bind, 256 MiB MaxBytesReader, and a 4-slot semaphore on - // in-flight PUTs (per Qwen Q1 fix). - }); err != nil { + registry, err := buildRegistry(cfg, *secretsPath) + if err != nil { + slog.Error("bucket registry", "err", err) + os.Exit(1) + } + + h := newHandlers(registry) + + if err := shared.Run("storaged", cfg.Storaged.Bind, h.register); err != nil { slog.Error("server", "err", err) os.Exit(1) } } + +// buildRegistry constructs the (single, G0) bucket registry. Multi-bucket +// federation lands in G2; the registry shape is in place so that arrives +// without an HTTP-layer refactor. +func buildRegistry(cfg shared.Config, secretsPath string) (*storaged.BucketRegistry, error) { + prov, err := secrets.NewFileProvider(secretsPath, secrets.S3Credentials{ + AccessKeyID: cfg.S3.AccessKeyID, + SecretAccessKey: cfg.S3.SecretAccessKey, + }) + if err != nil { + return nil, err + } + + // Per Opus C1 review: don't tie the AWS config-load context to a + // canceller that fires when buildRegistry returns. With static creds + // it's fine today, but EC2 IMDS / SSO / AssumeRole credential + // providers (G2+) capture the load ctx for refresh — a cancelled + // ctx silently fails them at the next refresh. Use Background here; + // per-request lifetimes flow through r.Context() in handlers. + bucket, err := storaged.NewBucket(context.Background(), cfg.S3, prov, primaryBucket) + if err != nil { + return nil, err + } + + reg := storaged.NewRegistry() + if err := reg.Register(bucket); err != nil { + return nil, err + } + return reg, nil +} + +// handlers carries the registry + the PUT semaphore. One instance +// per process; chi routes close over it. +type handlers struct { + reg *storaged.BucketRegistry + putSem chan struct{} +} + +func newHandlers(reg *storaged.BucketRegistry) *handlers { + return &handlers{ + reg: reg, + putSem: make(chan struct{}, maxConcurrentPut), + } +} + +func (h *handlers) register(r chi.Router) { + // Verb-paths per PHASE_G0_KICKOFF D2.4 spec. REST-style refactor + // (GET/PUT/DELETE on a single /storage/{key}) deferred until G2. + r.Get("/storage/get/*", h.handleGet) + r.Put("/storage/put/*", h.handlePut) + r.Get("/storage/list", h.handleList) + r.Delete("/storage/delete/*", h.handleDelete) +} + +// extractKey pulls the wildcard key out of a chi route. Routes are +// registered as `/storage//*` so the wildcard captures the +// remainder, including embedded slashes. +func extractKey(r *http.Request) string { + // chi.URLParam with "*" returns everything after the route prefix. + return chi.URLParam(r, "*") +} + +// validateKey rejects keys that would be unsafe to round-trip through +// the system. The exact policy is a G0 design choice — see the focused +// decision request in the D2 PR description (validateKey-policy). +func validateKey(key string) error { + if key == "" { + return errors.New("empty key") + } + if len(key) > 1024 { + return errors.New("key too long (>1024 bytes)") + } + if strings.ContainsRune(key, 0) { + return errors.New("key contains NUL byte") + } + if strings.HasPrefix(key, "/") { + return errors.New("key starts with /") + } + for _, part := range strings.Split(key, "/") { + if part == ".." { + return errors.New("key contains .. component") + } + } + if strings.ContainsAny(key, "\r\n\t") { + return errors.New("key contains control char") + } + return nil +} + +func (h *handlers) handleGet(w http.ResponseWriter, r *http.Request) { + key := extractKey(r) + if err := validateKey(key); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + bucket, err := h.reg.Resolve(primaryBucket) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + body, info, err := bucket.Get(r.Context(), key) + if errors.Is(err, storaged.ErrKeyNotFound) { + http.Error(w, "not found", http.StatusNotFound) + return + } + if err != nil { + slog.Error("storage get", "key", key, "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + defer body.Close() + if info.ETag != "" { + w.Header().Set("ETag", info.ETag) + } + w.Header().Set("Content-Type", "application/octet-stream") + if _, err := io.Copy(w, body); err != nil { + slog.Warn("storage get copy", "key", key, "err", err) + } +} + +func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) { + key := extractKey(r) + if err := validateKey(key); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Up-front Content-Length cap. Per Opus C3 review: the + // manager.Uploader's multipart path runs body reads in goroutines + // and wraps errors in its own types, so *http.MaxBytesError can be + // buried by the time it reaches us — meaning bodies just over the + // 5 MiB multipart threshold could surface as 500 instead of 413. + // Catching Content-Length up front returns 413 deterministically + // when the client honestly declares size; MaxBytesReader + the + // string-match fallback below cover chunked / lying-CL cases. + if r.ContentLength > maxPutBytes { + w.Header().Set("Retry-After", retryAfterSecs) + http.Error(w, "payload too large", http.StatusRequestEntityTooLarge) + return + } + + // Non-blocking try-acquire: if the 4-slot semaphore is full, return + // 503 + Retry-After:5 instantly rather than holding the connection. + // Per PHASE_G0_KICKOFF D2.4: "PUTs blocked on the semaphore → 503 + // with Retry-After: 5". + select { + case h.putSem <- struct{}{}: + defer func() { <-h.putSem }() + default: + w.Header().Set("Retry-After", retryAfterSecs) + http.Error(w, "storaged: put concurrency cap reached", http.StatusServiceUnavailable) + return + } + + // 256 MiB per-request body cap. Reads beyond this surface as + // *http.MaxBytesError; for chunked-encoding bodies that's the only + // signal we get. Defer LIFO order: r.Body.Close fires before + // <-h.putSem, so the body is fully drained before the slot frees. + r.Body = http.MaxBytesReader(w, r.Body, maxPutBytes) + defer r.Body.Close() + + bucket, err := h.reg.Resolve(primaryBucket) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := bucket.Put(r.Context(), key, r.Body); err != nil { + // Two-layer detect: errors.As catches the typed error when it + // survives unwrapping; the string-suffix check catches cases + // where manager.Uploader's multipart path wraps the body-read + // failure in its own aggregate type. + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { + w.Header().Set("Retry-After", retryAfterSecs) + http.Error(w, "payload too large", http.StatusRequestEntityTooLarge) + return + } + slog.Error("storage put", "key", key, "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"status":"ok"}`)) +} + +func (h *handlers) handleList(w http.ResponseWriter, r *http.Request) { + prefix := r.URL.Query().Get("prefix") + bucket, err := h.reg.Resolve(primaryBucket) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + objs, err := bucket.List(r.Context(), prefix) + if err != nil { + slog.Error("storage list", "prefix", prefix, "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "prefix": prefix, + "objects": objs, + }) +} + +func (h *handlers) handleDelete(w http.ResponseWriter, r *http.Request) { + key := extractKey(r) + if err := validateKey(key); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + bucket, err := h.reg.Resolve(primaryBucket) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := bucket.Delete(r.Context(), key); err != nil { + slog.Error("storage delete", "key", key, "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} diff --git a/cmd/storaged/main_test.go b/cmd/storaged/main_test.go new file mode 100644 index 0000000..df43b48 --- /dev/null +++ b/cmd/storaged/main_test.go @@ -0,0 +1,38 @@ +package main + +import "testing" + +func TestValidateKey(t *testing.T) { + cases := []struct { + name string + key string + wantErr bool + }{ + {"happy path", "data/workers_500k.parquet", false}, + {"deeply nested", "a/b/c/d/e/f/g.txt", false}, + {"empty", "", true}, + {"too long", string(make([]byte, 1025)), true}, + {"NUL byte", "foo\x00bar", true}, + {"leading slash", "/foo/bar", true}, + {"dotdot component", "data/../etc/passwd", true}, + {"dotdot at end", "data/..", true}, + {"dotdot at start", "../etc", true}, + {"single dot is fine", "data/./x.txt", false}, + {"dotdot embedded is fine", "data/.. /x.txt", false}, // not a path component + {"newline", "foo\nbar", true}, + {"carriage return", "foo\rbar", true}, + {"tab", "foo\tbar", true}, + {"unicode is fine", "résumé/data.txt", false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := validateKey(tc.key) + if tc.wantErr && err == nil { + t.Errorf("expected error for %q, got nil", tc.key) + } + if !tc.wantErr && err != nil { + t.Errorf("expected ok for %q, got %v", tc.key, err) + } + }) + } +} diff --git a/docs/PHASE_G0_KICKOFF.md b/docs/PHASE_G0_KICKOFF.md index a4ecc05..b022819 100644 --- a/docs/PHASE_G0_KICKOFF.md +++ b/docs/PHASE_G0_KICKOFF.md @@ -435,3 +435,125 @@ D1 ships harder than it would have without the scrum: bind-error handling is now race-free, smoke is deterministic, log volume on /health is acknowledged, secrets handling has a flagged path forward. + +--- + +## D2 — actual run results (2026-04-28 evening) + +Phase G0 Day 2 executed end-to-end. Output of `scripts/d2_smoke.sh`: + +``` +[d2-smoke] PUT round-trip: + ✓ PUT d2-smoke/.bin → 200 +[d2-smoke] GET echoes bytes: + ✓ GET d2-smoke/.bin → bytes match +[d2-smoke] LIST includes key: + ✓ LIST prefix=d2-smoke/ → contains d2-smoke/.bin +[d2-smoke] DELETE then GET → 404: + ✓ DELETE then GET → 404 +[d2-smoke] 256 MiB cap → 413: + ✓ PUT 257 MiB → 413 +[d2-smoke] semaphore: 5th concurrent PUT → 503 + Retry-After:5 + ✓ 5th concurrent PUT → 503 + Retry-After: 5 +[d2-smoke] D2 acceptance gate: PASSED +``` + +What landed: +- `internal/secrets/provider.go` — `Provider` interface, `FileProvider` + reading `/etc/lakehouse/secrets-go.toml` with inline-fallback for + G0 dev convenience, `StaticProvider` test helper +- `internal/storaged/bucket.go` — `aws-sdk-go-v2/service/s3` wrapper: + Get/Put/List/Delete; `manager.Uploader` for multipart-streaming PUT; + `MaxListResults=10_000` cap with `...truncated...` sentinel +- `internal/storaged/registry.go` — `BucketRegistry` (single bucket + in G0; G2 multi-bucket federation extends this) +- `cmd/storaged/main.go` — verb-paths `GET/PUT/LIST/DELETE`, strict + `validateKey` (rejects empty, >1024B, NUL, leading-`/`, `..`, + CR/LF/tab), Content-Length up-front 413, `MaxBytesReader` 256 MiB + body cap, 4-slot non-blocking semaphore (503+Retry-After:5) +- `scripts/d2_smoke.sh` — 6 acceptance probes; uses `curl -T --limit-rate` + for true streaming uploads (`--data-binary @-` first attempt + buffered client-side, semaphore never engaged) +- Per-package unit tests for provider + registry + validateKey +- New bucket `lakehouse-go-primary` on the existing MinIO at `:9000`, + isolated from the Rust `lakehouse` bucket during coexistence + +Out of spec but added: DELETE was exposed at the HTTP layer (the +kickoff D2.4 listed only GET/PUT/LIST in routes; `bucket.Delete` was +in the wrapper). J approved option **A** (DELETE exposed) for symmetry ++ smoke ergonomics. + +`validateKey` policy: J approved the strict stance — leading `/`, +`..` components, and CR/LF/tab control characters all rejected at the +HTTP boundary. Costs ~5 lines, propagates safety to every downstream +consumer. + +Next: D3 — catalogd Parquet manifests with idempotent register. + +--- + +## D2 — code scrum review (3-lineage parallel pass) + +After D2 shipped, the actual code went through the same 3-lineage +parallel scrum as D1. + +| Pass | Reviewer | Latency | Findings | +|---|---|---|---| +| 1 | `opencode/claude-opus-4-7` | ~30s | 1 BLOCK + 3 WARN + 3 INFO = 7 | +| 2 | `openrouter/qwen/qwen3-coder` | ~32s | 2 BLOCK + 1 WARN + 1 INFO = 4 | +| 3a | `opencode/kimi-k2.6` (max_tokens=4096) | 18s | discursive — finish_reason=length, no structured output (model spent budget thinking, never reached BLOCK/WARN format). opencode rejected `max_tokens>4096` without `stream=true` | +| 3b | `kimi/kimi-k2-turbo` (direct adapter) | 124s | empty content, finish_reason=length (8192 reasoning tokens, no surfaced output) | +| 3c | `openrouter/moonshotai/kimi-k2-0905` | 33s | 1 BLOCK + 2 WARN + 1 INFO = 4 (used as the K-lineage representative) | + +The Kimi route shopping (3a → 3c) was a process finding worth +recording: opencode caps non-streaming Kimi calls at 4096 output +tokens, the direct kimi.com adapter consumed 8192 tokens of +reasoning but surfaced empty `content`, and openrouter's +`moonshotai/kimi-k2-0905` route delivered structured output in +~33s. For future scrums on Kimi, default to +`openrouter/moonshotai/kimi-k2-0905`. + +### Convergent findings (≥2 reviewers — high confidence) + +| # | Severity | Finding | Reviewers | Disposition | +|---|---|---|---|---| +| C1 | BLOCK | `buildRegistry` `defer cancel()` cancels the ctx that the AWS SDK was loaded with — fine today (static creds, sync call) but breaks future credential refresh chains (EC2 IMDS, SSO, AssumeRole) | Opus + Kimi | **Fixed** — switched to `context.Background()` for SDK construction. Per-request lifetimes flow through `r.Context()` | +| C2 | WARN→BLOCK | `*http.MaxBytesError` may not unwrap through `manager.Uploader`'s multipart goroutines for bodies >5 MiB; smoke covers 257 MiB single-PutObject path only — bodies in the multipart range that exceed 256 MiB could surface as 500 instead of 413 | Opus (WARN) + Kimi (BLOCK) | **Fixed** — added Content-Length up-front 413 (deterministic for honest clients) + string-suffix fallback (`"http: request body too large"`) on the Put error path for chunked / lying-CL cases | +| C3 | INFO→WARN | `Bucket.List` accumulates unbounded results into a slice; stray `prefix=""` against a large bucket OOMs the daemon | Opus (INFO) + Kimi (WARN) | **Fixed** — `MaxListResults=10_000` cap; truncated responses append a sentinel `ObjectInfo{Key: "...truncated..."}` so callers see they didn't get everything | + +### Single-reviewer findings (lineage-specific catches) + +| # | Severity | Finding | Reviewer | Disposition | +|---|---|---|---|---| +| S1 | WARN | PUT 200 response missing `Content-Type: application/json` — Go's content-sniffing won't catch the JSON prefix, clients may see `text/plain` | Opus | **Fixed** — explicit header set before `WriteHeader` | +| S2 | WARN | `Bucket.Get` body close ordering fragile — currently correct but the contract is implicit | Opus | **Accepted** — current order is correct (early returns on errors where body is nil); no fix required | +| S3 | INFO | `extractKey` uses chi `*` wildcard — works correctly, just noting empty-key path is covered by `validateKey` | Opus | **Accepted** — already covered | +| S4 | INFO | `FileProvider.mu sync.RWMutex` is unused given the immutable-after-construction design | Opus | **Accepted** — drop the mutex when SIGHUP reload lands in G1 (S6 below); keep it for now as a placeholder | +| S5 | INFO | `Bucket.Delete` doesn't translate not-found to `ErrKeyNotFound` | (Kimi noted, not flagged) | **Accepted** — S3 DeleteObject is idempotent by spec; non-error on missing key is the correct behavior | +| S6 | INFO | `FileProvider` never reloads (no SIGHUP handler) | Kimi | **Deferred to G1** — reload-on-SIGHUP is on the G1 list per `internal/shared/server.go` opening comment | +| S7 | INFO | Error messages on no-creds-found could specify which source (file vs fallback) was missing | Qwen | **Deferred** — minor debug ergonomics, no production impact | +| F1 | BLOCK | "Body close happens after semaphore release" | Qwen | **Dismissed** — false. Defer order is LIFO; `r.Body.Close()` was registered AFTER `<-h.putSem`, so it fires FIRST. Body closes before slot frees | +| F2 | BLOCK | "ObjectInfo fields can be nil-dereferenced in List" | Qwen | **Dismissed** — false. `bucket.go:147-160` checks `if o.Key != nil` etc. before dereferencing every field | +| F3 | WARN | "MaxBytesReader not applied to semaphore-protected path" | Qwen | **Dismissed** — false. Semaphore is non-blocking try-acquire (`select { default }`); there is no waiting state where pre-cap MaxBytesReader matters | + +### Cumulative D2 disposition + +- 3-lineage parallel pass: **15 distinct findings** (3 convergent + 7 single-reviewer real + 3 false + 2 absorbed by other findings) +- **Fixed: 5** (3 convergent + 2 single-reviewer Opus) +- **Accepted-with-rationale: 5** +- **Deferred to G1+: 2** +- **Dismissed (false positives): 3** (all from Qwen — its strength on D1 doesn't reproduce on D2 code; the lineage caught real C1/C2 issues there but misread defer order + nil-checks here) +- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round. + +The Kimi/Opus convergence on **C1 (ctx cancel)** is the highest-signal +finding of the round: two completely different lineages flagged the +same architectural footgun. C2 (MaxBytesReader unwrap) was the most +*consequential* — the smoke test would have stayed green while +production multi-MiB uploads silently returned 500 on oversize. C3 +was a latent OOM that's a 5-line fix. + +The Qwen lineage delivered three false BLOCKs on D2 — different from +its D1 contribution where it caught two real BLOCKs that Opus missed. +Lineage rotation is real; on a given PR, one lineage may be the only +one finding bugs and another may be confidently wrong. The convergence +filter (≥2 reviewers) is the right gate. diff --git a/go.mod b/go.mod index 2d5dc0f..6c31364 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,30 @@ module git.agentview.dev/profit/golangLAKEHOUSE -go 1.23 +go 1.24 require ( github.com/go-chi/chi/v5 v5.2.5 github.com/pelletier/go-toml/v2 v2.3.0 ) + +require ( + github.com/aws/aws-sdk-go-v2 v1.41.6 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 // indirect + github.com/aws/aws-sdk-go-v2/config v1.32.16 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.19.15 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 // indirect + github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect + github.com/aws/smithy-go v1.25.0 // indirect +) diff --git a/go.sum b/go.sum index 5dc1359..a1904f7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,41 @@ +github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg= +github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 h1:adBsCIIpLbLmYnkQU+nAChU5yhVTvu5PerROm+/Kq2A= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9/go.mod h1:uOYhgfgThm/ZyAuJGNQ5YgNyOlYfqnGpTHXvk3cpykg= +github.com/aws/aws-sdk-go-v2/config v1.32.16 h1:Q0iQ7quUgJP0F/SCRTieScnaMdXr9h/2+wze1u3cNeM= +github.com/aws/aws-sdk-go-v2/config v1.32.16/go.mod h1:duCCnJEFqpt2RC6no1iK6q+8HpwOAkiUua0pY507dQc= +github.com/aws/aws-sdk-go-v2/credentials v1.19.15 h1:fyvgWTszojq8hEnMi8PPBTvZdTtEVmAVyo+NFLHBhH4= +github.com/aws/aws-sdk-go-v2/credentials v1.19.15/go.mod h1:gJiYyMOjNg8OEdRWOf3CrFQxM2a98qmrtjx1zuiQfB8= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 h1:IOGsJ1xVWhsi+ZO7/NW8OuZZBtMJLZbk4P5HDjJO0jQ= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22/go.mod h1:b+hYdbU+jGKfXE8kKM6g1+h+L/Go3vMvzlxBsiuGsxg= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 h1:QkX8xXGmX81xuFrXNqU7NChFXVuKOl9EFrlSjy4RDfg= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16/go.mod h1:CI+oguch+yROmJLFO0/wp8oRXmtUBibAQCis7lKQ95g= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 h1:GmLa5Kw1ESqtFpXsx5MmC84QWa/ZrLZvlJGa2y+4kcQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22/go.mod h1:6sW9iWm9DK9YRpRGga/qzrzNLgKpT2cIxb7Vo2eNOp0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 h1:dY4kWZiSaXIzxnKlj17nHnBcXXBfac6UlsAx2qL6XrU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22/go.mod h1:KIpEUx0JuRZLO7U6cbV204cWAEco2iC3l061IxlwLtI= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 h1:FPXsW9+gMuIeKmz7j6ENWcWtBGTe1kH8r9thNt5Uxx4= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23/go.mod h1:7J8iGMdRKk6lw2C+cMIphgAnT8uTwBwNOsGkyOCm80U= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 h1:HtOTYcbVcGABLOVuPYaIihj6IlkqubBwFj10K5fxRek= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8/go.mod h1:VsK9abqQeGlzPgUr+isNWzPlK2vKe9INMLWnY65f5Xs= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 h1:xnvDEnw+pnj5mctWiYuFbigrEzSm35x7k4KS/ZkCANg= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14/go.mod h1:yS5rNogD8e0Wu9+l3MUwr6eENBzEeGejvINpN5PAYfY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 h1:PUmZeJU6Y1Lbvt9WFuJ0ugUK2xn6hIWUBBbKuOWF30s= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22/go.mod h1:nO6egFBoAaoXze24a2C0NjQCvdpk8OueRoYimvEB9jo= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 h1:SE+aQ4DEqG53RRCAIHlCf//B2ycxGH7jFkpnAh/kKPM= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22/go.mod h1:ES3ynECd7fYeJIL6+oax+uIEljmfps0S70BaQzbMd/o= +github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 h1:7G26Sae6PMKn4kMcU5JzNfrm1YrKwyOhowXPYR2WiWY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0/go.mod h1:Fw9aqhJicIVee1VytBBjH+l+5ov6/PhbtIK/u3rt/ls= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 h1:a1Fq/KXn75wSzoJaPQTgZO0wHGqE9mjFnylnqEPTchA= +github.com/aws/aws-sdk-go-v2/service/signin v1.0.10/go.mod h1:p6+MXNxW7IA6dMgHfTAzljuwSKD0NCm/4lbS4t6+7vI= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 h1:x6bKbmDhsgSZwv6q19wY/u3rLk/3FGjJWyqKcIRufpE= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.16/go.mod h1:CudnEVKRtLn0+3uMV0yEXZ+YZOKnAtUJ5DmDhilVnIw= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 h1:oK/njaL8GtyEihkWMD4k3VgHCT64RQKkZwh0DG5j8ak= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20/go.mod h1:JHs8/y1f3zY7U5WcuzoJ/yAYGYtNIVPKLIbp61euvmg= +github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 h1:ks8KBcZPh3PYISr5dAiXCM5/Thcuxk8l+PG4+A0exds= +github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo= +github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U= +github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM= diff --git a/internal/secrets/provider.go b/internal/secrets/provider.go new file mode 100644 index 0000000..475a9c5 --- /dev/null +++ b/internal/secrets/provider.go @@ -0,0 +1,108 @@ +// Package secrets resolves credentials for storaged + future bucket +// federation (G2). The G0 surface is one method — S3Credentials — +// looked up by logical bucket name. Multi-bucket lands in G2; until +// then every lookup returns the same credentials, but callers already +// pass the name so the API doesn't need to change later. +// +// FileProvider reads /etc/lakehouse/secrets.toml. If that file is +// absent OR doesn't contain credentials for the requested bucket, the +// provider falls back to the values supplied via the inline config +// (lakehouse.toml [s3] block). G0 is dev-only so the inline fallback +// is convenient; G1 will tighten this to "secrets file required". +package secrets + +import ( + "errors" + "fmt" + "io/fs" + "os" + "sync" + + "github.com/pelletier/go-toml/v2" +) + +// S3Credentials is what storaged hands to aws-sdk-go-v2 to sign +// requests. Region/endpoint/bucket are config (non-secret) and live +// on shared.S3Config — those don't pass through this provider. +type S3Credentials struct { + AccessKeyID string `toml:"access_key_id"` + SecretAccessKey string `toml:"secret_access_key"` +} + +// Provider is the interface storaged depends on. Keeping this small +// is deliberate — every method here is a future migration point when +// secrets move to Vault / SOPS / SSM. +type Provider interface { + S3Credentials(bucket string) (S3Credentials, error) +} + +// FileProvider is the G0 implementation. It loads the file once on +// construction; reload-on-SIGHUP is a G1 concern. +type FileProvider struct { + path string + parsed secretsFile + fallback S3Credentials + mu sync.RWMutex +} + +// secretsFile mirrors the on-disk TOML shape: +// +// [s3.primary] +// access_key_id = "..." +// secret_access_key = "..." +// +// [s3.archive] # G2 multi-bucket +// access_key_id = "..." +type secretsFile struct { + S3 map[string]S3Credentials `toml:"s3"` +} + +// NewFileProvider loads `path`. If the file is missing the provider +// is still usable with the inline fallback — that's a deliberate +// G0 affordance. Any other read/parse error is fatal. +func NewFileProvider(path string, fallback S3Credentials) (*FileProvider, error) { + p := &FileProvider{path: path, fallback: fallback} + b, err := os.ReadFile(path) + if errors.Is(err, fs.ErrNotExist) { + return p, nil + } + if err != nil { + return nil, fmt.Errorf("read secrets %q: %w", path, err) + } + if err := toml.Unmarshal(b, &p.parsed); err != nil { + return nil, fmt.Errorf("parse secrets %q: %w", path, err) + } + return p, nil +} + +// S3Credentials resolves bucket → credentials. Lookup order: +// 1. secrets file [s3.] section +// 2. inline fallback (lakehouse.toml [s3]) +// +// If neither produces a non-empty AccessKeyID, returns an error so a +// misconfigured deployment fails loud instead of trying anonymous S3. +func (p *FileProvider) S3Credentials(bucket string) (S3Credentials, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if creds, ok := p.parsed.S3[bucket]; ok && creds.AccessKeyID != "" { + return creds, nil + } + if p.fallback.AccessKeyID != "" { + return p.fallback, nil + } + return S3Credentials{}, fmt.Errorf("no credentials for bucket %q (file=%q)", bucket, p.path) +} + +// StaticProvider is a test/dev helper that returns the same creds for +// every bucket. Use NewFileProvider in production code paths. +type StaticProvider struct { + Creds S3Credentials +} + +func (p StaticProvider) S3Credentials(_ string) (S3Credentials, error) { + if p.Creds.AccessKeyID == "" { + return S3Credentials{}, errors.New("StaticProvider: no AccessKeyID") + } + return p.Creds, nil +} diff --git a/internal/secrets/provider_test.go b/internal/secrets/provider_test.go new file mode 100644 index 0000000..24b6b62 --- /dev/null +++ b/internal/secrets/provider_test.go @@ -0,0 +1,122 @@ +package secrets + +import ( + "errors" + "os" + "path/filepath" + "testing" +) + +func TestFileProvider_ParsesSection(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "secrets.toml") + if err := os.WriteFile(path, []byte(` +[s3.primary] +access_key_id = "AK" +secret_access_key = "SK" +`), 0o600); err != nil { + t.Fatal(err) + } + + p, err := NewFileProvider(path, S3Credentials{}) + if err != nil { + t.Fatalf("NewFileProvider: %v", err) + } + got, err := p.S3Credentials("primary") + if err != nil { + t.Fatalf("S3Credentials: %v", err) + } + if got.AccessKeyID != "AK" || got.SecretAccessKey != "SK" { + t.Errorf("got %+v, want {AK, SK}", got) + } +} + +func TestFileProvider_FallbackWhenSectionMissing(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "secrets.toml") + // File exists, but doesn't have an [s3.primary] block. + if err := os.WriteFile(path, []byte(` +[s3.archive] +access_key_id = "OTHER" +secret_access_key = "OTHER_SK" +`), 0o600); err != nil { + t.Fatal(err) + } + + p, err := NewFileProvider(path, S3Credentials{ + AccessKeyID: "FALLBACK", + SecretAccessKey: "FALLBACK_SK", + }) + if err != nil { + t.Fatal(err) + } + got, err := p.S3Credentials("primary") + if err != nil { + t.Fatalf("S3Credentials: %v", err) + } + if got.AccessKeyID != "FALLBACK" { + t.Errorf("expected fallback, got %+v", got) + } +} + +func TestFileProvider_MissingFileIsOK(t *testing.T) { + p, err := NewFileProvider("/no/such/path", S3Credentials{ + AccessKeyID: "FALLBACK", + SecretAccessKey: "FALLBACK_SK", + }) + if err != nil { + t.Fatalf("NewFileProvider should not error on missing file: %v", err) + } + got, err := p.S3Credentials("primary") + if err != nil { + t.Fatalf("S3Credentials: %v", err) + } + if got.AccessKeyID != "FALLBACK" { + t.Errorf("expected fallback, got %+v", got) + } +} + +func TestFileProvider_NoCredsAtAll(t *testing.T) { + p, err := NewFileProvider("/no/such/path", S3Credentials{}) + if err != nil { + t.Fatal(err) + } + if _, err := p.S3Credentials("primary"); err == nil { + t.Fatal("expected error when no creds in file or fallback") + } +} + +func TestFileProvider_ParseError(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "bad.toml") + if err := os.WriteFile(path, []byte("not valid toml ===\n"), 0o600); err != nil { + t.Fatal(err) + } + if _, err := NewFileProvider(path, S3Credentials{}); err == nil { + t.Fatal("expected parse error") + } +} + +func TestStaticProvider(t *testing.T) { + p := StaticProvider{Creds: S3Credentials{AccessKeyID: "X", SecretAccessKey: "Y"}} + got, err := p.S3Credentials("any-bucket") + if err != nil { + t.Fatal(err) + } + if got.AccessKeyID != "X" { + t.Errorf("got %+v", got) + } + + empty := StaticProvider{} + if _, err := empty.S3Credentials("any"); err == nil { + t.Error("expected error from empty StaticProvider") + } +} + +func TestStaticProvider_ErrorIsExported(t *testing.T) { + // Sanity: the empty-creds path should be a real error type, not nil. + _, err := StaticProvider{}.S3Credentials("x") + if err == nil || !errors.Is(err, err) { + t.Fatal("expected non-nil error") + } +} diff --git a/internal/storaged/bucket.go b/internal/storaged/bucket.go new file mode 100644 index 0000000..6120bbd --- /dev/null +++ b/internal/storaged/bucket.go @@ -0,0 +1,208 @@ +// Package storaged is the object I/O layer for Lakehouse-Go. Bucket +// wraps aws-sdk-go-v2's s3.Client + s3 manager.Uploader and exposes +// the four operations the rest of the system actually needs: +// Get, Put, List, Delete. Path-style addressing is forced on for +// MinIO; AWS deployments override it via shared.S3Config.UsePathStyle. +package storaged + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" +) + +// ErrKeyNotFound is returned by Get/Delete when the underlying S3 +// response is NoSuchKey or 404. Callers translate it to HTTP 404. +var ErrKeyNotFound = errors.New("storaged: key not found") + +// ObjectInfo is the per-key data List returns. We don't surface the +// full s3.Object — most fields are noise for a single-bucket G0. +type ObjectInfo struct { + Key string + Size int64 + ETag string + LastModified time.Time +} + +// Bucket is the runtime handle over one S3 bucket. The logical name +// is what callers reference (e.g. "primary"); the physical bucket on +// the server is held in s3Bucket. +type Bucket struct { + name string + s3Bucket string + client *s3.Client + uploader *manager.Uploader +} + +// NewBucket builds a Bucket from the shared S3 config + a credentials +// lookup against the provider. Region/endpoint/path-style come from +// the inline config; access keys come from the provider so they can +// migrate to Vault / SSM later without a config schema change. +func NewBucket(ctx context.Context, cfg shared.S3Config, prov secrets.Provider, logicalName string) (*Bucket, error) { + creds, err := prov.S3Credentials(logicalName) + if err != nil { + return nil, err + } + + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, + awsconfig.WithRegion(cfg.Region), + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + creds.AccessKeyID, creds.SecretAccessKey, "", + )), + ) + if err != nil { + return nil, fmt.Errorf("aws config: %w", err) + } + + client := s3.NewFromConfig(awsCfg, func(o *s3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = aws.String(cfg.Endpoint) + } + o.UsePathStyle = cfg.UsePathStyle + }) + + return &Bucket{ + name: logicalName, + s3Bucket: cfg.Bucket, + client: client, + uploader: manager.NewUploader(client), + }, nil +} + +// Name returns the logical name (e.g. "primary"). Useful for logs. +func (b *Bucket) Name() string { return b.name } + +// Get streams an object back to the caller. The returned ReadCloser +// must be closed; on ErrKeyNotFound it is nil. +func (b *Bucket) Get(ctx context.Context, key string) (io.ReadCloser, *ObjectInfo, error) { + out, err := b.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(b.s3Bucket), + Key: aws.String(key), + }) + if err != nil { + if isNotFound(err) { + return nil, nil, ErrKeyNotFound + } + return nil, nil, fmt.Errorf("s3 get %q: %w", key, err) + } + info := &ObjectInfo{ + Key: key, + } + if out.ContentLength != nil { + info.Size = *out.ContentLength + } + if out.ETag != nil { + info.ETag = *out.ETag + } + if out.LastModified != nil { + info.LastModified = *out.LastModified + } + return out.Body, info, nil +} + +// Put uploads via the s3 manager.Uploader so bodies above the SDK's +// part threshold (~5 MiB) auto-multipart without buffering. Caller +// is responsible for capping body size before calling — Bucket itself +// is a thin pipe; the 256 MiB MaxBytesReader gate lives in cmd/storaged. +func (b *Bucket) Put(ctx context.Context, key string, body io.Reader) error { + _, err := b.uploader.Upload(ctx, &s3.PutObjectInput{ + Bucket: aws.String(b.s3Bucket), + Key: aws.String(key), + Body: body, + }) + if err != nil { + return fmt.Errorf("s3 put %q: %w", key, err) + } + return nil +} + +// MaxListResults caps a single List call so a stray prefix="" against +// a large bucket can't OOM the daemon. Per Opus + Kimi convergent +// scrum review (D2 round). G2 multi-bucket federation will introduce +// continuation tokens so callers can paginate explicitly. +const MaxListResults = 10_000 + +// List returns up to MaxListResults objects whose key starts with +// prefix. If the bucket holds more than the cap, the result is +// truncated and a sentinel ObjectInfo with Key="...truncated..." is +// appended so callers see they didn't get everything. G0 single-bucket +// dev workloads are far below the cap; the cap protects against +// production bucket sizes during the migration. +func (b *Bucket) List(ctx context.Context, prefix string) ([]ObjectInfo, error) { + out := make([]ObjectInfo, 0, 64) + pager := s3.NewListObjectsV2Paginator(b.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(b.s3Bucket), + Prefix: aws.String(prefix), + }) + for pager.HasMorePages() { + page, err := pager.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("s3 list prefix=%q: %w", prefix, err) + } + for _, o := range page.Contents { + if len(out) >= MaxListResults { + out = append(out, ObjectInfo{Key: "...truncated..."}) + return out, nil + } + oi := ObjectInfo{} + if o.Key != nil { + oi.Key = *o.Key + } + if o.Size != nil { + oi.Size = *o.Size + } + if o.ETag != nil { + oi.ETag = *o.ETag + } + if o.LastModified != nil { + oi.LastModified = *o.LastModified + } + out = append(out, oi) + } + } + return out, nil +} + +// Delete removes one object. Idempotent: deleting a missing key is +// not an error on S3, so we don't translate that to ErrKeyNotFound. +func (b *Bucket) Delete(ctx context.Context, key string) error { + _, err := b.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(b.s3Bucket), + Key: aws.String(key), + }) + if err != nil { + return fmt.Errorf("s3 delete %q: %w", key, err) + } + return nil +} + +// isNotFound checks both the typed s3 NoSuchKey error and the smithy +// generic 404 — different code paths in the SDK surface different +// shapes depending on whether HEAD-then-GET happens. +func isNotFound(err error) bool { + var nsk *types.NoSuchKey + if errors.As(err, &nsk) { + return true + } + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + switch apiErr.ErrorCode() { + case "NoSuchKey", "NotFound": + return true + } + } + return false +} diff --git a/internal/storaged/registry.go b/internal/storaged/registry.go new file mode 100644 index 0000000..148af03 --- /dev/null +++ b/internal/storaged/registry.go @@ -0,0 +1,64 @@ +// BucketRegistry resolves logical bucket names → *Bucket. G0 holds +// exactly one bucket ("primary"); G2 (multi-bucket federation, per +// Rust ADR-017) extends this to many. Keeping the registry as a +// distinct type now means cmd/storaged never has to be refactored +// when G2 lands — every handler already routes through Resolve. +package storaged + +import ( + "errors" + "fmt" + "sync" +) + +var ErrBucketUnknown = errors.New("storaged: unknown bucket") + +// BucketRegistry is a thread-safe map of logical name → *Bucket. +// In G0 only "primary" is registered; the registry exists so that +// when G2 multi-bucket lands we add buckets in registry.go and the +// HTTP layer is unchanged. +type BucketRegistry struct { + mu sync.RWMutex + buckets map[string]*Bucket +} + +func NewRegistry() *BucketRegistry { + return &BucketRegistry{buckets: make(map[string]*Bucket)} +} + +// Register adds a bucket under its logical name. Re-registering the +// same name is rejected — G0 services build the registry once at +// startup and a duplicate is almost always a config bug. +func (r *BucketRegistry) Register(b *Bucket) error { + r.mu.Lock() + defer r.mu.Unlock() + if _, exists := r.buckets[b.name]; exists { + return fmt.Errorf("bucket %q already registered", b.name) + } + r.buckets[b.name] = b + return nil +} + +// Resolve returns the bucket for a logical name. G0 callers pass +// "primary" since that's the only one registered. +func (r *BucketRegistry) Resolve(name string) (*Bucket, error) { + r.mu.RLock() + defer r.mu.RUnlock() + b, ok := r.buckets[name] + if !ok { + return nil, fmt.Errorf("%w: %q", ErrBucketUnknown, name) + } + return b, nil +} + +// Names returns the registered logical names (sorted-order not +// guaranteed). Useful for /health surface or debug dumps. +func (r *BucketRegistry) Names() []string { + r.mu.RLock() + defer r.mu.RUnlock() + out := make([]string, 0, len(r.buckets)) + for name := range r.buckets { + out = append(out, name) + } + return out +} diff --git a/internal/storaged/registry_test.go b/internal/storaged/registry_test.go new file mode 100644 index 0000000..609d0dd --- /dev/null +++ b/internal/storaged/registry_test.go @@ -0,0 +1,49 @@ +package storaged + +import ( + "errors" + "testing" +) + +func TestRegistry_RegisterAndResolve(t *testing.T) { + r := NewRegistry() + b := &Bucket{name: "primary"} + if err := r.Register(b); err != nil { + t.Fatalf("Register: %v", err) + } + got, err := r.Resolve("primary") + if err != nil { + t.Fatalf("Resolve: %v", err) + } + if got != b { + t.Errorf("Resolve returned different bucket") + } +} + +func TestRegistry_DuplicateRegisterRejected(t *testing.T) { + r := NewRegistry() + if err := r.Register(&Bucket{name: "primary"}); err != nil { + t.Fatal(err) + } + if err := r.Register(&Bucket{name: "primary"}); err == nil { + t.Fatal("expected error on duplicate register") + } +} + +func TestRegistry_UnknownReturnsErrBucketUnknown(t *testing.T) { + r := NewRegistry() + _, err := r.Resolve("nope") + if !errors.Is(err, ErrBucketUnknown) { + t.Fatalf("expected ErrBucketUnknown, got %v", err) + } +} + +func TestRegistry_Names(t *testing.T) { + r := NewRegistry() + _ = r.Register(&Bucket{name: "primary"}) + _ = r.Register(&Bucket{name: "archive"}) + names := r.Names() + if len(names) != 2 { + t.Fatalf("expected 2 names, got %d", len(names)) + } +} diff --git a/lakehouse.toml b/lakehouse.toml index 8112312..9f8d51d 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -22,9 +22,9 @@ bind = "127.0.0.1:3214" [s3] endpoint = "http://localhost:9000" region = "us-east-1" -bucket = "lakehouse-primary" -access_key_id = "" # filled by SecretsProvider in D2.3 -secret_access_key = "" # ditto +bucket = "lakehouse-go-primary" # G0 dedicated bucket so Rust + Go coexist +access_key_id = "" # populated by SecretsProvider from /etc/lakehouse/secrets-go.toml +secret_access_key = "" # ditto use_path_style = true [log] diff --git a/scripts/d2_smoke.sh b/scripts/d2_smoke.sh new file mode 100755 index 0000000..d1a8e25 --- /dev/null +++ b/scripts/d2_smoke.sh @@ -0,0 +1,167 @@ +#!/usr/bin/env bash +# D2 smoke — proves the Day 2 acceptance gate end-to-end against the +# live MinIO at :9000 and the dedicated bucket "lakehouse-go-primary". +# +# Validates: +# - PUT a small file → 200, body roundtrips on GET, appears in LIST +# - DELETE removes the key, subsequent GET → 404 +# - PUT exceeding 256 MiB → 413 Payload Too Large +# - 5th concurrent PUT (4-slot semaphore full) → 503 + Retry-After:5 +# +# Usage: ./scripts/d2_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[d2-smoke] building storaged..." +go build -o bin/ ./cmd/storaged + +# Cleanup any prior storaged process on :3211 before launching. +pkill -f "bin/storaged" 2>/dev/null || true + +STORAGED_PID="" +SLOW_PIDS=() +TMP="$(mktemp -d)" +cleanup() { + echo "[d2-smoke] cleanup" + if [ -n "$STORAGED_PID" ]; then + kill "$STORAGED_PID" 2>/dev/null || true + fi + if [ ${#SLOW_PIDS[@]} -gt 0 ]; then + kill "${SLOW_PIDS[@]}" 2>/dev/null || true + fi + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +echo "[d2-smoke] launching storaged..." +./bin/storaged > /tmp/storaged.log 2>&1 & +STORAGED_PID=$! + +# Poll /health up to 5s — same discipline as d1_smoke. +deadline=$(($(date +%s) + 5)) +while [ "$(date +%s)" -lt "$deadline" ]; do + if curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then + break + fi + sleep 0.05 +done +if ! curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then + echo " [d2-smoke] storaged failed to bind within 5s — log:" + tail -10 /tmp/storaged.log | sed 's/^/ /' + exit 1 +fi + +FAILED=0 +KEY="d2-smoke/$(date +%s).bin" + +echo "[d2-smoke] PUT round-trip:" +printf "hello-d2-smoke" > "$TMP/sample.bin" +HTTP="$(curl -sS -o "$TMP/put.out" -w '%{http_code}' -X PUT --data-binary @"$TMP/sample.bin" "http://127.0.0.1:3211/storage/put/$KEY")" +if [ "$HTTP" = "200" ]; then + echo " ✓ PUT $KEY → 200" +else + echo " ✗ PUT $KEY → $HTTP (body=$(cat "$TMP/put.out"))" + FAILED=1 +fi + +echo "[d2-smoke] GET echoes bytes:" +curl -sS -o "$TMP/get.out" "http://127.0.0.1:3211/storage/get/$KEY" +if cmp -s "$TMP/sample.bin" "$TMP/get.out"; then + echo " ✓ GET $KEY → bytes match" +else + echo " ✗ GET $KEY → bytes differ" + FAILED=1 +fi + +echo "[d2-smoke] LIST includes key:" +LISTED="$(curl -sS "http://127.0.0.1:3211/storage/list?prefix=d2-smoke/" | grep -o "\"$KEY\"" || true)" +if [ -n "$LISTED" ]; then + echo " ✓ LIST prefix=d2-smoke/ → contains $KEY" +else + echo " ✗ LIST prefix=d2-smoke/ → missing $KEY" + FAILED=1 +fi + +echo "[d2-smoke] DELETE then GET → 404:" +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$KEY" +HTTP="$(curl -sS -o /dev/null -w '%{http_code}' "http://127.0.0.1:3211/storage/get/$KEY")" +if [ "$HTTP" = "404" ]; then + echo " ✓ DELETE then GET → 404" +else + echo " ✗ DELETE then GET → $HTTP (expected 404)" + FAILED=1 +fi + +echo "[d2-smoke] 256 MiB cap → 413:" +dd if=/dev/zero of="$TMP/big.bin" bs=1M count=257 status=none +HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X PUT --data-binary @"$TMP/big.bin" "http://127.0.0.1:3211/storage/put/d2-smoke/oversize.bin")" +if [ "$HTTP" = "413" ]; then + echo " ✓ PUT 257 MiB → 413" +else + echo " ✗ PUT 257 MiB → $HTTP (expected 413)" + FAILED=1 +fi + +echo "[d2-smoke] semaphore: 5th concurrent PUT → 503 + Retry-After:5" +# Streaming uploads: -T file with --limit-rate makes curl actually +# pace the body chunks as bytes ship — unlike --data-binary @- which +# buffers stdin before opening the connection. +dd if=/dev/zero of="$TMP/slow.bin" bs=1M count=100 status=none + +slow_put() { + local idx="$1" + local out="$TMP/slow_${idx}.code" + curl -sS -o /dev/null -w "%{http_code}" -X PUT \ + -T "$TMP/slow.bin" --limit-rate 5M \ + "http://127.0.0.1:3211/storage/put/d2-smoke-cap-${idx}.bin" > "$out" 2>&1 || echo CURL_FAIL > "$out" +} + +for i in 1 2 3 4; do + slow_put "$i" & + SLOW_PIDS+=($!) +done + +# Give the 4 slow uploads ~750ms to actually start streaming + sit +# on the semaphore. At 5 MiB/s for 100 MiB they'll each run ~20s, +# so a brief warmup is plenty. +sleep 0.75 + +# Capture full headers on the 5th request to read Retry-After. +curl -sS -i -o "$TMP/blocked.out" -w '%{http_code}' -X PUT \ + --data-binary "blocked" \ + "http://127.0.0.1:3211/storage/put/d2-smoke-blocked.bin" > "$TMP/blocked.code" 2>/dev/null || true +HTTP_BLOCKED="$(cat "$TMP/blocked.code")" +RA="$(grep -i '^Retry-After:' "$TMP/blocked.out" | awk '{print $2}' | tr -d '\r' || true)" + +if [ "$HTTP_BLOCKED" = "503" ] && [ "$RA" = "5" ]; then + echo " ✓ 5th concurrent PUT → 503 + Retry-After: 5" +else + echo " ✗ 5th concurrent PUT → code=$HTTP_BLOCKED retry-after=$RA" + echo " (slow PUT codes: $(for i in 1 2 3 4; do printf '%s ' "$(cat "$TMP/slow_${i}.code" 2>/dev/null || echo ?)"; done))" + FAILED=1 +fi + +# Drain the 4 slow PUTs cleanly. Don't wait on STORAGED_PID — that +# only exits on signal. Wait only on the slow PUT PIDs we spawned. +for pid in "${SLOW_PIDS[@]}"; do + wait "$pid" 2>/dev/null || true +done +SLOW_PIDS=() + +# Cleanup smoke keys regardless of pass/fail. +for i in 1 2 3 4; do + curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/d2-smoke-cap-${i}.bin" || true +done +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/d2-smoke-blocked.bin" || true +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/d2-smoke/oversize.bin" || true + +if [ "$FAILED" -eq 0 ]; then + echo "[d2-smoke] D2 acceptance gate: PASSED" + exit 0 +else + echo "[d2-smoke] D2 acceptance gate: FAILED" + exit 1 +fi