root 4205ecd0f0 Pre-D5: extract CatalogClient to internal/catalogclient/ + add List
queryd (D5) needs the same HTTP client to catalogd that ingestd uses,
but the client lived in internal/ingestd — having queryd import from
ingestd would invert the data-flow direction (ingestd is upstream of
queryd; the package dep should not point back). Extract to a shared
internal/catalogclient/ package now, before D5 forces it under
implementation pressure.

Adds the List(ctx) method queryd will need for view registration.
Unit tests cover Register success/conflict and List success/error
paths against an httptest.Server fake.

ingestd's import flips from internal/ingestd → internal/catalogclient;
the wire format and behavior are unchanged. All four smokes (D1/D2/D3/
D4) PASS unchanged. DuckDB cgo path re-verified with the official
github.com/duckdb/duckdb-go/v2 (per ADR-001) on Go 1.25 + arrow-go.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:58:34 -05:00

134 lines
4.4 KiB
Go

// Package catalogclient is the shared HTTP client to catalogd.
// ingestd uses Register; queryd (D5+) uses List for view registration.
// Symmetric in shape with internal/catalogd/store_client.go: thin
// wrapper, drain-and-close discipline, sentinel errors for 4xx
// classes that the handler maps back to HTTP.
package catalogclient
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
)
// Client talks HTTP to catalogd's /catalog/* routes.
type Client struct {
baseURL string
hc *http.Client
}
// ErrFingerprintConflict mirrors catalogd's 409 — same name,
// different schema fingerprint.
var ErrFingerprintConflict = errors.New("catalogclient: catalogd reports schema fingerprint conflict (409)")
// New builds a client against catalogd's base URL
// (e.g. "http://127.0.0.1:3212").
func New(baseURL string) *Client {
return &Client{
baseURL: strings.TrimRight(baseURL, "/"),
hc: &http.Client{Timeout: 30 * time.Second},
}
}
// RegisterRequest mirrors catalogd's POST /catalog/register body.
type RegisterRequest struct {
Name string `json:"name"`
SchemaFingerprint string `json:"schema_fingerprint"`
Objects []catalogd.Object `json:"objects"`
RowCount *int64 `json:"row_count,omitempty"`
}
// RegisterResponse mirrors catalogd's 200/conflict response.
type RegisterResponse struct {
Manifest *catalogd.Manifest `json:"manifest"`
Existing bool `json:"existing"`
}
// listResponse mirrors catalogd's GET /catalog/list shape:
//
// {"manifests": [...], "count": N}
type listResponse struct {
Manifests []*catalogd.Manifest `json:"manifests"`
Count int `json:"count"`
}
// Register POSTs to /catalog/register. Returns ErrFingerprintConflict
// on 409, the decoded response on 200, an error on anything else.
func (c *Client) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal register: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/catalog/register", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("register req: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.ContentLength = int64(len(body))
resp, err := c.hc.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("register do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode == http.StatusConflict {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("%w: %s", ErrFingerprintConflict, string(preview))
}
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("register status %d: %s", resp.StatusCode, string(preview))
}
var out RegisterResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("register decode: %w", err)
}
return &out, nil
}
// List GETs /catalog/list and returns every registered manifest.
// queryd (D5+) uses this for view registration; future operator
// tooling can use it for catalog inspection.
//
// Manifests come back sorted by name (catalogd's contract). The
// response also carries a count, which is dropped here — callers
// can take len(manifests) instead of paying for both.
func (c *Client) List(ctx context.Context) ([]*catalogd.Manifest, error) {
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/catalog/list", nil)
if err != nil {
return nil, fmt.Errorf("list req: %w", err)
}
resp, err := c.hc.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("list do: %w", err)
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return nil, fmt.Errorf("list status %d: %s", resp.StatusCode, string(preview))
}
var out listResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("list decode: %w", err)
}
return out.Manifests, nil
}
// drainAndClose mirrors the catalogd store_client helper — drain a
// bounded amount of body bytes before close so HTTP/1.1 keep-alive
// pool reuse stays healthy on error paths.
func drainAndClose(body io.ReadCloser) {
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
_ = body.Close()
}