Phase G0 Day 3 ships catalogd: Arrow Parquet manifest codec, in-memory registry with the ADR-020 idempotency contract (same name+fingerprint reuses dataset_id; different fingerprint → 409 Conflict), HTTP client to storaged for persistence, and rehydration on startup. Acceptance smoke 6/6 PASSES end-to-end including rehydrate-across-restart — the load-bearing test that the catalog/storaged service split actually preserves state. dataset_id derivation diverges from Rust: UUIDv5(namespace, name) instead of v4 surrogate. Same name on any box generates the same dataset_id; rehydrate after disk loss converges to the same identity rather than silently re-issuing. Namespace pinned at a8f3c1d2-4e5b-5a6c-9d8e-7f0a1b2c3d4e — every dataset_id ever issued depends on these bytes. Cross-lineage scrum on shipped code: - Opus 4.7 (opencode): 1 BLOCK + 5 WARN + 3 INFO - Kimi K2-0905 (openrouter, validated D2): 2 BLOCK + 2 WARN + 1 INFO - Qwen3-coder (openrouter): 2 BLOCK + 2 WARN + 2 INFO Fixed: C1 list-offsets BLOCK (3-way convergent) → ValueOffsets(0) + bounds C2 Rehydrate mutex held across I/O → swap-under-brief-lock pattern S1 split-brain on persist failure → candidate-then-swap S2 brittle string-match for 400 vs 500 → ErrEmptyName/ErrEmptyFingerprint sentinels S3 Get/List shallow-copy aliasing → cloneManifest deep copy S4 keep-alive socket leak on error paths → drainAndClose helper Dismissed (false positives, all single-reviewer): Kimi BLOCK "Decode crashes on empty Parquet" — already handled Kimi INFO "safeKey double-escapes" — wrong, splitting before escape is required Qwen INFO "rb.NewRecord() error unchecked" — API returns no error Deferred to G1+: name validation regex, per-call deadlines, Snappy compression, list pagination continuation tokens (storaged caps at 10k with sentinel for now). Build clean, vet clean, all tests pass, smoke 6/6 PASS after every fix round. arrow-go/v18 + google/uuid added; Go 1.24 → 1.25 forced by arrow-go's minimum. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
132 lines
3.9 KiB
Go
132 lines
3.9 KiB
Go
// catalogd is the metadata authority — registers Parquet datasets,
|
|
// persists manifests in storaged, rehydrates them on startup, and
|
|
// answers GET/list queries. ADR-020 idempotency contract enforced
|
|
// by internal/catalogd/registry.go.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
|
)
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
|
flag.Parse()
|
|
|
|
cfg, err := shared.LoadConfig(*configPath)
|
|
if err != nil {
|
|
slog.Error("config", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
if cfg.Catalogd.StoragedURL == "" {
|
|
slog.Error("config", "err", "catalogd.storaged_url is required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
store := catalogd.NewStoreClient(cfg.Catalogd.StoragedURL)
|
|
registry := catalogd.NewRegistry(store)
|
|
|
|
rehydrateCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
n, err := registry.Rehydrate(rehydrateCtx)
|
|
cancel()
|
|
if err != nil {
|
|
slog.Error("rehydrate", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
slog.Info("rehydrated", "manifests", n)
|
|
|
|
h := newHandlers(registry)
|
|
|
|
if err := shared.Run("catalogd", cfg.Catalogd.Bind, h.register); err != nil {
|
|
slog.Error("server", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
type handlers struct {
|
|
reg *catalogd.Registry
|
|
}
|
|
|
|
func newHandlers(r *catalogd.Registry) *handlers { return &handlers{reg: r} }
|
|
|
|
func (h *handlers) register(r chi.Router) {
|
|
r.Post("/catalog/register", h.handleRegister)
|
|
r.Get("/catalog/manifest/*", h.handleGetManifest)
|
|
r.Get("/catalog/list", h.handleList)
|
|
}
|
|
|
|
// registerRequest mirrors POST body shape.
|
|
type registerRequest struct {
|
|
Name string `json:"name"`
|
|
SchemaFingerprint string `json:"schema_fingerprint"`
|
|
Objects []catalogd.Object `json:"objects"`
|
|
RowCount *int64 `json:"row_count,omitempty"`
|
|
}
|
|
|
|
// registerResponse adds the existing flag so callers can distinguish
|
|
// fresh registration from idempotent re-register.
|
|
type registerResponse struct {
|
|
Manifest *catalogd.Manifest `json:"manifest"`
|
|
Existing bool `json:"existing"`
|
|
}
|
|
|
|
func (h *handlers) handleRegister(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
r.Body = http.MaxBytesReader(w, r.Body, 4<<20) // 4 MiB cap on register payloads
|
|
var req registerRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
http.Error(w, "decode body: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
m, existing, err := h.reg.Register(r.Context(), req.Name, req.SchemaFingerprint, req.Objects, req.RowCount)
|
|
if errors.Is(err, catalogd.ErrFingerprintConflict) {
|
|
http.Error(w, err.Error(), http.StatusConflict)
|
|
return
|
|
}
|
|
if errors.Is(err, catalogd.ErrEmptyName) || errors.Is(err, catalogd.ErrEmptyFingerprint) {
|
|
// Per scrum S2 (Opus): sentinel-based detection, not substring match.
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
if err != nil {
|
|
slog.Error("register", "name", req.Name, "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(registerResponse{Manifest: m, Existing: existing})
|
|
}
|
|
|
|
func (h *handlers) handleGetManifest(w http.ResponseWriter, r *http.Request) {
|
|
name := chi.URLParam(r, "*")
|
|
m, err := h.reg.Get(name)
|
|
if errors.Is(err, catalogd.ErrManifestNotFound) {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
if err != nil {
|
|
slog.Error("get manifest", "name", name, "err", err)
|
|
http.Error(w, "internal", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(m)
|
|
}
|
|
|
|
func (h *handlers) handleList(w http.ResponseWriter, _ *http.Request) {
|
|
items := h.reg.List()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]any{"manifests": items, "count": len(items)})
|
|
}
|