Pre-D5: extract CatalogClient to internal/catalogclient/ + add List
queryd (D5) needs the same HTTP client to catalogd that ingestd uses, but the client lived in internal/ingestd — having queryd import from ingestd would invert the data-flow direction (ingestd is upstream of queryd; the package dep should not point back). Extract to a shared internal/catalogclient/ package now, before D5 forces it under implementation pressure. Adds the List(ctx) method queryd will need for view registration. Unit tests cover Register success/conflict and List success/error paths against an httptest.Server fake. ingestd's import flips from internal/ingestd → internal/catalogclient; the wire format and behavior are unchanged. All four smokes (D1/D2/D3/ D4) PASS unchanged. DuckDB cgo path re-verified with the official github.com/duckdb/duckdb-go/v2 (per ADR-001) on Go 1.25 + arrow-go. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c1e411347a
commit
4205ecd0f0
@ -1,8 +1,8 @@
|
|||||||
// ingestd is the data on-ramp: CSV in (multipart form), Parquet out
|
// ingestd is the data on-ramp: CSV in (multipart form), Parquet out
|
||||||
// (to storaged), manifest registered (with catalogd). One round trip
|
// (to storaged), manifest registered (with catalogd). One round trip
|
||||||
// of HTTP, two service hops. The interesting glue lives in
|
// of HTTP, two service hops. The interesting glue lives in
|
||||||
// internal/ingestd/{schema,csv,catalog_client}.go; main.go just wires
|
// internal/ingestd/{schema,csv}.go and internal/catalogclient/;
|
||||||
// the route and threads the upstream URLs through.
|
// main.go just wires the route and threads the upstream URLs through.
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -22,6 +22,7 @@ import (
|
|||||||
|
|
||||||
"github.com/go-chi/chi/v5"
|
"github.com/go-chi/chi/v5"
|
||||||
|
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient"
|
||||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
||||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd"
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd"
|
||||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
||||||
@ -54,7 +55,7 @@ func main() {
|
|||||||
|
|
||||||
h := &handlers{
|
h := &handlers{
|
||||||
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
|
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
|
||||||
catalogd: ingestd.NewCatalogClient(cfg.Ingestd.CatalogdURL),
|
catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL),
|
||||||
hc: &http.Client{Timeout: 5 * time.Minute},
|
hc: &http.Client{Timeout: 5 * time.Minute},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,7 +67,7 @@ func main() {
|
|||||||
|
|
||||||
type handlers struct {
|
type handlers struct {
|
||||||
storagedURL string
|
storagedURL string
|
||||||
catalogd *ingestd.CatalogClient
|
catalogd *catalogclient.Client
|
||||||
hc *http.Client
|
hc *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,13 +137,13 @@ func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// 3. Register with catalogd
|
// 3. Register with catalogd
|
||||||
rc := res.RowCount
|
rc := res.RowCount
|
||||||
regResp, err := h.catalogd.Register(r.Context(), &ingestd.RegisterRequest{
|
regResp, err := h.catalogd.Register(r.Context(), &catalogclient.RegisterRequest{
|
||||||
Name: name,
|
Name: name,
|
||||||
SchemaFingerprint: fingerprint,
|
SchemaFingerprint: fingerprint,
|
||||||
Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}},
|
Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}},
|
||||||
RowCount: &rc,
|
RowCount: &rc,
|
||||||
})
|
})
|
||||||
if errors.Is(err, ingestd.ErrFingerprintConflict) {
|
if errors.Is(err, catalogclient.ErrFingerprintConflict) {
|
||||||
http.Error(w, err.Error(), http.StatusConflict)
|
http.Error(w, err.Error(), http.StatusConflict)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,9 +1,9 @@
|
|||||||
// catalog_client.go — HTTP client to catalogd. ingestd ships
|
// Package catalogclient is the shared HTTP client to catalogd.
|
||||||
// manifests through here after writing the Parquet to storaged.
|
// ingestd uses Register; queryd (D5+) uses List for view registration.
|
||||||
// Symmetric in shape with internal/catalogd/store_client.go: thin
|
// Symmetric in shape with internal/catalogd/store_client.go: thin
|
||||||
// wrapper, drain-and-close discipline, sentinel errors for 4xx
|
// wrapper, drain-and-close discipline, sentinel errors for 4xx
|
||||||
// classes that the handler maps back to HTTP.
|
// classes that the handler maps back to HTTP.
|
||||||
package ingestd
|
package catalogclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -19,20 +19,20 @@ import (
|
|||||||
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CatalogClient talks HTTP to catalogd's /catalog/* routes.
|
// Client talks HTTP to catalogd's /catalog/* routes.
|
||||||
type CatalogClient struct {
|
type Client struct {
|
||||||
baseURL string
|
baseURL string
|
||||||
hc *http.Client
|
hc *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrFingerprintConflict mirrors catalogd's 409 — same name,
|
// ErrFingerprintConflict mirrors catalogd's 409 — same name,
|
||||||
// different schema fingerprint.
|
// different schema fingerprint.
|
||||||
var ErrFingerprintConflict = errors.New("ingestd: catalogd reports schema fingerprint conflict (409)")
|
var ErrFingerprintConflict = errors.New("catalogclient: catalogd reports schema fingerprint conflict (409)")
|
||||||
|
|
||||||
// NewCatalogClient builds a client against catalogd's base URL
|
// New builds a client against catalogd's base URL
|
||||||
// (e.g. "http://127.0.0.1:3212").
|
// (e.g. "http://127.0.0.1:3212").
|
||||||
func NewCatalogClient(baseURL string) *CatalogClient {
|
func New(baseURL string) *Client {
|
||||||
return &CatalogClient{
|
return &Client{
|
||||||
baseURL: strings.TrimRight(baseURL, "/"),
|
baseURL: strings.TrimRight(baseURL, "/"),
|
||||||
hc: &http.Client{Timeout: 30 * time.Second},
|
hc: &http.Client{Timeout: 30 * time.Second},
|
||||||
}
|
}
|
||||||
@ -52,9 +52,17 @@ type RegisterResponse struct {
|
|||||||
Existing bool `json:"existing"`
|
Existing bool `json:"existing"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// listResponse mirrors catalogd's GET /catalog/list shape:
|
||||||
|
//
|
||||||
|
// {"manifests": [...], "count": N}
|
||||||
|
type listResponse struct {
|
||||||
|
Manifests []*catalogd.Manifest `json:"manifests"`
|
||||||
|
Count int `json:"count"`
|
||||||
|
}
|
||||||
|
|
||||||
// Register POSTs to /catalog/register. Returns ErrFingerprintConflict
|
// Register POSTs to /catalog/register. Returns ErrFingerprintConflict
|
||||||
// on 409, the decoded response on 200, an error on anything else.
|
// on 409, the decoded response on 200, an error on anything else.
|
||||||
func (c *CatalogClient) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) {
|
func (c *Client) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) {
|
||||||
body, err := json.Marshal(req)
|
body, err := json.Marshal(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("marshal register: %w", err)
|
return nil, fmt.Errorf("marshal register: %w", err)
|
||||||
@ -87,6 +95,35 @@ func (c *CatalogClient) Register(ctx context.Context, req *RegisterRequest) (*Re
|
|||||||
return &out, nil
|
return &out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// List GETs /catalog/list and returns every registered manifest.
|
||||||
|
// queryd (D5+) uses this for view registration; future operator
|
||||||
|
// tooling can use it for catalog inspection.
|
||||||
|
//
|
||||||
|
// Manifests come back sorted by name (catalogd's contract). The
|
||||||
|
// response also carries a count, which is dropped here — callers
|
||||||
|
// can take len(manifests) instead of paying for both.
|
||||||
|
func (c *Client) List(ctx context.Context) ([]*catalogd.Manifest, error) {
|
||||||
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/catalog/list", nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list req: %w", err)
|
||||||
|
}
|
||||||
|
resp, err := c.hc.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list do: %w", err)
|
||||||
|
}
|
||||||
|
defer drainAndClose(resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
|
||||||
|
return nil, fmt.Errorf("list status %d: %s", resp.StatusCode, string(preview))
|
||||||
|
}
|
||||||
|
var out listResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("list decode: %w", err)
|
||||||
|
}
|
||||||
|
return out.Manifests, nil
|
||||||
|
}
|
||||||
|
|
||||||
// drainAndClose mirrors the catalogd store_client helper — drain a
|
// drainAndClose mirrors the catalogd store_client helper — drain a
|
||||||
// bounded amount of body bytes before close so HTTP/1.1 keep-alive
|
// bounded amount of body bytes before close so HTTP/1.1 keep-alive
|
||||||
// pool reuse stays healthy on error paths.
|
// pool reuse stays healthy on error paths.
|
||||||
90
internal/catalogclient/client_test.go
Normal file
90
internal/catalogclient/client_test.go
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
package catalogclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegister_Success(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/catalog/register" || r.Method != http.MethodPost {
|
||||||
|
http.Error(w, "wrong route", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_ = json.NewEncoder(w).Encode(RegisterResponse{
|
||||||
|
Manifest: &catalogd.Manifest{Name: "x", DatasetID: "id-1"},
|
||||||
|
Existing: false,
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL)
|
||||||
|
resp, err := c.Register(context.Background(), &RegisterRequest{
|
||||||
|
Name: "x",
|
||||||
|
SchemaFingerprint: "sha256:abc",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if resp.Manifest.DatasetID != "id-1" {
|
||||||
|
t.Errorf("dataset_id: got %s, want id-1", resp.Manifest.DatasetID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegister_ConflictMapsToSentinel(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
http.Error(w, "fingerprint conflict", http.StatusConflict)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL)
|
||||||
|
_, err := c.Register(context.Background(), &RegisterRequest{
|
||||||
|
Name: "x",
|
||||||
|
SchemaFingerprint: "sha256:abc",
|
||||||
|
})
|
||||||
|
if !errors.Is(err, ErrFingerprintConflict) {
|
||||||
|
t.Fatalf("expected ErrFingerprintConflict, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestList_Success(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path != "/catalog/list" || r.Method != http.MethodGet {
|
||||||
|
http.Error(w, "wrong route", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write([]byte(`{"manifests":[{"name":"a"},{"name":"b"}],"count":2}`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL)
|
||||||
|
got, err := c.List(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(got) != 2 || got[0].Name != "a" || got[1].Name != "b" {
|
||||||
|
t.Errorf("List: got %+v, want [a b]", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestList_Non200Errors(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
http.Error(w, "boom", http.StatusInternalServerError)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
c := New(srv.URL)
|
||||||
|
_, err := c.List(context.Background())
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "list status 500") {
|
||||||
|
t.Fatalf("expected status 500 error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user