root 66a704ca3e G0 D3: catalogd Parquet manifests + ADR-020 idempotent register · 6 scrum fixes
Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory
registry with the ADR-020 idempotency contract (same name+fingerprint
reuses dataset_id; different fingerprint → 409 Conflict), HTTP client
to storaged for persistence, and rehydration on startup. Acceptance
smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the
load-bearing test that the catalog/storaged service split actually
preserves state.

dataset_id derivation diverges from Rust: UUIDv5(namespace, name)
instead of v4 surrogate. Same name on any box generates the same
dataset_id; rehydrate after disk loss converges to the same identity
rather than silently re-issuing. Namespace pinned at
a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued
depends on these bytes.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                       1 BLOCK + 5 WARN + 3 INFO
  - Kimi K2-0905 (openrouter, validated D2):   2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                  2 BLOCK + 2 WARN + 2 INFO

Fixed:
  C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds
  C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern
  S1 split-brain on persist failure → candidate-then-swap
  S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels
  S3 Get/List shallow-copy aliasing → cloneManifest deep copy
  S4 keep-alive socket leak on error paths → drainAndClose helper

Dismissed (false positives, all single-reviewer):
  Kimi BLOCK "Decode crashes on empty Parquet" — already handled
  Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required
  Qwen INFO "rb.NewRecord() error unchecked" — API returns no error

Deferred to G1+: name validation regex, per-call deadlines, Snappy
compression, list pagination continuation tokens (storaged caps at
10k with sentinel for now).

Build clean, vet clean, all tests pass, smoke 6/6 PASS after every
fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced
by arrow-go's minimum.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:36:57 -05:00

