Pre-D5: extract CatalogClient to internal/catalogclient/ + add List

queryd (D5) needs the same HTTP client to catalogd that ingestd uses,
but the client lived in internal/ingestd — having queryd import from
ingestd would invert the data-flow direction (ingestd is upstream of
queryd; the package dep should not point back). Extract to a shared
internal/catalogclient/ package now, before D5 forces it under
implementation pressure.

Adds the List(ctx) method queryd will need for view registration.
Unit tests cover Register success/conflict and List success/error
paths against an httptest.Server fake.

ingestd's import flips from internal/ingestd → internal/catalogclient;
the wire format and behavior are unchanged. All four smokes (D1/D2/D3/
D4) PASS unchanged. DuckDB cgo path re-verified with the official
github.com/duckdb/duckdb-go/v2 (per ADR-001) on Go 1.25 + arrow-go.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-04-28 23:58:34 -05:00
parent c1e411347a
commit 4205ecd0f0
3 changed files with 148 additions and 20 deletions

View File

@ -1,8 +1,8 @@
// ingestd is the data on-ramp: CSV in (multipart form), Parquet out // ingestd is the data on-ramp: CSV in (multipart form), Parquet out
// (to storaged), manifest registered (with catalogd). One round trip // (to storaged), manifest registered (with catalogd). One round trip
// of HTTP, two service hops. The interesting glue lives in // of HTTP, two service hops. The interesting glue lives in
// internal/ingestd/{schema,csv,catalog_client}.go; main.go just wires // internal/ingestd/{schema,csv}.go and internal/catalogclient/;
// the route and threads the upstream URLs through. // main.go just wires the route and threads the upstream URLs through.
package main package main
import ( import (
@ -22,6 +22,7 @@ import (
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
@ -54,7 +55,7 @@ func main() {
h := &handlers{ h := &handlers{
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"), storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
catalogd: ingestd.NewCatalogClient(cfg.Ingestd.CatalogdURL), catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL),
hc: &http.Client{Timeout: 5 * time.Minute}, hc: &http.Client{Timeout: 5 * time.Minute},
} }
@ -66,7 +67,7 @@ func main() {
type handlers struct { type handlers struct {
storagedURL string storagedURL string
catalogd *ingestd.CatalogClient catalogd *catalogclient.Client
hc *http.Client hc *http.Client
} }
@ -136,13 +137,13 @@ func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) {
// 3. Register with catalogd // 3. Register with catalogd
rc := res.RowCount rc := res.RowCount
regResp, err := h.catalogd.Register(r.Context(), &ingestd.RegisterRequest{ regResp, err := h.catalogd.Register(r.Context(), &catalogclient.RegisterRequest{
Name: name, Name: name,
SchemaFingerprint: fingerprint, SchemaFingerprint: fingerprint,
Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}}, Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}},
RowCount: &rc, RowCount: &rc,
}) })
if errors.Is(err, ingestd.ErrFingerprintConflict) { if errors.Is(err, catalogclient.ErrFingerprintConflict) {
http.Error(w, err.Error(), http.StatusConflict) http.Error(w, err.Error(), http.StatusConflict)
return return
} }

View File

