root fa56134b90 ADR-003 wiring: Bearer token + IP allowlist middleware
Implements the auth posture from ADR-003 (commit 0d18ffa). Two
independent layers — Bearer token (constant-time compare via
crypto/subtle) and IP allowlist (CIDR set) — composed in shared.Run
so every binary inherits the same gate without per-binary wiring.

Together with the bind-gate from commit 6af0520, this mechanically
closes audit risks R-001 + R-007:
  - non-loopback bind without auth.token = startup refuse
  - non-loopback bind WITH auth.token + override env = allowed
  - loopback bind = all gates open (G0 dev unchanged)

internal/shared/auth.go (NEW)
  RequireAuth(cfg AuthConfig) returns chi-compatible middleware.
  Empty Token + empty AllowedIPs → pass-through (G0 dev mode).
  Token-only → 401 Bearer mismatch.
  AllowedIPs-only → 403 source IP not in CIDR set.
  Both → both gates apply.
  /health bypasses both layers (load-balancer / liveness probes
  shouldn't carry tokens).

  CIDR parsing pre-runs at boot; bare IP (no /N) treated as /32 (or
  /128 for IPv6). Invalid entries log warn and drop, fail-loud-but-
  not-fatal so a typo doesn't kill the binary.

  Token comparison: subtle.ConstantTimeCompare on the full
  "Bearer <token>" wire-format string. Length-mismatch returns 0
  (per stdlib spec), so wrong-length tokens reject without timing
  leak. Pre-encoded comparison slice stored in the middleware
  closure — one allocation per request.

  Source-IP extraction prefers net.SplitHostPort fallback to
  RemoteAddr-as-is for httptest compatibility. X-Forwarded-For
  support is a follow-up when a trusted proxy fronts the gateway
  (config knob TBD per ADR-003 §"Future").

internal/shared/server.go
  Run signature: gained AuthConfig parameter (4th arg).
  /health stays mounted on the outer router (public).
  Registered routes go inside chi.Group with RequireAuth applied —
  empty config = transparent group.
  Added requireAuthOnNonLoopback startup check: non-loopback bind
  with empty Token = refuse to start (cites R-001 + R-007 by name).

internal/shared/config.go
  AuthConfig type added with TOML tags. Fields: Token, AllowedIPs.
  Composed into Config under [auth].

cmd/<svc>/main.go × 7 (catalogd, embedd, gateway, ingestd, queryd,
storaged, vectord, mcpd is unaffected — stdio doesn't bind a port)
  Each call site adds cfg.Auth as the 4th arg to shared.Run. No
  other changes — middleware applies via shared.Run uniformly.

internal/shared/auth_test.go (12 test funcs)
  Empty config pass-through, missing-token 401, wrong-token 401,
  correct-token 200, raw-token-without-Bearer-prefix 401, /health
  always public, IP allowlist allow + reject, bare IP /32, both
  layers when both configured, invalid CIDR drop-with-warn, RemoteAddr
  shape extraction. The constant-time comparison is verified by
  inspection (comments in auth.go) plus the existence of the
  passthrough test (length-mismatch case).

Verified:
  go test -count=1 ./internal/shared/  — all green (was 21, now 33 funcs)
  just verify                            — vet + test + 9 smokes 33s
  just proof contract                    — 53/0/1 unchanged

Smokes + proof harness keep working without any token configuration:
default Auth is empty struct → middleware is no-op → existing tests
pass unchanged. To exercise the gate, operators set [auth].token in
lakehouse.toml (or, per the "future" note in the ADR, via env var).

Closes audit findings:
  R-001 HIGH — fully mechanically closed (was: partial via bind gate)
  R-007 MED  — fully mechanically closed (was: design-only ADR-003)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 07:11:34 -05:00

208 lines
6.5 KiB
Go

// ingestd is the data on-ramp: CSV in (multipart form), Parquet out
// (to storaged), manifest registered (with catalogd). One round trip
// of HTTP, two service hops. The interesting glue lives in
// internal/ingestd/{schema,csv}.go and internal/catalogclient/;
// main.go just wires the route and threads the upstream URLs through.
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/go-chi/chi/v5"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd"
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
)
const (
defaultMaxIngestBytes = 256 << 20 // floor; configurable via [ingestd].max_ingest_bytes
// Per scrum C-DRIFT (Opus WARN): parquet keys are content-addressed
// by schema fingerprint — datasets/<name>/<fp_hex>.parquet — so a
// schema-drift ingest writes to a different key from the live one.
// catalogd's 409 fires AFTER the PUT but the live parquet is
// untouched (the new file is an orphan we GC in G2). Same-fingerprint
// re-ingest still overwrites the same key, idempotent.
parquetKeyPath = "datasets/%s/%s.parquet"
)
func main() {
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
flag.Parse()
cfg, err := shared.LoadConfig(*configPath)
if err != nil {
slog.Error("config", "err", err)
os.Exit(1)
}
if cfg.Ingestd.StoragedURL == "" || cfg.Ingestd.CatalogdURL == "" {
slog.Error("config", "err", "ingestd.storaged_url and ingestd.catalogd_url are required")
os.Exit(1)
}
maxBytes := cfg.Ingestd.MaxIngestBytes
if maxBytes <= 0 {
maxBytes = defaultMaxIngestBytes
}
h := &handlers{
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL),
hc: &http.Client{Timeout: 5 * time.Minute},
maxBytes: maxBytes,
}
if err := shared.Run("ingestd", cfg.Ingestd.Bind, h.register, cfg.Auth); err != nil {
slog.Error("server", "err", err)
os.Exit(1)
}
}
type handlers struct {
storagedURL string
catalogd *catalogclient.Client
hc *http.Client
maxBytes int64
}
func (h *handlers) register(r chi.Router) {
r.Post("/ingest", h.handleIngest)
}
// ingestResponse is the 200 body shape — manifest as registered by
// catalogd plus a few stats from the parquet write. Existing flips
// true on idempotent re-ingest of the same name+fingerprint.
type ingestResponse struct {
Manifest *catalogd.Manifest `json:"manifest"`
Existing bool `json:"existing"`
RowCount int64 `json:"row_count"`
ParquetSize int64 `json:"parquet_size"`
ParquetKey string `json:"parquet_key"`
}
func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
name := r.URL.Query().Get("name")
if name == "" {
http.Error(w, "name query param required", http.StatusBadRequest)
return
}
// Multipart parse with a body cap that matches storaged's. We
// don't stream straight from the form because the CSV needs to
// be re-readable — the schema infer pre-reads N rows, and the
// pqarrow writer needs all rows including those samples. We
// take the simple path: the whole upload in memory, capped.
r.Body = http.MaxBytesReader(w, r.Body, h.maxBytes)
if err := r.ParseMultipartForm(64 << 20); err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
return
}
http.Error(w, "parse multipart: "+err.Error(), http.StatusBadRequest)
return
}
file, _, err := r.FormFile("file")
if err != nil {
http.Error(w, "form file: "+err.Error(), http.StatusBadRequest)
return
}
defer file.Close()
// 1. CSV → schema → Parquet
res, err := ingestd.IngestCSV(file, 0, 0)
if err != nil {
slog.Error("ingest csv", "name", name, "err", err)
http.Error(w, "ingest csv: "+err.Error(), http.StatusBadRequest)
return
}
// 2. PUT Parquet to storaged at a content-addressed path.
fingerprint := res.Schema.Fingerprint()
fpHex := strings.TrimPrefix(fingerprint, "sha256:")
parquetKey := fmt.Sprintf(parquetKeyPath, name, fpHex)
if err := h.putToStoraged(r.Context(), parquetKey, res.Parquet); err != nil {
slog.Error("put parquet", "key", parquetKey, "err", err)
http.Error(w, "put parquet: "+err.Error(), http.StatusBadGateway)
return
}
// 3. Register with catalogd
rc := res.RowCount
regResp, err := h.catalogd.Register(r.Context(), &catalogclient.RegisterRequest{
Name: name,
SchemaFingerprint: fingerprint,
Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}},
RowCount: &rc,
})
if errors.Is(err, catalogclient.ErrFingerprintConflict) {
http.Error(w, err.Error(), http.StatusConflict)
return
}
if err != nil {
slog.Error("catalog register", "name", name, "err", err)
http.Error(w, "catalog register: "+err.Error(), http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(ingestResponse{
Manifest: regResp.Manifest,
Existing: regResp.Existing,
RowCount: res.RowCount,
ParquetSize: int64(len(res.Parquet)),
ParquetKey: parquetKey,
})
}
// putToStoraged streams the parquet bytes to storaged at /storage/put/<key>.
// Mirrors the catalogd store_client shape — drain on error for keep-alive
// hygiene. Uses ContentLength up-front so storaged's 413 path fires
// deterministically on oversize.
func (h *handlers) putToStoraged(ctx context.Context, key string, body []byte) error {
u := h.storagedURL + "/storage/put/" + escapeKeyPath(key)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body))
if err != nil {
return err
}
req.ContentLength = int64(len(body))
resp, err := h.hc.Do(req)
if err != nil {
return err
}
defer drainAndClose(resp.Body)
if resp.StatusCode != http.StatusOK {
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview))
}
return nil
}
func escapeKeyPath(key string) string {
parts := strings.Split(key, "/")
for i, p := range parts {
parts[i] = url.PathEscape(p)
}
return strings.Join(parts, "/")
}
func drainAndClose(body io.ReadCloser) {
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
_ = body.Close()
}