Phase G0 Day 2 ships storaged: aws-sdk-go-v2 wrapper + chi routes
binding 127.0.0.1:3211 with 256 MiB MaxBytesReader, Content-Length
up-front 413, and a 4-slot non-blocking semaphore returning 503 +
Retry-After:5 when full. Acceptance smoke (6/6 probes) PASSES against
the dedicated MinIO bucket lakehouse-go-primary, isolated from the
Rust system's lakehouse bucket during coexistence.
Cross-lineage scrum on the shipped code:
- Opus 4.7 (opencode): 1 BLOCK + 3 WARN + 3 INFO
- Qwen3-coder (openrouter): 2 BLOCK + 1 WARN + 1 INFO (3 false positives)
- Kimi K2-0905 (openrouter, after route-shopping past opencode's 4k
cap and the direct adapter's empty-content reasoning bug):
1 BLOCK + 2 WARN + 1 INFO
Fixed:
C1 buildRegistry ctx cancel footgun → context.Background()
(Opus + Kimi convergent; future credential refresh chains)
C2 MaxBytesReader unwrap through manager.Uploader multipart
goroutines → Content-Length up-front 413 + string-suffix fallback
(Opus + Kimi convergent; latent 500-instead-of-413 in 5-256 MiB range)
C3 Bucket.List unbounded accumulation → MaxListResults=10_000 cap
(Opus + Kimi convergent; OOM guard)
S1 PUT response Content-Type: application/json (Opus single-reviewer)
Strict validateKey policy (J approved): rejects empty, >1024B, NUL,
leading "/", ".." path components, CR/LF/tab control characters.
DELETE exposed at HTTP layer (J approved option A) for symmetry +
smoke ergonomics.
Build clean, vet clean, all unit tests pass, smoke 6/6 PASS after
every fix round. go.mod 1.23 → 1.24 (required by aws-sdk-go-v2).
Process finding worth recording: opencode caps non-streaming Kimi at
max_tokens=4096; the direct kimi.com adapter consumed 8192 tokens of
reasoning but surfaced empty content; openrouter/moonshotai/kimi-k2-0905
delivered structured output in ~33s. Future Kimi scrums should default
to that route.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
209 lines
6.3 KiB
Go
209 lines
6.3 KiB
Go
// Package storaged is the object I/O layer for Lakehouse-Go. Bucket
|
|
// wraps aws-sdk-go-v2's s3.Client + s3 manager.Uploader and exposes
|
|
// the four operations the rest of the system actually needs:
|
|
// Get, Put, List, Delete. Path-style addressing is forced on for
|
|
// MinIO; AWS deployments override it via shared.S3Config.UsePathStyle.
|
|
package storaged
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/aws/smithy-go"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/secrets"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
|
)
|
|
|
|
// ErrKeyNotFound is returned by Get/Delete when the underlying S3
|
|
// response is NoSuchKey or 404. Callers translate it to HTTP 404.
|
|
var ErrKeyNotFound = errors.New("storaged: key not found")
|
|
|
|
// ObjectInfo is the per-key data List returns. We don't surface the
|
|
// full s3.Object — most fields are noise for a single-bucket G0.
|
|
type ObjectInfo struct {
|
|
Key string
|
|
Size int64
|
|
ETag string
|
|
LastModified time.Time
|
|
}
|
|
|
|
// Bucket is the runtime handle over one S3 bucket. The logical name
|
|
// is what callers reference (e.g. "primary"); the physical bucket on
|
|
// the server is held in s3Bucket.
|
|
type Bucket struct {
|
|
name string
|
|
s3Bucket string
|
|
client *s3.Client
|
|
uploader *manager.Uploader
|
|
}
|
|
|
|
// NewBucket builds a Bucket from the shared S3 config + a credentials
|
|
// lookup against the provider. Region/endpoint/path-style come from
|
|
// the inline config; access keys come from the provider so they can
|
|
// migrate to Vault / SSM later without a config schema change.
|
|
func NewBucket(ctx context.Context, cfg shared.S3Config, prov secrets.Provider, logicalName string) (*Bucket, error) {
|
|
creds, err := prov.S3Credentials(logicalName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
|
|
awsconfig.WithRegion(cfg.Region),
|
|
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
creds.AccessKeyID, creds.SecretAccessKey, "",
|
|
)),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("aws config: %w", err)
|
|
}
|
|
|
|
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
|
if cfg.Endpoint != "" {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
}
|
|
o.UsePathStyle = cfg.UsePathStyle
|
|
})
|
|
|
|
return &Bucket{
|
|
name: logicalName,
|
|
s3Bucket: cfg.Bucket,
|
|
client: client,
|
|
uploader: manager.NewUploader(client),
|
|
}, nil
|
|
}
|
|
|
|
// Name returns the logical name (e.g. "primary"). Useful for logs.
|
|
func (b *Bucket) Name() string { return b.name }
|
|
|
|
// Get streams an object back to the caller. The returned ReadCloser
|
|
// must be closed; on ErrKeyNotFound it is nil.
|
|
func (b *Bucket) Get(ctx context.Context, key string) (io.ReadCloser, *ObjectInfo, error) {
|
|
out, err := b.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(b.s3Bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, nil, ErrKeyNotFound
|
|
}
|
|
return nil, nil, fmt.Errorf("s3 get %q: %w", key, err)
|
|
}
|
|
info := &ObjectInfo{
|
|
Key: key,
|
|
}
|
|
if out.ContentLength != nil {
|
|
info.Size = *out.ContentLength
|
|
}
|
|
if out.ETag != nil {
|
|
info.ETag = *out.ETag
|
|
}
|
|
if out.LastModified != nil {
|
|
info.LastModified = *out.LastModified
|
|
}
|
|
return out.Body, info, nil
|
|
}
|
|
|
|
// Put uploads via the s3 manager.Uploader so bodies above the SDK's
|
|
// part threshold (~5 MiB) auto-multipart without buffering. Caller
|
|
// is responsible for capping body size before calling — Bucket itself
|
|
// is a thin pipe; the 256 MiB MaxBytesReader gate lives in cmd/storaged.
|
|
func (b *Bucket) Put(ctx context.Context, key string, body io.Reader) error {
|
|
_, err := b.uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(b.s3Bucket),
|
|
Key: aws.String(key),
|
|
Body: body,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("s3 put %q: %w", key, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MaxListResults caps a single List call so a stray prefix="" against
|
|
// a large bucket can't OOM the daemon. Per Opus + Kimi convergent
|
|
// scrum review (D2 round). G2 multi-bucket federation will introduce
|
|
// continuation tokens so callers can paginate explicitly.
|
|
const MaxListResults = 10_000
|
|
|
|
// List returns up to MaxListResults objects whose key starts with
|
|
// prefix. If the bucket holds more than the cap, the result is
|
|
// truncated and a sentinel ObjectInfo with Key="...truncated..." is
|
|
// appended so callers see they didn't get everything. G0 single-bucket
|
|
// dev workloads are far below the cap; the cap protects against
|
|
// production bucket sizes during the migration.
|
|
func (b *Bucket) List(ctx context.Context, prefix string) ([]ObjectInfo, error) {
|
|
out := make([]ObjectInfo, 0, 64)
|
|
pager := s3.NewListObjectsV2Paginator(b.client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(b.s3Bucket),
|
|
Prefix: aws.String(prefix),
|
|
})
|
|
for pager.HasMorePages() {
|
|
page, err := pager.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("s3 list prefix=%q: %w", prefix, err)
|
|
}
|
|
for _, o := range page.Contents {
|
|
if len(out) >= MaxListResults {
|
|
out = append(out, ObjectInfo{Key: "...truncated..."})
|
|
return out, nil
|
|
}
|
|
oi := ObjectInfo{}
|
|
if o.Key != nil {
|
|
oi.Key = *o.Key
|
|
}
|
|
if o.Size != nil {
|
|
oi.Size = *o.Size
|
|
}
|
|
if o.ETag != nil {
|
|
oi.ETag = *o.ETag
|
|
}
|
|
if o.LastModified != nil {
|
|
oi.LastModified = *o.LastModified
|
|
}
|
|
out = append(out, oi)
|
|
}
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// Delete removes one object. Idempotent: deleting a missing key is
|
|
// not an error on S3, so we don't translate that to ErrKeyNotFound.
|
|
func (b *Bucket) Delete(ctx context.Context, key string) error {
|
|
_, err := b.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(b.s3Bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("s3 delete %q: %w", key, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isNotFound checks both the typed s3 NoSuchKey error and the smithy
|
|
// generic 404 — different code paths in the SDK surface different
|
|
// shapes depending on whether HEAD-then-GET happens.
|
|
func isNotFound(err error) bool {
|
|
var nsk *types.NoSuchKey
|
|
if errors.As(err, &nsk) {
|
|
return true
|
|
}
|
|
var apiErr smithy.APIError
|
|
if errors.As(err, &apiErr) {
|
|
switch apiErr.ErrorCode() {
|
|
case "NoSuchKey", "NotFound":
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|