Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory registry with the ADR-020 idempotency contract (same name+fingerprint reuses dataset_id; different fingerprint → 409 Conflict), HTTP client to storaged for persistence, and rehydration on startup. Acceptance smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the load-bearing test that the catalog/storaged service split actually preserves state. dataset_id derivation diverges from Rust: UUIDv5(namespace, name) instead of v4 surrogate. Same name on any box generates the same dataset_id; rehydrate after disk loss converges to the same identity rather than silently re-issuing. Namespace pinned at a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued depends on these bytes. Cross-lineage scrum on shipped code: - Opus 4.7 (opencode): 1 BLOCK + 5 WARN + 3 INFO - Kimi K2-0905 (openrouter, validated D2): 2 BLOCK + 2 WARN + 1 INFO - Qwen3-coder (openrouter): 2 BLOCK + 2 WARN + 2 INFO Fixed: C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern S1 split-brain on persist failure → candidate-then-swap S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels S3 Get/List shallow-copy aliasing → cloneManifest deep copy S4 keep-alive socket leak on error paths → drainAndClose helper Dismissed (false positives, all single-reviewer): Kimi BLOCK "Decode crashes on empty Parquet" — already handled Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required Qwen INFO "rb.NewRecord() error unchecked" — API returns no error Deferred to G1+: name validation regex, per-call deadlines, Snappy compression, list pagination continuation tokens (storaged caps at 10k with sentinel for now). Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced by arrow-go's minimum. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
220 lines
7.2 KiB
Go
220 lines
7.2 KiB
Go
// registry.go — the in-memory catalog index plus the ADR-020
|
||
// idempotent register contract. The register path holds a single
|
||
// mutex across (lookup → fingerprint check → storage write →
|
||
// in-memory update) to close the check→insert TOCTOU window. The
|
||
// historical Rust bug (308× duplicate manifests on re-register) is
|
||
// the prior art — don't loosen this lock.
|
||
package catalogd
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// ManifestPrefix is the storaged key prefix that holds catalog
|
||
// manifests. Objects under this prefix are NOT user data and never
|
||
// surface through ingest/query paths.
|
||
const ManifestPrefix = "_catalog/manifests/"
|
||
|
||
// ErrFingerprintConflict is returned by Register when a manifest with
|
||
// the same name already exists under a different schema fingerprint.
|
||
// HTTP layer maps this to 409 Conflict; gRPC will map to FAILED_PRECONDITION.
|
||
var ErrFingerprintConflict = errors.New("catalogd: schema fingerprint conflict")
|
||
|
||
// ErrManifestNotFound is returned by Get when the requested name has
|
||
// no manifest registered.
|
||
var ErrManifestNotFound = errors.New("catalogd: manifest not found")
|
||
|
||
// ErrEmptyName / ErrEmptyFingerprint are returned by Register on
|
||
// missing required inputs. HTTP layer maps both to 400. Per scrum S2
|
||
// (Opus): sentinel errors so the HTTP boundary uses errors.Is rather
|
||
// than substring matching err.Error().
|
||
var (
|
||
ErrEmptyName = errors.New("catalogd: empty name")
|
||
ErrEmptyFingerprint = errors.New("catalogd: empty schema_fingerprint")
|
||
)
|
||
|
||
// Store abstracts the storaged HTTP wire so registry can be unit-
|
||
// tested with an in-memory fake.
|
||
type Store interface {
|
||
Put(ctx context.Context, key string, body []byte) error
|
||
Get(ctx context.Context, key string) ([]byte, error)
|
||
List(ctx context.Context, prefix string) ([]string, error)
|
||
}
|
||
|
||
// Registry is the in-memory authority. Persistence lives in storaged
|
||
// at ManifestPrefix; Registry is rehydrated on startup.
|
||
type Registry struct {
|
||
mu sync.Mutex
|
||
byKey map[string]*Manifest // name → manifest
|
||
store Store
|
||
now func() time.Time // injectable for tests
|
||
}
|
||
|
||
// NewRegistry builds an empty registry bound to a Store. Call
|
||
// Rehydrate after construction to pick up persisted manifests.
|
||
func NewRegistry(store Store) *Registry {
|
||
return &Registry{
|
||
byKey: make(map[string]*Manifest),
|
||
store: store,
|
||
now: time.Now,
|
||
}
|
||
}
|
||
|
||
// Rehydrate lists ManifestPrefix in storaged and decodes every entry
|
||
// into the in-memory map. Returns the count of manifests recovered.
|
||
// On any per-file decode error, returns immediately so a corrupt
|
||
// catalog doesn't half-load and silently lose state.
|
||
//
|
||
// Per scrum C2 (Opus + Kimi convergent): network I/O happens OUTSIDE
|
||
// the registry mutex so a slow storaged doesn't block concurrent
|
||
// Register/Get/List. The completed map is swapped in under the lock.
|
||
func (r *Registry) Rehydrate(ctx context.Context) (int, error) {
|
||
keys, err := r.store.List(ctx, ManifestPrefix)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("list manifests: %w", err)
|
||
}
|
||
loaded := make(map[string]*Manifest)
|
||
for _, k := range keys {
|
||
if !strings.HasPrefix(k, ManifestPrefix) || !strings.HasSuffix(k, ".parquet") {
|
||
continue
|
||
}
|
||
body, err := r.store.Get(ctx, k)
|
||
if err != nil {
|
||
return len(loaded), fmt.Errorf("get manifest %s: %w", k, err)
|
||
}
|
||
m, err := Decode(body)
|
||
if err != nil {
|
||
return len(loaded), fmt.Errorf("decode manifest %s: %w", k, err)
|
||
}
|
||
loaded[m.Name] = m
|
||
}
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
r.byKey = loaded
|
||
return len(loaded), nil
|
||
}
|
||
|
||
// Register applies the ADR-020 idempotency contract:
|
||
//
|
||
// - No prior manifest for name → create (returns existing=false)
|
||
// - Prior manifest, same fingerprint → update objects + bump
|
||
// updated_at, reuse dataset_id (returns existing=true)
|
||
// - Prior manifest, different fingerprint → ErrFingerprintConflict
|
||
//
|
||
// The mutex is held across the storage write so concurrent calls for
|
||
// the same name serialize through the persistence layer (low TPS,
|
||
// correctness > throughput).
|
||
func (r *Registry) Register(ctx context.Context, name, fingerprint string, objects []Object, rowCount *int64) (*Manifest, bool, error) {
|
||
if name == "" {
|
||
return nil, false, ErrEmptyName
|
||
}
|
||
if fingerprint == "" {
|
||
return nil, false, ErrEmptyFingerprint
|
||
}
|
||
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
|
||
now := r.now().UTC()
|
||
prior, exists := r.byKey[name]
|
||
if exists {
|
||
if prior.SchemaFingerprint != fingerprint {
|
||
return nil, true, fmt.Errorf("%w: name=%q have=%s got=%s",
|
||
ErrFingerprintConflict, name, prior.SchemaFingerprint, fingerprint)
|
||
}
|
||
// Same fingerprint — reuse dataset_id, replace objects, bump updated_at.
|
||
// Per scrum S1 (Opus): build candidate, persist, then swap in. Mutating
|
||
// `prior` before persist succeeds creates split-brain if storaged is
|
||
// down — in-memory advances, disk holds the old state, restart loses
|
||
// what callers were told didn't happen.
|
||
candidate := *prior
|
||
candidate.Objects = objects
|
||
candidate.UpdatedAt = now
|
||
candidate.RowCount = rowCount
|
||
if err := r.persist(ctx, &candidate); err != nil {
|
||
return nil, true, err
|
||
}
|
||
r.byKey[name] = &candidate
|
||
return &candidate, true, nil
|
||
}
|
||
|
||
m := &Manifest{
|
||
DatasetID: DatasetIDForName(name),
|
||
Name: name,
|
||
SchemaFingerprint: fingerprint,
|
||
Objects: objects,
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
RowCount: rowCount,
|
||
}
|
||
if err := r.persist(ctx, m); err != nil {
|
||
return nil, false, err
|
||
}
|
||
r.byKey[name] = m
|
||
return m, false, nil
|
||
}
|
||
|
||
// Get returns a deep copy of the manifest for name, or ErrManifestNotFound.
|
||
func (r *Registry) Get(name string) (*Manifest, error) {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
m, ok := r.byKey[name]
|
||
if !ok {
|
||
return nil, ErrManifestNotFound
|
||
}
|
||
return cloneManifest(m), nil
|
||
}
|
||
|
||
// List returns deep copies of every manifest, sorted by name.
|
||
// Callers may mutate the returned slice and the underlying Manifest
|
||
// values without affecting registry state.
|
||
func (r *Registry) List() []*Manifest {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
out := make([]*Manifest, 0, len(r.byKey))
|
||
for _, m := range r.byKey {
|
||
out = append(out, cloneManifest(m))
|
||
}
|
||
sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name })
|
||
return out
|
||
}
|
||
|
||
// cloneManifest deep-copies the Objects slice and dereferences
|
||
// RowCount into a fresh pointer so a returned manifest cannot alias
|
||
// registry state. Per scrum S3 (Opus): the prior `cp := *m` shape
|
||
// shared the Objects backing array — caller-side index writes
|
||
// corrupted registry state without holding the mutex.
|
||
func cloneManifest(m *Manifest) *Manifest {
|
||
cp := *m
|
||
if m.Objects != nil {
|
||
cp.Objects = make([]Object, len(m.Objects))
|
||
copy(cp.Objects, m.Objects)
|
||
}
|
||
if m.RowCount != nil {
|
||
v := *m.RowCount
|
||
cp.RowCount = &v
|
||
}
|
||
return &cp
|
||
}
|
||
|
||
// persist encodes the manifest and writes it to storaged at the
|
||
// canonical path. Caller MUST hold r.mu — this function does not
|
||
// take the lock itself.
|
||
func (r *Registry) persist(ctx context.Context, m *Manifest) error {
|
||
body, err := Encode(m)
|
||
if err != nil {
|
||
return fmt.Errorf("encode manifest %s: %w", m.Name, err)
|
||
}
|
||
key := ManifestPrefix + m.Name + ".parquet"
|
||
if err := r.store.Put(ctx, key, body); err != nil {
|
||
return fmt.Errorf("persist manifest %s: %w", m.Name, err)
|
||
}
|
||
return nil
|
||
}
|