G0 D3: catalogd Parquet manifests + ADR-020 idempotent register · 6 scrum fixes

Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory
registry with the ADR-020 idempotency contract (same name+fingerprint
reuses dataset_id; different fingerprint → 409 Conflict), HTTP client
to storaged for persistence, and rehydration on startup. Acceptance
smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the
load-bearing test that the catalog/storaged service split actually
preserves state.

dataset_id derivation diverges from Rust: UUIDv5(namespace, name)
instead of v4 surrogate. Same name on any box generates the same
dataset_id; rehydrate after disk loss converges to the same identity
rather than silently re-issuing. Namespace pinned at
a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued
depends on these bytes.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                       1 BLOCK + 5 WARN + 3 INFO
  - Kimi K2-0905 (openrouter, validated D2):   2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                  2 BLOCK + 2 WARN + 2 INFO

Fixed:
  C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds
  C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern
  S1 split-brain on persist failure → candidate-then-swap
  S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels
  S3 Get/List shallow-copy aliasing → cloneManifest deep copy
  S4 keep-alive socket leak on error paths → drainAndClose helper

Dismissed (false positives, all single-reviewer):
  Kimi BLOCK "Decode crashes on empty Parquet" — already handled
  Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required
  Qwen INFO "rb.NewRecord() error unchecked" — API returns no error

Deferred to G1+: name validation regex, per-call deadlines, Snappy
compression, list pagination continuation tokens (storaged caps at
10k with sentinel for now).

Build clean, vet clean, all tests pass, smoke 6/6 PASS after every
fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced
by arrow-go's minimum.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-28 23:36:57 -05:00
parent 8cfcdb8e5f
commit 66a704ca3e
12 changed files with 1320 additions and 20 deletions

View File

