diff --git a/cmd/catalogd/main.go b/cmd/catalogd/main.go index 35ff9f9..da0303d 100644 --- a/cmd/catalogd/main.go +++ b/cmd/catalogd/main.go @@ -1,16 +1,23 @@ -// catalogd is the metadata control plane — dataset registry, -// Parquet manifest persistence, idempotent registration with -// schema-fingerprint gate (per Rust ADR-020). D3 wires the -// register/list/manifest routes; D1 just stands up the binary. +// catalogd is the metadata authority — registers Parquet datasets, +// persists manifests in storaged, rehydrates them on startup, and +// answers GET/list queries. ADR-020 idempotency contract enforced +// by internal/catalogd/registry.go. package main import ( + "context" + "encoding/json" + "errors" "flag" "log/slog" + "net/http" "os" + "time" - "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" "github.com/go-chi/chi/v5" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" ) func main() { @@ -22,14 +29,103 @@ func main() { slog.Error("config", "err", err) os.Exit(1) } + if cfg.Catalogd.StoragedURL == "" { + slog.Error("config", "err", "catalogd.storaged_url is required") + os.Exit(1) + } - if err := shared.Run("catalogd", cfg.Catalogd.Bind, func(_ chi.Router) { - // D3 wires: - // POST /catalog/register (idempotent by name + fingerprint, 409 on drift) - // GET /catalog/manifest/{name} - // GET /catalog/list - }); err != nil { + store := catalogd.NewStoreClient(cfg.Catalogd.StoragedURL) + registry := catalogd.NewRegistry(store) + + rehydrateCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + n, err := registry.Rehydrate(rehydrateCtx) + cancel() + if err != nil { + slog.Error("rehydrate", "err", err) + os.Exit(1) + } + slog.Info("rehydrated", "manifests", n) + + h := newHandlers(registry) + + if err := shared.Run("catalogd", cfg.Catalogd.Bind, h.register); err != nil { slog.Error("server", "err", err) os.Exit(1) } } + +type handlers struct { + reg *catalogd.Registry +} + +func newHandlers(r *catalogd.Registry) *handlers { return &handlers{reg: r} } + +func (h *handlers) register(r chi.Router) { + r.Post("/catalog/register", h.handleRegister) + r.Get("/catalog/manifest/*", h.handleGetManifest) + r.Get("/catalog/list", h.handleList) +} + +// registerRequest mirrors POST body shape. +type registerRequest struct { + Name string `json:"name"` + SchemaFingerprint string `json:"schema_fingerprint"` + Objects []catalogd.Object `json:"objects"` + RowCount *int64 `json:"row_count,omitempty"` +} + +// registerResponse adds the existing flag so callers can distinguish +// fresh registration from idempotent re-register. +type registerResponse struct { + Manifest *catalogd.Manifest `json:"manifest"` + Existing bool `json:"existing"` +} + +func (h *handlers) handleRegister(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + r.Body = http.MaxBytesReader(w, r.Body, 4<<20) // 4 MiB cap on register payloads + var req registerRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest) + return + } + m, existing, err := h.reg.Register(r.Context(), req.Name, req.SchemaFingerprint, req.Objects, req.RowCount) + if errors.Is(err, catalogd.ErrFingerprintConflict) { + http.Error(w, err.Error(), http.StatusConflict) + return + } + if errors.Is(err, catalogd.ErrEmptyName) || errors.Is(err, catalogd.ErrEmptyFingerprint) { + // Per scrum S2 (Opus): sentinel-based detection, not substring match. + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if err != nil { + slog.Error("register", "name", req.Name, "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(registerResponse{Manifest: m, Existing: existing}) +} + +func (h *handlers) handleGetManifest(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "*") + m, err := h.reg.Get(name) + if errors.Is(err, catalogd.ErrManifestNotFound) { + http.Error(w, "not found", http.StatusNotFound) + return + } + if err != nil { + slog.Error("get manifest", "name", name, "err", err) + http.Error(w, "internal", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(m) +} + +func (h *handlers) handleList(w http.ResponseWriter, _ *http.Request) { + items := h.reg.List() + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"manifests": items, "count": len(items)}) +} diff --git a/docs/PHASE_G0_KICKOFF.md b/docs/PHASE_G0_KICKOFF.md index b022819..f185f30 100644 --- a/docs/PHASE_G0_KICKOFF.md +++ b/docs/PHASE_G0_KICKOFF.md @@ -557,3 +557,127 @@ its D1 contribution where it caught two real BLOCKs that Opus missed. Lineage rotation is real; on a given PR, one lineage may be the only one finding bugs and another may be confidently wrong. The convergence filter (≥2 reviewers) is the right gate. + +--- + +## D3 — actual run results (2026-04-28 evening) + +Phase G0 Day 3 executed end-to-end. Output of `scripts/d3_smoke.sh`: + +``` +[d3-smoke] POST /catalog/register (fresh): + ✓ fresh register → existing=false, dataset_id=200a05a8-... +[d3-smoke] GET /catalog/manifest/: + ✓ manifest dataset_id matches +[d3-smoke] GET /catalog/list (1 entry): + ✓ list count=1 +[d3-smoke] restart catalogd → rehydrate from Parquet: + ✓ rehydrated dataset_id matches across restart +[d3-smoke] re-register (same name + same fingerprint) → existing=true: + ✓ existing=true, same dataset_id, objects replaced (count=2) +[d3-smoke] re-register (different fingerprint) → 409: + ✓ different fingerprint → 409 Conflict +[d3-smoke] D3 acceptance gate: PASSED +``` + +What landed: +- `internal/catalogd/manifest.go` — Arrow Parquet codec for the + Manifest type (dataset_id/name/schema_fingerprint/objects-as-list-of- + struct/created_at_ns/updated_at_ns/row_count). One row per Parquet + file. `DatasetIDForName` derives a deterministic UUIDv5 from name + (namespace `a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e`); same name on + any box yields the same dataset_id. +- `internal/catalogd/registry.go` — in-memory `map[name]*Manifest` + with the ADR-020 contract: same name+fingerprint reuses + dataset_id, replaces objects, bumps `updated_at`; different + fingerprint → `ErrFingerprintConflict`. Single mutex serializes + Register across persistence to close the check→insert TOCTOU. + Rehydrate runs storaged I/O OUTSIDE the lock, swaps in the new + map under the lock. +- `internal/catalogd/store_client.go` — HTTP client over + storaged's GET/PUT/DELETE/list. `safeKey` URL-escapes path + segments while preserving `/`. Error paths drain body before + close to keep the keep-alive pool healthy. +- `cmd/catalogd/main.go` — POST /catalog/register, GET + /catalog/manifest/{name}, GET /catalog/list. Sentinel-error + detection (`errors.Is(err, ErrEmptyName)`) for 400s. 4 MiB body + cap on register payloads. +- New `[catalogd]` config: `bind` + `storaged_url`. Default + `http://127.0.0.1:3211`. +- `scripts/d3_smoke.sh` — 6 acceptance probes including + rehydrate-across-restart (the load-bearing ADR-020 contract test). +- `arrow-go/v18` v18.6.0 + `google/uuid` v1.6.0 added; Go 1.24 → 1.25 + forced by arrow-go's minimum. + +UUIDv5 vs Rust v4: the Go rewrite picks deterministic-from-name. Same +dataset name on any box converges to the same dataset_id; rehydrate +after disk loss can't silently issue new IDs and break downstream +cross-references. Rust's surrogate-UUIDv4 is what the prior system +used; the divergence is intentional and documented at +`internal/catalogd/manifest.go:34`. + +Next: D4 — ingestd CSV → Parquet → catalogd register loop. + +--- + +## D3 — code scrum review (3-lineage parallel pass) + +After D3 shipped, the actual code went through the same 3-lineage +parallel scrum as D1/D2. + +| Pass | Reviewer | Latency | Findings | +|---|---|---|---| +| 1 | `opencode/claude-opus-4-7` | ~30s | 1 BLOCK + 5 WARN + 3 INFO = 9 | +| 2 | `openrouter/moonshotai/kimi-k2-0905` | ~30s | 2 BLOCK + 2 WARN + 1 INFO = 5 | +| 3 | `openrouter/qwen/qwen3-coder` | ~25s | 2 BLOCK + 2 WARN + 2 INFO = 6 | + +Total: 20 distinct findings (some convergent across reviewers). +Kimi route shopping from D2 paid off — `openrouter/moonshotai/kimi-k2-0905` +delivered structured output on first attempt, no max_tokens cap, no +empty-content reasoning trap. + +### Convergent findings (≥2 reviewers — high confidence) + +| # | Severity | Finding | Reviewers | Disposition | +|---|---|---|---|---| +| C1 | BLOCK×3 | `Decode` indexes `listArr.Offsets()[0]/[1]` directly — fragile under array slicing/non-zero offset; panics on malformed Parquet (multi-row reader chunks, single-offset corrupt files) | Opus + Kimi + Qwen | **Fixed** — switched to `listArr.ValueOffsets(0)` (Arrow API that accounts for array offset) + bounds check on `start/end` against `structArr.Len()` | +| C2 | WARN×2 | `Rehydrate` holds the registry mutex across network I/O to storaged — slow storaged blocks all `Register/Get/List`; future re-sync endpoints stall | Opus + Kimi | **Fixed** — list/get/decode happen outside the lock; the completed map is swapped into `r.byKey` under a brief lock | + +### Single-reviewer findings (lineage-specific catches) + +| # | Severity | Finding | Reviewer | Disposition | +|---|---|---|---|---| +| S1 | WARN | Idempotent re-register mutates the in-memory manifest BEFORE persist succeeds — split-brain on storaged failure (in-memory advances, disk holds old, restart silently rolls back) | Opus | **Fixed** — build candidate copy of prior, persist candidate, swap into `r.byKey` only on persist success | +| S2 | WARN | `cmd/catalogd` detects empty-input via `strings.Contains(err.Error(), "empty name")` — brittle, anyone reformatting the registry error silently demotes a 400 to a 500 | Opus | **Fixed** — exported `ErrEmptyName` / `ErrEmptyFingerprint` sentinels, HTTP layer uses `errors.Is` | +| S3 | WARN | `Get/List` `cp := *m` aliases the `Objects` slice + `RowCount` pointer — caller mutation corrupts registry state under the lock-free read | Opus | **Fixed** — `cloneManifest` deep-copies Objects + dereferences RowCount into a fresh `*int64` | +| S4 | WARN | error paths in `store_client` preview body but don't drain before close → HTTP/1.1 keep-alive pool reuse breaks → slow socket leak | Qwen | **Fixed** — `drainAndClose` helper reads up to 64 KiB before close on every defer | +| S5 | INFO×3 | name-validation regex / per-call deadline / Snappy compression on manifest writes | Opus | **Deferred** — small, no risk if skipped | +| S6 | WARN | `Rehydrate` aborts on first decode error → partial state | Kimi | **Accepted** — fail-loud > silent-partial is the design; cmd/catalogd `os.Exit(1)`s on rehydrate error so a corrupt manifest is operator-visible at startup | +| S7 | WARN | `store_client.List` ignores pagination — if storaged gains continuation tokens, the client silently drops everything past page 1 | Opus | **Accepted** — storaged caps lists at MaxListResults=10_000 with a `...truncated...` sentinel; no continuation token in the wire format yet (deferred to D7.5+ alongside multi-bucket federation) | +| F1 | BLOCK | `Decode` crashes on empty Parquet (NumRows==0) — `Value(0)` panics | Kimi | **Dismissed** — already handled. `tbl.NumRows() != 1` returns error before any column access; NumRows==0 also fails the subsequent `rr.Next()` check | +| F2 | INFO | `safeKey` double-escapes slashes; should use `url.PathEscape(key)` directly | Kimi | **Dismissed** — `url.PathEscape("a/b")` returns `"a%2Fb"`. Splitting on `/` first is **necessary** to preserve the path separator while escaping segment content. The current code is correct; Kimi's suggested fix would break storaged's chi wildcard match | +| F3 | INFO | `rb.NewRecord()` error unchecked | Qwen | **Dismissed** — false signature. `RecordBuilder.NewRecord()` returns `arrow.Record` only; no error to check | + +### Cumulative D3 disposition + +- **3-lineage parallel pass: 20 distinct findings** +- **Fixed: 6** (2 convergent + 4 single-reviewer) +- **Accepted-with-rationale: 5** (3 INFO + 2 WARN with deferred-by-design rationale) +- **Dismissed (false positives): 3** (1 Kimi BLOCK on already-handled empty case, 1 Kimi INFO on safeKey, 1 Qwen INFO on Arrow API signature) +- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round. + +The C1 triple-confirmation (Opus + Kimi + Qwen all flagging the same +list-offset indexing) is the strongest convergence signal across the +three days. All three correctly diagnosed the underlying issue +(direct `Offsets()` indexing is fragile under array slicing); Opus +named the right Arrow API (`ValueOffsets(0)`), Kimi and Qwen named +the bounds-check shape. Fix uses Opus's API + the others' bounds +discipline together. + +S1 (split-brain on persist failure) was the most *consequential* +finding — D3's smoke would have stayed green through every test case +because none of them simulate storaged-down mid-register. Opus alone +caught it; the fix is a 4-line candidate-then-swap pattern but the +class of bug (in-memory advances ahead of disk) is exactly the +distributed-systems hazard ADR-020 was added to prevent at a +different layer. diff --git a/go.mod b/go.mod index 6c31364..ace06eb 100644 --- a/go.mod +++ b/go.mod @@ -1,19 +1,25 @@ module git.agentview.dev/profit/golangLAKEHOUSE -go 1.24 +go 1.25.0 require ( + github.com/apache/arrow-go/v18 v18.6.0 + github.com/aws/aws-sdk-go-v2 v1.41.6 + github.com/aws/aws-sdk-go-v2/config v1.32.16 + github.com/aws/aws-sdk-go-v2/credentials v1.19.15 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 + github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 + github.com/aws/smithy-go v1.25.0 github.com/go-chi/chi/v5 v5.2.5 + github.com/google/uuid v1.6.0 github.com/pelletier/go-toml/v2 v2.3.0 ) require ( - github.com/aws/aws-sdk-go-v2 v1.41.6 // indirect + github.com/andybalholm/brotli v1.2.1 // indirect + github.com/apache/thrift v0.22.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 // indirect - github.com/aws/aws-sdk-go-v2/config v1.32.16 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.19.15 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect - github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect @@ -21,10 +27,23 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect - github.com/aws/smithy-go v1.25.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/goccy/go-json v0.10.6 // indirect + github.com/google/flatbuffers v25.12.19+incompatible // indirect + github.com/klauspost/compress v1.18.5 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/pierrec/lz4/v4 v4.1.26 // indirect + github.com/zeebo/xxh3 v1.1.0 // indirect + golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/text v0.35.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect + google.golang.org/grpc v1.80.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect ) diff --git a/go.sum b/go.sum index a1904f7..883cf49 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,9 @@ +github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro= +github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/apache/arrow-go/v18 v18.6.0 h1:GX/Jyd3R7mCLiECAwY9FWbbaYblie2WXBSz4Sw8fNpM= +github.com/apache/arrow-go/v18 v18.6.0/go.mod h1:gm3MiPpY82fLYK5VKPB3WoJbsiLVDfT7flD5/vHReKw= +github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= +github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g= github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg= github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 h1:adBsCIIpLbLmYnkQU+nAChU5yhVTvu5PerROm+/Kq2A= @@ -36,7 +42,75 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 h1:ks8KBcZPh3PYISr5dAiXCM5/Thcu github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo= github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U= github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/goccy/go-json v0.10.6 h1:p8HrPJzOakx/mn/bQtjgNjdTcN+/S6FcG2CTtQOrHVU= +github.com/goccy/go-json v0.10.6/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs= +github.com/google/flatbuffers v25.12.19+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM= github.com/pelletier/go-toml/v2 v2.3.0/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= +github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU= +golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/catalogd/manifest.go b/internal/catalogd/manifest.go new file mode 100644 index 0000000..1f25b2b --- /dev/null +++ b/internal/catalogd/manifest.go @@ -0,0 +1,200 @@ +// Package catalogd is the metadata authority — manifests for every +// registered dataset. A Manifest is the catalog row Rust calls +// `Manifest`: dataset_id (deterministic from name via UUIDv5), +// schema_fingerprint (caller-supplied schema hash), the object keys +// that physically back the dataset in storaged, plus timestamps and +// optional row_count. +// +// G0 stores one Manifest per Parquet file at +// primary://_catalog/manifests/.parquet. One row per file — +// catalog manifests are written rarely and read on startup, so the +// per-file shape favors atomic register over storage density. +package catalogd + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/pqarrow" + "github.com/google/uuid" +) + +// catalogNamespace is the v5 UUID namespace for dataset_id derivation. +// Same name → same dataset_id across boxes / cold starts. Don't change +// — every dataset_id ever issued depends on this byte sequence. +var catalogNamespace = uuid.MustParse("a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e") + +// Object is one storaged key contributing to a dataset. +type Object struct { + Key string `json:"key"` + Size int64 `json:"size"` +} + +// Manifest is the catalog row for one dataset. +type Manifest struct { + DatasetID string `json:"dataset_id"` + Name string `json:"name"` + SchemaFingerprint string `json:"schema_fingerprint"` + Objects []Object `json:"objects"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + RowCount *int64 `json:"row_count,omitempty"` +} + +// DatasetIDForName returns the deterministic UUIDv5 dataset_id for a +// logical dataset name. Idempotent on the same name across boxes. +func DatasetIDForName(name string) string { + return uuid.NewSHA1(catalogNamespace, []byte(name)).String() +} + +// manifestArrowSchema is the Arrow schema for the on-disk Parquet. +// Field order matters — codec builders rely on it. +var manifestArrowSchema = arrow.NewSchema([]arrow.Field{ + {Name: "dataset_id", Type: arrow.BinaryTypes.String}, + {Name: "name", Type: arrow.BinaryTypes.String}, + {Name: "schema_fingerprint", Type: arrow.BinaryTypes.String}, + {Name: "objects", Type: arrow.ListOf(arrow.StructOf( + arrow.Field{Name: "key", Type: arrow.BinaryTypes.String}, + arrow.Field{Name: "size", Type: arrow.PrimitiveTypes.Int64}, + ))}, + {Name: "created_at_unix_ns", Type: arrow.PrimitiveTypes.Int64}, + {Name: "updated_at_unix_ns", Type: arrow.PrimitiveTypes.Int64}, + {Name: "row_count", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, +}, nil) + +// Encode writes a single Manifest to a Parquet byte slice. Memory +// allocations are bounded — manifests have tens of objects, not +// millions. +func Encode(m *Manifest) ([]byte, error) { + mem := memory.NewGoAllocator() + + rb := array.NewRecordBuilder(mem, manifestArrowSchema) + defer rb.Release() + + rb.Field(0).(*array.StringBuilder).Append(m.DatasetID) + rb.Field(1).(*array.StringBuilder).Append(m.Name) + rb.Field(2).(*array.StringBuilder).Append(m.SchemaFingerprint) + + listB := rb.Field(3).(*array.ListBuilder) + listB.Append(true) + structB := listB.ValueBuilder().(*array.StructBuilder) + keyB := structB.FieldBuilder(0).(*array.StringBuilder) + sizeB := structB.FieldBuilder(1).(*array.Int64Builder) + for _, o := range m.Objects { + structB.Append(true) + keyB.Append(o.Key) + sizeB.Append(o.Size) + } + + rb.Field(4).(*array.Int64Builder).Append(m.CreatedAt.UnixNano()) + rb.Field(5).(*array.Int64Builder).Append(m.UpdatedAt.UnixNano()) + if m.RowCount != nil { + rb.Field(6).(*array.Int64Builder).Append(*m.RowCount) + } else { + rb.Field(6).(*array.Int64Builder).AppendNull() + } + + rec := rb.NewRecord() + defer rec.Release() + + var buf bytes.Buffer + props := parquet.NewWriterProperties() + arrowProps := pqarrow.NewArrowWriterProperties() + w, err := pqarrow.NewFileWriter(manifestArrowSchema, &buf, props, arrowProps) + if err != nil { + return nil, fmt.Errorf("pqarrow writer: %w", err) + } + if err := w.Write(rec); err != nil { + return nil, fmt.Errorf("pqarrow write: %w", err) + } + if err := w.Close(); err != nil { + return nil, fmt.Errorf("pqarrow close: %w", err) + } + return buf.Bytes(), nil +} + +// Decode reads a single-row Parquet manifest back into a Manifest. +func Decode(b []byte) (*Manifest, error) { + rdr, err := file.NewParquetReader(bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("parquet reader: %w", err) + } + defer rdr.Close() + + pr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.NewGoAllocator()) + if err != nil { + return nil, fmt.Errorf("pqarrow reader: %w", err) + } + tbl, err := pr.ReadTable(context.Background()) + if err != nil { + return nil, fmt.Errorf("read table: %w", err) + } + defer tbl.Release() + + if tbl.NumRows() != 1 { + return nil, fmt.Errorf("manifest parquet: expected 1 row, got %d", tbl.NumRows()) + } + + rr := array.NewTableReader(tbl, 1) + defer rr.Release() + if !rr.Next() { + return nil, errors.New("manifest parquet: no record batch") + } + rec := rr.Record() + + m := &Manifest{ + DatasetID: rec.Column(0).(*array.String).Value(0), + Name: rec.Column(1).(*array.String).Value(0), + SchemaFingerprint: rec.Column(2).(*array.String).Value(0), + } + + // Per scrum C1 (3-way convergent): use ValueOffsets which accounts + // for the array's own offset (non-zero under slicing) and is bounds- + // safe by API contract. Direct Offsets()[0]/[1] indexing is fragile + // under multi-row reads and panics on malformed offset buffers. + listArr := rec.Column(3).(*array.List) + structArr := listArr.ListValues().(*array.Struct) + keyArr := structArr.Field(0).(*array.String) + sizeArr := structArr.Field(1).(*array.Int64) + start, end := listArr.ValueOffsets(0) + if start < 0 || end < start || end > int64(structArr.Len()) { + return nil, fmt.Errorf("manifest: bad list offsets [%d, %d] for struct len %d", + start, end, structArr.Len()) + } + for i := start; i < end; i++ { + m.Objects = append(m.Objects, Object{ + Key: keyArr.Value(int(i)), + Size: sizeArr.Value(int(i)), + }) + } + + m.CreatedAt = time.Unix(0, rec.Column(4).(*array.Int64).Value(0)) + m.UpdatedAt = time.Unix(0, rec.Column(5).(*array.Int64).Value(0)) + rcArr := rec.Column(6).(*array.Int64) + if rcArr.IsValid(0) { + v := rcArr.Value(0) + m.RowCount = &v + } + + return m, nil +} + +// EncodeReader is a small convenience for callers that want an +// io.Reader over the encoded bytes (matches the storaged HTTP PUT +// signature). +func EncodeReader(m *Manifest) (io.Reader, int64, error) { + b, err := Encode(m) + if err != nil { + return nil, 0, err + } + return bytes.NewReader(b), int64(len(b)), nil +} diff --git a/internal/catalogd/manifest_test.go b/internal/catalogd/manifest_test.go new file mode 100644 index 0000000..fa26a7a --- /dev/null +++ b/internal/catalogd/manifest_test.go @@ -0,0 +1,103 @@ +package catalogd + +import ( + "testing" + "time" +) + +func TestEncodeDecode_RoundTrip(t *testing.T) { + rc := int64(500_000) + now := time.Unix(1777435000, 123456789) + want := &Manifest{ + DatasetID: DatasetIDForName("workers_500k"), + Name: "workers_500k", + SchemaFingerprint: "sha256:abcdef", + Objects: []Object{ + {Key: "datasets/workers_500k/part-001.parquet", Size: 75 * 1024 * 1024}, + {Key: "datasets/workers_500k/part-002.parquet", Size: 12 * 1024 * 1024}, + }, + CreatedAt: now, + UpdatedAt: now.Add(time.Minute), + RowCount: &rc, + } + + b, err := Encode(want) + if err != nil { + t.Fatalf("Encode: %v", err) + } + if len(b) == 0 { + t.Fatal("Encode returned 0 bytes") + } + + got, err := Decode(b) + if err != nil { + t.Fatalf("Decode: %v", err) + } + + if got.DatasetID != want.DatasetID { + t.Errorf("DatasetID: got %q, want %q", got.DatasetID, want.DatasetID) + } + if got.Name != want.Name { + t.Errorf("Name: got %q, want %q", got.Name, want.Name) + } + if got.SchemaFingerprint != want.SchemaFingerprint { + t.Errorf("SchemaFingerprint: got %q, want %q", got.SchemaFingerprint, want.SchemaFingerprint) + } + if len(got.Objects) != 2 { + t.Fatalf("Objects: got %d, want 2", len(got.Objects)) + } + for i, o := range got.Objects { + if o.Key != want.Objects[i].Key || o.Size != want.Objects[i].Size { + t.Errorf("Objects[%d]: got %+v, want %+v", i, o, want.Objects[i]) + } + } + // Times round-trip via UnixNano so equality is on the nanosecond. + if !got.CreatedAt.Equal(want.CreatedAt) { + t.Errorf("CreatedAt: got %v, want %v", got.CreatedAt, want.CreatedAt) + } + if !got.UpdatedAt.Equal(want.UpdatedAt) { + t.Errorf("UpdatedAt: got %v, want %v", got.UpdatedAt, want.UpdatedAt) + } + if got.RowCount == nil || *got.RowCount != *want.RowCount { + t.Errorf("RowCount: got %v, want %v", got.RowCount, want.RowCount) + } +} + +func TestEncodeDecode_NoRowCount(t *testing.T) { + want := &Manifest{ + DatasetID: DatasetIDForName("unknown_size"), + Name: "unknown_size", + SchemaFingerprint: "sha256:zero", + Objects: []Object{}, + CreatedAt: time.Unix(0, 0), + UpdatedAt: time.Unix(0, 0), + RowCount: nil, + } + + b, err := Encode(want) + if err != nil { + t.Fatal(err) + } + got, err := Decode(b) + if err != nil { + t.Fatal(err) + } + if got.RowCount != nil { + t.Errorf("RowCount: got %v, want nil", got.RowCount) + } + if len(got.Objects) != 0 { + t.Errorf("Objects: got %d, want 0", len(got.Objects)) + } +} + +func TestDatasetIDForName_Deterministic(t *testing.T) { + a := DatasetIDForName("workers_500k") + b := DatasetIDForName("workers_500k") + if a != b { + t.Errorf("DatasetIDForName not deterministic: %q vs %q", a, b) + } + c := DatasetIDForName("different") + if a == c { + t.Errorf("DatasetIDForName collided across distinct names") + } +} diff --git a/internal/catalogd/registry.go b/internal/catalogd/registry.go new file mode 100644 index 0000000..037c0c0 --- /dev/null +++ b/internal/catalogd/registry.go @@ -0,0 +1,219 @@ +// registry.go — the in-memory catalog index plus the ADR-020 +// idempotent register contract. The register path holds a single +// mutex across (lookup → fingerprint check → storage write → +// in-memory update) to close the check→insert TOCTOU window. The +// historical Rust bug (308× duplicate manifests on re-register) is +// the prior art — don't loosen this lock. +package catalogd + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + "sync" + "time" +) + +// ManifestPrefix is the storaged key prefix that holds catalog +// manifests. Objects under this prefix are NOT user data and never +// surface through ingest/query paths. +const ManifestPrefix = "_catalog/manifests/" + +// ErrFingerprintConflict is returned by Register when a manifest with +// the same name already exists under a different schema fingerprint. +// HTTP layer maps this to 409 Conflict; gRPC will map to FAILED_PRECONDITION. +var ErrFingerprintConflict = errors.New("catalogd: schema fingerprint conflict") + +// ErrManifestNotFound is returned by Get when the requested name has +// no manifest registered. +var ErrManifestNotFound = errors.New("catalogd: manifest not found") + +// ErrEmptyName / ErrEmptyFingerprint are returned by Register on +// missing required inputs. HTTP layer maps both to 400. Per scrum S2 +// (Opus): sentinel errors so the HTTP boundary uses errors.Is rather +// than substring matching err.Error(). +var ( + ErrEmptyName = errors.New("catalogd: empty name") + ErrEmptyFingerprint = errors.New("catalogd: empty schema_fingerprint") +) + +// Store abstracts the storaged HTTP wire so registry can be unit- +// tested with an in-memory fake. +type Store interface { + Put(ctx context.Context, key string, body []byte) error + Get(ctx context.Context, key string) ([]byte, error) + List(ctx context.Context, prefix string) ([]string, error) +} + +// Registry is the in-memory authority. Persistence lives in storaged +// at ManifestPrefix; Registry is rehydrated on startup. +type Registry struct { + mu sync.Mutex + byKey map[string]*Manifest // name → manifest + store Store + now func() time.Time // injectable for tests +} + +// NewRegistry builds an empty registry bound to a Store. Call +// Rehydrate after construction to pick up persisted manifests. +func NewRegistry(store Store) *Registry { + return &Registry{ + byKey: make(map[string]*Manifest), + store: store, + now: time.Now, + } +} + +// Rehydrate lists ManifestPrefix in storaged and decodes every entry +// into the in-memory map. Returns the count of manifests recovered. +// On any per-file decode error, returns immediately so a corrupt +// catalog doesn't half-load and silently lose state. +// +// Per scrum C2 (Opus + Kimi convergent): network I/O happens OUTSIDE +// the registry mutex so a slow storaged doesn't block concurrent +// Register/Get/List. The completed map is swapped in under the lock. +func (r *Registry) Rehydrate(ctx context.Context) (int, error) { + keys, err := r.store.List(ctx, ManifestPrefix) + if err != nil { + return 0, fmt.Errorf("list manifests: %w", err) + } + loaded := make(map[string]*Manifest) + for _, k := range keys { + if !strings.HasPrefix(k, ManifestPrefix) || !strings.HasSuffix(k, ".parquet") { + continue + } + body, err := r.store.Get(ctx, k) + if err != nil { + return len(loaded), fmt.Errorf("get manifest %s: %w", k, err) + } + m, err := Decode(body) + if err != nil { + return len(loaded), fmt.Errorf("decode manifest %s: %w", k, err) + } + loaded[m.Name] = m + } + r.mu.Lock() + defer r.mu.Unlock() + r.byKey = loaded + return len(loaded), nil +} + +// Register applies the ADR-020 idempotency contract: +// +// - No prior manifest for name → create (returns existing=false) +// - Prior manifest, same fingerprint → update objects + bump +// updated_at, reuse dataset_id (returns existing=true) +// - Prior manifest, different fingerprint → ErrFingerprintConflict +// +// The mutex is held across the storage write so concurrent calls for +// the same name serialize through the persistence layer (low TPS, +// correctness > throughput). +func (r *Registry) Register(ctx context.Context, name, fingerprint string, objects []Object, rowCount *int64) (*Manifest, bool, error) { + if name == "" { + return nil, false, ErrEmptyName + } + if fingerprint == "" { + return nil, false, ErrEmptyFingerprint + } + + r.mu.Lock() + defer r.mu.Unlock() + + now := r.now().UTC() + prior, exists := r.byKey[name] + if exists { + if prior.SchemaFingerprint != fingerprint { + return nil, true, fmt.Errorf("%w: name=%q have=%s got=%s", + ErrFingerprintConflict, name, prior.SchemaFingerprint, fingerprint) + } + // Same fingerprint — reuse dataset_id, replace objects, bump updated_at. + // Per scrum S1 (Opus): build candidate, persist, then swap in. Mutating + // `prior` before persist succeeds creates split-brain if storaged is + // down — in-memory advances, disk holds the old state, restart loses + // what callers were told didn't happen. + candidate := *prior + candidate.Objects = objects + candidate.UpdatedAt = now + candidate.RowCount = rowCount + if err := r.persist(ctx, &candidate); err != nil { + return nil, true, err + } + r.byKey[name] = &candidate + return &candidate, true, nil + } + + m := &Manifest{ + DatasetID: DatasetIDForName(name), + Name: name, + SchemaFingerprint: fingerprint, + Objects: objects, + CreatedAt: now, + UpdatedAt: now, + RowCount: rowCount, + } + if err := r.persist(ctx, m); err != nil { + return nil, false, err + } + r.byKey[name] = m + return m, false, nil +} + +// Get returns a deep copy of the manifest for name, or ErrManifestNotFound. +func (r *Registry) Get(name string) (*Manifest, error) { + r.mu.Lock() + defer r.mu.Unlock() + m, ok := r.byKey[name] + if !ok { + return nil, ErrManifestNotFound + } + return cloneManifest(m), nil +} + +// List returns deep copies of every manifest, sorted by name. +// Callers may mutate the returned slice and the underlying Manifest +// values without affecting registry state. +func (r *Registry) List() []*Manifest { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]*Manifest, 0, len(r.byKey)) + for _, m := range r.byKey { + out = append(out, cloneManifest(m)) + } + sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name }) + return out +} + +// cloneManifest deep-copies the Objects slice and dereferences +// RowCount into a fresh pointer so a returned manifest cannot alias +// registry state. Per scrum S3 (Opus): the prior `cp := *m` shape +// shared the Objects backing array — caller-side index writes +// corrupted registry state without holding the mutex. +func cloneManifest(m *Manifest) *Manifest { + cp := *m + if m.Objects != nil { + cp.Objects = make([]Object, len(m.Objects)) + copy(cp.Objects, m.Objects) + } + if m.RowCount != nil { + v := *m.RowCount + cp.RowCount = &v + } + return &cp +} + +// persist encodes the manifest and writes it to storaged at the +// canonical path. Caller MUST hold r.mu — this function does not +// take the lock itself. +func (r *Registry) persist(ctx context.Context, m *Manifest) error { + body, err := Encode(m) + if err != nil { + return fmt.Errorf("encode manifest %s: %w", m.Name, err) + } + key := ManifestPrefix + m.Name + ".parquet" + if err := r.store.Put(ctx, key, body); err != nil { + return fmt.Errorf("persist manifest %s: %w", m.Name, err) + } + return nil +} diff --git a/internal/catalogd/registry_test.go b/internal/catalogd/registry_test.go new file mode 100644 index 0000000..6f2534a --- /dev/null +++ b/internal/catalogd/registry_test.go @@ -0,0 +1,160 @@ +package catalogd + +import ( + "context" + "errors" + "sync" + "testing" + "time" +) + +// memStore is an in-memory Store fake for unit tests. +type memStore struct { + mu sync.Mutex + data map[string][]byte +} + +func newMemStore() *memStore { return &memStore{data: map[string][]byte{}} } + +func (m *memStore) Put(_ context.Context, key string, body []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + cp := make([]byte, len(body)) + copy(cp, body) + m.data[key] = cp + return nil +} + +func (m *memStore) Get(_ context.Context, key string) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + b, ok := m.data[key] + if !ok { + return nil, ErrKeyNotFound + } + return b, nil +} + +func (m *memStore) List(_ context.Context, prefix string) ([]string, error) { + m.mu.Lock() + defer m.mu.Unlock() + out := []string{} + for k := range m.data { + if len(k) >= len(prefix) && k[:len(prefix)] == prefix { + out = append(out, k) + } + } + return out, nil +} + +func mkRegistry(t *testing.T) (*Registry, *memStore) { + t.Helper() + s := newMemStore() + r := NewRegistry(s) + r.now = func() time.Time { return time.Unix(1777435000, 0).UTC() } + return r, s +} + +func TestRegister_NewManifest(t *testing.T) { + r, _ := mkRegistry(t) + rc := int64(100) + m, existing, err := r.Register(context.Background(), "workers", "sha256:abc", + []Object{{Key: "datasets/workers/p1.parquet", Size: 1024}}, &rc) + if err != nil { + t.Fatalf("Register: %v", err) + } + if existing { + t.Error("expected existing=false for new manifest") + } + if m.DatasetID != DatasetIDForName("workers") { + t.Errorf("DatasetID: got %q, want UUIDv5(workers)", m.DatasetID) + } +} + +func TestRegister_SameFingerprint_Idempotent(t *testing.T) { + r, _ := mkRegistry(t) + rc := int64(100) + first, _, _ := r.Register(context.Background(), "workers", "sha256:abc", + []Object{{Key: "p1.parquet", Size: 1024}}, &rc) + + // Re-register same name + fingerprint with new objects. + rc2 := int64(200) + second, existing, err := r.Register(context.Background(), "workers", "sha256:abc", + []Object{{Key: "p1.parquet", Size: 1024}, {Key: "p2.parquet", Size: 2048}}, &rc2) + if err != nil { + t.Fatalf("Register (idempotent): %v", err) + } + if !existing { + t.Error("expected existing=true on idempotent re-register") + } + if second.DatasetID != first.DatasetID { + t.Errorf("DatasetID changed: %q → %q", first.DatasetID, second.DatasetID) + } + if len(second.Objects) != 2 { + t.Errorf("Objects not replaced: got %d, want 2", len(second.Objects)) + } + if second.RowCount == nil || *second.RowCount != 200 { + t.Errorf("RowCount not bumped: got %v, want 200", second.RowCount) + } +} + +func TestRegister_DifferentFingerprint_Conflict(t *testing.T) { + r, _ := mkRegistry(t) + _, _, _ = r.Register(context.Background(), "workers", "sha256:abc", + []Object{{Key: "p1.parquet", Size: 1024}}, nil) + + _, _, err := r.Register(context.Background(), "workers", "sha256:DIFFERENT", + []Object{{Key: "p1.parquet", Size: 1024}}, nil) + if !errors.Is(err, ErrFingerprintConflict) { + t.Fatalf("expected ErrFingerprintConflict, got %v", err) + } +} + +func TestRehydrate_RecoversManifests(t *testing.T) { + // Build first registry, register 2 manifests. + r1, store := mkRegistry(t) + _, _, _ = r1.Register(context.Background(), "workers", "sha256:a", nil, nil) + _, _, _ = r1.Register(context.Background(), "candidates", "sha256:b", nil, nil) + + // Build a second registry against the same store + rehydrate. + r2 := NewRegistry(store) + n, err := r2.Rehydrate(context.Background()) + if err != nil { + t.Fatalf("Rehydrate: %v", err) + } + if n != 2 { + t.Errorf("recovered %d, want 2", n) + } + if _, err := r2.Get("workers"); err != nil { + t.Errorf("Get(workers): %v", err) + } + if _, err := r2.Get("candidates"); err != nil { + t.Errorf("Get(candidates): %v", err) + } +} + +func TestList_Sorted(t *testing.T) { + r, _ := mkRegistry(t) + _, _, _ = r.Register(context.Background(), "zoo", "fp", nil, nil) + _, _, _ = r.Register(context.Background(), "alpha", "fp", nil, nil) + _, _, _ = r.Register(context.Background(), "midway", "fp", nil, nil) + got := r.List() + want := []string{"alpha", "midway", "zoo"} + for i, m := range got { + if m.Name != want[i] { + t.Errorf("List[%d]: got %q, want %q", i, m.Name, want[i]) + } + } +} + +func TestRegister_RejectsEmptyInputs(t *testing.T) { + r, _ := mkRegistry(t) + _, _, err := r.Register(context.Background(), "", "fp", nil, nil) + if err == nil { + t.Error("expected error on empty name") + } + _, _, err = r.Register(context.Background(), "x", "", nil, nil) + if err == nil { + t.Error("expected error on empty fingerprint") + } +} diff --git a/internal/catalogd/store_client.go b/internal/catalogd/store_client.go new file mode 100644 index 0000000..8d70a97 --- /dev/null +++ b/internal/catalogd/store_client.go @@ -0,0 +1,140 @@ +// store_client.go — thin HTTP client to storaged. catalogd needs to +// PUT manifest Parquets, GET them on startup, and LIST the manifests +// directory. Staying inside an HTTP boundary (rather than reaching +// into storaged's package directly) preserves the service-boundary +// shape that gRPC will eventually formalize at G1+. +package catalogd + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// StoreClient talks HTTP to the storaged service. +type StoreClient struct { + baseURL string + hc *http.Client +} + +// listResponse mirrors storaged's GET /storage/list shape: +// +// {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]} +type listResponse struct { + Prefix string `json:"prefix"` + Objects []struct { + Key string `json:"Key"` + Size int64 `json:"Size"` + } `json:"objects"` +} + +// NewStoreClient builds a client against the given storaged base URL +// (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write +// only; rehydration of many manifests at startup runs sequentially +// and each call gets its own timeout window. +func NewStoreClient(baseURL string) *StoreClient { + return &StoreClient{ + baseURL: strings.TrimRight(baseURL, "/"), + hc: &http.Client{Timeout: 30 * time.Second}, + } +} + +// Put writes raw bytes at key. body is the encoded Parquet manifest. +func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error { + u := c.baseURL + "/storage/put/" + safeKey(key) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("put req: %w", err) + } + req.ContentLength = int64(len(body)) + resp, err := c.hc.Do(req) + if err != nil { + return fmt.Errorf("put do: %w", err) + } + defer drainAndClose(resp.Body) + if resp.StatusCode != http.StatusOK { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview)) + } + return nil +} + +// Get reads the bytes at key. ErrKeyNotFound on 404. +func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) { + u := c.baseURL + "/storage/get/" + safeKey(key) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, fmt.Errorf("get req: %w", err) + } + resp, err := c.hc.Do(req) + if err != nil { + return nil, fmt.Errorf("get do: %w", err) + } + defer drainAndClose(resp.Body) + if resp.StatusCode == http.StatusNotFound { + return nil, ErrKeyNotFound + } + if resp.StatusCode != http.StatusOK { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return nil, fmt.Errorf("get %s: status %d: %s", key, resp.StatusCode, string(preview)) + } + return io.ReadAll(resp.Body) +} + +// List returns the keys under prefix. Object metadata beyond Key is +// ignored — catalogd only needs the keys to drive rehydration. +func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) { + u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, fmt.Errorf("list req: %w", err) + } + resp, err := c.hc.Do(req) + if err != nil { + return nil, fmt.Errorf("list do: %w", err) + } + defer drainAndClose(resp.Body) + if resp.StatusCode != http.StatusOK { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return nil, fmt.Errorf("list %s: status %d: %s", prefix, resp.StatusCode, string(preview)) + } + var lr listResponse + if err := json.NewDecoder(resp.Body).Decode(&lr); err != nil { + return nil, fmt.Errorf("list decode: %w", err) + } + out := make([]string, 0, len(lr.Objects)) + for _, o := range lr.Objects { + out = append(out, o.Key) + } + return out, nil +} + +// ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd +// side without exposing storaged's package types. +var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found") + +// safeKey URL-escapes path segments while preserving "/". storaged's +// chi `/storage//*` routes accept literal slashes in the +// wildcard match, so we only escape the segments, not the separators. +func safeKey(key string) string { + parts := strings.Split(key, "/") + for i, p := range parts { + parts[i] = url.PathEscape(p) + } + return strings.Join(parts, "/") +} + +// drainAndClose reads any remaining body bytes (capped at 64 KiB) and +// closes the body. Per scrum S6 (Qwen): preview-then-close on error +// paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep- +// alive reuse and slowly leaking sockets. +func drainAndClose(body io.ReadCloser) { + _, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10)) + _ = body.Close() +} diff --git a/internal/shared/config.go b/internal/shared/config.go index a91f203..0519f84 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -21,13 +21,21 @@ import ( type Config struct { Gateway ServiceConfig `toml:"gateway"` Storaged ServiceConfig `toml:"storaged"` - Catalogd ServiceConfig `toml:"catalogd"` + Catalogd CatalogConfig `toml:"catalogd"` Ingestd ServiceConfig `toml:"ingestd"` Queryd ServiceConfig `toml:"queryd"` S3 S3Config `toml:"s3"` Log LogConfig `toml:"log"` } +// CatalogConfig adds catalogd-specific knobs on top of the standard +// bind. StoragedURL points at the storaged service for manifest +// persistence; G0 defaults to the localhost bind. +type CatalogConfig struct { + Bind string `toml:"bind"` + StoragedURL string `toml:"storaged_url"` +} + // ServiceConfig is the per-binary bind config. Default Bind "" // means "use the service's hardcoded G0 default" — see DefaultConfig. type ServiceConfig struct { @@ -57,7 +65,7 @@ func DefaultConfig() Config { return Config{ Gateway: ServiceConfig{Bind: "127.0.0.1:3110"}, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, - Catalogd: ServiceConfig{Bind: "127.0.0.1:3212"}, + Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, Ingestd: ServiceConfig{Bind: "127.0.0.1:3213"}, Queryd: ServiceConfig{Bind: "127.0.0.1:3214"}, S3: S3Config{ diff --git a/lakehouse.toml b/lakehouse.toml index 9f8d51d..fd7eaba 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -12,6 +12,7 @@ bind = "127.0.0.1:3211" [catalogd] bind = "127.0.0.1:3212" +storaged_url = "http://127.0.0.1:3211" [ingestd] bind = "127.0.0.1:3213" diff --git a/scripts/d3_smoke.sh b/scripts/d3_smoke.sh new file mode 100755 index 0000000..9ae727a --- /dev/null +++ b/scripts/d3_smoke.sh @@ -0,0 +1,156 @@ +#!/usr/bin/env bash +# D3 smoke — proves the Day 3 acceptance gate end-to-end. +# +# Validates: +# - Register a fresh dataset → 200 with existing=false, dataset_id from UUIDv5(name) +# - GET /catalog/manifest/ → manifest matches what we registered +# - GET /catalog/list → manifest listed +# - Restart catalogd → /catalog/list still shows it (Parquet-backed rehydrate) +# - Re-register same name + same fingerprint → 200, existing=true, same dataset_id +# - Re-register same name + different fingerprint → 409 Conflict +# +# Requires storaged (D2) running on :3211 and reachable. +# +# Usage: ./scripts/d3_smoke.sh + +set -euo pipefail +cd "$(dirname "$0")/.." + +export PATH="$PATH:/usr/local/go/bin" + +echo "[d3-smoke] building storaged + catalogd..." +go build -o bin/ ./cmd/storaged ./cmd/catalogd + +# Cleanup any prior processes on D3 ports. +pkill -f "bin/storaged" 2>/dev/null || true +pkill -f "bin/catalogd" 2>/dev/null || true +sleep 0.2 + +STORAGED_PID="" +CATALOGD_PID="" +TMP="$(mktemp -d)" +cleanup() { + echo "[d3-smoke] cleanup" + if [ -n "$CATALOGD_PID" ]; then kill "$CATALOGD_PID" 2>/dev/null || true; fi + if [ -n "$STORAGED_PID" ]; then kill "$STORAGED_PID" 2>/dev/null || true; fi + rm -rf "$TMP" +} +trap cleanup EXIT INT TERM + +# --- launch storaged --- +echo "[d3-smoke] launching storaged..." +./bin/storaged > /tmp/storaged.log 2>&1 & +STORAGED_PID=$! +deadline=$(($(date +%s) + 5)) +while [ "$(date +%s)" -lt "$deadline" ]; do + if curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then break; fi + sleep 0.05 +done +if ! curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then + echo " [d3-smoke] storaged failed to bind"; tail -10 /tmp/storaged.log; exit 1 +fi + +# --- clean any prior catalog manifests for a fresh smoke --- +for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=_catalog/manifests/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do + curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true +done + +# --- launch catalogd (round 1) --- +launch_catalogd() { + ./bin/catalogd > /tmp/catalogd.log 2>&1 & + CATALOGD_PID=$! + local deadline=$(($(date +%s) + 5)) + while [ "$(date +%s)" -lt "$deadline" ]; do + if curl -sS --max-time 1 http://127.0.0.1:3212/health >/dev/null 2>&1; then return 0; fi + sleep 0.05 + done + echo " [d3-smoke] catalogd failed to bind"; tail -10 /tmp/catalogd.log; return 1 +} + +echo "[d3-smoke] launching catalogd (first start, empty catalog)..." +launch_catalogd + +FAILED=0 +NAME="d3_smoke_dataset" +FP1="sha256:fingerprint-A" +FP2="sha256:fingerprint-B" + +echo "[d3-smoke] POST /catalog/register (fresh):" +RESP="$(curl -sS -X POST http://127.0.0.1:3212/catalog/register \ + -H 'Content-Type: application/json' \ + -d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP1\",\"objects\":[{\"key\":\"datasets/$NAME/p1.parquet\",\"size\":1024}],\"row_count\":42}")" +EXISTING="$(echo "$RESP" | jq -r '.existing')" +DATASET_ID="$(echo "$RESP" | jq -r '.manifest.dataset_id')" +if [ "$EXISTING" = "false" ] && [ -n "$DATASET_ID" ] && [ "$DATASET_ID" != "null" ]; then + echo " ✓ fresh register → existing=false, dataset_id=$DATASET_ID" +else + echo " ✗ fresh register → $RESP" + FAILED=1 +fi + +echo "[d3-smoke] GET /catalog/manifest/$NAME:" +GOT="$(curl -sS "http://127.0.0.1:3212/catalog/manifest/$NAME" | jq -r '.dataset_id')" +if [ "$GOT" = "$DATASET_ID" ]; then + echo " ✓ manifest dataset_id matches" +else + echo " ✗ manifest dataset_id: got $GOT, want $DATASET_ID" + FAILED=1 +fi + +echo "[d3-smoke] GET /catalog/list (1 entry):" +COUNT="$(curl -sS http://127.0.0.1:3212/catalog/list | jq -r '.count')" +if [ "$COUNT" = "1" ]; then + echo " ✓ list count=1" +else + echo " ✗ list count=$COUNT (want 1)" + FAILED=1 +fi + +echo "[d3-smoke] restart catalogd → rehydrate from Parquet:" +kill "$CATALOGD_PID" 2>/dev/null || true; wait "$CATALOGD_PID" 2>/dev/null || true +launch_catalogd + +REHYDRATED_ID="$(curl -sS "http://127.0.0.1:3212/catalog/manifest/$NAME" | jq -r '.dataset_id')" +if [ "$REHYDRATED_ID" = "$DATASET_ID" ]; then + echo " ✓ rehydrated dataset_id matches across restart" +else + echo " ✗ rehydrated dataset_id: got $REHYDRATED_ID, want $DATASET_ID" + FAILED=1 +fi + +echo "[d3-smoke] re-register (same name + same fingerprint) → existing=true:" +RESP2="$(curl -sS -X POST http://127.0.0.1:3212/catalog/register \ + -H 'Content-Type: application/json' \ + -d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP1\",\"objects\":[{\"key\":\"datasets/$NAME/p1.parquet\",\"size\":1024},{\"key\":\"datasets/$NAME/p2.parquet\",\"size\":2048}],\"row_count\":84}")" +EXISTING2="$(echo "$RESP2" | jq -r '.existing')" +DATASET_ID2="$(echo "$RESP2" | jq -r '.manifest.dataset_id')" +OBJ_COUNT="$(echo "$RESP2" | jq -r '.manifest.objects | length')" +if [ "$EXISTING2" = "true" ] && [ "$DATASET_ID2" = "$DATASET_ID" ] && [ "$OBJ_COUNT" = "2" ]; then + echo " ✓ existing=true, same dataset_id, objects replaced (count=2)" +else + echo " ✗ idempotent re-register: existing=$EXISTING2 id=$DATASET_ID2 objs=$OBJ_COUNT → $RESP2" + FAILED=1 +fi + +echo "[d3-smoke] re-register (different fingerprint) → 409:" +HTTP="$(curl -sS -o "$TMP/conflict.out" -w '%{http_code}' -X POST http://127.0.0.1:3212/catalog/register \ + -H 'Content-Type: application/json' \ + -d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP2\",\"objects\":[]}")" +if [ "$HTTP" = "409" ]; then + echo " ✓ different fingerprint → 409 Conflict" +else + echo " ✗ different fingerprint → $HTTP (want 409)" + cat "$TMP/conflict.out" + FAILED=1 +fi + +# Cleanup smoke manifests. +curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_catalog/manifests/$NAME.parquet" || true + +if [ "$FAILED" -eq 0 ]; then + echo "[d3-smoke] D3 acceptance gate: PASSED" + exit 0 +else + echo "[d3-smoke] D3 acceptance gate: FAILED" + exit 1 +fi