// ingestd is the data on-ramp: CSV in (multipart form), Parquet out // (to storaged), manifest registered (with catalogd). One round trip // of HTTP, two service hops. The interesting glue lives in // internal/ingestd/{schema,csv}.go and internal/catalogclient/; // main.go just wires the route and threads the upstream URLs through. package main import ( "bytes" "context" "encoding/json" "errors" "flag" "fmt" "io" "log/slog" "net/http" "net/url" "os" "strings" "time" "github.com/go-chi/chi/v5" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogclient" "git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd" "git.agentview.dev/profit/golangLAKEHOUSE/internal/shared" ) const ( defaultMaxIngestBytes = 256 << 20 // floor; configurable via [ingestd].max_ingest_bytes // Per scrum C-DRIFT (Opus WARN): parquet keys are content-addressed // by schema fingerprint — datasets//.parquet — so a // schema-drift ingest writes to a different key from the live one. // catalogd's 409 fires AFTER the PUT but the live parquet is // untouched (the new file is an orphan we GC in G2). Same-fingerprint // re-ingest still overwrites the same key, idempotent. parquetKeyPath = "datasets/%s/%s.parquet" ) func main() { configPath := flag.String("config", "lakehouse.toml", "path to TOML config") flag.Parse() cfg, err := shared.LoadConfig(*configPath) if err != nil { slog.Error("config", "err", err) os.Exit(1) } if cfg.Ingestd.StoragedURL == "" || cfg.Ingestd.CatalogdURL == "" { slog.Error("config", "err", "ingestd.storaged_url and ingestd.catalogd_url are required") os.Exit(1) } maxBytes := cfg.Ingestd.MaxIngestBytes if maxBytes <= 0 { maxBytes = defaultMaxIngestBytes } h := &handlers{ storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"), catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL), hc: &http.Client{Timeout: 5 * time.Minute}, maxBytes: maxBytes, } if err := shared.Run("ingestd", cfg.Ingestd.Bind, h.register); err != nil { slog.Error("server", "err", err) os.Exit(1) } } type handlers struct { storagedURL string catalogd *catalogclient.Client hc *http.Client maxBytes int64 } func (h *handlers) register(r chi.Router) { r.Post("/ingest", h.handleIngest) } // ingestResponse is the 200 body shape — manifest as registered by // catalogd plus a few stats from the parquet write. Existing flips // true on idempotent re-ingest of the same name+fingerprint. type ingestResponse struct { Manifest *catalogd.Manifest `json:"manifest"` Existing bool `json:"existing"` RowCount int64 `json:"row_count"` ParquetSize int64 `json:"parquet_size"` ParquetKey string `json:"parquet_key"` } func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() name := r.URL.Query().Get("name") if name == "" { http.Error(w, "name query param required", http.StatusBadRequest) return } // Multipart parse with a body cap that matches storaged's. We // don't stream straight from the form because the CSV needs to // be re-readable — the schema infer pre-reads N rows, and the // pqarrow writer needs all rows including those samples. We // take the simple path: the whole upload in memory, capped. r.Body = http.MaxBytesReader(w, r.Body, h.maxBytes) if err := r.ParseMultipartForm(64 << 20); err != nil { var maxErr *http.MaxBytesError if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { http.Error(w, "payload too large", http.StatusRequestEntityTooLarge) return } http.Error(w, "parse multipart: "+err.Error(), http.StatusBadRequest) return } file, _, err := r.FormFile("file") if err != nil { http.Error(w, "form file: "+err.Error(), http.StatusBadRequest) return } defer file.Close() // 1. CSV → schema → Parquet res, err := ingestd.IngestCSV(file, 0, 0) if err != nil { slog.Error("ingest csv", "name", name, "err", err) http.Error(w, "ingest csv: "+err.Error(), http.StatusBadRequest) return } // 2. PUT Parquet to storaged at a content-addressed path. fingerprint := res.Schema.Fingerprint() fpHex := strings.TrimPrefix(fingerprint, "sha256:") parquetKey := fmt.Sprintf(parquetKeyPath, name, fpHex) if err := h.putToStoraged(r.Context(), parquetKey, res.Parquet); err != nil { slog.Error("put parquet", "key", parquetKey, "err", err) http.Error(w, "put parquet: "+err.Error(), http.StatusBadGateway) return } // 3. Register with catalogd rc := res.RowCount regResp, err := h.catalogd.Register(r.Context(), &catalogclient.RegisterRequest{ Name: name, SchemaFingerprint: fingerprint, Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}}, RowCount: &rc, }) if errors.Is(err, catalogclient.ErrFingerprintConflict) { http.Error(w, err.Error(), http.StatusConflict) return } if err != nil { slog.Error("catalog register", "name", name, "err", err) http.Error(w, "catalog register: "+err.Error(), http.StatusBadGateway) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(ingestResponse{ Manifest: regResp.Manifest, Existing: regResp.Existing, RowCount: res.RowCount, ParquetSize: int64(len(res.Parquet)), ParquetKey: parquetKey, }) } // putToStoraged streams the parquet bytes to storaged at /storage/put/. // Mirrors the catalogd store_client shape — drain on error for keep-alive // hygiene. Uses ContentLength up-front so storaged's 413 path fires // deterministically on oversize. func (h *handlers) putToStoraged(ctx context.Context, key string, body []byte) error { u := h.storagedURL + "/storage/put/" + escapeKeyPath(key) req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body)) if err != nil { return err } req.ContentLength = int64(len(body)) resp, err := h.hc.Do(req) if err != nil { return err } defer drainAndClose(resp.Body) if resp.StatusCode != http.StatusOK { preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256)) return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview)) } return nil } func escapeKeyPath(key string) string { parts := strings.Split(key, "/") for i, p := range parts { parts[i] = url.PathEscape(p) } return strings.Join(parts, "/") } func drainAndClose(body io.ReadCloser) { _, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10)) _ = body.Close() }