root 66a704ca3e G0 D3: catalogd Parquet manifests + ADR-020 idempotent register · 6 scrum fixes
Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory
registry with the ADR-020 idempotency contract (same name+fingerprint
reuses dataset_id; different fingerprint → 409 Conflict), HTTP client
to storaged for persistence, and rehydration on startup. Acceptance
smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the
load-bearing test that the catalog/storaged service split actually
preserves state.

dataset_id derivation diverges from Rust: UUIDv5(namespace, name)
instead of v4 surrogate. Same name on any box generates the same
dataset_id; rehydrate after disk loss converges to the same identity
rather than silently re-issuing. Namespace pinned at
a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued
depends on these bytes.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                       1 BLOCK + 5 WARN + 3 INFO
  - Kimi K2-0905 (openrouter, validated D2):   2 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                  2 BLOCK + 2 WARN + 2 INFO

Fixed:
  C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds
  C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern
  S1 split-brain on persist failure → candidate-then-swap
  S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels
  S3 Get/List shallow-copy aliasing → cloneManifest deep copy
  S4 keep-alive socket leak on error paths → drainAndClose helper

Dismissed (false positives, all single-reviewer):
  Kimi BLOCK "Decode crashes on empty Parquet" — already handled
  Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required
  Qwen INFO "rb.NewRecord() error unchecked" — API returns no error

Deferred to G1+: name validation regex, per-call deadlines, Snappy
compression, list pagination continuation tokens (storaged caps at
10k with sentinel for now).

Build clean, vet clean, all tests pass, smoke 6/6 PASS after every
fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced
by arrow-go's minimum.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:36:57 -05:00

201 lines
6.7 KiB
Go

