From 4205ecd0f07655ecf91a8ffb2e302671c9661dd9 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 28 Apr 2026 23:58:34 -0500 Subject: [PATCH] Pre-D5: extract CatalogClient to internal/catalogclient/ + add List MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit queryd (D5) needs the same HTTP client to catalogd that ingestd uses, but the client lived in internal/ingestd — having queryd import from ingestd would invert the data-flow direction (ingestd is upstream of queryd; the package dep should not point back). Extract to a shared internal/catalogclient/ package now, before D5 forces it under implementation pressure. Adds the List(ctx) method queryd will need for view registration. Unit tests cover Register success/conflict and List success/error paths against an httptest.Server fake. ingestd's import flips from internal/ingestd → internal/catalogclient; the wire format and behavior are unchanged. All four smokes (D1/D2/D3/ D4) PASS unchanged. DuckDB cgo path re-verified with the official github.com/duckdb/duckdb-go/v2 (per ADR-001) on Go 1.25 + arrow-go. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/ingestd/main.go | 13 +-- .../client.go} | 65 +++++++++++--- internal/catalogclient/client_test.go | 90 +++++++++++++++++++ 3 files changed, 148 insertions(+), 20 deletions(-) rename internal/{ingestd/catalog_client.go => catalogclient/client.go} (53%) create mode 100644 internal/catalogclient/client_test.go diff --git a/cmd/ingestd/main.go b/cmd/ingestd/main.go index f7398fe..ce02ab3 100644 --- a/cmd/ingestd/main.go +++ b/cmd/ingestd/main.go @@ -1,8 +1,8 @@ // ingestd is the data on-ramp: CSV in (multipart form), Parquet out // (to storaged), manifest registered (with catalogd). One round trip // of HTTP, two service hops. The interesting glue lives in -// internal/ingestd/{schema,csv,catalog_client}.go; main.go just wires -// the route and threads the upstream URLs through. +// internal/ingestd/{schema,csv}.go and internal/catalogclient/; +// main.go just wires the route and threads the upstream URLs through. package main import ( @@ -22,6 +22,7 @@ import ( "github.com/go-chi/chi/v5" + "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" @@ -54,7 +55,7 @@ func main() { h := &handlers{ storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"), - catalogd: ingestd.NewCatalogClient(cfg.Ingestd.CatalogdURL), + catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL), hc: &http.Client{Timeout: 5 * time.Minute}, } @@ -66,7 +67,7 @@ func main() { type handlers struct { storagedURL string - catalogd *ingestd.CatalogClient + catalogd *catalogclient.Client hc *http.Client } @@ -136,13 +137,13 @@ func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) { // 3. Register with catalogd rc := res.RowCount - regResp, err := h.catalogd.Register(r.Context(), &ingestd.RegisterRequest{ + regResp, err := h.catalogd.Register(r.Context(), &catalogclient.RegisterRequest{ Name: name, SchemaFingerprint: fingerprint, Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}}, RowCount: &rc, }) - if errors.Is(err, ingestd.ErrFingerprintConflict) { + if errors.Is(err, catalogclient.ErrFingerprintConflict) { http.Error(w, err.Error(), http.StatusConflict) return } diff --git a/internal/ingestd/catalog_client.go b/internal/catalogclient/client.go similarity index 53% rename from internal/ingestd/catalog_client.go rename to internal/catalogclient/client.go index be0485f..5261229 100644 --- a/internal/ingestd/catalog_client.go +++ b/internal/catalogclient/client.go @@ -1,9 +1,9 @@ -// catalog_client.go — HTTP client to catalogd. ingestd ships -// manifests through here after writing the Parquet to storaged. +// Package catalogclient is the shared HTTP client to catalogd. +// ingestd uses Register; queryd (D5+) uses List for view registration. // Symmetric in shape with internal/catalogd/store_client.go: thin // wrapper, drain-and-close discipline, sentinel errors for 4xx // classes that the handler maps back to HTTP. -package ingestd +package catalogclient import ( "bytes" @@ -19,20 +19,20 @@ import ( "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" ) -// CatalogClient talks HTTP to catalogd's /catalog/* routes. -type CatalogClient struct { +// Client talks HTTP to catalogd's /catalog/* routes. +type Client struct { baseURL string hc *http.Client } // ErrFingerprintConflict mirrors catalogd's 409 — same name, // different schema fingerprint. -var ErrFingerprintConflict = errors.New("ingestd: catalogd reports schema fingerprint conflict (409)") +var ErrFingerprintConflict = errors.New("catalogclient: catalogd reports schema fingerprint conflict (409)") -// NewCatalogClient builds a client against catalogd's base URL +// New builds a client against catalogd's base URL // (e.g. "http://127.0.0.1:3212"). -func NewCatalogClient(baseURL string) *CatalogClient { - return &CatalogClient{ +func New(baseURL string) *Client { + return &Client{ baseURL: strings.TrimRight(baseURL, "/"), hc: &http.Client{Timeout: 30 * time.Second}, } @@ -40,10 +40,10 @@ func NewCatalogClient(baseURL string) *CatalogClient { // RegisterRequest mirrors catalogd's POST /catalog/register body. type RegisterRequest struct { - Name string `json:"name"` - SchemaFingerprint string `json:"schema_fingerprint"` - Objects []catalogd.Object `json:"objects"` - RowCount *int64 `json:"row_count,omitempty"` + Name string `json:"name"` + SchemaFingerprint string `json:"schema_fingerprint"` + Objects []catalogd.Object `json:"objects"` + RowCount *int64 `json:"row_count,omitempty"` } // RegisterResponse mirrors catalogd's 200/conflict response. @@ -52,9 +52,17 @@ type RegisterResponse struct { Existing bool `json:"existing"` } +// listResponse mirrors catalogd's GET /catalog/list shape: +// +// {"manifests": [...], "count": N} +type listResponse struct { + Manifests []*catalogd.Manifest `json:"manifests"` + Count int `json:"count"` +} + // Register POSTs to /catalog/register. Returns ErrFingerprintConflict // on 409, the decoded response on 200, an error on anything else. -func (c *CatalogClient) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { +func (c *Client) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("marshal register: %w", err) @@ -87,6 +95,35 @@ func (c *CatalogClient) Register(ctx context.Context, req *RegisterRequest) (*Re return &out, nil } +// List GETs /catalog/list and returns every registered manifest. +// queryd (D5+) uses this for view registration; future operator +// tooling can use it for catalog inspection. +// +// Manifests come back sorted by name (catalogd's contract). The +// response also carries a count, which is dropped here — callers +// can take len(manifests) instead of paying for both. +func (c *Client) List(ctx context.Context) ([]*catalogd.Manifest, error) { + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/catalog/list", nil) + if err != nil { + return nil, fmt.Errorf("list req: %w", err) + } + resp, err := c.hc.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("list do: %w", err) + } + defer drainAndClose(resp.Body) + + if resp.StatusCode != http.StatusOK { + preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) + return nil, fmt.Errorf("list status %d: %s", resp.StatusCode, string(preview)) + } + var out listResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("list decode: %w", err) + } + return out.Manifests, nil +} + // drainAndClose mirrors the catalogd store_client helper — drain a // bounded amount of body bytes before close so HTTP/1.1 keep-alive // pool reuse stays healthy on error paths. diff --git a/internal/catalogclient/client_test.go b/internal/catalogclient/client_test.go new file mode 100644 index 0000000..2888614 --- /dev/null +++ b/internal/catalogclient/client_test.go @@ -0,0 +1,90 @@ +package catalogclient + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" +) + +func TestRegister_Success(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/catalog/register" || r.Method != http.MethodPost { + http.Error(w, "wrong route", http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(RegisterResponse{ + Manifest: &catalogd.Manifest{Name: "x", DatasetID: "id-1"}, + Existing: false, + }) + })) + defer srv.Close() + + c := New(srv.URL) + resp, err := c.Register(context.Background(), &RegisterRequest{ + Name: "x", + SchemaFingerprint: "sha256:abc", + }) + if err != nil { + t.Fatal(err) + } + if resp.Manifest.DatasetID != "id-1" { + t.Errorf("dataset_id: got %s, want id-1", resp.Manifest.DatasetID) + } +} + +func TestRegister_ConflictMapsToSentinel(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "fingerprint conflict", http.StatusConflict) + })) + defer srv.Close() + + c := New(srv.URL) + _, err := c.Register(context.Background(), &RegisterRequest{ + Name: "x", + SchemaFingerprint: "sha256:abc", + }) + if !errors.Is(err, ErrFingerprintConflict) { + t.Fatalf("expected ErrFingerprintConflict, got %v", err) + } +} + +func TestList_Success(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/catalog/list" || r.Method != http.MethodGet { + http.Error(w, "wrong route", http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"manifests":[{"name":"a"},{"name":"b"}],"count":2}`)) + })) + defer srv.Close() + + c := New(srv.URL) + got, err := c.List(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(got) != 2 || got[0].Name != "a" || got[1].Name != "b" { + t.Errorf("List: got %+v, want [a b]", got) + } +} + +func TestList_Non200Errors(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "boom", http.StatusInternalServerError) + })) + defer srv.Close() + + c := New(srv.URL) + _, err := c.List(context.Background()) + if err == nil || !strings.Contains(err.Error(), "list status 500") { + t.Fatalf("expected status 500 error, got %v", err) + } +}