root 66a704ca3e G0 D3: catalogd Parquet manifests + ADR-020 idempotent register · 6 scrum fixes
Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory
registry with the ADR-020 idempotency contract (same name+fingerprint
reuses dataset_id; different fingerprint → 409 Conflict), HTTP client
to storaged for persistence, and rehydration on startup. Acceptance
smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the
load-bearing test that the catalog/storaged service split actually
preserves state.

dataset_id derivation diverges from Rust: UUIDv5(namespace, name)
instead of v4 surrogate. Same name on any box generates the same
dataset_id; rehydrate after disk loss converges to the same identity
rather than silently re-issuing. Namespace pinned at
a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued
depends on these bytes.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                       1 BLOCK + 5 WARN + 3 INFO
  - Kimi K2-0905 (openrouter, validated D2):   2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                  2 BLOCK + 2 WARN + 2 INFO

Fixed:
  C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds
  C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern
  S1 split-brain on persist failure → candidate-then-swap
  S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels
  S3 Get/List shallow-copy aliasing → cloneManifest deep copy
  S4 keep-alive socket leak on error paths → drainAndClose helper

Dismissed (false positives, all single-reviewer):
  Kimi BLOCK "Decode crashes on empty Parquet" — already handled
  Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required
  Qwen INFO "rb.NewRecord() error unchecked" — API returns no error

Deferred to G1+: name validation regex, per-call deadlines, Snappy
compression, list pagination continuation tokens (storaged caps at
10k with sentinel for now).

Build clean, vet clean, all tests pass, smoke 6/6 PASS after every
fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced
by arrow-go's minimum.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:36:57 -05:00

141 lines
4.6 KiB
Go

// store_client.go — thin HTTP client to storaged. catalogd needs to
// PUT manifest Parquets, GET them on startup, and LIST the manifests
// directory. Staying inside an HTTP boundary (rather than reaching
// into storaged's package directly) preserves the service-boundary
// shape that gRPC will eventually formalize at G1+.
package catalogd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
// StoreClient talks HTTP to the storaged service.
type StoreClient struct {
baseURL string
hc *http.Client
}
// listResponse mirrors storaged's GET /storage/list shape:
//
// {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]}
type listResponse struct {
Prefix string `json:"prefix"`
Objects []struct {
Key string `json:"Key"`
Size int64 `json:"Size"`
} `json:"objects"`
}
// NewStoreClient builds a client against the given storaged base URL
// (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write
// only; rehydration of many manifests at startup runs sequentially
// and each call gets its own timeout window.
func NewStoreClient(baseURL string) *StoreClient {
return &StoreClient{
baseURL: strings.TrimRight(baseURL, "/"),
hc: &http.Client{Timeout: 30 * time.Second},
}
}
// Put writes raw bytes at key. body is the encoded Parquet manifest.
func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error {
u := c.baseURL + "/storage/put/" + safeKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("put req: %w", err)
}
req.ContentLength = int64(len(body))
resp, err := c.hc.Do(req)
if err != nil {
return fmt.Errorf("put do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return nil
}
// Get reads the bytes at key. ErrKeyNotFound on 404.
func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) {
u := c.baseURL + "/storage/get/" + safeKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("get req: %w", err)
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("get do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode == http.StatusNotFound {
return nil, ErrKeyNotFound
}
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("get %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return io.ReadAll(resp.Body)
}
// List returns the keys under prefix. Object metadata beyond Key is
// ignored — catalogd only needs the keys to drive rehydration.
func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) {
u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("list req: %w", err)
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("list do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("list %s: status %d: %s", prefix, resp.StatusCode, string(preview))
}
var lr listResponse
if err := json.NewDecoder(resp.Body).Decode(&lr); err != nil {
return nil, fmt.Errorf("list decode: %w", err)
}
out := make([]string, 0, len(lr.Objects))
for _, o := range lr.Objects {
out = append(out, o.Key)
}
return out, nil
}
// ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd
// side without exposing storaged's package types.
var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found")
// safeKey URL-escapes path segments while preserving "/". storaged's
// chi `/storage/<verb>/*` routes accept literal slashes in the
// wildcard match, so we only escape the segments, not the separators.
func safeKey(key string) string {
parts := strings.Split(key, "/")
for i, p := range parts {
parts[i] = url.PathEscape(p)
}
return strings.Join(parts, "/")
}
// drainAndClose reads any remaining body bytes (capped at 64 KiB) and
// closes the body. Per scrum S6 (Qwen): preview-then-close on error
// paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep-
// alive reuse and slowly leaking sockets.
func drainAndClose(body io.ReadCloser) {
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
_ = body.Close()
}