// Package catalogd is the metadata authority — manifests for every
// registered dataset. A Manifest is the catalog row Rust calls
// `Manifest`: dataset_id (deterministic from name via UUIDv5),
// schema_fingerprint (caller-supplied schema hash), the object keys
// that physically back the dataset in storaged, plus timestamps and
// optional row_count.
//
// G0 stores one Manifest per Parquet file at
// primary://_catalog/manifests/<name>.parquet. One row per file —
// catalog manifests are written rarely and read on startup, so the
// per-file shape favors atomic register over storage density.
package catalogd
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/google/uuid"
)
// catalogNamespace is the v5 UUID namespace for dataset_id derivation.
// Same name → same dataset_id across boxes / cold starts. Don't change
// — every dataset_id ever issued depends on this byte sequence.
var catalogNamespace = uuid.MustParse("a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e")
// Object is one storaged key contributing to a dataset.
type Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
}
// Manifest is the catalog row for one dataset.
type Manifest struct {
DatasetID string `json:"dataset_id"`
Name string `json:"name"`
SchemaFingerprint string `json:"schema_fingerprint"`
Objects []Object `json:"objects"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
RowCount *int64 `json:"row_count,omitempty"`
}
// DatasetIDForName returns the deterministic UUIDv5 dataset_id for a
// logical dataset name. Idempotent on the same name across boxes.
func DatasetIDForName(name string) string {
return uuid.NewSHA1(catalogNamespace, []byte(name)).String()
}
// manifestArrowSchema is the Arrow schema for the on-disk Parquet.
// Field order matters — codec builders rely on it.
var manifestArrowSchema = arrow.NewSchema([]arrow.Field{
{Name: "dataset_id", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "schema_fingerprint", Type: arrow.BinaryTypes.String},
{Name: "objects", Type: arrow.ListOf(arrow.StructOf(
arrow.Field{Name: "key", Type: arrow.BinaryTypes.String},
arrow.Field{Name: "size", Type: arrow.PrimitiveTypes.Int64},
))},
{Name: "created_at_unix_ns", Type: arrow.PrimitiveTypes.Int64},
{Name: "updated_at_unix_ns", Type: arrow.PrimitiveTypes.Int64},
{Name: "row_count", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
}, nil)
// Encode writes a single Manifest to a Parquet byte slice. Memory
// allocations are bounded — manifests have tens of objects, not
// millions.
func Encode(m *Manifest) ([]byte, error) {
mem := memory.NewGoAllocator()
rb := array.NewRecordBuilder(mem, manifestArrowSchema)
defer rb.Release()
rb.Field(0).(*array.StringBuilder).Append(m.DatasetID)
rb.Field(1).(*array.StringBuilder).Append(m.Name)
rb.Field(2).(*array.StringBuilder).Append(m.SchemaFingerprint)
listB := rb.Field(3).(*array.ListBuilder)
listB.Append(true)
structB := listB.ValueBuilder().(*array.StructBuilder)
keyB := structB.FieldBuilder(0).(*array.StringBuilder)
sizeB := structB.FieldBuilder(1).(*array.Int64Builder)
for _, o := range m.Objects {
structB.Append(true)
keyB.Append(o.Key)
sizeB.Append(o.Size)
}
rb.Field(4).(*array.Int64Builder).Append(m.CreatedAt.UnixNano())
rb.Field(5).(*array.Int64Builder).Append(m.UpdatedAt.UnixNano())
if m.RowCount != nil {
rb.Field(6).(*array.Int64Builder).Append(*m.RowCount)
} else {
rb.Field(6).(*array.Int64Builder).AppendNull()
}
rec := rb.NewRecord()
defer rec.Release()
var buf bytes.Buffer
props := parquet.NewWriterProperties()
arrowProps := pqarrow.NewArrowWriterProperties()
w, err := pqarrow.NewFileWriter(manifestArrowSchema, &buf, props, arrowProps)
if err != nil {
return nil, fmt.Errorf("pqarrow writer: %w", err)
}
if err := w.Write(rec); err != nil {
return nil, fmt.Errorf("pqarrow write: %w", err)
}
if err := w.Close(); err != nil {
return nil, fmt.Errorf("pqarrow close: %w", err)
}
return buf.Bytes(), nil
}
// Decode reads a single-row Parquet manifest back into a Manifest.
func Decode(b []byte) (*Manifest, error) {
rdr, err := file.NewParquetReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("parquet reader: %w", err)
}
defer rdr.Close()
pr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.NewGoAllocator())
if err != nil {
return nil, fmt.Errorf("pqarrow reader: %w", err)
}
tbl, err := pr.ReadTable(context.Background())
if err != nil {
return nil, fmt.Errorf("read table: %w", err)
}
defer tbl.Release()
if tbl.NumRows() != 1 {
return nil, fmt.Errorf("manifest parquet: expected 1 row, got %d", tbl.NumRows())
}
rr := array.NewTableReader(tbl, 1)
defer rr.Release()
if !rr.Next() {
return nil, errors.New("manifest parquet: no record batch")
}
rec := rr.Record()
m := &Manifest{
DatasetID: rec.Column(0).(*array.String).Value(0),
Name: rec.Column(1).(*array.String).Value(0),
SchemaFingerprint: rec.Column(2).(*array.String).Value(0),
}
// Per scrum C1 (3-way convergent): use ValueOffsets which accounts
// for the array's own offset (non-zero under slicing) and is bounds-
// safe by API contract. Direct Offsets()[0]/[1] indexing is fragile
// under multi-row reads and panics on malformed offset buffers.
listArr := rec.Column(3).(*array.List)
structArr := listArr.ListValues().(*array.Struct)
keyArr := structArr.Field(0).(*array.String)
sizeArr := structArr.Field(1).(*array.Int64)
start, end := listArr.ValueOffsets(0)
if start < 0 || end < start || end > int64(structArr.Len()) {
return nil, fmt.Errorf("manifest: bad list offsets [%d, %d] for struct len %d",
start, end, structArr.Len())
}
for i := start; i < end; i++ {
m.Objects = append(m.Objects, Object{
Key: keyArr.Value(int(i)),
Size: sizeArr.Value(int(i)),
})
}
m.CreatedAt = time.Unix(0, rec.Column(4).(*array.Int64).Value(0))
m.UpdatedAt = time.Unix(0, rec.Column(5).(*array.Int64).Value(0))
rcArr := rec.Column(6).(*array.Int64)
if rcArr.IsValid(0) {
v := rcArr.Value(0)
m.RowCount = &v
}
return m, nil
}
// EncodeReader is a small convenience for callers that want an
// io.Reader over the encoded bytes (matches the storaged HTTP PUT
// signature).
func EncodeReader(m *Manifest) (io.Reader, int64, error) {
b, err := Encode(m)
if err != nil {
return nil, 0, err
}
return bytes.NewReader(b), int64(len(b)), nil
}