diff --git a/cmd/ingestd/main.go b/cmd/ingestd/main.go index ce02ab3..11894d2 100644 --- a/cmd/ingestd/main.go +++ b/cmd/ingestd/main.go @@ -29,7 +29,7 @@ import ( ) const ( - maxIngestBytes = 256 << 20 // matches storaged PUT cap + defaultMaxIngestBytes = 256 << 20 // floor; configurable via [ingestd].max_ingest_bytes // Per scrum C-DRIFT (Opus WARN): parquet keys are content-addressed // by schema fingerprint — datasets//.parquet — so a // schema-drift ingest writes to a different key from the live one. @@ -53,10 +53,16 @@ func main() { os.Exit(1) } + maxBytes := cfg.Ingestd.MaxIngestBytes + if maxBytes <= 0 { + maxBytes = defaultMaxIngestBytes + } + h := &handlers{ storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"), catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL), hc: &http.Client{Timeout: 5 * time.Minute}, + maxBytes: maxBytes, } if err := shared.Run("ingestd", cfg.Ingestd.Bind, h.register); err != nil { @@ -69,6 +75,7 @@ type handlers struct { storagedURL string catalogd *catalogclient.Client hc *http.Client + maxBytes int64 } func (h *handlers) register(r chi.Router) { @@ -100,7 +107,7 @@ func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) { // be re-readable — the schema infer pre-reads N rows, and the // pqarrow writer needs all rows including those samples. We // take the simple path: the whole upload in memory, capped. - r.Body = http.MaxBytesReader(w, r.Body, maxIngestBytes) + r.Body = http.MaxBytesReader(w, r.Body, h.maxBytes) if err := r.ParseMultipartForm(64 << 20); err != nil { var maxErr *http.MaxBytesError if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") { diff --git a/docs/PHASE_G0_KICKOFF.md b/docs/PHASE_G0_KICKOFF.md index aa76166..e9b3ee9 100644 --- a/docs/PHASE_G0_KICKOFF.md +++ b/docs/PHASE_G0_KICKOFF.md @@ -1173,3 +1173,103 @@ Real architectural choices that proved out: Next: G1+. The substrate is now in place to build gRPC adapters, vector indices (Lance/HNSW), the Go MCP SDK port, distillation rebuild, and observer/Langfuse integration on top. + +--- + +## Real-scale validation (2026-04-29 post-G0) + +After G0 shipped, a half-day validation against the real production +dataset (`workers_500k.parquet` from the Rust system, 18 cols × +500,000 rows of staffing data with quoted-comma fields, multi-line +text, mixed numeric/categorical). The smokes use 5-row CSVs; this +exercises the substrate at production scale. + +### Method + +Converted `/home/profit/lakehouse/data/datasets/workers_500k.parquet` +→ CSV via DuckDB (`COPY (SELECT * FROM read_parquet) TO 'csv'`). +Drove the resulting CSV through the gateway's `/v1/ingest` route, +queried via `/v1/sql`. Captured wall time, parquet output size, +and ingestd peak RSS. + +### Results + +| Dataset | CSV size | Parquet | Ingest wall | ingestd peak RSS | +|---|---|---|---|---| +| 100K × 18 cols | 68 MiB | 14 MiB | 0.93s | 98 MiB | +| 500K × 18 cols | 344 MiB | 71 MiB | 3.12s | 209 MiB | + +| Query (500K rows) | Latency | +|---|---| +| `SELECT count(*)` | 24ms | +| `SELECT col, name FROM x ORDER BY id LIMIT 5` | 56ms | +| `SELECT state, count(*) GROUP BY state ORDER BY n DESC LIMIT 10` | 45ms | +| `SELECT round(avg(reliability), 4), min, max` | 47ms | +| `SELECT count(*) WHERE state='CA' AND reliability > 0.8` | 34ms | +| `SELECT column_name, data_type FROM duckdb_columns()` | 25ms | + +Memory at rest after ingest: +- storaged: 55 MiB +- catalogd: 28 MiB +- ingestd: 209 MiB (held the post-ingest heap; not released) +- gateway: 10 MiB +- queryd: 112 MiB + +### Findings + +**Finding #1 — ingestd's hardcoded cap is the binding constraint +for big CSVs.** A 500K-row staffing CSV is 344 MiB. The previous +hardcoded `maxIngestBytes = 256 << 20` rejected it with 413 in +360ms — the cap fired correctly, no OOM, server stayed alive, but +the dataset couldn't ship. **Fixed**: extracted to +`[ingestd].max_ingest_bytes` config field with a 256 MiB default. +Operators can bump for known-large workloads (validated at 512 MiB +for the 500K dataset). The cap is on the multipart upload body, +not on the resulting Parquet (which compresses ~5× via Snappy and +is well under storaged's 256 MiB PUT cap). + +**Finding #2 — ingestd doesn't release memory between ingests.** +Peak RSS stayed at 209 MiB after the 500K ingest finished, even +though the CSV bytes + Arrow builders were eligible for GC. Go's +runtime is conservative about returning heap memory to the OS. +For a long-running daemon this is fine (next ingest reuses the +heap); for a short-lived ingest CLI tool it would matter. +**Deferred** — operational note, not a correctness bug. + +**Finding #3 — DuckDB-via-httpfs latency is healthy at 500K.** +GROUP BY across 500K rows in 45ms. Sub-linear scaling vs 100K +(36ms → 45ms for 5× rows). The `read_parquet('s3://...')` path +through MinIO is not a bottleneck at this scale. + +**Finding #4 — Real-data type inference works.** ADR-010's +default-to-string-on-ambiguity correctly typed `worker_id` as +BIGINT, numeric scores as DOUBLE, multi-line `resume_text` as +VARCHAR. No silent type errors. The 1000-row sample was sufficient. + +**Finding #5 — Multipart parsing handles complex CSVs.** The real +data has quoted-comma fields (skills: `"bilingual, cold storage, +hazmat"`) and multi-line text inside quotes (`resume_text` with +embedded newlines). Go's `encoding/csv` defaults handle RFC 4180 +correctly without `LazyQuotes` — confirming the D4 scrum dismissal +of Qwen's BLOCK on this point. + +### Decisions + +- **Configurable `max_ingest_bytes`**: applied. Default 256 MiB, + override via `[ingestd].max_ingest_bytes` in TOML. +- **Streaming-spool multipart parser**: deferred. The current + in-memory ParseMultipartForm with the 64 MiB threshold spills + larger bodies to /tmp automatically, which is good enough for + production workloads up to ~1 GiB CSV. True streaming Parquet + generation (read CSV row-by-row from the multipart stream + without ever holding the full body) is a G2+ refinement. +- **Memory release**: deferred. `runtime/debug.FreeOSMemory()` + after each ingest would aggressively release; not worth the + complexity for a long-running daemon. + +### Net assessment + +G0's substrate handles real production-scale data with one config +knob bumped. No correctness issues, no OOMs, no silent type +errors. Query latency is fast enough for ad-hoc analytics. +The substrate is ready for G1+ work to build on top. diff --git a/internal/shared/config.go b/internal/shared/config.go index 8d58472..6632575 100644 --- a/internal/shared/config.go +++ b/internal/shared/config.go @@ -31,10 +31,18 @@ type Config struct { // IngestConfig adds ingestd-specific knobs. ingestd needs to PUT // parquet to storaged AND register manifests with catalogd, so it // holds two upstream URLs in addition to its own bind. +// +// MaxIngestBytes caps the multipart body size. CSVs are typically +// 4-6× larger than the resulting Snappy-compressed Parquet, so 256 +// MiB CSV → ~50 MiB Parquet — well under storaged's 256 MiB PUT +// cap. Real-scale validation (2026-04-29) showed 500K workers × +// 18 cols = 344 MiB CSV → 71 MiB Parquet; bumping this knob to +// 512 MiB is the documented path for that workload. type IngestConfig struct { - Bind string `toml:"bind"` - StoragedURL string `toml:"storaged_url"` - CatalogdURL string `toml:"catalogd_url"` + Bind string `toml:"bind"` + StoragedURL string `toml:"storaged_url"` + CatalogdURL string `toml:"catalogd_url"` + MaxIngestBytes int64 `toml:"max_ingest_bytes"` } // GatewayConfig adds the upstream URLs the reverse proxy fronts. @@ -106,9 +114,10 @@ func DefaultConfig() Config { Storaged: ServiceConfig{Bind: "127.0.0.1:3211"}, Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"}, Ingestd: IngestConfig{ - Bind: "127.0.0.1:3213", - StoragedURL: "http://127.0.0.1:3211", - CatalogdURL: "http://127.0.0.1:3212", + Bind: "127.0.0.1:3213", + StoragedURL: "http://127.0.0.1:3211", + CatalogdURL: "http://127.0.0.1:3212", + MaxIngestBytes: 256 << 20, // 256 MiB; bump per deployment via lakehouse.toml }, Queryd: QuerydConfig{ Bind: "127.0.0.1:3214", diff --git a/lakehouse.toml b/lakehouse.toml index 4c82bbe..fa0aea8 100644 --- a/lakehouse.toml +++ b/lakehouse.toml @@ -22,6 +22,10 @@ storaged_url = "http://127.0.0.1:3211" bind = "127.0.0.1:3213" storaged_url = "http://127.0.0.1:3211" catalogd_url = "http://127.0.0.1:3212" +# CSV uploads are ~4-6× the resulting Parquet. 256 MiB cap keeps the in-memory +# parse + Arrow + Parquet output footprint bounded. Bump for known large +# datasets (e.g. workers_500k → 344 MiB CSV needs 512 MiB). +max_ingest_bytes = 268435456 [queryd] bind = "127.0.0.1:3214"