220 lines
7.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// registry.go — the in-memory catalog index plus the ADR-020
// idempotent register contract. The register path holds a single
// mutex across (lookup → fingerprint check → storage write →
// in-memory update) to close the check→insert TOCTOU window. The
// historical Rust bug (308× duplicate manifests on re-register) is
// the prior art — don't loosen this lock.
package catalogd
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
)
// ManifestPrefix is the storaged key prefix that holds catalog
// manifests. Objects under this prefix are NOT user data and never
// surface through ingest/query paths.
const ManifestPrefix = "_catalog/manifests/"
// ErrFingerprintConflict is returned by Register when a manifest with
// the same name already exists under a different schema fingerprint.
// HTTP layer maps this to 409 Conflict; gRPC will map to FAILED_PRECONDITION.
var ErrFingerprintConflict = errors.New("catalogd: schema fingerprint conflict")
// ErrManifestNotFound is returned by Get when the requested name has
// no manifest registered.
var ErrManifestNotFound = errors.New("catalogd: manifest not found")
// ErrEmptyName / ErrEmptyFingerprint are returned by Register on
// missing required inputs. HTTP layer maps both to 400. Per scrum S2
// (Opus): sentinel errors so the HTTP boundary uses errors.Is rather
// than substring matching err.Error().
var (
ErrEmptyName = errors.New("catalogd: empty name")
ErrEmptyFingerprint = errors.New("catalogd: empty schema_fingerprint")
)
// Store abstracts the storaged HTTP wire so registry can be unit-
// tested with an in-memory fake.
type Store interface {
Put(ctx context.Context, key string, body []byte) error
Get(ctx context.Context, key string) ([]byte, error)
List(ctx context.Context, prefix string) ([]string, error)
}
// Registry is the in-memory authority. Persistence lives in storaged
// at ManifestPrefix; Registry is rehydrated on startup.
type Registry struct {
mu sync.Mutex
byKey map[string]*Manifest // name → manifest
store Store
now func() time.Time // injectable for tests
}
// NewRegistry builds an empty registry bound to a Store. Call
// Rehydrate after construction to pick up persisted manifests.
func NewRegistry(store Store) *Registry {
return &Registry{
byKey: make(map[string]*Manifest),
store: store,
now: time.Now,
}
}
// Rehydrate lists ManifestPrefix in storaged and decodes every entry
// into the in-memory map. Returns the count of manifests recovered.
// On any per-file decode error, returns immediately so a corrupt
// catalog doesn't half-load and silently lose state.
//
// Per scrum C2 (Opus + Kimi convergent): network I/O happens OUTSIDE
// the registry mutex so a slow storaged doesn't block concurrent
// Register/Get/List. The completed map is swapped in under the lock.
func (r *Registry) Rehydrate(ctx context.Context) (int, error) {
keys, err := r.store.List(ctx, ManifestPrefix)
if err != nil {
return 0, fmt.Errorf("list manifests: %w", err)
}
loaded := make(map[string]*Manifest)
for _, k := range keys {
if !strings.HasPrefix(k, ManifestPrefix) || !strings.HasSuffix(k, ".parquet") {
continue
}
body, err := r.store.Get(ctx, k)
if err != nil {
return len(loaded), fmt.Errorf("get manifest %s: %w", k, err)
}
m, err := Decode(body)
if err != nil {
return len(loaded), fmt.Errorf("decode manifest %s: %w", k, err)
}
loaded[m.Name] = m
}
r.mu.Lock()
defer r.mu.Unlock()
r.byKey = loaded
return len(loaded), nil
}
// Register applies the ADR-020 idempotency contract:
//
// - No prior manifest for name → create (returns existing=false)
// - Prior manifest, same fingerprint → update objects + bump
// updated_at, reuse dataset_id (returns existing=true)
// - Prior manifest, different fingerprint → ErrFingerprintConflict
//
// The mutex is held across the storage write so concurrent calls for
// the same name serialize through the persistence layer (low TPS,
// correctness > throughput).
func (r *Registry) Register(ctx context.Context, name, fingerprint string, objects []Object, rowCount *int64) (*Manifest, bool, error) {
if name == "" {
return nil, false, ErrEmptyName
}
if fingerprint == "" {
return nil, false, ErrEmptyFingerprint
}
r.mu.Lock()
defer r.mu.Unlock()
now := r.now().UTC()
prior, exists := r.byKey[name]
if exists {
if prior.SchemaFingerprint != fingerprint {
return nil, true, fmt.Errorf("%w: name=%q have=%s got=%s",
ErrFingerprintConflict, name, prior.SchemaFingerprint, fingerprint)
}
// Same fingerprint — reuse dataset_id, replace objects, bump updated_at.
// Per scrum S1 (Opus): build candidate, persist, then swap in. Mutating
// `prior` before persist succeeds creates split-brain if storaged is
// down — in-memory advances, disk holds the old state, restart loses
// what callers were told didn't happen.
candidate := *prior
candidate.Objects = objects
candidate.UpdatedAt = now
candidate.RowCount = rowCount
if err := r.persist(ctx, &candidate); err != nil {
return nil, true, err
}
r.byKey[name] = &candidate
return &candidate, true, nil
}
m := &Manifest{
DatasetID: DatasetIDForName(name),
Name: name,
SchemaFingerprint: fingerprint,
Objects: objects,
CreatedAt: now,
UpdatedAt: now,
RowCount: rowCount,
}
if err := r.persist(ctx, m); err != nil {
return nil, false, err
}
r.byKey[name] = m
return m, false, nil
}
// Get returns a deep copy of the manifest for name, or ErrManifestNotFound.
func (r *Registry) Get(name string) (*Manifest, error) {
r.mu.Lock()
defer r.mu.Unlock()
m, ok := r.byKey[name]
if !ok {
return nil, ErrManifestNotFound
}
return cloneManifest(m), nil
}
// List returns deep copies of every manifest, sorted by name.
// Callers may mutate the returned slice and the underlying Manifest
// values without affecting registry state.
func (r *Registry) List() []*Manifest {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]*Manifest, 0, len(r.byKey))
for _, m := range r.byKey {
out = append(out, cloneManifest(m))
}
sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
return out
}
// cloneManifest deep-copies the Objects slice and dereferences
// RowCount into a fresh pointer so a returned manifest cannot alias
// registry state. Per scrum S3 (Opus): the prior `cp := *m` shape
// shared the Objects backing array — caller-side index writes
// corrupted registry state without holding the mutex.
func cloneManifest(m *Manifest) *Manifest {
cp := *m
if m.Objects != nil {
cp.Objects = make([]Object, len(m.Objects))
copy(cp.Objects, m.Objects)
}
if m.RowCount != nil {
v := *m.RowCount
cp.RowCount = &v
}
return &cp
}
// persist encodes the manifest and writes it to storaged at the
// canonical path. Caller MUST hold r.mu — this function does not
// take the lock itself.
func (r *Registry) persist(ctx context.Context, m *Manifest) error {
body, err := Encode(m)
if err != nil {
return fmt.Errorf("encode manifest %s: %w", m.Name, err)
}
key := ManifestPrefix + m.Name + ".parquet"
if err := r.store.Put(ctx, key, body); err != nil {
return fmt.Errorf("persist manifest %s: %w", m.Name, err)
}
return nil
}