// Package catalogclient is the shared HTTP client to catalogd. // ingestd uses Register; queryd (D5+) uses List for view registration. // Symmetric in shape with internal/catalogd/store_client.go: thin // wrapper, drain-and-close discipline, sentinel errors for 4xx // classes that the handler maps back to HTTP. package catalogclient import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" ) // Client talks HTTP to catalogd's /catalog/* routes. type Client struct { baseURL string hc *http.Client } // ErrFingerprintConflict mirrors catalogd's 409 — same name, // different schema fingerprint. var ErrFingerprintConflict = errors.New("catalogclient: catalogd reports schema fingerprint conflict (409)") // New builds a client against catalogd's base URL // (e.g. "http://127.0.0.1:3212"). func New(baseURL string) *Client { return &Client{ baseURL: strings.TrimRight(baseURL, "/"), hc: &http.Client{Timeout: 30 * time.Second}, } } // RegisterRequest mirrors catalogd's POST /catalog/register body. type RegisterRequest struct { Name string `json:"name"` SchemaFingerprint string `json:"schema_fingerprint"` Objects []catalogd.Object `json:"objects"` RowCount *int64 `json:"row_count,omitempty"` } // RegisterResponse mirrors catalogd's 200/conflict response. type RegisterResponse struct { Manifest *catalogd.Manifest `json:"manifest"` Existing bool `json:"existing"` } // listResponse mirrors catalogd's GET /catalog/list shape: // // {"manifests": [...], "count": N} type listResponse struct { Manifests []*catalogd.Manifest `json:"manifests"` Count int `json:"count"` } // Register POSTs to /catalog/register. Returns ErrFingerprintConflict // on 409, the decoded response on 200, an error on anything else. func (c *Client) Register(ctx context.Context, req *RegisterRequest) (*RegisterResponse, error) { body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("marshal register: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/catalog/register", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("register req: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.ContentLength = int64(len(body)) resp, err := c.hc.Do(httpReq) if err != nil { return nil, fmt.Errorf("register do: %w", err) } defer drainAndClose(resp.Body) if resp.StatusCode == http.StatusConflict { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return nil, fmt.Errorf("%w: %s", ErrFingerprintConflict, string(preview)) } if resp.StatusCode != http.StatusOK { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return nil, fmt.Errorf("register status %d: %s", resp.StatusCode, string(preview)) } var out RegisterResponse if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return nil, fmt.Errorf("register decode: %w", err) } return &out, nil } // List GETs /catalog/list and returns every registered manifest. // queryd (D5+) uses this for view registration; future operator // tooling can use it for catalog inspection. // // Manifests come back sorted by name (catalogd's contract). The // response also carries a count, which is dropped here — callers // can take len(manifests) instead of paying for both. func (c *Client) List(ctx context.Context) ([]*catalogd.Manifest, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/catalog/list", nil) if err != nil { return nil, fmt.Errorf("list req: %w", err) } resp, err := c.hc.Do(httpReq) if err != nil { return nil, fmt.Errorf("list do: %w", err) } defer drainAndClose(resp.Body) if resp.StatusCode != http.StatusOK { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return nil, fmt.Errorf("list status %d: %s", resp.StatusCode, string(preview)) } var out listResponse if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return nil, fmt.Errorf("list decode: %w", err) } return out.Manifests, nil } // drainAndClose mirrors the catalogd store_client helper — drain a // bounded amount of body bytes before close so HTTP/1.1 keep-alive // pool reuse stays healthy on error paths. func drainAndClose(body io.ReadCloser) { _, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10)) _ = body.Close() }