@ -1,9 +1,9 @@
// catalog_client.go — HTTP client to catalogd. ingestd ships // Package catalogclient is the shared HTTP client to catalogd.
// manifests through here after writing the Parquet to storaged. // ingestd uses Register; queryd (D5+) uses List for view registration.
// Symmetric in shape with internal/catalogd/store_client.go: thin // Symmetric in shape with internal/catalogd/store_client.go: thin
// wrapper, drain-and-close discipline, sentinel errors for 4xx // wrapper, drain-and-close discipline, sentinel errors for 4xx
// classes that the handler maps back to HTTP. // classes that the handler maps back to HTTP.
package ingestd package catalogclient
import ( import (
"bytes" "bytes"
@ -19,20 +19,20 @@ import (
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
) )
// CatalogClient talks HTTP to catalogd's /catalog/* routes. // Client talks HTTP to catalogd's /catalog/* routes.
type CatalogClient struct { type Client struct {
baseURL string baseURL string
hc *http.Client hc *http.Client
} }
// ErrFingerprintConflict mirrors catalogd's 409 — same name, // ErrFingerprintConflict mirrors catalogd's 409 — same name,
// different schema fingerprint. // different schema fingerprint.
var ErrFingerprintConflict = errors.New("ingestd: catalogd reports schema fingerprint conflict (409)") var ErrFingerprintConflict = errors.New("catalogclient: catalogd reports schema fingerprint conflict (409)")
// NewCatalogClient builds a client against catalogd's base URL // New builds a client against catalogd's base URL
// (e.g. "http://127.0.0.1:3212"). // (e.g. "http://127.0.0.1:3212").
func NewCatalogClient(baseURL string) *CatalogClient { func New(baseURL string) *Client {
return &CatalogClient{ return &Client{
baseURL: strings.TrimRight(baseURL, "/"), baseURL: strings.TrimRight(baseURL, "/"),
hc: &http.Client{Timeout: 30 * time.Second}, hc: &http.Client{Timeout: 30 * time.Second},
} }
@ -52,9 +52,17 @@ type RegisterResponse struct {
Existing bool `json:"existing"` Existing bool `json:"existing"`
} }
// listResponse mirrors catalogd's GET /catalog/list shape:
//
// {"manifests": [...], "count": N}
type listResponse struct {
Manifests []*catalogd.Manifest `json:"manifests"`
Count int `json:"count"`
}
// Register POSTs to /catalog/register. Returns ErrFingerprintConflict // Register POSTs to /catalog/register. Returns ErrFingerprintConflict
// on 409, the decoded response on 200, an error on anything else. // on 409, the decoded response on 200, an error on anything else.
func (c *CatalogClient) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { func (c *Client) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) {
body, err := json.Marshal(req) body, err := json.Marshal(req)
if err != nil { if err != nil {
return nil, fmt.Errorf("marshal register: %w", err) return nil, fmt.Errorf("marshal register: %w", err)
@ -87,6 +95,35 @@ func (c *CatalogClient) Register(ctx context.Context, req *RegisterRequest) (*Re
return &out, nil return &out, nil
} }
// List GETs /catalog/list and returns every registered manifest.
// queryd (D5+) uses this for view registration; future operator
// tooling can use it for catalog inspection.
//
// Manifests come back sorted by name (catalogd's contract). The
// response also carries a count, which is dropped here — callers
// can take len(manifests) instead of paying for both.
func (c *Client) List(ctx context.Context) ([]*catalogd.Manifest, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/catalog/list", nil)
if err != nil {
return nil, fmt.Errorf("list req: %w", err)
}
resp, err := c.hc.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("list do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("list status %d: %s", resp.StatusCode, string(preview))
}
var out listResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("list decode: %w", err)
}
return out.Manifests, nil
}
// drainAndClose mirrors the catalogd store_client helper — drain a // drainAndClose mirrors the catalogd store_client helper — drain a
// bounded amount of body bytes before close so HTTP/1.1 keep-alive // bounded amount of body bytes before close so HTTP/1.1 keep-alive
// pool reuse stays healthy on error paths. // pool reuse stays healthy on error paths.

View File

@ -0,0 +1,90 @@
package catalogclient
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
)
func TestRegister_Success(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/catalog/register" || r.Method != http.MethodPost {
http.Error(w, "wrong route", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(RegisterResponse{
Manifest: &catalogd.Manifest{Name: "x", DatasetID: "id-1"},
Existing: false,
})
}))
defer srv.Close()
c := New(srv.URL)
resp, err := c.Register(context.Background(), &RegisterRequest{
Name: "x",
SchemaFingerprint: "sha256:abc",
})
if err != nil {
t.Fatal(err)
}
if resp.Manifest.DatasetID != "id-1" {
t.Errorf("dataset_id: got %s, want id-1", resp.Manifest.DatasetID)
}
}
func TestRegister_ConflictMapsToSentinel(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "fingerprint conflict", http.StatusConflict)
}))
defer srv.Close()
c := New(srv.URL)
_, err := c.Register(context.Background(), &RegisterRequest{
Name: "x",
SchemaFingerprint: "sha256:abc",
})
if !errors.Is(err, ErrFingerprintConflict) {
t.Fatalf("expected ErrFingerprintConflict, got %v", err)
}
}
func TestList_Success(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/catalog/list" || r.Method != http.MethodGet {
http.Error(w, "wrong route", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"manifests":[{"name":"a"},{"name":"b"}],"count":2}`))
}))
defer srv.Close()
c := New(srv.URL)
got, err := c.List(context.Background())
if err != nil {
t.Fatal(err)
}
if len(got) != 2 || got[0].Name != "a" || got[1].Name != "b" {
t.Errorf("List: got %+v, want [a b]", got)
}
}
func TestList_Non200Errors(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "boom", http.StatusInternalServerError)
}))
defer srv.Close()
c := New(srv.URL)
_, err := c.List(context.Background())
if err == nil || !strings.Contains(err.Error(), "list status 500") {
t.Fatalf("expected status 500 error, got %v", err)
}
}