// store_client.go — thin HTTP client to storaged. catalogd needs to // PUT manifest Parquets, GET them on startup, and LIST the manifests // directory. Staying inside an HTTP boundary (rather than reaching // into storaged's package directly) preserves the service-boundary // shape that gRPC will eventually formalize at G1+. package catalogd import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strings" "time" ) // StoreClient talks HTTP to the storaged service. type StoreClient struct { baseURL string hc *http.Client } // listResponse mirrors storaged's GET /storage/list shape: // // {"prefix":"_catalog/manifests/","objects":[{Key,Size,ETag,LastModified}, ...]} type listResponse struct { Prefix string `json:"prefix"` Objects []struct { Key string `json:"Key"` Size int64 `json:"Size"` } `json:"objects"` } // NewStoreClient builds a client against the given storaged base URL // (e.g. "http://127.0.0.1:3211"). Timeout covers manifest read-write // only; rehydration of many manifests at startup runs sequentially // and each call gets its own timeout window. func NewStoreClient(baseURL string) *StoreClient { return &StoreClient{ baseURL: strings.TrimRight(baseURL, "/"), hc: &http.Client{Timeout: 30 * time.Second}, } } // Put writes raw bytes at key. body is the encoded Parquet manifest. func (c *StoreClient) Put(ctx context.Context, key string, body []byte) error { u := c.baseURL + "/storage/put/" + safeKey(key) req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body)) if err != nil { return fmt.Errorf("put req: %w", err) } req.ContentLength = int64(len(body)) resp, err := c.hc.Do(req) if err != nil { return fmt.Errorf("put do: %w", err) } defer drainAndClose(resp.Body) if resp.StatusCode != http.StatusOK { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview)) } return nil } // Get reads the bytes at key. ErrKeyNotFound on 404. func (c *StoreClient) Get(ctx context.Context, key string) ([]byte, error) { u := c.baseURL + "/storage/get/" + safeKey(key) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { return nil, fmt.Errorf("get req: %w", err) } resp, err := c.hc.Do(req) if err != nil { return nil, fmt.Errorf("get do: %w", err) } defer drainAndClose(resp.Body) if resp.StatusCode == http.StatusNotFound { return nil, ErrKeyNotFound } if resp.StatusCode != http.StatusOK { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return nil, fmt.Errorf("get %s: status %d: %s", key, resp.StatusCode, string(preview)) } return io.ReadAll(resp.Body) } // List returns the keys under prefix. Object metadata beyond Key is // ignored — catalogd only needs the keys to drive rehydration. func (c *StoreClient) List(ctx context.Context, prefix string) ([]string, error) { u := c.baseURL + "/storage/list?prefix=" + url.QueryEscape(prefix) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { return nil, fmt.Errorf("list req: %w", err) } resp, err := c.hc.Do(req) if err != nil { return nil, fmt.Errorf("list do: %w", err) } defer drainAndClose(resp.Body) if resp.StatusCode != http.StatusOK { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return nil, fmt.Errorf("list %s: status %d: %s", prefix, resp.StatusCode, string(preview)) } var lr listResponse if err := json.NewDecoder(resp.Body).Decode(&lr); err != nil { return nil, fmt.Errorf("list decode: %w", err) } out := make([]string, 0, len(lr.Objects)) for _, o := range lr.Objects { out = append(out, o.Key) } return out, nil } // ErrKeyNotFound mirrors storaged's not-found semantics on the catalogd // side without exposing storaged's package types. var ErrKeyNotFound = fmt.Errorf("catalogd store: key not found") // safeKey URL-escapes path segments while preserving "/". storaged's // chi `/storage//*` routes accept literal slashes in the // wildcard match, so we only escape the segments, not the separators. func safeKey(key string) string { parts := strings.Split(key, "/") for i, p := range parts { parts[i] = url.PathEscape(p) } return strings.Join(parts, "/") } // drainAndClose reads any remaining body bytes (capped at 64 KiB) and // closes the body. Per scrum S6 (Qwen): preview-then-close on error // paths leaves bytes in the kernel buffer, breaking HTTP/1.1 keep- // alive reuse and slowly leaking sockets. func drainAndClose(body io.ReadCloser) { _, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10)) _ = body.Close() }