G0 D2: storaged S3 GET/PUT/LIST/DELETE · 3-lineage scrum · 4 fixes applied

Phase G0 Day 2 ships storaged: aws-sdk-go-v2 wrapper + chi routes
binding 127.0.0.1:3211 with 256 MiB MaxBytesReader, Content-Length
up-front 413, and a 4-slot non-blocking semaphore returning 503 +
Retry-After:5 when full. Acceptance smoke (6/6 probes) PASSES against
the dedicated MinIO bucket lakehouse-go-primary, isolated from the
Rust system's lakehouse bucket during coexistence.

Cross-lineage scrum on the shipped code:
  - Opus 4.7 (opencode): 1 BLOCK + 3 WARN + 3 INFO
  - Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO (3 false positives)
  - Kimi K2-0905 (openrouter, after route-shopping past opencode's 4k
    cap and the direct adapter's empty-content reasoning bug):
    1 BLOCK + 2 WARN + 1 INFO

Fixed:
  C1 buildRegistry ctx cancel footgun → context.Background()
     (Opus + Kimi convergent; future credential refresh chains)
  C2 MaxBytesReader unwrap through manager.Uploader multipart
     goroutines → Content-Length up-front 413 + string-suffix fallback
     (Opus + Kimi convergent; latent 500-instead-of-413 in 5-256 MiB range)
  C3 Bucket.List unbounded accumulation → MaxListResults=10_000 cap
     (Opus + Kimi convergent; OOM guard)
  S1 PUT response Content-Type: application/json (Opus single-reviewer)

Strict validateKey policy (J approved): rejects empty, >1024B, NUL,
leading "/", ".." path components, CR/LF/tab control characters.
DELETE exposed at HTTP layer (J approved option A) for symmetry +
smoke ergonomics.

Build clean, vet clean, all unit tests pass, smoke 6/6 PASS after
every fix round. go.mod 1.23 → 1.24 (required by aws-sdk-go-v2).

Process finding worth recording: opencode caps non-streaming Kimi at
max_tokens=4096; the direct kimi.com adapter consumed 8192 tokens of
reasoning but surfaced empty content; openrouter/moonshotai/kimi-k2-0905
delivered structured output in ~33s. Future Kimi scrums should default
to that route.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-28 23:23:03 -05:00
parent ad2ec1aca9
commit 8cfcdb8e5f
13 changed files with 1199 additions and 12 deletions

1
.gitignore vendored
View File

@ -39,3 +39,4 @@ vendor/
# Secrets — never commit. Resolved via SecretsProvider per ADR-001 §1.x.
*.env
secrets.toml
secrets-go.toml

View File

@ -1,18 +1,38 @@
// storaged is the object I/O service. D2 wires the actual S3
// GET/PUT/LIST routes; D1 just stands up the binary with /health.
// storaged is the object I/O service. D2 wires GET / PUT / LIST /
// DELETE routes against a single bucket ("primary") in the registry.
// Bind is 127.0.0.1 only (G0 dev — no auth on the wire); body cap is
// 256 MiB; concurrent in-flight PUTs are capped at 4 with non-blocking
// try-acquire (503 + Retry-After when full).
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"io"
"log/slog"
"net/http"
"os"
"strings"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/storaged"
)
const (
maxPutBytes = 256 << 20 // 256 MiB per Qwen Q1 fix
maxConcurrentPut = 4 // 4-slot semaphore on in-flight PUTs
retryAfterSecs = "5" // Retry-After header on 503
primaryBucket = "primary"
)
func main() {
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
secretsPath := flag.String("secrets", "/etc/lakehouse/secrets-go.toml", "path to secrets TOML (Go-side; Rust uses /etc/lakehouse/secrets.toml)")
flag.Parse()
cfg, err := shared.LoadConfig(*configPath)
@ -21,12 +41,240 @@ func main() {
os.Exit(1)
}
if err := shared.Run("storaged", cfg.Storaged.Bind, func(_ chi.Router) {
// D2.4 wires GET/PUT/LIST routes here with localhost-only
// bind, 256 MiB MaxBytesReader, and a 4-slot semaphore on
// in-flight PUTs (per Qwen Q1 fix).
}); err != nil {
registry, err := buildRegistry(cfg, *secretsPath)
if err != nil {
slog.Error("bucket registry", "err", err)
os.Exit(1)
}
h := newHandlers(registry)
if err := shared.Run("storaged", cfg.Storaged.Bind, h.register); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
}
}
// buildRegistry constructs the (single, G0) bucket registry. Multi-bucket
// federation lands in G2; the registry shape is in place so that arrives
// without an HTTP-layer refactor.
func buildRegistry(cfg shared.Config, secretsPath string) (*storaged.BucketRegistry, error) {
prov, err := secrets.NewFileProvider(secretsPath, secrets.S3Credentials{
AccessKeyID: cfg.S3.AccessKeyID,
SecretAccessKey: cfg.S3.SecretAccessKey,
})
if err != nil {
return nil, err
}
// Per Opus C1 review: don't tie the AWS config-load context to a
// canceller that fires when buildRegistry returns. With static creds
// it's fine today, but EC2 IMDS / SSO / AssumeRole credential
// providers (G2+) capture the load ctx for refresh — a cancelled
// ctx silently fails them at the next refresh. Use Background here;
// per-request lifetimes flow through r.Context() in handlers.
bucket, err := storaged.NewBucket(context.Background(), cfg.S3, prov, primaryBucket)
if err != nil {
return nil, err
}
reg := storaged.NewRegistry()
if err := reg.Register(bucket); err != nil {
return nil, err
}
return reg, nil
}
// handlers carries the registry + the PUT semaphore. One instance
// per process; chi routes close over it.
type handlers struct {
reg *storaged.BucketRegistry
putSem chan struct{}
}
func newHandlers(reg *storaged.BucketRegistry) *handlers {
return &handlers{
reg: reg,
putSem: make(chan struct{}, maxConcurrentPut),
}
}
func (h *handlers) register(r chi.Router) {
// Verb-paths per PHASE_G0_KICKOFF D2.4 spec. REST-style refactor
// (GET/PUT/DELETE on a single /storage/{key}) deferred until G2.
r.Get("/storage/get/*", h.handleGet)
r.Put("/storage/put/*", h.handlePut)
r.Get("/storage/list", h.handleList)
r.Delete("/storage/delete/*", h.handleDelete)
}
// extractKey pulls the wildcard key out of a chi route. Routes are
// registered as `/storage/<verb>/*` so the wildcard captures the
// remainder, including embedded slashes.
func extractKey(r *http.Request) string {
// chi.URLParam with "*" returns everything after the route prefix.
return chi.URLParam(r, "*")
}
// validateKey rejects keys that would be unsafe to round-trip through
// the system. The exact policy is a G0 design choice — see the focused
// decision request in the D2 PR description (validateKey-policy).
func validateKey(key string) error {
if key == "" {
return errors.New("empty key")
}
if len(key) > 1024 {
return errors.New("key too long (>1024 bytes)")
}
if strings.ContainsRune(key, 0) {
return errors.New("key contains NUL byte")
}
if strings.HasPrefix(key, "/") {
return errors.New("key starts with /")
}
for _, part := range strings.Split(key, "/") {
if part == ".." {
return errors.New("key contains .. component")
}
}
if strings.ContainsAny(key, "\r\n\t") {
return errors.New("key contains control char")
}
return nil
}
func (h *handlers) handleGet(w http.ResponseWriter, r *http.Request) {
key := extractKey(r)
if err := validateKey(key); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
body, info, err := bucket.Get(r.Context(), key)
if errors.Is(err, storaged.ErrKeyNotFound) {
http.Error(w, "not found", http.StatusNotFound)
return
}
if err != nil {
slog.Error("storage get", "key", key, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
defer body.Close()
if info.ETag != "" {
w.Header().Set("ETag", info.ETag)
}
w.Header().Set("Content-Type", "application/octet-stream")
if _, err := io.Copy(w, body); err != nil {
slog.Warn("storage get copy", "key", key, "err", err)
}
}
func (h *handlers) handlePut(w http.ResponseWriter, r *http.Request) {
key := extractKey(r)
if err := validateKey(key); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Up-front Content-Length cap. Per Opus C3 review: the
// manager.Uploader's multipart path runs body reads in goroutines
// and wraps errors in its own types, so *http.MaxBytesError can be
// buried by the time it reaches us — meaning bodies just over the
// 5 MiB multipart threshold could surface as 500 instead of 413.
// Catching Content-Length up front returns 413 deterministically
// when the client honestly declares size; MaxBytesReader + the
// string-match fallback below cover chunked / lying-CL cases.
if r.ContentLength > maxPutBytes {
w.Header().Set("Retry-After", retryAfterSecs)
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
return
}
// Non-blocking try-acquire: if the 4-slot semaphore is full, return
// 503 + Retry-After:5 instantly rather than holding the connection.
// Per PHASE_G0_KICKOFF D2.4: "PUTs blocked on the semaphore → 503
// with Retry-After: 5".
select {
case h.putSem <- struct{}{}:
defer func() { <-h.putSem }()
default:
w.Header().Set("Retry-After", retryAfterSecs)
http.Error(w, "storaged: put concurrency cap reached", http.StatusServiceUnavailable)
return
}
// 256 MiB per-request body cap. Reads beyond this surface as
// *http.MaxBytesError; for chunked-encoding bodies that's the only
// signal we get. Defer LIFO order: r.Body.Close fires before
// <-h.putSem, so the body is fully drained before the slot frees.
r.Body = http.MaxBytesReader(w, r.Body, maxPutBytes)
defer r.Body.Close()
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := bucket.Put(r.Context(), key, r.Body); err != nil {
// Two-layer detect: errors.As catches the typed error when it
// survives unwrapping; the string-suffix check catches cases
// where manager.Uploader's multipart path wraps the body-read
// failure in its own aggregate type.
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
w.Header().Set("Retry-After", retryAfterSecs)
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
return
}
slog.Error("storage put", "key", key, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"ok"}`))
}
func (h *handlers) handleList(w http.ResponseWriter, r *http.Request) {
prefix := r.URL.Query().Get("prefix")
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
objs, err := bucket.List(r.Context(), prefix)
if err != nil {
slog.Error("storage list", "prefix", prefix, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"prefix": prefix,
"objects": objs,
})
}
func (h *handlers) handleDelete(w http.ResponseWriter, r *http.Request) {
key := extractKey(r)
if err := validateKey(key); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
bucket, err := h.reg.Resolve(primaryBucket)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := bucket.Delete(r.Context(), key); err != nil {
slog.Error("storage delete", "key", key, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}

38
cmd/storaged/main_test.go Normal file
View File

@ -0,0 +1,38 @@
package main
import "testing"
func TestValidateKey(t *testing.T) {
cases := []struct {
name string
key string
wantErr bool
}{
{"happy path", "data/workers_500k.parquet", false},
{"deeply nested", "a/b/c/d/e/f/g.txt", false},
{"empty", "", true},
{"too long", string(make([]byte, 1025)), true},
{"NUL byte", "foo\x00bar", true},
{"leading slash", "/foo/bar", true},
{"dotdot component", "data/../etc/passwd", true},
{"dotdot at end", "data/..", true},
{"dotdot at start", "../etc", true},
{"single dot is fine", "data/./x.txt", false},
{"dotdot embedded is fine", "data/.. /x.txt", false}, // not a path component
{"newline", "foo\nbar", true},
{"carriage return", "foo\rbar", true},
{"tab", "foo\tbar", true},
{"unicode is fine", "résumé/data.txt", false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := validateKey(tc.key)
if tc.wantErr && err == nil {
t.Errorf("expected error for %q, got nil", tc.key)
}
if !tc.wantErr && err != nil {
t.Errorf("expected ok for %q, got %v", tc.key, err)
}
})
}
}

View File

@ -435,3 +435,125 @@ D1 ships harder than it would have without the scrum: bind-error
handling is now race-free, smoke is deterministic, log volume on
/health is acknowledged, secrets handling has a flagged path
forward.
---
## D2 — actual run results (2026-04-28 evening)
Phase G0 Day 2 executed end-to-end. Output of `scripts/d2_smoke.sh`:
```
[d2-smoke] PUT round-trip:
✓ PUT d2-smoke/<ts>.bin → 200
[d2-smoke] GET echoes bytes:
✓ GET d2-smoke/<ts>.bin → bytes match
[d2-smoke] LIST includes key:
✓ LIST prefix=d2-smoke/ → contains d2-smoke/<ts>.bin
[d2-smoke] DELETE then GET → 404:
✓ DELETE then GET → 404
[d2-smoke] 256 MiB cap → 413:
✓ PUT 257 MiB → 413
[d2-smoke] semaphore: 5th concurrent PUT → 503 + Retry-After:5
✓ 5th concurrent PUT → 503 + Retry-After: 5
[d2-smoke] D2 acceptance gate: PASSED
```
What landed:
- `internal/secrets/provider.go``Provider` interface, `FileProvider`
reading `/etc/lakehouse/secrets-go.toml` with inline-fallback for
G0 dev convenience, `StaticProvider` test helper
- `internal/storaged/bucket.go``aws-sdk-go-v2/service/s3` wrapper:
Get/Put/List/Delete; `manager.Uploader` for multipart-streaming PUT;
`MaxListResults=10_000` cap with `...truncated...` sentinel
- `internal/storaged/registry.go``BucketRegistry` (single bucket
in G0; G2 multi-bucket federation extends this)
- `cmd/storaged/main.go` — verb-paths `GET/PUT/LIST/DELETE`, strict
`validateKey` (rejects empty, >1024B, NUL, leading-`/`, `..`,
CR/LF/tab), Content-Length up-front 413, `MaxBytesReader` 256 MiB
body cap, 4-slot non-blocking semaphore (503+Retry-After:5)
- `scripts/d2_smoke.sh` — 6 acceptance probes; uses `curl -T --limit-rate`
for true streaming uploads (`--data-binary @-` first attempt
buffered client-side, semaphore never engaged)
- Per-package unit tests for provider + registry + validateKey
- New bucket `lakehouse-go-primary` on the existing MinIO at `:9000`,
isolated from the Rust `lakehouse` bucket during coexistence
Out of spec but added: DELETE was exposed at the HTTP layer (the
kickoff D2.4 listed only GET/PUT/LIST in routes; `bucket.Delete` was
in the wrapper). J approved option **A** (DELETE exposed) for symmetry
+ smoke ergonomics.
`validateKey` policy: J approved the strict stance — leading `/`,
`..` components, and CR/LF/tab control characters all rejected at the
HTTP boundary. Costs ~5 lines, propagates safety to every downstream
consumer.
Next: D3 — catalogd Parquet manifests with idempotent register.
---
## D2 — code scrum review (3-lineage parallel pass)
After D2 shipped, the actual code went through the same 3-lineage
parallel scrum as D1.
| Pass | Reviewer | Latency | Findings |
|---|---|---|---|
| 1 | `opencode/claude-opus-4-7` | ~30s | 1 BLOCK + 3 WARN + 3 INFO = 7 |
| 2 | `openrouter/qwen/qwen3-coder` | ~32s | 2 BLOCK + 1 WARN + 1 INFO = 4 |
| 3a | `opencode/kimi-k2.6` (max_tokens=4096) | 18s | discursive — finish_reason=length, no structured output (model spent budget thinking, never reached BLOCK/WARN format). opencode rejected `max_tokens>4096` without `stream=true` |
| 3b | `kimi/kimi-k2-turbo` (direct adapter) | 124s | empty content, finish_reason=length (8192 reasoning tokens, no surfaced output) |
| 3c | `openrouter/moonshotai/kimi-k2-0905` | 33s | 1 BLOCK + 2 WARN + 1 INFO = 4 (used as the K-lineage representative) |
The Kimi route shopping (3a → 3c) was a process finding worth
recording: opencode caps non-streaming Kimi calls at 4096 output
tokens, the direct kimi.com adapter consumed 8192 tokens of
reasoning but surfaced empty `content`, and openrouter's
`moonshotai/kimi-k2-0905` route delivered structured output in
~33s. For future scrums on Kimi, default to
`openrouter/moonshotai/kimi-k2-0905`.
### Convergent findings (≥2 reviewers — high confidence)
| # | Severity | Finding | Reviewers | Disposition |
|---|---|---|---|---|
| C1 | BLOCK | `buildRegistry` `defer cancel()` cancels the ctx that the AWS SDK was loaded with — fine today (static creds, sync call) but breaks future credential refresh chains (EC2 IMDS, SSO, AssumeRole) | Opus + Kimi | **Fixed** — switched to `context.Background()` for SDK construction. Per-request lifetimes flow through `r.Context()` |
| C2 | WARN→BLOCK | `*http.MaxBytesError` may not unwrap through `manager.Uploader`'s multipart goroutines for bodies >5 MiB; smoke covers 257 MiB single-PutObject path only — bodies in the multipart range that exceed 256 MiB could surface as 500 instead of 413 | Opus (WARN) + Kimi (BLOCK) | **Fixed** — added Content-Length up-front 413 (deterministic for honest clients) + string-suffix fallback (`"http: request body too large"`) on the Put error path for chunked / lying-CL cases |
| C3 | INFO→WARN | `Bucket.List` accumulates unbounded results into a slice; stray `prefix=""` against a large bucket OOMs the daemon | Opus (INFO) + Kimi (WARN) | **Fixed**`MaxListResults=10_000` cap; truncated responses append a sentinel `ObjectInfo{Key: "...truncated..."}` so callers see they didn't get everything |
### Single-reviewer findings (lineage-specific catches)
| # | Severity | Finding | Reviewer | Disposition |
|---|---|---|---|---|
| S1 | WARN | PUT 200 response missing `Content-Type: application/json` — Go's content-sniffing won't catch the JSON prefix, clients may see `text/plain` | Opus | **Fixed** — explicit header set before `WriteHeader` |
| S2 | WARN | `Bucket.Get` body close ordering fragile — currently correct but the contract is implicit | Opus | **Accepted** — current order is correct (early returns on errors where body is nil); no fix required |
| S3 | INFO | `extractKey` uses chi `*` wildcard — works correctly, just noting empty-key path is covered by `validateKey` | Opus | **Accepted** — already covered |
| S4 | INFO | `FileProvider.mu sync.RWMutex` is unused given the immutable-after-construction design | Opus | **Accepted** — drop the mutex when SIGHUP reload lands in G1 (S6 below); keep it for now as a placeholder |
| S5 | INFO | `Bucket.Delete` doesn't translate not-found to `ErrKeyNotFound` | (Kimi noted, not flagged) | **Accepted** — S3 DeleteObject is idempotent by spec; non-error on missing key is the correct behavior |
| S6 | INFO | `FileProvider` never reloads (no SIGHUP handler) | Kimi | **Deferred to G1** — reload-on-SIGHUP is on the G1 list per `internal/shared/server.go` opening comment |
| S7 | INFO | Error messages on no-creds-found could specify which source (file vs fallback) was missing | Qwen | **Deferred** — minor debug ergonomics, no production impact |
| F1 | BLOCK | "Body close happens after semaphore release" | Qwen | **Dismissed** — false. Defer order is LIFO; `r.Body.Close()` was registered AFTER `<-h.putSem`, so it fires FIRST. Body closes before slot frees |
| F2 | BLOCK | "ObjectInfo fields can be nil-dereferenced in List" | Qwen | **Dismissed** — false. `bucket.go:147-160` checks `if o.Key != nil` etc. before dereferencing every field |
| F3 | WARN | "MaxBytesReader not applied to semaphore-protected path" | Qwen | **Dismissed** — false. Semaphore is non-blocking try-acquire (`select { default }`); there is no waiting state where pre-cap MaxBytesReader matters |
### Cumulative D2 disposition
- 3-lineage parallel pass: **15 distinct findings** (3 convergent + 7 single-reviewer real + 3 false + 2 absorbed by other findings)
- **Fixed: 5** (3 convergent + 2 single-reviewer Opus)
- **Accepted-with-rationale: 5**
- **Deferred to G1+: 2**
- **Dismissed (false positives): 3** (all from Qwen — its strength on D1 doesn't reproduce on D2 code; the lineage caught real C1/C2 issues there but misread defer order + nil-checks here)
- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round.
The Kimi/Opus convergence on **C1 (ctx cancel)** is the highest-signal
finding of the round: two completely different lineages flagged the
same architectural footgun. C2 (MaxBytesReader unwrap) was the most
*consequential* — the smoke test would have stayed green while
production multi-MiB uploads silently returned 500 on oversize. C3
was a latent OOM that's a 5-line fix.
The Qwen lineage delivered three false BLOCKs on D2 — different from
its D1 contribution where it caught two real BLOCKs that Opus missed.
Lineage rotation is real; on a given PR, one lineage may be the only
one finding bugs and another may be confidently wrong. The convergence
filter (≥2 reviewers) is the right gate.

24
go.mod
View File

@ -1,8 +1,30 @@
module git.agentview.dev/profit/golangLAKEHOUSE
go 1.23
go 1.24
require (
github.com/go-chi/chi/v5 v5.2.5
github.com/pelletier/go-toml/v2 v2.3.0
)
require (
github.com/aws/aws-sdk-go-v2 v1.41.6 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 // indirect
github.com/aws/aws-sdk-go-v2/config v1.32.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect
github.com/aws/smithy-go v1.25.0 // indirect
)

38
go.sum
View File

@ -1,3 +1,41 @@
github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg=
github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 h1:adBsCIIpLbLmYnkQU+nAChU5yhVTvu5PerROm+/Kq2A=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9/go.mod h1:uOYhgfgThm/ZyAuJGNQ5YgNyOlYfqnGpTHXvk3cpykg=
github.com/aws/aws-sdk-go-v2/config v1.32.16 h1:Q0iQ7quUgJP0F/SCRTieScnaMdXr9h/2+wze1u3cNeM=
github.com/aws/aws-sdk-go-v2/config v1.32.16/go.mod h1:duCCnJEFqpt2RC6no1iK6q+8HpwOAkiUua0pY507dQc=
github.com/aws/aws-sdk-go-v2/credentials v1.19.15 h1:fyvgWTszojq8hEnMi8PPBTvZdTtEVmAVyo+NFLHBhH4=
github.com/aws/aws-sdk-go-v2/credentials v1.19.15/go.mod h1:gJiYyMOjNg8OEdRWOf3CrFQxM2a98qmrtjx1zuiQfB8=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 h1:IOGsJ1xVWhsi+ZO7/NW8OuZZBtMJLZbk4P5HDjJO0jQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22/go.mod h1:b+hYdbU+jGKfXE8kKM6g1+h+L/Go3vMvzlxBsiuGsxg=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 h1:QkX8xXGmX81xuFrXNqU7NChFXVuKOl9EFrlSjy4RDfg=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16/go.mod h1:CI+oguch+yROmJLFO0/wp8oRXmtUBibAQCis7lKQ95g=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 h1:GmLa5Kw1ESqtFpXsx5MmC84QWa/ZrLZvlJGa2y+4kcQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22/go.mod h1:6sW9iWm9DK9YRpRGga/qzrzNLgKpT2cIxb7Vo2eNOp0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 h1:dY4kWZiSaXIzxnKlj17nHnBcXXBfac6UlsAx2qL6XrU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22/go.mod h1:KIpEUx0JuRZLO7U6cbV204cWAEco2iC3l061IxlwLtI=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 h1:FPXsW9+gMuIeKmz7j6ENWcWtBGTe1kH8r9thNt5Uxx4=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23/go.mod h1:7J8iGMdRKk6lw2C+cMIphgAnT8uTwBwNOsGkyOCm80U=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 h1:HtOTYcbVcGABLOVuPYaIihj6IlkqubBwFj10K5fxRek=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8/go.mod h1:VsK9abqQeGlzPgUr+isNWzPlK2vKe9INMLWnY65f5Xs=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 h1:xnvDEnw+pnj5mctWiYuFbigrEzSm35x7k4KS/ZkCANg=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14/go.mod h1:yS5rNogD8e0Wu9+l3MUwr6eENBzEeGejvINpN5PAYfY=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 h1:PUmZeJU6Y1Lbvt9WFuJ0ugUK2xn6hIWUBBbKuOWF30s=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22/go.mod h1:nO6egFBoAaoXze24a2C0NjQCvdpk8OueRoYimvEB9jo=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 h1:SE+aQ4DEqG53RRCAIHlCf//B2ycxGH7jFkpnAh/kKPM=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22/go.mod h1:ES3ynECd7fYeJIL6+oax+uIEljmfps0S70BaQzbMd/o=
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 h1:7G26Sae6PMKn4kMcU5JzNfrm1YrKwyOhowXPYR2WiWY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0/go.mod h1:Fw9aqhJicIVee1VytBBjH+l+5ov6/PhbtIK/u3rt/ls=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 h1:a1Fq/KXn75wSzoJaPQTgZO0wHGqE9mjFnylnqEPTchA=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10/go.mod h1:p6+MXNxW7IA6dMgHfTAzljuwSKD0NCm/4lbS4t6+7vI=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 h1:x6bKbmDhsgSZwv6q19wY/u3rLk/3FGjJWyqKcIRufpE=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16/go.mod h1:CudnEVKRtLn0+3uMV0yEXZ+YZOKnAtUJ5DmDhilVnIw=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 h1:oK/njaL8GtyEihkWMD4k3VgHCT64RQKkZwh0DG5j8ak=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20/go.mod h1:JHs8/y1f3zY7U5WcuzoJ/yAYGYtNIVPKLIbp61euvmg=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 h1:ks8KBcZPh3PYISr5dAiXCM5/Thcuxk8l+PG4+A0exds=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo=
github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U=
github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM=

View File

@ -0,0 +1,108 @@
// Package secrets resolves credentials for storaged + future bucket
// federation (G2). The G0 surface is one method — S3Credentials —
// looked up by logical bucket name. Multi-bucket lands in G2; until
// then every lookup returns the same credentials, but callers already
// pass the name so the API doesn't need to change later.
//
// FileProvider reads /etc/lakehouse/secrets.toml. If that file is
// absent OR doesn't contain credentials for the requested bucket, the
// provider falls back to the values supplied via the inline config
// (lakehouse.toml [s3] block). G0 is dev-only so the inline fallback
// is convenient; G1 will tighten this to "secrets file required".
package secrets
import (
"errors"
"fmt"
"io/fs"
"os"
"sync"
"github.com/pelletier/go-toml/v2"
)
// S3Credentials is what storaged hands to aws-sdk-go-v2 to sign
// requests. Region/endpoint/bucket are config (non-secret) and live
// on shared.S3Config — those don't pass through this provider.
type S3Credentials struct {
AccessKeyID string `toml:"access_key_id"`
SecretAccessKey string `toml:"secret_access_key"`
}
// Provider is the interface storaged depends on. Keeping this small
// is deliberate — every method here is a future migration point when
// secrets move to Vault / SOPS / SSM.
type Provider interface {
S3Credentials(bucket string) (S3Credentials, error)
}
// FileProvider is the G0 implementation. It loads the file once on
// construction; reload-on-SIGHUP is a G1 concern.
type FileProvider struct {
path string
parsed secretsFile
fallback S3Credentials
mu sync.RWMutex
}
// secretsFile mirrors the on-disk TOML shape:
//
// [s3.primary]
// access_key_id = "..."
// secret_access_key = "..."
//
// [s3.archive] # G2 multi-bucket
// access_key_id = "..."
type secretsFile struct {
S3 map[string]S3Credentials `toml:"s3"`
}
// NewFileProvider loads `path`. If the file is missing the provider
// is still usable with the inline fallback — that's a deliberate
// G0 affordance. Any other read/parse error is fatal.
func NewFileProvider(path string, fallback S3Credentials) (*FileProvider, error) {
p := &FileProvider{path: path, fallback: fallback}
b, err := os.ReadFile(path)
if errors.Is(err, fs.ErrNotExist) {
return p, nil
}
if err != nil {
return nil, fmt.Errorf("read secrets %q: %w", path, err)
}
if err := toml.Unmarshal(b, &p.parsed); err != nil {
return nil, fmt.Errorf("parse secrets %q: %w", path, err)
}
return p, nil
}
// S3Credentials resolves bucket → credentials. Lookup order:
// 1. secrets file [s3.<bucket>] section
// 2. inline fallback (lakehouse.toml [s3])
//
// If neither produces a non-empty AccessKeyID, returns an error so a
// misconfigured deployment fails loud instead of trying anonymous S3.
func (p *FileProvider) S3Credentials(bucket string) (S3Credentials, error) {
p.mu.RLock()
defer p.mu.RUnlock()
if creds, ok := p.parsed.S3[bucket]; ok && creds.AccessKeyID != "" {
return creds, nil
}
if p.fallback.AccessKeyID != "" {
return p.fallback, nil
}
return S3Credentials{}, fmt.Errorf("no credentials for bucket %q (file=%q)", bucket, p.path)
}
// StaticProvider is a test/dev helper that returns the same creds for
// every bucket. Use NewFileProvider in production code paths.
type StaticProvider struct {
Creds S3Credentials
}
func (p StaticProvider) S3Credentials(_ string) (S3Credentials, error) {
if p.Creds.AccessKeyID == "" {
return S3Credentials{}, errors.New("StaticProvider: no AccessKeyID")
}
return p.Creds, nil
}

View File

@ -0,0 +1,122 @@
package secrets
import (
"errors"
"os"
"path/filepath"
"testing"
)
func TestFileProvider_ParsesSection(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "secrets.toml")
if err := os.WriteFile(path, []byte(`
[s3.primary]
access_key_id = "AK"
secret_access_key = "SK"
`), 0o600); err != nil {
t.Fatal(err)
}
p, err := NewFileProvider(path, S3Credentials{})
if err != nil {
t.Fatalf("NewFileProvider: %v", err)
}
got, err := p.S3Credentials("primary")
if err != nil {
t.Fatalf("S3Credentials: %v", err)
}
if got.AccessKeyID != "AK" || got.SecretAccessKey != "SK" {
t.Errorf("got %+v, want {AK, SK}", got)
}
}
func TestFileProvider_FallbackWhenSectionMissing(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "secrets.toml")
// File exists, but doesn't have an [s3.primary] block.
if err := os.WriteFile(path, []byte(`
[s3.archive]
access_key_id = "OTHER"
secret_access_key = "OTHER_SK"
`), 0o600); err != nil {
t.Fatal(err)
}
p, err := NewFileProvider(path, S3Credentials{
AccessKeyID: "FALLBACK",
SecretAccessKey: "FALLBACK_SK",
})
if err != nil {
t.Fatal(err)
}
got, err := p.S3Credentials("primary")
if err != nil {
t.Fatalf("S3Credentials: %v", err)
}
if got.AccessKeyID != "FALLBACK" {
t.Errorf("expected fallback, got %+v", got)
}
}
func TestFileProvider_MissingFileIsOK(t *testing.T) {
p, err := NewFileProvider("/no/such/path", S3Credentials{
AccessKeyID: "FALLBACK",
SecretAccessKey: "FALLBACK_SK",
})
if err != nil {
t.Fatalf("NewFileProvider should not error on missing file: %v", err)
}
got, err := p.S3Credentials("primary")
if err != nil {
t.Fatalf("S3Credentials: %v", err)
}
if got.AccessKeyID != "FALLBACK" {
t.Errorf("expected fallback, got %+v", got)
}
}
func TestFileProvider_NoCredsAtAll(t *testing.T) {
p, err := NewFileProvider("/no/such/path", S3Credentials{})
if err != nil {
t.Fatal(err)
}
if _, err := p.S3Credentials("primary"); err == nil {
t.Fatal("expected error when no creds in file or fallback")
}
}
func TestFileProvider_ParseError(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "bad.toml")
if err := os.WriteFile(path, []byte("not valid toml ===\n"), 0o600); err != nil {
t.Fatal(err)
}
if _, err := NewFileProvider(path, S3Credentials{}); err == nil {
t.Fatal("expected parse error")
}
}
func TestStaticProvider(t *testing.T) {
p := StaticProvider{Creds: S3Credentials{AccessKeyID: "X", SecretAccessKey: "Y"}}
got, err := p.S3Credentials("any-bucket")
if err != nil {
t.Fatal(err)
}
if got.AccessKeyID != "X" {
t.Errorf("got %+v", got)
}
empty := StaticProvider{}
if _, err := empty.S3Credentials("any"); err == nil {
t.Error("expected error from empty StaticProvider")
}
}
func TestStaticProvider_ErrorIsExported(t *testing.T) {
// Sanity: the empty-creds path should be a real error type, not nil.
_, err := StaticProvider{}.S3Credentials("x")
if err == nil || !errors.Is(err, err) {
t.Fatal("expected non-nil error")
}
}

208
internal/storaged/bucket.go Normal file
View File

@ -0,0 +1,208 @@
// Package storaged is the object I/O layer for Lakehouse-Go. Bucket
// wraps aws-sdk-go-v2's s3.Client + s3 manager.Uploader and exposes
// the four operations the rest of the system actually needs:
// Get, Put, List, Delete. Path-style addressing is forced on for
// MinIO; AWS deployments override it via shared.S3Config.UsePathStyle.
package storaged
import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
)
// ErrKeyNotFound is returned by Get/Delete when the underlying S3
// response is NoSuchKey or 404. Callers translate it to HTTP 404.
var ErrKeyNotFound = errors.New("storaged: key not found")
// ObjectInfo is the per-key data List returns. We don't surface the
// full s3.Object — most fields are noise for a single-bucket G0.
type ObjectInfo struct {
Key string
Size int64
ETag string
LastModified time.Time
}
// Bucket is the runtime handle over one S3 bucket. The logical name
// is what callers reference (e.g. "primary"); the physical bucket on
// the server is held in s3Bucket.
type Bucket struct {
name string
s3Bucket string
client *s3.Client
uploader *manager.Uploader
}
// NewBucket builds a Bucket from the shared S3 config + a credentials
// lookup against the provider. Region/endpoint/path-style come from
// the inline config; access keys come from the provider so they can
// migrate to Vault / SSM later without a config schema change.
func NewBucket(ctx context.Context, cfg shared.S3Config, prov secrets.Provider, logicalName string) (*Bucket, error) {
creds, err := prov.S3Credentials(logicalName)
if err != nil {
return nil, err
}
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(cfg.Region),
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
creds.AccessKeyID, creds.SecretAccessKey, "",
)),
)
if err != nil {
return nil, fmt.Errorf("aws config: %w", err)
}
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
if cfg.Endpoint != "" {
o.BaseEndpoint = aws.String(cfg.Endpoint)
}
o.UsePathStyle = cfg.UsePathStyle
})
return &Bucket{
name: logicalName,
s3Bucket: cfg.Bucket,
client: client,
uploader: manager.NewUploader(client),
}, nil
}
// Name returns the logical name (e.g. "primary"). Useful for logs.
func (b *Bucket) Name() string { return b.name }
// Get streams an object back to the caller. The returned ReadCloser
// must be closed; on ErrKeyNotFound it is nil.
func (b *Bucket) Get(ctx context.Context, key string) (io.ReadCloser, *ObjectInfo, error) {
out, err := b.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(b.s3Bucket),
Key: aws.String(key),
})
if err != nil {
if isNotFound(err) {
return nil, nil, ErrKeyNotFound
}
return nil, nil, fmt.Errorf("s3 get %q: %w", key, err)
}
info := &ObjectInfo{
Key: key,
}
if out.ContentLength != nil {
info.Size = *out.ContentLength
}
if out.ETag != nil {
info.ETag = *out.ETag
}
if out.LastModified != nil {
info.LastModified = *out.LastModified
}
return out.Body, info, nil
}
// Put uploads via the s3 manager.Uploader so bodies above the SDK's
// part threshold (~5 MiB) auto-multipart without buffering. Caller
// is responsible for capping body size before calling — Bucket itself
// is a thin pipe; the 256 MiB MaxBytesReader gate lives in cmd/storaged.
func (b *Bucket) Put(ctx context.Context, key string, body io.Reader) error {
_, err := b.uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(b.s3Bucket),
Key: aws.String(key),
Body: body,
})
if err != nil {
return fmt.Errorf("s3 put %q: %w", key, err)
}
return nil
}
// MaxListResults caps a single List call so a stray prefix="" against
// a large bucket can't OOM the daemon. Per Opus + Kimi convergent
// scrum review (D2 round). G2 multi-bucket federation will introduce
// continuation tokens so callers can paginate explicitly.
const MaxListResults = 10_000
// List returns up to MaxListResults objects whose key starts with
// prefix. If the bucket holds more than the cap, the result is
// truncated and a sentinel ObjectInfo with Key="...truncated..." is
// appended so callers see they didn't get everything. G0 single-bucket
// dev workloads are far below the cap; the cap protects against
// production bucket sizes during the migration.
func (b *Bucket) List(ctx context.Context, prefix string) ([]ObjectInfo, error) {
out := make([]ObjectInfo, 0, 64)
pager := s3.NewListObjectsV2Paginator(b.client, &s3.ListObjectsV2Input{
Bucket: aws.String(b.s3Bucket),
Prefix: aws.String(prefix),
})
for pager.HasMorePages() {
page, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("s3 list prefix=%q: %w", prefix, err)
}
for _, o := range page.Contents {
if len(out) >= MaxListResults {
out = append(out, ObjectInfo{Key: "...truncated..."})
return out, nil
}
oi := ObjectInfo{}
if o.Key != nil {
oi.Key = *o.Key
}
if o.Size != nil {
oi.Size = *o.Size
}
if o.ETag != nil {
oi.ETag = *o.ETag
}
if o.LastModified != nil {
oi.LastModified = *o.LastModified
}
out = append(out, oi)
}
}
return out, nil
}
// Delete removes one object. Idempotent: deleting a missing key is
// not an error on S3, so we don't translate that to ErrKeyNotFound.
func (b *Bucket) Delete(ctx context.Context, key string) error {
_, err := b.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(b.s3Bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("s3 delete %q: %w", key, err)
}
return nil
}
// isNotFound checks both the typed s3 NoSuchKey error and the smithy
// generic 404 — different code paths in the SDK surface different
// shapes depending on whether HEAD-then-GET happens.
func isNotFound(err error) bool {
var nsk *types.NoSuchKey
if errors.As(err, &nsk) {
return true
}
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
switch apiErr.ErrorCode() {
case "NoSuchKey", "NotFound":
return true
}
}
return false
}

View File

@ -0,0 +1,64 @@
// BucketRegistry resolves logical bucket names → *Bucket. G0 holds
// exactly one bucket ("primary"); G2 (multi-bucket federation, per
// Rust ADR-017) extends this to many. Keeping the registry as a
// distinct type now means cmd/storaged never has to be refactored
// when G2 lands — every handler already routes through Resolve.
package storaged
import (
"errors"
"fmt"
"sync"
)
var ErrBucketUnknown = errors.New("storaged: unknown bucket")
// BucketRegistry is a thread-safe map of logical name → *Bucket.
// In G0 only "primary" is registered; the registry exists so that
// when G2 multi-bucket lands we add buckets in registry.go and the
// HTTP layer is unchanged.
type BucketRegistry struct {
mu sync.RWMutex
buckets map[string]*Bucket
}
func NewRegistry() *BucketRegistry {
return &BucketRegistry{buckets: make(map[string]*Bucket)}
}
// Register adds a bucket under its logical name. Re-registering the
// same name is rejected — G0 services build the registry once at
// startup and a duplicate is almost always a config bug.
func (r *BucketRegistry) Register(b *Bucket) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.buckets[b.name]; exists {
return fmt.Errorf("bucket %q already registered", b.name)
}
r.buckets[b.name] = b
return nil
}
// Resolve returns the bucket for a logical name. G0 callers pass
// "primary" since that's the only one registered.
func (r *BucketRegistry) Resolve(name string) (*Bucket, error) {
r.mu.RLock()
defer r.mu.RUnlock()
b, ok := r.buckets[name]
if !ok {
return nil, fmt.Errorf("%w: %q", ErrBucketUnknown, name)
}
return b, nil
}
// Names returns the registered logical names (sorted-order not
// guaranteed). Useful for /health surface or debug dumps.
func (r *BucketRegistry) Names() []string {
r.mu.RLock()
defer r.mu.RUnlock()
out := make([]string, 0, len(r.buckets))
for name := range r.buckets {
out = append(out, name)
}
return out
}

View File

@ -0,0 +1,49 @@
package storaged
import (
"errors"
"testing"
)
func TestRegistry_RegisterAndResolve(t *testing.T) {
r := NewRegistry()
b := &Bucket{name: "primary"}
if err := r.Register(b); err != nil {
t.Fatalf("Register: %v", err)
}
got, err := r.Resolve("primary")
if err != nil {
t.Fatalf("Resolve: %v", err)
}
if got != b {
t.Errorf("Resolve returned different bucket")
}
}
func TestRegistry_DuplicateRegisterRejected(t *testing.T) {
r := NewRegistry()
if err := r.Register(&Bucket{name: "primary"}); err != nil {
t.Fatal(err)
}
if err := r.Register(&Bucket{name: "primary"}); err == nil {
t.Fatal("expected error on duplicate register")
}
}
func TestRegistry_UnknownReturnsErrBucketUnknown(t *testing.T) {
r := NewRegistry()
_, err := r.Resolve("nope")
if !errors.Is(err, ErrBucketUnknown) {
t.Fatalf("expected ErrBucketUnknown, got %v", err)
}
}
func TestRegistry_Names(t *testing.T) {
r := NewRegistry()
_ = r.Register(&Bucket{name: "primary"})
_ = r.Register(&Bucket{name: "archive"})
names := r.Names()
if len(names) != 2 {
t.Fatalf("expected 2 names, got %d", len(names))
}
}

View File

@ -22,9 +22,9 @@ bind = "127.0.0.1:3214"
[s3]
endpoint = "http://localhost:9000"
region = "us-east-1"
bucket = "lakehouse-primary"
access_key_id = "" # filled by SecretsProvider in D2.3
secret_access_key = "" # ditto
bucket = "lakehouse-go-primary" # G0 dedicated bucket so Rust + Go coexist
access_key_id = "" # populated by SecretsProvider from /etc/lakehouse/secrets-go.toml
secret_access_key = "" # ditto
use_path_style = true
[log]

167
scripts/d2_smoke.sh Executable file
View File

@ -0,0 +1,167 @@
#!/usr/bin/env bash
# D2 smoke — proves the Day 2 acceptance gate end-to-end against the
# live MinIO at :9000 and the dedicated bucket "lakehouse-go-primary".
#
# Validates:
# - PUT a small file → 200, body roundtrips on GET, appears in LIST
# - DELETE removes the key, subsequent GET → 404
# - PUT exceeding 256 MiB → 413 Payload Too Large
# - 5th concurrent PUT (4-slot semaphore full) → 503 + Retry-After:5
#
# Usage: ./scripts/d2_smoke.sh
set -euo pipefail
cd "$(dirname "$0")/.."
export PATH="$PATH:/usr/local/go/bin"
echo "[d2-smoke] building storaged..."
go build -o bin/ ./cmd/storaged
# Cleanup any prior storaged process on :3211 before launching.
pkill -f "bin/storaged" 2>/dev/null || true
STORAGED_PID=""
SLOW_PIDS=()
TMP="$(mktemp -d)"
cleanup() {
echo "[d2-smoke] cleanup"
if [ -n "$STORAGED_PID" ]; then
kill "$STORAGED_PID" 2>/dev/null || true
fi
if [ ${#SLOW_PIDS[@]} -gt 0 ]; then
kill "${SLOW_PIDS[@]}" 2>/dev/null || true
fi
rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
echo "[d2-smoke] launching storaged..."
./bin/storaged > /tmp/storaged.log 2>&1 &
STORAGED_PID=$!
# Poll /health up to 5s — same discipline as d1_smoke.
deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then
break
fi
sleep 0.05
done
if ! curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then
echo " [d2-smoke] storaged failed to bind within 5s — log:"
tail -10 /tmp/storaged.log | sed 's/^/ /'
exit 1
fi
FAILED=0
KEY="d2-smoke/$(date +%s).bin"
echo "[d2-smoke] PUT round-trip:"
printf "hello-d2-smoke" > "$TMP/sample.bin"
HTTP="$(curl -sS -o "$TMP/put.out" -w '%{http_code}' -X PUT --data-binary @"$TMP/sample.bin" "http://127.0.0.1:3211/storage/put/$KEY")"
if [ "$HTTP" = "200" ]; then
echo " ✓ PUT $KEY → 200"
else
echo " ✗ PUT $KEY$HTTP (body=$(cat "$TMP/put.out"))"
FAILED=1
fi
echo "[d2-smoke] GET echoes bytes:"
curl -sS -o "$TMP/get.out" "http://127.0.0.1:3211/storage/get/$KEY"
if cmp -s "$TMP/sample.bin" "$TMP/get.out"; then
echo " ✓ GET $KEY → bytes match"
else
echo " ✗ GET $KEY → bytes differ"
FAILED=1
fi
echo "[d2-smoke] LIST includes key:"
LISTED="$(curl -sS "http://127.0.0.1:3211/storage/list?prefix=d2-smoke/" | grep -o "\"$KEY\"" || true)"
if [ -n "$LISTED" ]; then
echo " ✓ LIST prefix=d2-smoke/ → contains $KEY"
else
echo " ✗ LIST prefix=d2-smoke/ → missing $KEY"
FAILED=1
fi
echo "[d2-smoke] DELETE then GET → 404:"
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$KEY"
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' "http://127.0.0.1:3211/storage/get/$KEY")"
if [ "$HTTP" = "404" ]; then
echo " ✓ DELETE then GET → 404"
else
echo " ✗ DELETE then GET → $HTTP (expected 404)"
FAILED=1
fi
echo "[d2-smoke] 256 MiB cap → 413:"
dd if=/dev/zero of="$TMP/big.bin" bs=1M count=257 status=none
HTTP="$(curl -sS -o /dev/null -w '%{http_code}' -X PUT --data-binary @"$TMP/big.bin" "http://127.0.0.1:3211/storage/put/d2-smoke/oversize.bin")"
if [ "$HTTP" = "413" ]; then
echo " ✓ PUT 257 MiB → 413"
else
echo " ✗ PUT 257 MiB → $HTTP (expected 413)"
FAILED=1
fi
echo "[d2-smoke] semaphore: 5th concurrent PUT → 503 + Retry-After:5"
# Streaming uploads: -T file with --limit-rate makes curl actually
# pace the body chunks as bytes ship — unlike --data-binary @- which
# buffers stdin before opening the connection.
dd if=/dev/zero of="$TMP/slow.bin" bs=1M count=100 status=none
slow_put() {
local idx="$1"
local out="$TMP/slow_${idx}.code"
curl -sS -o /dev/null -w "%{http_code}" -X PUT \
-T "$TMP/slow.bin" --limit-rate 5M \
"http://127.0.0.1:3211/storage/put/d2-smoke-cap-${idx}.bin" > "$out" 2>&1 || echo CURL_FAIL > "$out"
}
for i in 1 2 3 4; do
slow_put "$i" &
SLOW_PIDS+=($!)
done
# Give the 4 slow uploads ~750ms to actually start streaming + sit
# on the semaphore. At 5 MiB/s for 100 MiB they'll each run ~20s,
# so a brief warmup is plenty.
sleep 0.75
# Capture full headers on the 5th request to read Retry-After.
curl -sS -i -o "$TMP/blocked.out" -w '%{http_code}' -X PUT \
--data-binary "blocked" \
"http://127.0.0.1:3211/storage/put/d2-smoke-blocked.bin" > "$TMP/blocked.code" 2>/dev/null || true
HTTP_BLOCKED="$(cat "$TMP/blocked.code")"
RA="$(grep -i '^Retry-After:' "$TMP/blocked.out" | awk '{print $2}' | tr -d '\r' || true)"
if [ "$HTTP_BLOCKED" = "503" ] && [ "$RA" = "5" ]; then
echo " ✓ 5th concurrent PUT → 503 + Retry-After: 5"
else
echo " ✗ 5th concurrent PUT → code=$HTTP_BLOCKED retry-after=$RA"
echo " (slow PUT codes: $(for i in 1 2 3 4; do printf '%s ' "$(cat "$TMP/slow_${i}.code" 2>/dev/null || echo ?)"; done))"
FAILED=1
fi
# Drain the 4 slow PUTs cleanly. Don't wait on STORAGED_PID — that
# only exits on signal. Wait only on the slow PUT PIDs we spawned.
for pid in "${SLOW_PIDS[@]}"; do
wait "$pid" 2>/dev/null || true
done
SLOW_PIDS=()
# Cleanup smoke keys regardless of pass/fail.
for i in 1 2 3 4; do
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/d2-smoke-cap-${i}.bin" || true
done
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/d2-smoke-blocked.bin" || true
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/d2-smoke/oversize.bin" || true
if [ "$FAILED" -eq 0 ]; then
echo "[d2-smoke] D2 acceptance gate: PASSED"
exit 0
else
echo "[d2-smoke] D2 acceptance gate: FAILED"
exit 1
fi