// registry.go — the in-memory catalog index plus the ADR-020 // idempotent register contract. The register path holds a single // mutex across (lookup → fingerprint check → storage write → // in-memory update) to close the check→insert TOCTOU window. The // historical Rust bug (308× duplicate manifests on re-register) is // the prior art — don't loosen this lock. package catalogd import ( "context" "errors" "fmt" "sort" "strings" "sync" "time" ) // ManifestPrefix is the storaged key prefix that holds catalog // manifests. Objects under this prefix are NOT user data and never // surface through ingest/query paths. const ManifestPrefix = "_catalog/manifests/" // ErrFingerprintConflict is returned by Register when a manifest with // the same name already exists under a different schema fingerprint. // HTTP layer maps this to 409 Conflict; gRPC will map to FAILED_PRECONDITION. var ErrFingerprintConflict = errors.New("catalogd: schema fingerprint conflict") // ErrManifestNotFound is returned by Get when the requested name has // no manifest registered. var ErrManifestNotFound = errors.New("catalogd: manifest not found") // ErrEmptyName / ErrEmptyFingerprint are returned by Register on // missing required inputs. HTTP layer maps both to 400. Per scrum S2 // (Opus): sentinel errors so the HTTP boundary uses errors.Is rather // than substring matching err.Error(). var ( ErrEmptyName = errors.New("catalogd: empty name") ErrEmptyFingerprint = errors.New("catalogd: empty schema_fingerprint") ) // Store abstracts the storaged HTTP wire so registry can be unit- // tested with an in-memory fake. type Store interface { Put(ctx context.Context, key string, body []byte) error Get(ctx context.Context, key string) ([]byte, error) List(ctx context.Context, prefix string) ([]string, error) } // Registry is the in-memory authority. Persistence lives in storaged // at ManifestPrefix; Registry is rehydrated on startup. type Registry struct { mu sync.Mutex byKey map[string]*Manifest // name → manifest store Store now func() time.Time // injectable for tests } // NewRegistry builds an empty registry bound to a Store. Call // Rehydrate after construction to pick up persisted manifests. func NewRegistry(store Store) *Registry { return &Registry{ byKey: make(map[string]*Manifest), store: store, now: time.Now, } } // Rehydrate lists ManifestPrefix in storaged and decodes every entry // into the in-memory map. Returns the count of manifests recovered. // On any per-file decode error, returns immediately so a corrupt // catalog doesn't half-load and silently lose state. // // Per scrum C2 (Opus + Kimi convergent): network I/O happens OUTSIDE // the registry mutex so a slow storaged doesn't block concurrent // Register/Get/List. The completed map is swapped in under the lock. func (r *Registry) Rehydrate(ctx context.Context) (int, error) { keys, err := r.store.List(ctx, ManifestPrefix) if err != nil { return 0, fmt.Errorf("list manifests: %w", err) } loaded := make(map[string]*Manifest) for _, k := range keys { if !strings.HasPrefix(k, ManifestPrefix) || !strings.HasSuffix(k, ".parquet") { continue } body, err := r.store.Get(ctx, k) if err != nil { return len(loaded), fmt.Errorf("get manifest %s: %w", k, err) } m, err := Decode(body) if err != nil { return len(loaded), fmt.Errorf("decode manifest %s: %w", k, err) } loaded[m.Name] = m } r.mu.Lock() defer r.mu.Unlock() r.byKey = loaded return len(loaded), nil } // Register applies the ADR-020 idempotency contract: // // - No prior manifest for name → create (returns existing=false) // - Prior manifest, same fingerprint → update objects + bump // updated_at, reuse dataset_id (returns existing=true) // - Prior manifest, different fingerprint → ErrFingerprintConflict // // The mutex is held across the storage write so concurrent calls for // the same name serialize through the persistence layer (low TPS, // correctness > throughput). func (r *Registry) Register(ctx context.Context, name, fingerprint string, objects []Object, rowCount *int64) (*Manifest, bool, error) { if name == "" { return nil, false, ErrEmptyName } if fingerprint == "" { return nil, false, ErrEmptyFingerprint } r.mu.Lock() defer r.mu.Unlock() now := r.now().UTC() prior, exists := r.byKey[name] if exists { if prior.SchemaFingerprint != fingerprint { return nil, true, fmt.Errorf("%w: name=%q have=%s got=%s", ErrFingerprintConflict, name, prior.SchemaFingerprint, fingerprint) } // Same fingerprint — reuse dataset_id, replace objects, bump updated_at. // Per scrum S1 (Opus): build candidate, persist, then swap in. Mutating // `prior` before persist succeeds creates split-brain if storaged is // down — in-memory advances, disk holds the old state, restart loses // what callers were told didn't happen. candidate := *prior candidate.Objects = objects candidate.UpdatedAt = now candidate.RowCount = rowCount if err := r.persist(ctx, &candidate); err != nil { return nil, true, err } r.byKey[name] = &candidate return &candidate, true, nil } m := &Manifest{ DatasetID: DatasetIDForName(name), Name: name, SchemaFingerprint: fingerprint, Objects: objects, CreatedAt: now, UpdatedAt: now, RowCount: rowCount, } if err := r.persist(ctx, m); err != nil { return nil, false, err } r.byKey[name] = m return m, false, nil } // Get returns a deep copy of the manifest for name, or ErrManifestNotFound. func (r *Registry) Get(name string) (*Manifest, error) { r.mu.Lock() defer r.mu.Unlock() m, ok := r.byKey[name] if !ok { return nil, ErrManifestNotFound } return cloneManifest(m), nil } // List returns deep copies of every manifest, sorted by name. // Callers may mutate the returned slice and the underlying Manifest // values without affecting registry state. func (r *Registry) List() []*Manifest { r.mu.Lock() defer r.mu.Unlock() out := make([]*Manifest, 0, len(r.byKey)) for _, m := range r.byKey { out = append(out, cloneManifest(m)) } sort.Slice(out, func(i, j int) bool { return out[i].Name < out[j].Name }) return out } // cloneManifest deep-copies the Objects slice and dereferences // RowCount into a fresh pointer so a returned manifest cannot alias // registry state. Per scrum S3 (Opus): the prior `cp := *m` shape // shared the Objects backing array — caller-side index writes // corrupted registry state without holding the mutex. func cloneManifest(m *Manifest) *Manifest { cp := *m if m.Objects != nil { cp.Objects = make([]Object, len(m.Objects)) copy(cp.Objects, m.Objects) } if m.RowCount != nil { v := *m.RowCount cp.RowCount = &v } return &cp } // persist encodes the manifest and writes it to storaged at the // canonical path. Caller MUST hold r.mu — this function does not // take the lock itself. func (r *Registry) persist(ctx context.Context, m *Manifest) error { body, err := Encode(m) if err != nil { return fmt.Errorf("encode manifest %s: %w", m.Name, err) } key := ManifestPrefix + m.Name + ".parquet" if err := r.store.Put(ctx, key, body); err != nil { return fmt.Errorf("persist manifest %s: %w", m.Name, err) } return nil }