@ -1,16 +1,23 @@
// catalogd is the metadata control plane — dataset registry, // catalogd is the metadata authority — registers Parquet datasets,
// Parquet manifest persistence, idempotent registration with // persists manifests in storaged, rehydrates them on startup, and
// schema-fingerprint gate (per Rust ADR-020). D3 wires the // answers GET/list queries. ADR-020 idempotency contract enforced
// register/list/manifest routes; D1 just stands up the binary. // by internal/catalogd/registry.go.
package main package main
import ( import (
"context"
"encoding/json"
"errors"
"flag" "flag"
"log/slog" "log/slog"
"net/http"
"os" "os"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
) )
func main() { func main() {
@ -22,14 +29,103 @@ func main() {
slog.Error("config", "err", err) slog.Error("config", "err", err)
os.Exit(1) os.Exit(1)
} }
if cfg.Catalogd.StoragedURL == "" {
slog.Error("config", "err", "catalogd.storaged_url is required")
os.Exit(1)
}
if err := shared.Run("catalogd", cfg.Catalogd.Bind, func(_ chi.Router) { store := catalogd.NewStoreClient(cfg.Catalogd.StoragedURL)
// D3 wires: registry := catalogd.NewRegistry(store)
// POST /catalog/register (idempotent by name + fingerprint, 409 on drift)
// GET /catalog/manifest/{name} rehydrateCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// GET /catalog/list n, err := registry.Rehydrate(rehydrateCtx)
}); err != nil { cancel()
if err != nil {
slog.Error("rehydrate", "err", err)
os.Exit(1)
}
slog.Info("rehydrated", "manifests", n)
h := newHandlers(registry)
if err := shared.Run("catalogd", cfg.Catalogd.Bind, h.register); err != nil {
slog.Error("server", "err", err) slog.Error("server", "err", err)
os.Exit(1) os.Exit(1)
} }
} }
type handlers struct {
reg *catalogd.Registry
}
func newHandlers(r *catalogd.Registry) *handlers { return &handlers{reg: r} }
func (h *handlers) register(r chi.Router) {
r.Post("/catalog/register", h.handleRegister)
r.Get("/catalog/manifest/*", h.handleGetManifest)
r.Get("/catalog/list", h.handleList)
}
// registerRequest mirrors POST body shape.
type registerRequest struct {
Name string `json:"name"`
SchemaFingerprint string `json:"schema_fingerprint"`
Objects []catalogd.Object `json:"objects"`
RowCount *int64 `json:"row_count,omitempty"`
}
// registerResponse adds the existing flag so callers can distinguish
// fresh registration from idempotent re-register.
type registerResponse struct {
Manifest *catalogd.Manifest `json:"manifest"`
Existing bool `json:"existing"`
}
func (h *handlers) handleRegister(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
r.Body = http.MaxBytesReader(w, r.Body, 4<<20) // 4 MiB cap on register payloads
var req registerRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
return
}
m, existing, err := h.reg.Register(r.Context(), req.Name, req.SchemaFingerprint, req.Objects, req.RowCount)
if errors.Is(err, catalogd.ErrFingerprintConflict) {
http.Error(w, err.Error(), http.StatusConflict)
return
}
if errors.Is(err, catalogd.ErrEmptyName) || errors.Is(err, catalogd.ErrEmptyFingerprint) {
// Per scrum S2 (Opus): sentinel-based detection, not substring match.
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err != nil {
slog.Error("register", "name", req.Name, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(registerResponse{Manifest: m, Existing: existing})
}
func (h *handlers) handleGetManifest(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "*")
m, err := h.reg.Get(name)
if errors.Is(err, catalogd.ErrManifestNotFound) {
http.Error(w, "not found", http.StatusNotFound)
return
}
if err != nil {
slog.Error("get manifest", "name", name, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(m)
}
func (h *handlers) handleList(w http.ResponseWriter, _ *http.Request) {
items := h.reg.List()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"manifests": items, "count": len(items)})
}

View File

@ -557,3 +557,127 @@ its D1 contribution where it caught two real BLOCKs that Opus missed.
Lineage rotation is real; on a given PR, one lineage may be the only Lineage rotation is real; on a given PR, one lineage may be the only
one finding bugs and another may be confidently wrong. The convergence one finding bugs and another may be confidently wrong. The convergence
filter (≥2 reviewers) is the right gate. filter (≥2 reviewers) is the right gate.
---
## D3 — actual run results (2026-04-28 evening)
Phase G0 Day 3 executed end-to-end. Output of `scripts/d3_smoke.sh`:
```
[d3-smoke] POST /catalog/register (fresh):
✓ fresh register → existing=false, dataset_id=200a05a8-...
[d3-smoke] GET /catalog/manifest/<name>:
✓ manifest dataset_id matches
[d3-smoke] GET /catalog/list (1 entry):
✓ list count=1
[d3-smoke] restart catalogd → rehydrate from Parquet:
✓ rehydrated dataset_id matches across restart
[d3-smoke] re-register (same name + same fingerprint) → existing=true:
✓ existing=true, same dataset_id, objects replaced (count=2)
[d3-smoke] re-register (different fingerprint) → 409:
✓ different fingerprint → 409 Conflict
[d3-smoke] D3 acceptance gate: PASSED
```
What landed:
- `internal/catalogd/manifest.go` — Arrow Parquet codec for the
Manifest type (dataset_id/name/schema_fingerprint/objects-as-list-of-
struct/created_at_ns/updated_at_ns/row_count). One row per Parquet
file. `DatasetIDForName` derives a deterministic UUIDv5 from name
(namespace `a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e`); same name on
any box yields the same dataset_id.
- `internal/catalogd/registry.go` — in-memory `map[name]*Manifest`
with the ADR-020 contract: same name+fingerprint reuses
dataset_id, replaces objects, bumps `updated_at`; different
fingerprint → `ErrFingerprintConflict`. Single mutex serializes
Register across persistence to close the check→insert TOCTOU.
Rehydrate runs storaged I/O OUTSIDE the lock, swaps in the new
map under the lock.
- `internal/catalogd/store_client.go` — HTTP client over
storaged's GET/PUT/DELETE/list. `safeKey` URL-escapes path
segments while preserving `/`. Error paths drain body before
close to keep the keep-alive pool healthy.
- `cmd/catalogd/main.go` — POST /catalog/register, GET
/catalog/manifest/{name}, GET /catalog/list. Sentinel-error
detection (`errors.Is(err, ErrEmptyName)`) for 400s. 4 MiB body
cap on register payloads.
- New `[catalogd]` config: `bind` + `storaged_url`. Default
`http://127.0.0.1:3211`.
- `scripts/d3_smoke.sh` — 6 acceptance probes including
rehydrate-across-restart (the load-bearing ADR-020 contract test).
- `arrow-go/v18` v18.6.0 + `google/uuid` v1.6.0 added; Go 1.24 → 1.25
forced by arrow-go's minimum.
UUIDv5 vs Rust v4: the Go rewrite picks deterministic-from-name. Same
dataset name on any box converges to the same dataset_id; rehydrate
after disk loss can't silently issue new IDs and break downstream
cross-references. Rust's surrogate-UUIDv4 is what the prior system
used; the divergence is intentional and documented at
`internal/catalogd/manifest.go:34`.
Next: D4 — ingestd CSV → Parquet → catalogd register loop.
---
## D3 — code scrum review (3-lineage parallel pass)
After D3 shipped, the actual code went through the same 3-lineage
parallel scrum as D1/D2.
| Pass | Reviewer | Latency | Findings |
|---|---|---|---|
| 1 | `opencode/claude-opus-4-7` | ~30s | 1 BLOCK + 5 WARN + 3 INFO = 9 |
| 2 | `openrouter/moonshotai/kimi-k2-0905` | ~30s | 2 BLOCK + 2 WARN + 1 INFO = 5 |
| 3 | `openrouter/qwen/qwen3-coder` | ~25s | 2 BLOCK + 2 WARN + 2 INFO = 6 |
Total: 20 distinct findings (some convergent across reviewers).
Kimi route shopping from D2 paid off — `openrouter/moonshotai/kimi-k2-0905`
delivered structured output on first attempt, no max_tokens cap, no
empty-content reasoning trap.
### Convergent findings (≥2 reviewers — high confidence)
| # | Severity | Finding | Reviewers | Disposition |
|---|---|---|---|---|
| C1 | BLOCK×3 | `Decode` indexes `listArr.Offsets()[0]/[1]` directly — fragile under array slicing/non-zero offset; panics on malformed Parquet (multi-row reader chunks, single-offset corrupt files) | Opus + Kimi + Qwen | **Fixed** — switched to `listArr.ValueOffsets(0)` (Arrow API that accounts for array offset) + bounds check on `start/end` against `structArr.Len()` |
| C2 | WARN×2 | `Rehydrate` holds the registry mutex across network I/O to storaged — slow storaged blocks all `Register/Get/List`; future re-sync endpoints stall | Opus + Kimi | **Fixed** — list/get/decode happen outside the lock; the completed map is swapped into `r.byKey` under a brief lock |
### Single-reviewer findings (lineage-specific catches)
| # | Severity | Finding | Reviewer | Disposition |
|---|---|---|---|---|
| S1 | WARN | Idempotent re-register mutates the in-memory manifest BEFORE persist succeeds — split-brain on storaged failure (in-memory advances, disk holds old, restart silently rolls back) | Opus | **Fixed** — build candidate copy of prior, persist candidate, swap into `r.byKey` only on persist success |
| S2 | WARN | `cmd/catalogd` detects empty-input via `strings.Contains(err.Error(), "empty name")` — brittle, anyone reformatting the registry error silently demotes a 400 to a 500 | Opus | **Fixed** — exported `ErrEmptyName` / `ErrEmptyFingerprint` sentinels, HTTP layer uses `errors.Is` |
| S3 | WARN | `Get/List` `cp := *m` aliases the `Objects` slice + `RowCount` pointer — caller mutation corrupts registry state under the lock-free read | Opus | **Fixed**`cloneManifest` deep-copies Objects + dereferences RowCount into a fresh `*int64` |
| S4 | WARN | error paths in `store_client` preview body but don't drain before close → HTTP/1.1 keep-alive pool reuse breaks → slow socket leak | Qwen | **Fixed**`drainAndClose` helper reads up to 64 KiB before close on every defer |
| S5 | INFO×3 | name-validation regex / per-call deadline / Snappy compression on manifest writes | Opus | **Deferred** — small, no risk if skipped |
| S6 | WARN | `Rehydrate` aborts on first decode error → partial state | Kimi | **Accepted** — fail-loud > silent-partial is the design; cmd/catalogd `os.Exit(1)`s on rehydrate error so a corrupt manifest is operator-visible at startup |
| S7 | WARN | `store_client.List` ignores pagination — if storaged gains continuation tokens, the client silently drops everything past page 1 | Opus | **Accepted** — storaged caps lists at MaxListResults=10_000 with a `...truncated...` sentinel; no continuation token in the wire format yet (deferred to D7.5+ alongside multi-bucket federation) |
| F1 | BLOCK | `Decode` crashes on empty Parquet (NumRows==0) — `Value(0)` panics | Kimi | **Dismissed** — already handled. `tbl.NumRows() != 1` returns error before any column access; NumRows==0 also fails the subsequent `rr.Next()` check |
| F2 | INFO | `safeKey` double-escapes slashes; should use `url.PathEscape(key)` directly | Kimi | **Dismissed**`url.PathEscape("a/b")` returns `"a%2Fb"`. Splitting on `/` first is **necessary** to preserve the path separator while escaping segment content. The current code is correct; Kimi's suggested fix would break storaged's chi wildcard match |
| F3 | INFO | `rb.NewRecord()` error unchecked | Qwen | **Dismissed** — false signature. `RecordBuilder.NewRecord()` returns `arrow.Record` only; no error to check |
### Cumulative D3 disposition
- **3-lineage parallel pass: 20 distinct findings**
- **Fixed: 6** (2 convergent + 4 single-reviewer)
- **Accepted-with-rationale: 5** (3 INFO + 2 WARN with deferred-by-design rationale)
- **Dismissed (false positives): 3** (1 Kimi BLOCK on already-handled empty case, 1 Kimi INFO on safeKey, 1 Qwen INFO on Arrow API signature)
- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round.
The C1 triple-confirmation (Opus + Kimi + Qwen all flagging the same
list-offset indexing) is the strongest convergence signal across the
three days. All three correctly diagnosed the underlying issue
(direct `Offsets()` indexing is fragile under array slicing); Opus
named the right Arrow API (`ValueOffsets(0)`), Kimi and Qwen named
the bounds-check shape. Fix uses Opus's API + the others' bounds
discipline together.
S1 (split-brain on persist failure) was the most *consequential*
finding — D3's smoke would have stayed green through every test case
because none of them simulate storaged-down mid-register. Opus alone
caught it; the fix is a 4-line candidate-then-swap pattern but the
class of bug (in-memory advances ahead of disk) is exactly the
distributed-systems hazard ADR-020 was added to prevent at a
different layer.

33
go.mod
View File

@ -1,19 +1,25 @@
module git.agentview.dev/profit/golangLAKEHOUSE module git.agentview.dev/profit/golangLAKEHOUSE
go 1.24 go 1.25.0
require ( require (
github.com/apache/arrow-go/v18 v18.6.0
github.com/aws/aws-sdk-go-v2 v1.41.6
github.com/aws/aws-sdk-go-v2/config v1.32.16
github.com/aws/aws-sdk-go-v2/credentials v1.19.15
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0
github.com/aws/smithy-go v1.25.0
github.com/go-chi/chi/v5 v5.2.5 github.com/go-chi/chi/v5 v5.2.5
github.com/google/uuid v1.6.0
github.com/pelletier/go-toml/v2 v2.3.0 github.com/pelletier/go-toml/v2 v2.3.0
) )
require ( require (
github.com/aws/aws-sdk-go-v2 v1.41.6 // indirect github.com/andybalholm/brotli v1.2.1 // indirect
github.com/apache/thrift v0.22.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 // indirect
github.com/aws/aws-sdk-go-v2/config v1.32.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect
@ -21,10 +27,23 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect
github.com/aws/smithy-go v1.25.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/goccy/go-json v0.10.6 // indirect
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
google.golang.org/grpc v1.80.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
) )

74
go.sum
View File

@ -1,3 +1,9 @@
github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro=
github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/apache/arrow-go/v18 v18.6.0 h1:GX/Jyd3R7mCLiECAwY9FWbbaYblie2WXBSz4Sw8fNpM=
github.com/apache/arrow-go/v18 v18.6.0/go.mod h1:gm3MiPpY82fLYK5VKPB3WoJbsiLVDfT7flD5/vHReKw=
github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g=
github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg= github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg=
github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ= github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 h1:adBsCIIpLbLmYnkQU+nAChU5yhVTvu5PerROm+/Kq2A= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 h1:adBsCIIpLbLmYnkQU+nAChU5yhVTvu5PerROm+/Kq2A=
@ -36,7 +42,75 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 h1:ks8KBcZPh3PYISr5dAiXCM5/Thcu
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo= github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo=
github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U= github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U=
github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/goccy/go-json v0.10.6 h1:p8HrPJzOakx/mn/bQtjgNjdTcN+/S6FcG2CTtQOrHVU=
github.com/goccy/go-json v0.10.6/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs=
github.com/google/flatbuffers v25.12.19+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM= github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM=
github.com/pelletier/go-toml/v2 v2.3.0/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pelletier/go-toml/v2 v2.3.0/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU=
golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,200 @@
// Package catalogd is the metadata authority — manifests for every
// registered dataset. A Manifest is the catalog row Rust calls
// `Manifest`: dataset_id (deterministic from name via UUIDv5),
// schema_fingerprint (caller-supplied schema hash), the object keys
// that physically back the dataset in storaged, plus timestamps and
// optional row_count.
//
// G0 stores one Manifest per Parquet file at
// primary://_catalog/manifests/<name>.parquet. One row per file —
// catalog manifests are written rarely and read on startup, so the
// per-file shape favors atomic register over storage density.
package catalogd
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/google/uuid"
)
// catalogNamespace is the v5 UUID namespace for dataset_id derivation.
// Same name → same dataset_id across boxes / cold starts. Don't change
// — every dataset_id ever issued depends on this byte sequence.
var catalogNamespace = uuid.MustParse("a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e")
// Object is one storaged key contributing to a dataset.
type Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
}
// Manifest is the catalog row for one dataset.
type Manifest struct {
DatasetID string `json:"dataset_id"`
Name string `json:"name"`
SchemaFingerprint string `json:"schema_fingerprint"`
Objects []Object `json:"objects"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RowCount *int64 `json:"row_count,omitempty"`
}
// DatasetIDForName returns the deterministic UUIDv5 dataset_id for a
// logical dataset name. Idempotent on the same name across boxes.
func DatasetIDForName(name string) string {
return uuid.NewSHA1(catalogNamespace, []byte(name)).String()
}
// manifestArrowSchema is the Arrow schema for the on-disk Parquet.
// Field order matters — codec builders rely on it.
var manifestArrowSchema = arrow.NewSchema([]arrow.Field{
{Name: "dataset_id", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "schema_fingerprint", Type: arrow.BinaryTypes.String},
{Name: "objects", Type: arrow.ListOf(arrow.StructOf(
arrow.Field{Name: "key", Type: arrow.BinaryTypes.String},
arrow.Field{Name: "size", Type: arrow.PrimitiveTypes.Int64},
))},
{Name: "created_at_unix_ns", Type: arrow.PrimitiveTypes.Int64},
{Name: "updated_at_unix_ns", Type: arrow.PrimitiveTypes.Int64},
{Name: "row_count", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
}, nil)
// Encode writes a single Manifest to a Parquet byte slice. Memory
// allocations are bounded — manifests have tens of objects, not
// millions.
func Encode(m *Manifest) ([]byte, error) {
mem := memory.NewGoAllocator()
rb := array.NewRecordBuilder(mem, manifestArrowSchema)
defer rb.Release()
rb.Field(0).(*array.StringBuilder).Append(m.DatasetID)
rb.Field(1).(*array.StringBuilder).Append(m.Name)
rb.Field(2).(*array.StringBuilder).Append(m.SchemaFingerprint)
listB := rb.Field(3).(*array.ListBuilder)
listB.Append(true)
structB := listB.ValueBuilder().(*array.StructBuilder)
keyB := structB.FieldBuilder(0).(*array.StringBuilder)
sizeB := structB.FieldBuilder(1).(*array.Int64Builder)
for _, o := range m.Objects {
structB.Append(true)
keyB.Append(o.Key)
sizeB.Append(o.Size)
}
rb.Field(4).(*array.Int64Builder).Append(m.CreatedAt.UnixNano())
rb.Field(5).(*array.Int64Builder).Append(m.UpdatedAt.UnixNano())
if m.RowCount != nil {
rb.Field(6).(*array.Int64Builder).Append(*m.RowCount)
} else {
rb.Field(6).(*array.Int64Builder).AppendNull()
}
rec := rb.NewRecord()
defer rec.Release()
var buf bytes.Buffer
props := parquet.NewWriterProperties()
arrowProps := pqarrow.NewArrowWriterProperties()
w, err := pqarrow.NewFileWriter(manifestArrowSchema, &buf, props, arrowProps)
if err != nil {
return nil, fmt.Errorf("pqarrow writer: %w", err)
}
if err := w.Write(rec); err != nil {
return nil, fmt.Errorf("pqarrow write: %w", err)
}
if err := w.Close(); err != nil {
return nil, fmt.Errorf("pqarrow close: %w", err)
}
return buf.Bytes(), nil
}
// Decode reads a single-row Parquet manifest back into a Manifest.
func Decode(b []byte) (*Manifest, error) {
rdr, err := file.NewParquetReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("parquet reader: %w", err)
}
defer rdr.Close()
pr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.NewGoAllocator())
if err != nil {
return nil, fmt.Errorf("pqarrow reader: %w", err)
}
tbl, err := pr.ReadTable(context.Background())
if err != nil {
return nil, fmt.Errorf("read table: %w", err)
}
defer tbl.Release()
if tbl.NumRows() != 1 {
return nil, fmt.Errorf("manifest parquet: expected 1 row, got %d", tbl.NumRows())
}
rr := array.NewTableReader(tbl, 1)
defer rr.Release()
if !rr.Next() {
return nil, errors.New("manifest parquet: no record batch")
}
rec := rr.Record()
m := &Manifest{
DatasetID: rec.Column(0).(*array.String).Value(0),
Name: rec.Column(1).(*array.String).Value(0),
SchemaFingerprint: rec.Column(2).(*array.String).Value(0),
}
// Per scrum C1 (3-way convergent): use ValueOffsets which accounts
// for the array's own offset (non-zero under slicing) and is bounds-
// safe by API contract. Direct Offsets()[0]/[1] indexing is fragile
// under multi-row reads and panics on malformed offset buffers.
listArr := rec.Column(3).(*array.List)
structArr := listArr.ListValues().(*array.Struct)
keyArr := structArr.Field(0).(*array.String)
sizeArr := structArr.Field(1).(*array.Int64)
start, end := listArr.ValueOffsets(0)
if start < 0 || end < start || end > int64(structArr.Len()) {
return nil, fmt.Errorf("manifest: bad list offsets [%d, %d] for struct len %d",
start, end, structArr.Len())
}
for i := start; i < end; i++ {
m.Objects = append(m.Objects, Object{
Key: keyArr.Value(int(i)),
Size: sizeArr.Value(int(i)),
})
}
m.CreatedAt = time.Unix(0, rec.Column(4).(*array.Int64).Value(0))
m.UpdatedAt = time.Unix(0, rec.Column(5).(*array.Int64).Value(0))
rcArr := rec.Column(6).(*array.Int64)
if rcArr.IsValid(0) {
v := rcArr.Value(0)
m.RowCount = &v
}
return m, nil
}
// EncodeReader is a small convenience for callers that want an
// io.Reader over the encoded bytes (matches the storaged HTTP PUT
// signature).
func EncodeReader(m *Manifest) (io.Reader, int64, error) {
b, err := Encode(m)
if err != nil {
return nil, 0, err
}
return bytes.NewReader(b), int64(len(b)), nil
}

View File

@ -0,0 +1,103 @@
package catalogd
import (
"testing"
"time"
)
func TestEncodeDecode_RoundTrip(t *testing.T) {
rc := int64(500_000)
now := time.Unix(1777435000, 123456789)
want := &Manifest{
DatasetID: DatasetIDForName("workers_500k"),
Name: "workers_500k",
SchemaFingerprint: "sha256:abcdef",
Objects: []Object{
{Key: "datasets/workers_500k/part-001.parquet", Size: 75 * 1024 * 1024},
{Key: "datasets/workers_500k/part-002.parquet", Size: 12 * 1024 * 1024},
},
CreatedAt: now,
UpdatedAt: now.Add(time.Minute),
RowCount: &rc,
}
b, err := Encode(want)
if err != nil {
t.Fatalf("Encode: %v", err)
}
if len(b) == 0 {
t.Fatal("Encode returned 0 bytes")
}
got, err := Decode(b)
if err != nil {
t.Fatalf("Decode: %v", err)
}
if got.DatasetID != want.DatasetID {
t.Errorf("DatasetID: got %q, want %q", got.DatasetID, want.DatasetID)
}
if got.Name != want.Name {
t.Errorf("Name: got %q, want %q", got.Name, want.Name)
}
if got.SchemaFingerprint != want.SchemaFingerprint {
t.Errorf("SchemaFingerprint: got %q, want %q", got.SchemaFingerprint, want.SchemaFingerprint)
}
if len(got.Objects) != 2 {
t.Fatalf("Objects: got %d, want 2", len(got.Objects))
}
for i, o := range got.Objects {
if o.Key != want.Objects[i].Key || o.Size != want.Objects[i].Size {
t.Errorf("Objects[%d]: got %+v, want %+v", i, o, want.Objects[i])
}
}
// Times round-trip via UnixNano so equality is on the nanosecond.
if !got.CreatedAt.Equal(want.CreatedAt) {
t.Errorf("CreatedAt: got %v, want %v", got.CreatedAt, want.CreatedAt)
}
if !got.UpdatedAt.Equal(want.UpdatedAt) {
t.Errorf("UpdatedAt: got %v, want %v", got.UpdatedAt, want.UpdatedAt)
}
if got.RowCount == nil || *got.RowCount != *want.RowCount {
t.Errorf("RowCount: got %v, want %v", got.RowCount, want.RowCount)
}
}
func TestEncodeDecode_NoRowCount(t *testing.T) {
want := &Manifest{
DatasetID: DatasetIDForName("unknown_size"),
Name: "unknown_size",
SchemaFingerprint: "sha256:zero",
Objects: []Object{},
CreatedAt: time.Unix(0, 0),
UpdatedAt: time.Unix(0, 0),
RowCount: nil,
}
b, err := Encode(want)
if err != nil {
t.Fatal(err)
}
got, err := Decode(b)
if err != nil {
t.Fatal(err)
}
if got.RowCount != nil {
t.Errorf("RowCount: got %v, want nil", got.RowCount)
}
if len(got.Objects) != 0 {
t.Errorf("Objects: got %d, want 0", len(got.Objects))
}
}
func TestDatasetIDForName_Deterministic(t *testing.T) {
a := DatasetIDForName("workers_500k")
b := DatasetIDForName("workers_500k")
if a != b {
t.Errorf("DatasetIDForName not deterministic: %q vs %q", a, b)
}
c := DatasetIDForName("different")
if a == c {
t.Errorf("DatasetIDForName collided across distinct names")
}
}

View File

@ -0,0 +1,219 @@
// registry.go — the in-memory catalog index plus the ADR-020
// idempotent register contract. The register path holds a single
// mutex across (lookup → fingerprint check → storage write →
// in-memory update) to close the check→insert TOCTOU window. The
// historical Rust bug (308× duplicate manifests on re-register) is
// the prior art — don't loosen this lock.
package catalogd
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
)
// ManifestPrefix is the storaged key prefix that holds catalog
// manifests. Objects under this prefix are NOT user data and never
// surface through ingest/query paths.
const ManifestPrefix = "_catalog/manifests/"
// ErrFingerprintConflict is returned by Register when a manifest with
// the same name already exists under a different schema fingerprint.
// HTTP layer maps this to 409 Conflict; gRPC will map to FAILED_PRECONDITION.
var ErrFingerprintConflict = errors.New("catalogd: schema fingerprint conflict")
// ErrManifestNotFound is returned by Get when the requested name has
// no manifest registered.
var ErrManifestNotFound = errors.New("catalogd: manifest not found")
// ErrEmptyName / ErrEmptyFingerprint are returned by Register on
// missing required inputs. HTTP layer maps both to 400. Per scrum S2
// (Opus): sentinel errors so the HTTP boundary uses errors.Is rather
// than substring matching err.Error().
var (
ErrEmptyName = errors.New("catalogd: empty name")
ErrEmptyFingerprint = errors.New("catalogd: empty schema_fingerprint")
)
// Store abstracts the storaged HTTP wire so registry can be unit-
// tested with an in-memory fake.
type Store interface {
Put(ctx context.Context, key string, body []byte) error
Get(ctx context.Context, key string) ([]byte, error)
List(ctx context.Context, prefix string) ([]string, error)
}
// Registry is the in-memory authority. Persistence lives in storaged
// at ManifestPrefix; Registry is rehydrated on startup.
type Registry struct {
mu sync.Mutex
byKey map[string]*Manifest // name → manifest
store Store
now func() time.Time // injectable for tests
}
// NewRegistry builds an empty registry bound to a Store. Call
// Rehydrate after construction to pick up persisted manifests.
func NewRegistry(store Store) *Registry {
return &Registry{
byKey: make(map[string]*Manifest),
store: store,
now: time.Now,
}
}
// Rehydrate lists ManifestPrefix in storaged and decodes every entry
// into the in-memory map. Returns the count of manifests recovered.
// On any per-file decode error, returns immediately so a corrupt
// catalog doesn't half-load and silently lose state.
//
// Per scrum C2 (Opus + Kimi convergent): network I/O happens OUTSIDE
// the registry mutex so a slow storaged doesn't block concurrent
// Register/Get/List. The completed map is swapped in under the lock.
func (r *Registry) Rehydrate(ctx context.Context) (int, error) {
keys, err := r.store.List(ctx, ManifestPrefix)
if err != nil {
return 0, fmt.Errorf("list manifests: %w", err)
}
loaded := make(map[string]*Manifest)
for _, k := range keys {
if !strings.HasPrefix(k, ManifestPrefix) || !strings.HasSuffix(k, ".parquet") {
continue
}
body, err := r.store.Get(ctx, k)
if err != nil {
return len(loaded), fmt.Errorf("get manifest %s: %w", k, err)
}
m, err := Decode(body)
if err != nil {
return len(loaded), fmt.Errorf("decode manifest %s: %w", k, err)
}
loaded[m.Name] = m
}
r.mu.Lock()
defer r.mu.Unlock()
r.byKey = loaded
return len(loaded), nil
}
// Register applies the ADR-020 idempotency contract:
//
// - No prior manifest for name → create (returns existing=false)
// - Prior manifest, same fingerprint → update objects + bump
// updated_at, reuse dataset_id (returns existing=true)
// - Prior manifest, different fingerprint → ErrFingerprintConflict
//
// The mutex is held across the storage write so concurrent calls for
// the same name serialize through the persistence layer (low TPS,
// correctness > throughput).
func (r *Registry) Register(ctx context.Context, name, fingerprint string, objects []Object, rowCount *int64) (*Manifest, bool, error) {
if name == "" {
return nil, false, ErrEmptyName
}
if fingerprint == "" {
return nil, false, ErrEmptyFingerprint
}
r.mu.Lock()
defer r.mu.Unlock()
now := r.now().UTC()
prior, exists := r.byKey[name]
if exists {
if prior.SchemaFingerprint != fingerprint {
return nil, true, fmt.Errorf("%w: name=%q have=%s got=%s",
ErrFingerprintConflict, name, prior.SchemaFingerprint, fingerprint)
}
// Same fingerprint — reuse dataset_id, replace objects, bump updated_at.
// Per scrum S1 (Opus): build candidate, persist, then swap in. Mutating
// `prior` before persist succeeds creates split-brain if storaged is
// down — in-memory advances, disk holds the old state, restart loses
// what callers were told didn't happen.
candidate := *prior
candidate.Objects = objects
candidate.UpdatedAt = now
candidate.RowCount = rowCount
if err := r.persist(ctx, &candidate); err != nil {
return nil, true, err
}
r.byKey[name] = &candidate
return &candidate, true, nil
}
m := &Manifest{
DatasetID: DatasetIDForName(name),
Name: name,
SchemaFingerprint: fingerprint,
Objects: objects,
CreatedAt: now,
UpdatedAt: now,
RowCount: rowCount,
}
if err := r.persist(ctx, m); err != nil {
return nil, false, err
}
r.byKey[name] = m
return m, false, nil
}
// Get returns a deep copy of the manifest for name, or ErrManifestNotFound.
func (r *Registry) Get(name string) (*Manifest, error) {
r.mu.Lock()
defer r.mu.Unlock()
m, ok := r.byKey[name]
if !ok {
return nil, ErrManifestNotFound
}
return cloneManifest(m), nil
}
// List returns deep copies of every manifest, sorted by name.
// Callers may mutate the returned slice and the underlying Manifest
// values without affecting registry state.
func (r *Registry) List() []*Manifest {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]*Manifest, 0, len(r.byKey))
for _, m := range r.byKey {
out = append(out, cloneManifest(m))
}
sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
return out
}
// cloneManifest deep-copies the Objects slice and dereferences
// RowCount into a fresh pointer so a returned manifest cannot alias
// registry state. Per scrum S3 (Opus): the prior `cp := *m` shape
// shared the Objects backing array — caller-side index writes
// corrupted registry state without holding the mutex.
func cloneManifest(m *Manifest) *Manifest {
cp := *m
if m.Objects != nil {
cp.Objects = make([]Object, len(m.Objects))
copy(cp.Objects, m.Objects)
}
if m.RowCount != nil {
v := *m.RowCount
cp.RowCount = &v
}
return &cp
}
// persist encodes the manifest and writes it to storaged at the
// canonical path. Caller MUST hold r.mu — this function does not
// take the lock itself.
func (r *Registry) persist(ctx context.Context, m *Manifest) error {
body, err := Encode(m)
if err != nil {
return fmt.Errorf("encode manifest %s: %w", m.Name, err)
}
key := ManifestPrefix + m.Name + ".parquet"
if err := r.store.Put(ctx, key, body); err != nil {
return fmt.Errorf("persist manifest %s: %w", m.Name, err)
}
return nil
}

View File

@ -0,0 +1,160 @@
package catalogd
import (
"context"
"errors"
"sync"
"testing"
"time"
)
// memStore is an in-memory Store fake for unit tests.
type memStore struct {
mu sync.Mutex
data map[string][]byte
}
func newMemStore() *memStore { return &memStore{data: map[string][]byte{}} }
func (m *memStore) Put(_ context.Context, key string, body []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
cp := make([]byte, len(body))
copy(cp, body)
m.data[key] = cp
return nil
}
func (m *memStore) Get(_ context.Context, key string) ([]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()
b, ok := m.data[key]
if !ok {
return nil, ErrKeyNotFound
}
return b, nil
}
func (m *memStore) List(_ context.Context, prefix string) ([]string, error) {
m.mu.Lock()
defer m.mu.Unlock()
out := []string{}
for k := range m.data {
if len(k) >= len(prefix) && k[:len(prefix)] == prefix {
out = append(out, k)
}
}
return out, nil
}
func mkRegistry(t *testing.T) (*Registry, *memStore) {
t.Helper()
s := newMemStore()
r := NewRegistry(s)
r.now = func() time.Time { return time.Unix(1777435000, 0).UTC() }
return r, s
}
func TestRegister_NewManifest(t *testing.T) {
r, _ := mkRegistry(t)
rc := int64(100)
m, existing, err := r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "datasets/workers/p1.parquet", Size: 1024}}, &rc)
if err != nil {
t.Fatalf("Register: %v", err)
}
if existing {
t.Error("expected existing=false for new manifest")
}
if m.DatasetID != DatasetIDForName("workers") {
t.Errorf("DatasetID: got %q, want UUIDv5(workers)", m.DatasetID)
}
}
func TestRegister_SameFingerprint_Idempotent(t *testing.T) {
r, _ := mkRegistry(t)
rc := int64(100)
first, _, _ := r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "p1.parquet", Size: 1024}}, &rc)
// Re-register same name + fingerprint with new objects.
rc2 := int64(200)
second, existing, err := r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "p1.parquet", Size: 1024}, {Key: "p2.parquet", Size: 2048}}, &rc2)
if err != nil {
t.Fatalf("Register (idempotent): %v", err)
}
if !existing {
t.Error("expected existing=true on idempotent re-register")
}
if second.DatasetID != first.DatasetID {
t.Errorf("DatasetID changed: %q → %q", first.DatasetID, second.DatasetID)
}
if len(second.Objects) != 2 {
t.Errorf("Objects not replaced: got %d, want 2", len(second.Objects))
}
if second.RowCount == nil || *second.RowCount != 200 {
t.Errorf("RowCount not bumped: got %v, want 200", second.RowCount)
}
}
func TestRegister_DifferentFingerprint_Conflict(t *testing.T) {
r, _ := mkRegistry(t)
_, _, _ = r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "p1.parquet", Size: 1024}}, nil)
_, _, err := r.Register(context.Background(), "workers", "sha256:DIFFERENT",
[]Object{{Key: "p1.parquet", Size: 1024}}, nil)
if !errors.Is(err, ErrFingerprintConflict) {
t.Fatalf("expected ErrFingerprintConflict, got %v", err)
}
}
func TestRehydrate_RecoversManifests(t *testing.T) {
// Build first registry, register 2 manifests.
r1, store := mkRegistry(t)
_, _, _ = r1.Register(context.Background(), "workers", "sha256:a", nil, nil)
_, _, _ = r1.Register(context.Background(), "candidates", "sha256:b", nil, nil)
// Build a second registry against the same store + rehydrate.
r2 := NewRegistry(store)
n, err := r2.Rehydrate(context.Background())
if err != nil {
t.Fatalf("Rehydrate: %v", err)
}
if n != 2 {
t.Errorf("recovered %d, want 2", n)
}
if _, err := r2.Get("workers"); err != nil {
t.Errorf("Get(workers): %v", err)
}
if _, err := r2.Get("candidates"); err != nil {
t.Errorf("Get(candidates): %v", err)
}
}
func TestList_Sorted(t *testing.T) {
r, _ := mkRegistry(t)
_, _, _ = r.Register(context.Background(), "zoo", "fp", nil, nil)
_, _, _ = r.Register(context.Background(), "alpha", "fp", nil, nil)
_, _, _ = r.Register(context.Background(), "midway", "fp", nil, nil)
got := r.List()
want := []string{"alpha", "midway", "zoo"}
for i, m := range got {
if m.Name != want[i] {
t.Errorf("List[%d]: got %q, want %q", i, m.Name, want[i])
}
}
}
func TestRegister_RejectsEmptyInputs(t *testing.T) {
r, _ := mkRegistry(t)
_, _, err := r.Register(context.Background(), "", "fp", nil, nil)
if err == nil {
t.Error("expected error on empty name")
}
_, _, err = r.Register(context.Background(), "x", "", nil, nil)
if err == nil {
t.Error("expected error on empty fingerprint")
}
}

View File

@ -0,0 +1,140 @@
// store_client.go — thin HTTP client to storaged. catalogd needs to
// PUT manifest Parquets, GET them on startup, and LIST the manifests
// directory. Staying inside an HTTP boundary (rather than reaching
// into storaged's package directly) preserves the service-boundary
// shape that gRPC will eventually formalize at G1+.
package catalogd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
// StoreClient talks HTTP to the storaged service.
type StoreClient struct {
baseURL string
hc *http.Client
}
// listResponse mirrors storaged's GET /storage/list shape:
//
// {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]}
type listResponse struct {
Prefix string `json:"prefix"`
Objects []struct {
Key string `json:"Key"`
Size int64 `json:"Size"`
} `json:"objects"`
}
// NewStoreClient builds a client against the given storaged base URL
// (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write
// only; rehydration of many manifests at startup runs sequentially
// and each call gets its own timeout window.
func NewStoreClient(baseURL string) *StoreClient {
return &StoreClient{
baseURL: strings.TrimRight(baseURL, "/"),
hc: &http.Client{Timeout: 30 * time.Second},
}
}
// Put writes raw bytes at key. body is the encoded Parquet manifest.
func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error {
u := c.baseURL + "/storage/put/" + safeKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("put req: %w", err)
}
req.ContentLength = int64(len(body))
resp, err := c.hc.Do(req)
if err != nil {
return fmt.Errorf("put do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return nil
}
// Get reads the bytes at key. ErrKeyNotFound on 404.
func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) {
u := c.baseURL + "/storage/get/" + safeKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("get req: %w", err)
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("get do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode == http.StatusNotFound {
return nil, ErrKeyNotFound
}
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("get %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return io.ReadAll(resp.Body)
}
// List returns the keys under prefix. Object metadata beyond Key is
// ignored — catalogd only needs the keys to drive rehydration.
func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) {
u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("list req: %w", err)
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("list do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("list %s: status %d: %s", prefix, resp.StatusCode, string(preview))
}
var lr listResponse
if err := json.NewDecoder(resp.Body).Decode(&lr); err != nil {
return nil, fmt.Errorf("list decode: %w", err)
}
out := make([]string, 0, len(lr.Objects))
for _, o := range lr.Objects {
out = append(out, o.Key)
}
return out, nil
}
// ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd
// side without exposing storaged's package types.
var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found")
// safeKey URL-escapes path segments while preserving "/". storaged's
// chi `/storage/<verb>/*` routes accept literal slashes in the
// wildcard match, so we only escape the segments, not the separators.
func safeKey(key string) string {
parts := strings.Split(key, "/")
for i, p := range parts {
parts[i] = url.PathEscape(p)
}
return strings.Join(parts, "/")
}
// drainAndClose reads any remaining body bytes (capped at 64 KiB) and
// closes the body. Per scrum S6 (Qwen): preview-then-close on error
// paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep-
// alive reuse and slowly leaking sockets.
func drainAndClose(body io.ReadCloser) {
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
_ = body.Close()
}

View File

@ -21,13 +21,21 @@ import (
type Config struct { type Config struct {
Gateway ServiceConfig `toml:"gateway"` Gateway ServiceConfig `toml:"gateway"`
Storaged ServiceConfig `toml:"storaged"` Storaged ServiceConfig `toml:"storaged"`
Catalogd ServiceConfig `toml:"catalogd"` Catalogd CatalogConfig `toml:"catalogd"`
Ingestd ServiceConfig `toml:"ingestd"` Ingestd ServiceConfig `toml:"ingestd"`
Queryd ServiceConfig `toml:"queryd"` Queryd ServiceConfig `toml:"queryd"`
S3 S3Config `toml:"s3"` S3 S3Config `toml:"s3"`
Log LogConfig `toml:"log"` Log LogConfig `toml:"log"`
} }
// CatalogConfig adds catalogd-specific knobs on top of the standard
// bind. StoragedURL points at the storaged service for manifest
// persistence; G0 defaults to the localhost bind.
type CatalogConfig struct {
Bind string `toml:"bind"`
StoragedURL string `toml:"storaged_url"`
}
// ServiceConfig is the per-binary bind config. Default Bind "" // ServiceConfig is the per-binary bind config. Default Bind ""
// means "use the service's hardcoded G0 default" — see DefaultConfig. // means "use the service's hardcoded G0 default" — see DefaultConfig.
type ServiceConfig struct { type ServiceConfig struct {
@ -57,7 +65,7 @@ func DefaultConfig() Config {
return Config{ return Config{
Gateway: ServiceConfig{Bind: "127.0.0.1:3110"}, Gateway: ServiceConfig{Bind: "127.0.0.1:3110"},
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
Catalogd: ServiceConfig{Bind: "127.0.0.1:3212"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
Ingestd: ServiceConfig{Bind: "127.0.0.1:3213"}, Ingestd: ServiceConfig{Bind: "127.0.0.1:3213"},
Queryd: ServiceConfig{Bind: "127.0.0.1:3214"}, Queryd: ServiceConfig{Bind: "127.0.0.1:3214"},
S3: S3Config{ S3: S3Config{

View File

@ -12,6 +12,7 @@ bind = "127.0.0.1:3211"
[catalogd] [catalogd]
bind = "127.0.0.1:3212" bind = "127.0.0.1:3212"
storaged_url = "http://127.0.0.1:3211"
[ingestd] [ingestd]
bind = "127.0.0.1:3213" bind = "127.0.0.1:3213"

156
scripts/d3_smoke.sh Executable file
View File

@ -0,0 +1,156 @@
#!/usr/bin/env bash
# D3 smoke — proves the Day 3 acceptance gate end-to-end.
#
# Validates:
# - Register a fresh dataset → 200 with existing=false, dataset_id from UUIDv5(name)
# - GET /catalog/manifest/<name> → manifest matches what we registered
# - GET /catalog/list → manifest listed
# - Restart catalogd → /catalog/list still shows it (Parquet-backed rehydrate)
# - Re-register same name + same fingerprint → 200, existing=true, same dataset_id
# - Re-register same name + different fingerprint → 409 Conflict
#
# Requires storaged (D2) running on :3211 and reachable.
#
# Usage: ./scripts/d3_smoke.sh
set -euo pipefail
cd "$(dirname "$0")/.."
export PATH="$PATH:/usr/local/go/bin"
echo "[d3-smoke] building storaged + catalogd..."
go build -o bin/ ./cmd/storaged ./cmd/catalogd
# Cleanup any prior processes on D3 ports.
pkill -f "bin/storaged" 2>/dev/null || true
pkill -f "bin/catalogd" 2>/dev/null || true
sleep 0.2
STORAGED_PID=""
CATALOGD_PID=""
TMP="$(mktemp -d)"
cleanup() {
echo "[d3-smoke] cleanup"
if [ -n "$CATALOGD_PID" ]; then kill "$CATALOGD_PID" 2>/dev/null || true; fi
if [ -n "$STORAGED_PID" ]; then kill "$STORAGED_PID" 2>/dev/null || true; fi
rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
# --- launch storaged ---
echo "[d3-smoke] launching storaged..."
./bin/storaged > /tmp/storaged.log 2>&1 &
STORAGED_PID=$!
deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then break; fi
sleep 0.05
done
if ! curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then
echo " [d3-smoke] storaged failed to bind"; tail -10 /tmp/storaged.log; exit 1
fi
# --- clean any prior catalog manifests for a fresh smoke ---
for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=_catalog/manifests/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true
done
# --- launch catalogd (round 1) ---
launch_catalogd() {
./bin/catalogd > /tmp/catalogd.log 2>&1 &
CATALOGD_PID=$!
local deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 http://127.0.0.1:3212/health >/dev/null 2>&1; then return 0; fi
sleep 0.05
done
echo " [d3-smoke] catalogd failed to bind"; tail -10 /tmp/catalogd.log; return 1
}
echo "[d3-smoke] launching catalogd (first start, empty catalog)..."
launch_catalogd
FAILED=0
NAME="d3_smoke_dataset"
FP1="sha256:fingerprint-A"
FP2="sha256:fingerprint-B"
echo "[d3-smoke] POST /catalog/register (fresh):"
RESP="$(curl -sS -X POST http://127.0.0.1:3212/catalog/register \
-H 'Content-Type: application/json' \
-d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP1\",\"objects\":[{\"key\":\"datasets/$NAME/p1.parquet\",\"size\":1024}],\"row_count\":42}")"
EXISTING="$(echo "$RESP" | jq -r '.existing')"
DATASET_ID="$(echo "$RESP" | jq -r '.manifest.dataset_id')"
if [ "$EXISTING" = "false" ] && [ -n "$DATASET_ID" ] && [ "$DATASET_ID" != "null" ]; then
echo " ✓ fresh register → existing=false, dataset_id=$DATASET_ID"
else
echo " ✗ fresh register → $RESP"
FAILED=1
fi
echo "[d3-smoke] GET /catalog/manifest/$NAME:"
GOT="$(curl -sS "http://127.0.0.1:3212/catalog/manifest/$NAME" | jq -r '.dataset_id')"
if [ "$GOT" = "$DATASET_ID" ]; then
echo " ✓ manifest dataset_id matches"
else
echo " ✗ manifest dataset_id: got $GOT, want $DATASET_ID"
FAILED=1
fi
echo "[d3-smoke] GET /catalog/list (1 entry):"
COUNT="$(curl -sS http://127.0.0.1:3212/catalog/list | jq -r '.count')"
if [ "$COUNT" = "1" ]; then
echo " ✓ list count=1"
else
echo " ✗ list count=$COUNT (want 1)"
FAILED=1
fi
echo "[d3-smoke] restart catalogd → rehydrate from Parquet:"
kill "$CATALOGD_PID" 2>/dev/null || true; wait "$CATALOGD_PID" 2>/dev/null || true
launch_catalogd
REHYDRATED_ID="$(curl -sS "http://127.0.0.1:3212/catalog/manifest/$NAME" | jq -r '.dataset_id')"
if [ "$REHYDRATED_ID" = "$DATASET_ID" ]; then
echo " ✓ rehydrated dataset_id matches across restart"
else
echo " ✗ rehydrated dataset_id: got $REHYDRATED_ID, want $DATASET_ID"
FAILED=1
fi
echo "[d3-smoke] re-register (same name + same fingerprint) → existing=true:"
RESP2="$(curl -sS -X POST http://127.0.0.1:3212/catalog/register \
-H 'Content-Type: application/json' \
-d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP1\",\"objects\":[{\"key\":\"datasets/$NAME/p1.parquet\",\"size\":1024},{\"key\":\"datasets/$NAME/p2.parquet\",\"size\":2048}],\"row_count\":84}")"
EXISTING2="$(echo "$RESP2" | jq -r '.existing')"
DATASET_ID2="$(echo "$RESP2" | jq -r '.manifest.dataset_id')"
OBJ_COUNT="$(echo "$RESP2" | jq -r '.manifest.objects | length')"
if [ "$EXISTING2" = "true" ] && [ "$DATASET_ID2" = "$DATASET_ID" ] && [ "$OBJ_COUNT" = "2" ]; then
echo " ✓ existing=true, same dataset_id, objects replaced (count=2)"
else
echo " ✗ idempotent re-register: existing=$EXISTING2 id=$DATASET_ID2 objs=$OBJ_COUNT$RESP2"
FAILED=1
fi
echo "[d3-smoke] re-register (different fingerprint) → 409:"
HTTP="$(curl -sS -o "$TMP/conflict.out" -w '%{http_code}' -X POST http://127.0.0.1:3212/catalog/register \
-H 'Content-Type: application/json' \
-d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP2\",\"objects\":[]}")"
if [ "$HTTP" = "409" ]; then
echo " ✓ different fingerprint → 409 Conflict"
else
echo " ✗ different fingerprint → $HTTP (want 409)"
cat "$TMP/conflict.out"
FAILED=1
fi
# Cleanup smoke manifests.
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_catalog/manifests/$NAME.parquet" || true
if [ "$FAILED" -eq 0 ]; then
echo "[d3-smoke] D3 acceptance gate: PASSED"
exit 0
else
echo "[d3-smoke] D3 acceptance gate: FAILED"
exit 1
fi