G0 D3: catalogd Parquet manifests + ADR-020 idempotent register · 6 scrum fixes

Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory
registry with the ADR-020 idempotency contract (same name+fingerprint
reuses dataset_id; different fingerprint → 409 Conflict), HTTP client
to storaged for persistence, and rehydration on startup. Acceptance
smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the
load-bearing test that the catalog/storaged service split actually
preserves state.

dataset_id derivation diverges from Rust: UUIDv5(namespace, name)
instead of v4 surrogate. Same name on any box generates the same
dataset_id; rehydrate after disk loss converges to the same identity
rather than silently re-issuing. Namespace pinned at
a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued
depends on these bytes.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                       1 BLOCK + 5 WARN + 3 INFO
  - Kimi K2-0905 (openrouter, validated D2):   2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                  2 BLOCK + 2 WARN + 2 INFO

Fixed:
  C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds
  C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern
  S1 split-brain on persist failure → candidate-then-swap
  S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels
  S3 Get/List shallow-copy aliasing → cloneManifest deep copy
  S4 keep-alive socket leak on error paths → drainAndClose helper

Dismissed (false positives, all single-reviewer):
  Kimi BLOCK "Decode crashes on empty Parquet" — already handled
  Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required
  Qwen INFO "rb.NewRecord() error unchecked" — API returns no error

Deferred to G1+: name validation regex, per-call deadlines, Snappy
compression, list pagination continuation tokens (storaged caps at
10k with sentinel for now).

Build clean, vet clean, all tests pass, smoke 6/6 PASS after every
fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced
by arrow-go's minimum.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-28 23:36:57 -05:00
parent 8cfcdb8e5f
commit 66a704ca3e
12 changed files with 1320 additions and 20 deletions

View File

