Phase G0 Day 4 ships ingestd: multipart CSV upload, Arrow schema
inference per ADR-010 (default-to-string on ambiguity), single-pass
streaming CSV → Parquet via pqarrow batched writer (Snappy compressed,
8192 rows per batch), PUT to storaged at content-addressed key
datasets/<name>/<fp_hex>.parquet, register manifest with catalogd.
Acceptance smoke 6/6 PASS including idempotent re-ingest (proves
inference is deterministic — same CSV always produces same fingerprint)
and schema-drift → 409 (proves catalogd's gate fires on ingest traffic).
Schema fingerprint is SHA-256 over (name, type) tuples in header order
using ASCII record/unit separators (0x1e/0x1f) so column names with
commas can't collide. Nullability intentionally NOT in the fingerprint
— a column gaining nulls isn't a schema change.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 4 WARN + 3 INFO (after 2 self-retracted BLOCKs)
- Kimi K2-0905 (openrouter): 1 BLOCK + 2 WARN + 1 INFO
- Qwen3-coder (openrouter): 2 BLOCK + 2 WARN + 2 INFO
Fixed (2, both Opus single-reviewer):
C-DRIFT: PUT-then-register on fixed datasets/<name>/data.parquet
meant a schema-drift ingest overwrote the live parquet BEFORE
catalogd's 409 fired → storaged inconsistent with manifest.
Fix: content-addressed key datasets/<name>/<fp_hex>.parquet.
Drift writes to a different file (orphan in G2 GC scope); the
live data is never corrupted.
C-WCLOSE: pqarrow.NewFileWriter not Closed on error paths leaks
buffered column data + OS resources per failed ingest.
Fix: deferred guarded close with wClosed flag.
Dismissed (5, all false positives):
Qwen BLOCK "csv.Reader needs LazyQuotes=true for multi-line" — false,
Go csv handles RFC 4180 multi-line quoted fields by default
Qwen BLOCK "row[i] OOB" — already bounds-checked at schema.go:73
and csv.go:201
Kimi BLOCK "type assertion panic if pqarrow reorders fields" —
speculative, no real path
Kimi WARN + Qwen WARN×2 "RecordBuilder leak on early error" —
false convergent. Outer defer rb.Release() captures the current
builder; in-loop release runs before reassignment. No leak.
Deferred (6 INFO + accepted-with-rationale on 3 WARN): sample
boundary type mismatch (G0 cap bounds peak), string-match
paranoia on http.MaxBytesError, multipart double-buffer (G2 spool-
to-disk), separator validation, body close ordering, etc.
The D4 scrum produced fewer real findings than D3 (2 vs 6) — both
were architectural hazards smoke wouldn't catch because the smoke's
"schema drift → 409" assertion was passing even in the corrupted-
state world. The 409 fires correctly; what was wrong was the PUT
having already mutated the live parquet before the validation check.
Opus's PUT-then-register read of the order is exactly the kind of
architectural insight the cross-lineage scrum is designed to surface.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
200 lines
6.2 KiB
Go
200 lines
6.2 KiB
Go
// ingestd is the data on-ramp: CSV in (multipart form), Parquet out
|
|
// (to storaged), manifest registered (with catalogd). One round trip
|
|
// of HTTP, two service hops. The interesting glue lives in
|
|
// internal/ingestd/{schema,csv,catalog_client}.go; main.go just wires
|
|
// the route and threads the upstream URLs through.
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/catalogd"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/ingestd"
|
|
"git.agentview.dev/profit/golangLAKEHOUSE/internal/shared"
|
|
)
|
|
|
|
const (
|
|
maxIngestBytes = 256 << 20 // matches storaged PUT cap
|
|
// Per scrum C-DRIFT (Opus WARN): parquet keys are content-addressed
|
|
// by schema fingerprint — datasets/<name>/<fp_hex>.parquet — so a
|
|
// schema-drift ingest writes to a different key from the live one.
|
|
// catalogd's 409 fires AFTER the PUT but the live parquet is
|
|
// untouched (the new file is an orphan we GC in G2). Same-fingerprint
|
|
// re-ingest still overwrites the same key, idempotent.
|
|
parquetKeyPath = "datasets/%s/%s.parquet"
|
|
)
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "lakehouse.toml", "path to TOML config")
|
|
flag.Parse()
|
|
|
|
cfg, err := shared.LoadConfig(*configPath)
|
|
if err != nil {
|
|
slog.Error("config", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
if cfg.Ingestd.StoragedURL == "" || cfg.Ingestd.CatalogdURL == "" {
|
|
slog.Error("config", "err", "ingestd.storaged_url and ingestd.catalogd_url are required")
|
|
os.Exit(1)
|
|
}
|
|
|
|
h := &handlers{
|
|
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
|
|
catalogd: ingestd.NewCatalogClient(cfg.Ingestd.CatalogdURL),
|
|
hc: &http.Client{Timeout: 5 * time.Minute},
|
|
}
|
|
|
|
if err := shared.Run("ingestd", cfg.Ingestd.Bind, h.register); err != nil {
|
|
slog.Error("server", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
type handlers struct {
|
|
storagedURL string
|
|
catalogd *ingestd.CatalogClient
|
|
hc *http.Client
|
|
}
|
|
|
|
func (h *handlers) register(r chi.Router) {
|
|
r.Post("/ingest", h.handleIngest)
|
|
}
|
|
|
|
// ingestResponse is the 200 body shape — manifest as registered by
|
|
// catalogd plus a few stats from the parquet write. Existing flips
|
|
// true on idempotent re-ingest of the same name+fingerprint.
|
|
type ingestResponse struct {
|
|
Manifest *catalogd.Manifest `json:"manifest"`
|
|
Existing bool `json:"existing"`
|
|
RowCount int64 `json:"row_count"`
|
|
ParquetSize int64 `json:"parquet_size"`
|
|
ParquetKey string `json:"parquet_key"`
|
|
}
|
|
|
|
func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) {
|
|
defer r.Body.Close()
|
|
|
|
name := r.URL.Query().Get("name")
|
|
if name == "" {
|
|
http.Error(w, "name query param required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Multipart parse with a body cap that matches storaged's. We
|
|
// don't stream straight from the form because the CSV needs to
|
|
// be re-readable — the schema infer pre-reads N rows, and the
|
|
// pqarrow writer needs all rows including those samples. We
|
|
// take the simple path: the whole upload in memory, capped.
|
|
r.Body = http.MaxBytesReader(w, r.Body, maxIngestBytes)
|
|
if err := r.ParseMultipartForm(64 << 20); err != nil {
|
|
var maxErr *http.MaxBytesError
|
|
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
|
http.Error(w, "payload too large", http.StatusRequestEntityTooLarge)
|
|
return
|
|
}
|
|
http.Error(w, "parse multipart: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
file, _, err := r.FormFile("file")
|
|
if err != nil {
|
|
http.Error(w, "form file: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
// 1. CSV → schema → Parquet
|
|
res, err := ingestd.IngestCSV(file, 0, 0)
|
|
if err != nil {
|
|
slog.Error("ingest csv", "name", name, "err", err)
|
|
http.Error(w, "ingest csv: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 2. PUT Parquet to storaged at a content-addressed path.
|
|
fingerprint := res.Schema.Fingerprint()
|
|
fpHex := strings.TrimPrefix(fingerprint, "sha256:")
|
|
parquetKey := fmt.Sprintf(parquetKeyPath, name, fpHex)
|
|
if err := h.putToStoraged(r.Context(), parquetKey, res.Parquet); err != nil {
|
|
slog.Error("put parquet", "key", parquetKey, "err", err)
|
|
http.Error(w, "put parquet: "+err.Error(), http.StatusBadGateway)
|
|
return
|
|
}
|
|
|
|
// 3. Register with catalogd
|
|
rc := res.RowCount
|
|
regResp, err := h.catalogd.Register(r.Context(), &ingestd.RegisterRequest{
|
|
Name: name,
|
|
SchemaFingerprint: fingerprint,
|
|
Objects: []catalogd.Object{{Key: parquetKey, Size: int64(len(res.Parquet))}},
|
|
RowCount: &rc,
|
|
})
|
|
if errors.Is(err, ingestd.ErrFingerprintConflict) {
|
|
http.Error(w, err.Error(), http.StatusConflict)
|
|
return
|
|
}
|
|
if err != nil {
|
|
slog.Error("catalog register", "name", name, "err", err)
|
|
http.Error(w, "catalog register: "+err.Error(), http.StatusBadGateway)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(ingestResponse{
|
|
Manifest: regResp.Manifest,
|
|
Existing: regResp.Existing,
|
|
RowCount: res.RowCount,
|
|
ParquetSize: int64(len(res.Parquet)),
|
|
ParquetKey: parquetKey,
|
|
})
|
|
}
|
|
|
|
// putToStoraged streams the parquet bytes to storaged at /storage/put/<key>.
|
|
// Mirrors the catalogd store_client shape — drain on error for keep-alive
|
|
// hygiene. Uses ContentLength up-front so storaged's 413 path fires
|
|
// deterministically on oversize.
|
|
func (h *handlers) putToStoraged(ctx context.Context, key string, body []byte) error {
|
|
u := h.storagedURL + "/storage/put/" + escapeKeyPath(key)
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u, bytes.NewReader(body))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.ContentLength = int64(len(body))
|
|
resp, err := h.hc.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer drainAndClose(resp.Body)
|
|
if resp.StatusCode != http.StatusOK {
|
|
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
|
|
return fmt.Errorf("put %s: status %d: %s", key, resp.StatusCode, string(preview))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func escapeKeyPath(key string) string {
|
|
parts := strings.Split(key, "/")
|
|
for i, p := range parts {
|
|
parts[i] = url.PathEscape(p)
|
|
}
|
|
return strings.Join(parts, "/")
|
|
}
|
|
|
|
func drainAndClose(body io.ReadCloser) {
|
|
_, _ = io.Copy(io.Discard, io.LimitReader(body, 64<<10))
|
|
_ = body.Close()
|
|
}
|