@ -1,16 +1,23 @@
// catalogd is the metadata control plane — dataset registry,
// Parquet manifest persistence, idempotent registration with
// schema-fingerprint gate (per Rust ADR-020). D3 wires the
// register/list/manifest routes; D1 just stands up the binary.
// catalogd is the metadata authority — registers Parquet datasets,
// persists manifests in storaged, rehydrates them on startup, and
// answers GET/list queries. ADR-020 idempotency contract enforced
// by internal/catalogd/registry.go.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"log/slog"
"net/http"
"os"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
)
func main() {
@ -22,14 +29,103 @@ func main() {
slog.Error("config", "err", err)
os.Exit(1)
}
if cfg.Catalogd.StoragedURL == "" {
slog.Error("config", "err", "catalogd.storaged_url is required")
os.Exit(1)
}
if err := shared.Run("catalogd", cfg.Catalogd.Bind, func(_ chi.Router) {
// D3 wires:
// POST /catalog/register (idempotent by name + fingerprint, 409 on drift)
// GET /catalog/manifest/{name}
// GET /catalog/list
}); err != nil {
store := catalogd.NewStoreClient(cfg.Catalogd.StoragedURL)
registry := catalogd.NewRegistry(store)
rehydrateCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
n, err := registry.Rehydrate(rehydrateCtx)
cancel()
if err != nil {
slog.Error("rehydrate", "err", err)
os.Exit(1)
}
slog.Info("rehydrated", "manifests", n)
h := newHandlers(registry)
if err := shared.Run("catalogd", cfg.Catalogd.Bind, h.register); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
}
}
type handlers struct {
reg *catalogd.Registry
}
func newHandlers(r *catalogd.Registry) *handlers { return &handlers{reg: r} }
func (h *handlers) register(r chi.Router) {
r.Post("/catalog/register", h.handleRegister)
r.Get("/catalog/manifest/*", h.handleGetManifest)
r.Get("/catalog/list", h.handleList)
}
// registerRequest mirrors POST body shape.
type registerRequest struct {
Name string `json:"name"`
SchemaFingerprint string `json:"schema_fingerprint"`
Objects []catalogd.Object `json:"objects"`
RowCount *int64 `json:"row_count,omitempty"`
}
// registerResponse adds the existing flag so callers can distinguish
// fresh registration from idempotent re-register.
type registerResponse struct {
Manifest *catalogd.Manifest `json:"manifest"`
Existing bool `json:"existing"`
}
func (h *handlers) handleRegister(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
r.Body = http.MaxBytesReader(w, r.Body, 4<<20) // 4 MiB cap on register payloads
var req registerRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
return
}
m, existing, err := h.reg.Register(r.Context(), req.Name, req.SchemaFingerprint, req.Objects, req.RowCount)
if errors.Is(err, catalogd.ErrFingerprintConflict) {
http.Error(w, err.Error(), http.StatusConflict)
return
}
if errors.Is(err, catalogd.ErrEmptyName) || errors.Is(err, catalogd.ErrEmptyFingerprint) {
// Per scrum S2 (Opus): sentinel-based detection, not substring match.
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err != nil {
slog.Error("register", "name", req.Name, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(registerResponse{Manifest: m, Existing: existing})
}
func (h *handlers) handleGetManifest(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "*")
m, err := h.reg.Get(name)
if errors.Is(err, catalogd.ErrManifestNotFound) {
http.Error(w, "not found", http.StatusNotFound)
return
}
if err != nil {
slog.Error("get manifest", "name", name, "err", err)
http.Error(w, "internal", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(m)
}
func (h *handlers) handleList(w http.ResponseWriter, _ *http.Request) {
items := h.reg.List()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"manifests": items, "count": len(items)})
}

View File

@ -557,3 +557,127 @@ its D1 contribution where it caught two real BLOCKs that Opus missed.
Lineage rotation is real; on a given PR, one lineage may be the only
one finding bugs and another may be confidently wrong. The convergence
filter (≥2 reviewers) is the right gate.
---
## D3 — actual run results (2026-04-28 evening)
Phase G0 Day 3 executed end-to-end. Output of `scripts/d3_smoke.sh`:
```
[d3-smoke] POST /catalog/register (fresh):
✓ fresh register → existing=false, dataset_id=200a05a8-...
[d3-smoke] GET /catalog/manifest/<name>:
✓ manifest dataset_id matches
[d3-smoke] GET /catalog/list (1 entry):
✓ list count=1
[d3-smoke] restart catalogd → rehydrate from Parquet:
✓ rehydrated dataset_id matches across restart
[d3-smoke] re-register (same name + same fingerprint) → existing=true:
✓ existing=true, same dataset_id, objects replaced (count=2)
[d3-smoke] re-register (different fingerprint) → 409:
✓ different fingerprint → 409 Conflict
[d3-smoke] D3 acceptance gate: PASSED
```
What landed:
- `internal/catalogd/manifest.go` — Arrow Parquet codec for the
Manifest type (dataset_id/name/schema_fingerprint/objects-as-list-of-
struct/created_at_ns/updated_at_ns/row_count). One row per Parquet
file. `DatasetIDForName` derives a deterministic UUIDv5 from name
(namespace `a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e`); same name on
any box yields the same dataset_id.
- `internal/catalogd/registry.go` — in-memory `map[name]*Manifest`
with the ADR-020 contract: same name+fingerprint reuses
dataset_id, replaces objects, bumps `updated_at`; different
fingerprint → `ErrFingerprintConflict`. Single mutex serializes
Register across persistence to close the check→insert TOCTOU.
Rehydrate runs storaged I/O OUTSIDE the lock, swaps in the new
map under the lock.
- `internal/catalogd/store_client.go` — HTTP client over
storaged's GET/PUT/DELETE/list. `safeKey` URL-escapes path
segments while preserving `/`. Error paths drain body before
close to keep the keep-alive pool healthy.
- `cmd/catalogd/main.go` — POST /catalog/register, GET
/catalog/manifest/{name}, GET /catalog/list. Sentinel-error
detection (`errors.Is(err, ErrEmptyName)`) for 400s. 4 MiB body
cap on register payloads.
- New `[catalogd]` config: `bind` + `storaged_url`. Default
`http://127.0.0.1:3211`.
- `scripts/d3_smoke.sh` — 6 acceptance probes including
rehydrate-across-restart (the load-bearing ADR-020 contract test).
- `arrow-go/v18` v18.6.0 + `google/uuid` v1.6.0 added; Go 1.24 → 1.25
forced by arrow-go's minimum.
UUIDv5 vs Rust v4: the Go rewrite picks deterministic-from-name. Same
dataset name on any box converges to the same dataset_id; rehydrate
after disk loss can't silently issue new IDs and break downstream
cross-references. Rust's surrogate-UUIDv4 is what the prior system
used; the divergence is intentional and documented at
`internal/catalogd/manifest.go:34`.
Next: D4 — ingestd CSV → Parquet → catalogd register loop.
---
## D3 — code scrum review (3-lineage parallel pass)
After D3 shipped, the actual code went through the same 3-lineage
parallel scrum as D1/D2.
| Pass | Reviewer | Latency | Findings |
|---|---|---|---|
| 1 | `opencode/claude-opus-4-7` | ~30s | 1 BLOCK + 5 WARN + 3 INFO = 9 |
| 2 | `openrouter/moonshotai/kimi-k2-0905` | ~30s | 2 BLOCK + 2 WARN + 1 INFO = 5 |
| 3 | `openrouter/qwen/qwen3-coder` | ~25s | 2 BLOCK + 2 WARN + 2 INFO = 6 |
Total: 20 distinct findings (some convergent across reviewers).
Kimi route shopping from D2 paid off — `openrouter/moonshotai/kimi-k2-0905`
delivered structured output on first attempt, no max_tokens cap, no
empty-content reasoning trap.
### Convergent findings (≥2 reviewers — high confidence)
| # | Severity | Finding | Reviewers | Disposition |
|---|---|---|---|---|
| C1 | BLOCK×3 | `Decode` indexes `listArr.Offsets()[0]/[1]` directly — fragile under array slicing/non-zero offset; panics on malformed Parquet (multi-row reader chunks, single-offset corrupt files) | Opus + Kimi + Qwen | **Fixed** — switched to `listArr.ValueOffsets(0)` (Arrow API that accounts for array offset) + bounds check on `start/end` against `structArr.Len()` |
| C2 | WARN×2 | `Rehydrate` holds the registry mutex across network I/O to storaged — slow storaged blocks all `Register/Get/List`; future re-sync endpoints stall | Opus + Kimi | **Fixed** — list/get/decode happen outside the lock; the completed map is swapped into `r.byKey` under a brief lock |
### Single-reviewer findings (lineage-specific catches)
| # | Severity | Finding | Reviewer | Disposition |
|---|---|---|---|---|
| S1 | WARN | Idempotent re-register mutates the in-memory manifest BEFORE persist succeeds — split-brain on storaged failure (in-memory advances, disk holds old, restart silently rolls back) | Opus | **Fixed** — build candidate copy of prior, persist candidate, swap into `r.byKey` only on persist success |
| S2 | WARN | `cmd/catalogd` detects empty-input via `strings.Contains(err.Error(), "empty name")` — brittle, anyone reformatting the registry error silently demotes a 400 to a 500 | Opus | **Fixed** — exported `ErrEmptyName` / `ErrEmptyFingerprint` sentinels, HTTP layer uses `errors.Is` |
| S3 | WARN | `Get/List` `cp := *m` aliases the `Objects` slice + `RowCount` pointer — caller mutation corrupts registry state under the lock-free read | Opus | **Fixed**`cloneManifest` deep-copies Objects + dereferences RowCount into a fresh `*int64` |
| S4 | WARN | error paths in `store_client` preview body but don't drain before close → HTTP/1.1 keep-alive pool reuse breaks → slow socket leak | Qwen | **Fixed**`drainAndClose` helper reads up to 64 KiB before close on every defer |
| S5 | INFO×3 | name-validation regex / per-call deadline / Snappy compression on manifest writes | Opus | **Deferred** — small, no risk if skipped |
| S6 | WARN | `Rehydrate` aborts on first decode error → partial state | Kimi | **Accepted** — fail-loud > silent-partial is the design; cmd/catalogd `os.Exit(1)`s on rehydrate error so a corrupt manifest is operator-visible at startup |
| S7 | WARN | `store_client.List` ignores pagination — if storaged gains continuation tokens, the client silently drops everything past page 1 | Opus | **Accepted** — storaged caps lists at MaxListResults=10_000 with a `...truncated...` sentinel; no continuation token in the wire format yet (deferred to D7.5+ alongside multi-bucket federation) |
| F1 | BLOCK | `Decode` crashes on empty Parquet (NumRows==0) — `Value(0)` panics | Kimi | **Dismissed** — already handled. `tbl.NumRows() != 1` returns error before any column access; NumRows==0 also fails the subsequent `rr.Next()` check |
| F2 | INFO | `safeKey` double-escapes slashes; should use `url.PathEscape(key)` directly | Kimi | **Dismissed**`url.PathEscape("a/b")` returns `"a%2Fb"`. Splitting on `/` first is **necessary** to preserve the path separator while escaping segment content. The current code is correct; Kimi's suggested fix would break storaged's chi wildcard match |
| F3 | INFO | `rb.NewRecord()` error unchecked | Qwen | **Dismissed** — false signature. `RecordBuilder.NewRecord()` returns `arrow.Record` only; no error to check |
### Cumulative D3 disposition
- **3-lineage parallel pass: 20 distinct findings**
- **Fixed: 6** (2 convergent + 4 single-reviewer)
- **Accepted-with-rationale: 5** (3 INFO + 2 WARN with deferred-by-design rationale)
- **Dismissed (false positives): 3** (1 Kimi BLOCK on already-handled empty case, 1 Kimi INFO on safeKey, 1 Qwen INFO on Arrow API signature)
- Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round.
The C1 triple-confirmation (Opus + Kimi + Qwen all flagging the same
list-offset indexing) is the strongest convergence signal across the
three days. All three correctly diagnosed the underlying issue
(direct `Offsets()` indexing is fragile under array slicing); Opus
named the right Arrow API (`ValueOffsets(0)`), Kimi and Qwen named
the bounds-check shape. Fix uses Opus's API + the others' bounds
discipline together.
S1 (split-brain on persist failure) was the most *consequential*
finding — D3's smoke would have stayed green through every test case
because none of them simulate storaged-down mid-register. Opus alone
caught it; the fix is a 4-line candidate-then-swap pattern but the
class of bug (in-memory advances ahead of disk) is exactly the
distributed-systems hazard ADR-020 was added to prevent at a
different layer.

33
go.mod
View File

@ -1,19 +1,25 @@
module git.agentview.dev/profit/golangLAKEHOUSE
go 1.24
go 1.25.0
require (
github.com/apache/arrow-go/v18 v18.6.0
github.com/aws/aws-sdk-go-v2 v1.41.6
github.com/aws/aws-sdk-go-v2/config v1.32.16
github.com/aws/aws-sdk-go-v2/credentials v1.19.15
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0
github.com/aws/smithy-go v1.25.0
github.com/go-chi/chi/v5 v5.2.5
github.com/google/uuid v1.6.0
github.com/pelletier/go-toml/v2 v2.3.0
)
require (
github.com/aws/aws-sdk-go-v2 v1.41.6 // indirect
github.com/andybalholm/brotli v1.2.1 // indirect
github.com/apache/thrift v0.22.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 // indirect
github.com/aws/aws-sdk-go-v2/config v1.32.16 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.15 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect
@ -21,10 +27,23 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.14 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.22 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.0 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect
github.com/aws/smithy-go v1.25.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/goccy/go-json v0.10.6 // indirect
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
google.golang.org/grpc v1.80.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

74
go.sum
View File

@ -1,3 +1,9 @@
github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro=
github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/apache/arrow-go/v18 v18.6.0 h1:GX/Jyd3R7mCLiECAwY9FWbbaYblie2WXBSz4Sw8fNpM=
github.com/apache/arrow-go/v18 v18.6.0/go.mod h1:gm3MiPpY82fLYK5VKPB3WoJbsiLVDfT7flD5/vHReKw=
github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g=
github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg=
github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.9 h1:adBsCIIpLbLmYnkQU+nAChU5yhVTvu5PerROm+/Kq2A=
@ -36,7 +42,75 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 h1:ks8KBcZPh3PYISr5dAiXCM5/Thcu
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo=
github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U=
github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/goccy/go-json v0.10.6 h1:p8HrPJzOakx/mn/bQtjgNjdTcN+/S6FcG2CTtQOrHVU=
github.com/goccy/go-json v0.10.6/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs=
github.com/google/flatbuffers v25.12.19+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM=
github.com/pelletier/go-toml/v2 v2.3.0/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU=
golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,200 @@
// Package catalogd is the metadata authority — manifests for every
// registered dataset. A Manifest is the catalog row Rust calls
// `Manifest`: dataset_id (deterministic from name via UUIDv5),
// schema_fingerprint (caller-supplied schema hash), the object keys
// that physically back the dataset in storaged, plus timestamps and
// optional row_count.
//
// G0 stores one Manifest per Parquet file at
// primary://_catalog/manifests/<name>.parquet. One row per file —
// catalog manifests are written rarely and read on startup, so the
// per-file shape favors atomic register over storage density.
package catalogd
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/google/uuid"
)
// catalogNamespace is the v5 UUID namespace for dataset_id derivation.
// Same name → same dataset_id across boxes / cold starts. Don't change
// — every dataset_id ever issued depends on this byte sequence.
var catalogNamespace = uuid.MustParse("a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e")
// Object is one storaged key contributing to a dataset.
type Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
}
// Manifest is the catalog row for one dataset.
type Manifest struct {
DatasetID string `json:"dataset_id"`
Name string `json:"name"`
SchemaFingerprint string `json:"schema_fingerprint"`
Objects []Object `json:"objects"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RowCount *int64 `json:"row_count,omitempty"`
}
// DatasetIDForName returns the deterministic UUIDv5 dataset_id for a
// logical dataset name. Idempotent on the same name across boxes.
func DatasetIDForName(name string) string {
return uuid.NewSHA1(catalogNamespace, []byte(name)).String()
}
// manifestArrowSchema is the Arrow schema for the on-disk Parquet.
// Field order matters — codec builders rely on it.
var manifestArrowSchema = arrow.NewSchema([]arrow.Field{
{Name: "dataset_id", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "schema_fingerprint", Type: arrow.BinaryTypes.String},
{Name: "objects", Type: arrow.ListOf(arrow.StructOf(
arrow.Field{Name: "key", Type: arrow.BinaryTypes.String},
arrow.Field{Name: "size", Type: arrow.PrimitiveTypes.Int64},
))},
{Name: "created_at_unix_ns", Type: arrow.PrimitiveTypes.Int64},
{Name: "updated_at_unix_ns", Type: arrow.PrimitiveTypes.Int64},
{Name: "row_count", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
}, nil)
// Encode writes a single Manifest to a Parquet byte slice. Memory
// allocations are bounded — manifests have tens of objects, not
// millions.
func Encode(m *Manifest) ([]byte, error) {
mem := memory.NewGoAllocator()
rb := array.NewRecordBuilder(mem, manifestArrowSchema)
defer rb.Release()
rb.Field(0).(*array.StringBuilder).Append(m.DatasetID)
rb.Field(1).(*array.StringBuilder).Append(m.Name)
rb.Field(2).(*array.StringBuilder).Append(m.SchemaFingerprint)
listB := rb.Field(3).(*array.ListBuilder)
listB.Append(true)
structB := listB.ValueBuilder().(*array.StructBuilder)
keyB := structB.FieldBuilder(0).(*array.StringBuilder)
sizeB := structB.FieldBuilder(1).(*array.Int64Builder)
for _, o := range m.Objects {
structB.Append(true)
keyB.Append(o.Key)
sizeB.Append(o.Size)
}
rb.Field(4).(*array.Int64Builder).Append(m.CreatedAt.UnixNano())
rb.Field(5).(*array.Int64Builder).Append(m.UpdatedAt.UnixNano())
if m.RowCount != nil {
rb.Field(6).(*array.Int64Builder).Append(*m.RowCount)
} else {
rb.Field(6).(*array.Int64Builder).AppendNull()
}
rec := rb.NewRecord()
defer rec.Release()
var buf bytes.Buffer
props := parquet.NewWriterProperties()
arrowProps := pqarrow.NewArrowWriterProperties()
w, err := pqarrow.NewFileWriter(manifestArrowSchema, &buf, props, arrowProps)
if err != nil {
return nil, fmt.Errorf("pqarrow writer: %w", err)
}
if err := w.Write(rec); err != nil {
return nil, fmt.Errorf("pqarrow write: %w", err)
}
if err := w.Close(); err != nil {
return nil, fmt.Errorf("pqarrow close: %w", err)
}
return buf.Bytes(), nil
}
// Decode reads a single-row Parquet manifest back into a Manifest.
func Decode(b []byte) (*Manifest, error) {
rdr, err := file.NewParquetReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("parquet reader: %w", err)
}
defer rdr.Close()
pr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.NewGoAllocator())
if err != nil {
return nil, fmt.Errorf("pqarrow reader: %w", err)
}
tbl, err := pr.ReadTable(context.Background())
if err != nil {
return nil, fmt.Errorf("read table: %w", err)
}
defer tbl.Release()
if tbl.NumRows() != 1 {
return nil, fmt.Errorf("manifest parquet: expected 1 row, got %d", tbl.NumRows())
}
rr := array.NewTableReader(tbl, 1)
defer rr.Release()
if !rr.Next() {
return nil, errors.New("manifest parquet: no record batch")
}
rec := rr.Record()
m := &Manifest{
DatasetID: rec.Column(0).(*array.String).Value(0),
Name: rec.Column(1).(*array.String).Value(0),
SchemaFingerprint: rec.Column(2).(*array.String).Value(0),
}
// Per scrum C1 (3-way convergent): use ValueOffsets which accounts
// for the array's own offset (non-zero under slicing) and is bounds-
// safe by API contract. Direct Offsets()[0]/[1] indexing is fragile
// under multi-row reads and panics on malformed offset buffers.
listArr := rec.Column(3).(*array.List)
structArr := listArr.ListValues().(*array.Struct)
keyArr := structArr.Field(0).(*array.String)
sizeArr := structArr.Field(1).(*array.Int64)
start, end := listArr.ValueOffsets(0)
if start < 0 || end < start || end > int64(structArr.Len()) {
return nil, fmt.Errorf("manifest: bad list offsets [%d, %d] for struct len %d",
start, end, structArr.Len())
}
for i := start; i < end; i++ {
m.Objects = append(m.Objects, Object{
Key: keyArr.Value(int(i)),
Size: sizeArr.Value(int(i)),
})
}
m.CreatedAt = time.Unix(0, rec.Column(4).(*array.Int64).Value(0))
m.UpdatedAt = time.Unix(0, rec.Column(5).(*array.Int64).Value(0))
rcArr := rec.Column(6).(*array.Int64)
if rcArr.IsValid(0) {
v := rcArr.Value(0)
m.RowCount = &v
}
return m, nil
}
// EncodeReader is a small convenience for callers that want an
// io.Reader over the encoded bytes (matches the storaged HTTP PUT
// signature).
func EncodeReader(m *Manifest) (io.Reader, int64, error) {
b, err := Encode(m)
if err != nil {
return nil, 0, err
}
return bytes.NewReader(b), int64(len(b)), nil
}

View File

@ -0,0 +1,103 @@
package catalogd
import (
"testing"
"time"
)
func TestEncodeDecode_RoundTrip(t *testing.T) {
rc := int64(500_000)
now := time.Unix(1777435000, 123456789)
want := &Manifest{
DatasetID: DatasetIDForName("workers_500k"),
Name: "workers_500k",
SchemaFingerprint: "sha256:abcdef",
Objects: []Object{
{Key: "datasets/workers_500k/part-001.parquet", Size: 75 * 1024 * 1024},
{Key: "datasets/workers_500k/part-002.parquet", Size: 12 * 1024 * 1024},
},
CreatedAt: now,
UpdatedAt: now.Add(time.Minute),
RowCount: &rc,
}
b, err := Encode(want)
if err != nil {
t.Fatalf("Encode: %v", err)
}
if len(b) == 0 {
t.Fatal("Encode returned 0 bytes")
}
got, err := Decode(b)
if err != nil {
t.Fatalf("Decode: %v", err)
}
if got.DatasetID != want.DatasetID {
t.Errorf("DatasetID: got %q, want %q", got.DatasetID, want.DatasetID)
}
if got.Name != want.Name {
t.Errorf("Name: got %q, want %q", got.Name, want.Name)
}
if got.SchemaFingerprint != want.SchemaFingerprint {
t.Errorf("SchemaFingerprint: got %q, want %q", got.SchemaFingerprint, want.SchemaFingerprint)
}
if len(got.Objects) != 2 {
t.Fatalf("Objects: got %d, want 2", len(got.Objects))
}
for i, o := range got.Objects {
if o.Key != want.Objects[i].Key || o.Size != want.Objects[i].Size {
t.Errorf("Objects[%d]: got %+v, want %+v", i, o, want.Objects[i])
}
}
// Times round-trip via UnixNano so equality is on the nanosecond.
if !got.CreatedAt.Equal(want.CreatedAt) {
t.Errorf("CreatedAt: got %v, want %v", got.CreatedAt, want.CreatedAt)
}
if !got.UpdatedAt.Equal(want.UpdatedAt) {
t.Errorf("UpdatedAt: got %v, want %v", got.UpdatedAt, want.UpdatedAt)
}
if got.RowCount == nil || *got.RowCount != *want.RowCount {
t.Errorf("RowCount: got %v, want %v", got.RowCount, want.RowCount)
}
}
func TestEncodeDecode_NoRowCount(t *testing.T) {
want := &Manifest{
DatasetID: DatasetIDForName("unknown_size"),
Name: "unknown_size",
SchemaFingerprint: "sha256:zero",
Objects: []Object{},
CreatedAt: time.Unix(0, 0),
UpdatedAt: time.Unix(0, 0),
RowCount: nil,
}
b, err := Encode(want)
if err != nil {
t.Fatal(err)
}
got, err := Decode(b)
if err != nil {
t.Fatal(err)
}
if got.RowCount != nil {
t.Errorf("RowCount: got %v, want nil", got.RowCount)
}
if len(got.Objects) != 0 {
t.Errorf("Objects: got %d, want 0", len(got.Objects))
}
}
func TestDatasetIDForName_Deterministic(t *testing.T) {
a := DatasetIDForName("workers_500k")
b := DatasetIDForName("workers_500k")
if a != b {
t.Errorf("DatasetIDForName not deterministic: %q vs %q", a, b)
}
c := DatasetIDForName("different")
if a == c {
t.Errorf("DatasetIDForName collided across distinct names")
}
}

View File

@ -0,0 +1,219 @@
// registry.go — the in-memory catalog index plus the ADR-020
// idempotent register contract. The register path holds a single
// mutex across (lookup → fingerprint check → storage write →
// in-memory update) to close the check→insert TOCTOU window. The
// historical Rust bug (308× duplicate manifests on re-register) is
// the prior art — don't loosen this lock.
package catalogd
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
)
// ManifestPrefix is the storaged key prefix that holds catalog
// manifests. Objects under this prefix are NOT user data and never
// surface through ingest/query paths.
const ManifestPrefix = "_catalog/manifests/"
// ErrFingerprintConflict is returned by Register when a manifest with
// the same name already exists under a different schema fingerprint.
// HTTP layer maps this to 409 Conflict; gRPC will map to FAILED_PRECONDITION.
var ErrFingerprintConflict = errors.New("catalogd: schema fingerprint conflict")
// ErrManifestNotFound is returned by Get when the requested name has
// no manifest registered.
var ErrManifestNotFound = errors.New("catalogd: manifest not found")
// ErrEmptyName / ErrEmptyFingerprint are returned by Register on
// missing required inputs. HTTP layer maps both to 400. Per scrum S2
// (Opus): sentinel errors so the HTTP boundary uses errors.Is rather
// than substring matching err.Error().
var (
ErrEmptyName = errors.New("catalogd: empty name")
ErrEmptyFingerprint = errors.New("catalogd: empty schema_fingerprint")
)
// Store abstracts the storaged HTTP wire so registry can be unit-
// tested with an in-memory fake.
type Store interface {
Put(ctx context.Context, key string, body []byte) error
Get(ctx context.Context, key string) ([]byte, error)
List(ctx context.Context, prefix string) ([]string, error)
}
// Registry is the in-memory authority. Persistence lives in storaged
// at ManifestPrefix; Registry is rehydrated on startup.
type Registry struct {
mu sync.Mutex
byKey map[string]*Manifest // name → manifest
store Store
now func() time.Time // injectable for tests
}
// NewRegistry builds an empty registry bound to a Store. Call
// Rehydrate after construction to pick up persisted manifests.
func NewRegistry(store Store) *Registry {
return &Registry{
byKey: make(map[string]*Manifest),
store: store,
now: time.Now,
}
}
// Rehydrate lists ManifestPrefix in storaged and decodes every entry
// into the in-memory map. Returns the count of manifests recovered.
// On any per-file decode error, returns immediately so a corrupt
// catalog doesn't half-load and silently lose state.
//
// Per scrum C2 (Opus + Kimi convergent): network I/O happens OUTSIDE
// the registry mutex so a slow storaged doesn't block concurrent
// Register/Get/List. The completed map is swapped in under the lock.
func (r *Registry) Rehydrate(ctx context.Context) (int, error) {
keys, err := r.store.List(ctx, ManifestPrefix)
if err != nil {
return 0, fmt.Errorf("list manifests: %w", err)
}
loaded := make(map[string]*Manifest)
for _, k := range keys {
if !strings.HasPrefix(k, ManifestPrefix) || !strings.HasSuffix(k, ".parquet") {
continue
}
body, err := r.store.Get(ctx, k)
if err != nil {
return len(loaded), fmt.Errorf("get manifest %s: %w", k, err)
}
m, err := Decode(body)
if err != nil {
return len(loaded), fmt.Errorf("decode manifest %s: %w", k, err)
}
loaded[m.Name] = m
}
r.mu.Lock()
defer r.mu.Unlock()
r.byKey = loaded
return len(loaded), nil
}
// Register applies the ADR-020 idempotency contract:
//
// - No prior manifest for name → create (returns existing=false)
// - Prior manifest, same fingerprint → update objects + bump
// updated_at, reuse dataset_id (returns existing=true)
// - Prior manifest, different fingerprint → ErrFingerprintConflict
//
// The mutex is held across the storage write so concurrent calls for
// the same name serialize through the persistence layer (low TPS,
// correctness > throughput).
func (r *Registry) Register(ctx context.Context, name, fingerprint string, objects []Object, rowCount *int64) (*Manifest, bool, error) {
if name == "" {
return nil, false, ErrEmptyName
}
if fingerprint == "" {
return nil, false, ErrEmptyFingerprint
}
r.mu.Lock()
defer r.mu.Unlock()
now := r.now().UTC()
prior, exists := r.byKey[name]
if exists {
if prior.SchemaFingerprint != fingerprint {
return nil, true, fmt.Errorf("%w: name=%q have=%s got=%s",
ErrFingerprintConflict, name, prior.SchemaFingerprint, fingerprint)
}
// Same fingerprint — reuse dataset_id, replace objects, bump updated_at.
// Per scrum S1 (Opus): build candidate, persist, then swap in. Mutating
// `prior` before persist succeeds creates split-brain if storaged is
// down — in-memory advances, disk holds the old state, restart loses
// what callers were told didn't happen.
candidate := *prior
candidate.Objects = objects
candidate.UpdatedAt = now
candidate.RowCount = rowCount
if err := r.persist(ctx, &candidate); err != nil {
return nil, true, err
}
r.byKey[name] = &candidate
return &candidate, true, nil
}
m := &Manifest{
DatasetID: DatasetIDForName(name),
Name: name,
SchemaFingerprint: fingerprint,
Objects: objects,
CreatedAt: now,
UpdatedAt: now,
RowCount: rowCount,
}
if err := r.persist(ctx, m); err != nil {
return nil, false, err
}
r.byKey[name] = m
return m, false, nil
}
// Get returns a deep copy of the manifest for name, or ErrManifestNotFound.
func (r *Registry) Get(name string) (*Manifest, error) {
r.mu.Lock()
defer r.mu.Unlock()
m, ok := r.byKey[name]
if !ok {
return nil, ErrManifestNotFound
}
return cloneManifest(m), nil
}
// List returns deep copies of every manifest, sorted by name.
// Callers may mutate the returned slice and the underlying Manifest
// values without affecting registry state.
func (r *Registry) List() []*Manifest {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]*Manifest, 0, len(r.byKey))
for _, m := range r.byKey {
out = append(out, cloneManifest(m))
}
sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
return out
}
// cloneManifest deep-copies the Objects slice and dereferences
// RowCount into a fresh pointer so a returned manifest cannot alias
// registry state. Per scrum S3 (Opus): the prior `cp := *m` shape
// shared the Objects backing array — caller-side index writes
// corrupted registry state without holding the mutex.
func cloneManifest(m *Manifest) *Manifest {
cp := *m
if m.Objects != nil {
cp.Objects = make([]Object, len(m.Objects))
copy(cp.Objects, m.Objects)
}
if m.RowCount != nil {
v := *m.RowCount
cp.RowCount = &v
}
return &cp
}
// persist encodes the manifest and writes it to storaged at the
// canonical path. Caller MUST hold r.mu — this function does not
// take the lock itself.
func (r *Registry) persist(ctx context.Context, m *Manifest) error {
body, err := Encode(m)
if err != nil {
return fmt.Errorf("encode manifest %s: %w", m.Name, err)
}
key := ManifestPrefix + m.Name + ".parquet"
if err := r.store.Put(ctx, key, body); err != nil {
return fmt.Errorf("persist manifest %s: %w", m.Name, err)
}
return nil
}

View File

@ -0,0 +1,160 @@
package catalogd
import (
"context"
"errors"
"sync"
"testing"
"time"
)
// memStore is an in-memory Store fake for unit tests.
type memStore struct {
mu sync.Mutex
data map[string][]byte
}
func newMemStore() *memStore { return &memStore{data: map[string][]byte{}} }
func (m *memStore) Put(_ context.Context, key string, body []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
cp := make([]byte, len(body))
copy(cp, body)
m.data[key] = cp
return nil
}
func (m *memStore) Get(_ context.Context, key string) ([]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()
b, ok := m.data[key]
if !ok {
return nil, ErrKeyNotFound
}
return b, nil
}
func (m *memStore) List(_ context.Context, prefix string) ([]string, error) {
m.mu.Lock()
defer m.mu.Unlock()
out := []string{}
for k := range m.data {
if len(k) >= len(prefix) && k[:len(prefix)] == prefix {
out = append(out, k)
}
}
return out, nil
}
func mkRegistry(t *testing.T) (*Registry, *memStore) {
t.Helper()
s := newMemStore()
r := NewRegistry(s)
r.now = func() time.Time { return time.Unix(1777435000, 0).UTC() }
return r, s
}
func TestRegister_NewManifest(t *testing.T) {
r, _ := mkRegistry(t)
rc := int64(100)
m, existing, err := r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "datasets/workers/p1.parquet", Size: 1024}}, &rc)
if err != nil {
t.Fatalf("Register: %v", err)
}
if existing {
t.Error("expected existing=false for new manifest")
}
if m.DatasetID != DatasetIDForName("workers") {
t.Errorf("DatasetID: got %q, want UUIDv5(workers)", m.DatasetID)
}
}
func TestRegister_SameFingerprint_Idempotent(t *testing.T) {
r, _ := mkRegistry(t)
rc := int64(100)
first, _, _ := r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "p1.parquet", Size: 1024}}, &rc)
// Re-register same name + fingerprint with new objects.
rc2 := int64(200)
second, existing, err := r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "p1.parquet", Size: 1024}, {Key: "p2.parquet", Size: 2048}}, &rc2)
if err != nil {
t.Fatalf("Register (idempotent): %v", err)
}
if !existing {
t.Error("expected existing=true on idempotent re-register")
}
if second.DatasetID != first.DatasetID {
t.Errorf("DatasetID changed: %q → %q", first.DatasetID, second.DatasetID)
}
if len(second.Objects) != 2 {
t.Errorf("Objects not replaced: got %d, want 2", len(second.Objects))
}
if second.RowCount == nil || *second.RowCount != 200 {
t.Errorf("RowCount not bumped: got %v, want 200", second.RowCount)
}
}
func TestRegister_DifferentFingerprint_Conflict(t *testing.T) {
r, _ := mkRegistry(t)
_, _, _ = r.Register(context.Background(), "workers", "sha256:abc",
[]Object{{Key: "p1.parquet", Size: 1024}}, nil)
_, _, err := r.Register(context.Background(), "workers", "sha256:DIFFERENT",
[]Object{{Key: "p1.parquet", Size: 1024}}, nil)
if !errors.Is(err, ErrFingerprintConflict) {
t.Fatalf("expected ErrFingerprintConflict, got %v", err)
}
}
func TestRehydrate_RecoversManifests(t *testing.T) {
// Build first registry, register 2 manifests.
r1, store := mkRegistry(t)
_, _, _ = r1.Register(context.Background(), "workers", "sha256:a", nil, nil)
_, _, _ = r1.Register(context.Background(), "candidates", "sha256:b", nil, nil)
// Build a second registry against the same store + rehydrate.
r2 := NewRegistry(store)
n, err := r2.Rehydrate(context.Background())
if err != nil {
t.Fatalf("Rehydrate: %v", err)
}
if n != 2 {
t.Errorf("recovered %d, want 2", n)
}
if _, err := r2.Get("workers"); err != nil {
t.Errorf("Get(workers): %v", err)
}
if _, err := r2.Get("candidates"); err != nil {
t.Errorf("Get(candidates): %v", err)
}
}
func TestList_Sorted(t *testing.T) {
r, _ := mkRegistry(t)
_, _, _ = r.Register(context.Background(), "zoo", "fp", nil, nil)
_, _, _ = r.Register(context.Background(), "alpha", "fp", nil, nil)
_, _, _ = r.Register(context.Background(), "midway", "fp", nil, nil)
got := r.List()
want := []string{"alpha", "midway", "zoo"}
for i, m := range got {
if m.Name != want[i] {
t.Errorf("List[%d]: got %q, want %q", i, m.Name, want[i])
}
}
}
func TestRegister_RejectsEmptyInputs(t *testing.T) {
r, _ := mkRegistry(t)
_, _, err := r.Register(context.Background(), "", "fp", nil, nil)
if err == nil {
t.Error("expected error on empty name")
}
_, _, err = r.Register(context.Background(), "x", "", nil, nil)
if err == nil {
t.Error("expected error on empty fingerprint")
}
}

View File

@ -0,0 +1,140 @@
// store_client.go — thin HTTP client to storaged. catalogd needs to
// PUT manifest Parquets, GET them on startup, and LIST the manifests
// directory. Staying inside an HTTP boundary (rather than reaching
// into storaged's package directly) preserves the service-boundary
// shape that gRPC will eventually formalize at G1+.
package catalogd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
// StoreClient talks HTTP to the storaged service.
type StoreClient struct {
baseURL string
hc *http.Client
}
// listResponse mirrors storaged's GET /storage/list shape:
//
// {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]}
type listResponse struct {
Prefix string `json:"prefix"`
Objects []struct {
Key string `json:"Key"`
Size int64 `json:"Size"`
} `json:"objects"`
}
// NewStoreClient builds a client against the given storaged base URL
// (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write
// only; rehydration of many manifests at startup runs sequentially
// and each call gets its own timeout window.
func NewStoreClient(baseURL string) *StoreClient {
return &StoreClient{
baseURL: strings.TrimRight(baseURL, "/"),
hc: &http.Client{Timeout: 30 * time.Second},
}
}
// Put writes raw bytes at key. body is the encoded Parquet manifest.
func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error {
u := c.baseURL + "/storage/put/" + safeKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("put req: %w", err)
}
req.ContentLength = int64(len(body))
resp, err := c.hc.Do(req)
if err != nil {
return fmt.Errorf("put do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return nil
}
// Get reads the bytes at key. ErrKeyNotFound on 404.
func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) {
u := c.baseURL + "/storage/get/" + safeKey(key)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("get req: %w", err)
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("get do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode == http.StatusNotFound {
return nil, ErrKeyNotFound
}
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("get %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return io.ReadAll(resp.Body)
}
// List returns the keys under prefix. Object metadata beyond Key is
// ignored — catalogd only needs the keys to drive rehydration.
func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) {
u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("list req: %w", err)
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, fmt.Errorf("list do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("list %s: status %d: %s", prefix, resp.StatusCode, string(preview))
}
var lr listResponse
if err := json.NewDecoder(resp.Body).Decode(&lr); err != nil {
return nil, fmt.Errorf("list decode: %w", err)
}
out := make([]string, 0, len(lr.Objects))
for _, o := range lr.Objects {
out = append(out, o.Key)
}
return out, nil
}
// ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd
// side without exposing storaged's package types.
var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found")
// safeKey URL-escapes path segments while preserving "/". storaged's
// chi `/storage/<verb>/*` routes accept literal slashes in the
// wildcard match, so we only escape the segments, not the separators.
func safeKey(key string) string {
parts := strings.Split(key, "/")
for i, p := range parts {
parts[i] = url.PathEscape(p)
}
return strings.Join(parts, "/")
}
// drainAndClose reads any remaining body bytes (capped at 64 KiB) and
// closes the body. Per scrum S6 (Qwen): preview-then-close on error
// paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep-
// alive reuse and slowly leaking sockets.
func drainAndClose(body io.ReadCloser) {
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
_ = body.Close()
}

View File

@ -21,13 +21,21 @@ import (
type Config struct {
Gateway ServiceConfig `toml:"gateway"`
Storaged ServiceConfig `toml:"storaged"`
Catalogd ServiceConfig `toml:"catalogd"`
Catalogd CatalogConfig `toml:"catalogd"`
Ingestd ServiceConfig `toml:"ingestd"`
Queryd ServiceConfig `toml:"queryd"`
S3 S3Config `toml:"s3"`
Log LogConfig `toml:"log"`
}
// CatalogConfig adds catalogd-specific knobs on top of the standard
// bind. StoragedURL points at the storaged service for manifest
// persistence; G0 defaults to the localhost bind.
type CatalogConfig struct {
Bind string `toml:"bind"`
StoragedURL string `toml:"storaged_url"`
}
// ServiceConfig is the per-binary bind config. Default Bind ""
// means "use the service's hardcoded G0 default" — see DefaultConfig.
type ServiceConfig struct {
@ -57,7 +65,7 @@ func DefaultConfig() Config {
return Config{
Gateway: ServiceConfig{Bind: "127.0.0.1:3110"},
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
Catalogd: ServiceConfig{Bind: "127.0.0.1:3212"},
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
Ingestd: ServiceConfig{Bind: "127.0.0.1:3213"},
Queryd: ServiceConfig{Bind: "127.0.0.1:3214"},
S3: S3Config{

View File

@ -12,6 +12,7 @@ bind = "127.0.0.1:3211"
[catalogd]
bind = "127.0.0.1:3212"
storaged_url = "http://127.0.0.1:3211"
[ingestd]
bind = "127.0.0.1:3213"

156
scripts/d3_smoke.sh Executable file
View File

@ -0,0 +1,156 @@
#!/usr/bin/env bash
# D3 smoke — proves the Day 3 acceptance gate end-to-end.
#
# Validates:
# - Register a fresh dataset → 200 with existing=false, dataset_id from UUIDv5(name)
# - GET /catalog/manifest/<name> → manifest matches what we registered
# - GET /catalog/list → manifest listed
# - Restart catalogd → /catalog/list still shows it (Parquet-backed rehydrate)
# - Re-register same name + same fingerprint → 200, existing=true, same dataset_id
# - Re-register same name + different fingerprint → 409 Conflict
#
# Requires storaged (D2) running on :3211 and reachable.
#
# Usage: ./scripts/d3_smoke.sh
set -euo pipefail
cd "$(dirname "$0")/.."
export PATH="$PATH:/usr/local/go/bin"
echo "[d3-smoke] building storaged + catalogd..."
go build -o bin/ ./cmd/storaged ./cmd/catalogd
# Cleanup any prior processes on D3 ports.
pkill -f "bin/storaged" 2>/dev/null || true
pkill -f "bin/catalogd" 2>/dev/null || true
sleep 0.2
STORAGED_PID=""
CATALOGD_PID=""
TMP="$(mktemp -d)"
cleanup() {
echo "[d3-smoke] cleanup"
if [ -n "$CATALOGD_PID" ]; then kill "$CATALOGD_PID" 2>/dev/null || true; fi
if [ -n "$STORAGED_PID" ]; then kill "$STORAGED_PID" 2>/dev/null || true; fi
rm -rf "$TMP"
}
trap cleanup EXIT INT TERM
# --- launch storaged ---
echo "[d3-smoke] launching storaged..."
./bin/storaged > /tmp/storaged.log 2>&1 &
STORAGED_PID=$!
deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then break; fi
sleep 0.05
done
if ! curl -sS --max-time 1 http://127.0.0.1:3211/health >/dev/null 2>&1; then
echo " [d3-smoke] storaged failed to bind"; tail -10 /tmp/storaged.log; exit 1
fi
# --- clean any prior catalog manifests for a fresh smoke ---
for k in $(curl -sS "http://127.0.0.1:3211/storage/list?prefix=_catalog/manifests/" | jq -r '.objects[]?.Key // empty' 2>/dev/null); do
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/$k" || true
done
# --- launch catalogd (round 1) ---
launch_catalogd() {
./bin/catalogd > /tmp/catalogd.log 2>&1 &
CATALOGD_PID=$!
local deadline=$(($(date +%s) + 5))
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sS --max-time 1 http://127.0.0.1:3212/health >/dev/null 2>&1; then return 0; fi
sleep 0.05
done
echo " [d3-smoke] catalogd failed to bind"; tail -10 /tmp/catalogd.log; return 1
}
echo "[d3-smoke] launching catalogd (first start, empty catalog)..."
launch_catalogd
FAILED=0
NAME="d3_smoke_dataset"
FP1="sha256:fingerprint-A"
FP2="sha256:fingerprint-B"
echo "[d3-smoke] POST /catalog/register (fresh):"
RESP="$(curl -sS -X POST http://127.0.0.1:3212/catalog/register \
-H 'Content-Type: application/json' \
-d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP1\",\"objects\":[{\"key\":\"datasets/$NAME/p1.parquet\",\"size\":1024}],\"row_count\":42}")"
EXISTING="$(echo "$RESP" | jq -r '.existing')"
DATASET_ID="$(echo "$RESP" | jq -r '.manifest.dataset_id')"
if [ "$EXISTING" = "false" ] && [ -n "$DATASET_ID" ] && [ "$DATASET_ID" != "null" ]; then
echo " ✓ fresh register → existing=false, dataset_id=$DATASET_ID"
else
echo " ✗ fresh register → $RESP"
FAILED=1
fi
echo "[d3-smoke] GET /catalog/manifest/$NAME:"
GOT="$(curl -sS "http://127.0.0.1:3212/catalog/manifest/$NAME" | jq -r '.dataset_id')"
if [ "$GOT" = "$DATASET_ID" ]; then
echo " ✓ manifest dataset_id matches"
else
echo " ✗ manifest dataset_id: got $GOT, want $DATASET_ID"
FAILED=1
fi
echo "[d3-smoke] GET /catalog/list (1 entry):"
COUNT="$(curl -sS http://127.0.0.1:3212/catalog/list | jq -r '.count')"
if [ "$COUNT" = "1" ]; then
echo " ✓ list count=1"
else
echo " ✗ list count=$COUNT (want 1)"
FAILED=1
fi
echo "[d3-smoke] restart catalogd → rehydrate from Parquet:"
kill "$CATALOGD_PID" 2>/dev/null || true; wait "$CATALOGD_PID" 2>/dev/null || true
launch_catalogd
REHYDRATED_ID="$(curl -sS "http://127.0.0.1:3212/catalog/manifest/$NAME" | jq -r '.dataset_id')"
if [ "$REHYDRATED_ID" = "$DATASET_ID" ]; then
echo " ✓ rehydrated dataset_id matches across restart"
else
echo " ✗ rehydrated dataset_id: got $REHYDRATED_ID, want $DATASET_ID"
FAILED=1
fi
echo "[d3-smoke] re-register (same name + same fingerprint) → existing=true:"
RESP2="$(curl -sS -X POST http://127.0.0.1:3212/catalog/register \
-H 'Content-Type: application/json' \
-d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP1\",\"objects\":[{\"key\":\"datasets/$NAME/p1.parquet\",\"size\":1024},{\"key\":\"datasets/$NAME/p2.parquet\",\"size\":2048}],\"row_count\":84}")"
EXISTING2="$(echo "$RESP2" | jq -r '.existing')"
DATASET_ID2="$(echo "$RESP2" | jq -r '.manifest.dataset_id')"
OBJ_COUNT="$(echo "$RESP2" | jq -r '.manifest.objects | length')"
if [ "$EXISTING2" = "true" ] && [ "$DATASET_ID2" = "$DATASET_ID" ] && [ "$OBJ_COUNT" = "2" ]; then
echo " ✓ existing=true, same dataset_id, objects replaced (count=2)"
else
echo " ✗ idempotent re-register: existing=$EXISTING2 id=$DATASET_ID2 objs=$OBJ_COUNT$RESP2"
FAILED=1
fi
echo "[d3-smoke] re-register (different fingerprint) → 409:"
HTTP="$(curl -sS -o "$TMP/conflict.out" -w '%{http_code}' -X POST http://127.0.0.1:3212/catalog/register \
-H 'Content-Type: application/json' \
-d "{\"name\":\"$NAME\",\"schema_fingerprint\":\"$FP2\",\"objects\":[]}")"
if [ "$HTTP" = "409" ]; then
echo " ✓ different fingerprint → 409 Conflict"
else
echo " ✗ different fingerprint → $HTTP (want 409)"
cat "$TMP/conflict.out"
FAILED=1
fi
# Cleanup smoke manifests.
curl -sS -o /dev/null -X DELETE "http://127.0.0.1:3211/storage/delete/_catalog/manifests/$NAME.parquet" || true
if [ "$FAILED" -eq 0 ]; then
echo "[d3-smoke] D3 acceptance gate: PASSED"
exit 0
else
echo "[d3-smoke] D3 acceptance gate: FAILED"
exit 1
fi