Real-scale validation post-G0: configurable ingest cap + workers_500k metrics
Validated G0 substrate against the production workers_500k.parquet dataset (18 cols × 500,000 rows). Findings + one applied fix: Finding #1 (FIXED): ingestd's hardcoded 256 MiB cap rejected the 500K CSV (344 MiB) with 413. Cap fired correctly, no OOM. Extracted to [ingestd].max_ingest_bytes config field; default 256 MiB, override per deployment for known-large workloads. With cap bumped to 512 MiB, 500K ingest succeeds in 3.12s with ingestd peak RSS 209 MiB. Finding #2 (deferred): ingestd doesn't release memory between ingests. Go runtime conservative; long-running daemon, fine. Finding #3: DuckDB-via-httpfs is healthy at 500K. GROUP BY 45ms, count(*) 24ms, AVG 47ms, schema introspection 25ms. Sub-linear scaling vs 100K — the s3:// read path is not a bottleneck. Finding #4: ADR-010 type inference correctly handled real staffing data. worker_id → BIGINT, numeric scores → DOUBLE, multi-line resume_text → VARCHAR. 1000-row sample sufficient. Finding #5: Go's encoding/csv handles RFC 4180 quoted-comma fields and multi-line quoted text without LazyQuotes — confirming the D4 scrum's dismissal of Qwen's BLOCK on this point. Net: substrate handles production-scale data with one config knob. No correctness issues, no OOMs, no silent type errors. All 6 G0 smokes still PASS after the cap-config change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b1d52306ad
commit
d023b07b30
@ -29,7 +29,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxIngestBytes = 256 << 20 // matches storaged PUT cap
|
defaultMaxIngestBytes = 256 << 20 // floor; configurable via [ingestd].max_ingest_bytes
|
||||||
// Per scrum C-DRIFT (Opus WARN): parquet keys are content-addressed
|
// Per scrum C-DRIFT (Opus WARN): parquet keys are content-addressed
|
||||||
// by schema fingerprint — datasets/<name>/<fp_hex>.parquet — so a
|
// by schema fingerprint — datasets/<name>/<fp_hex>.parquet — so a
|
||||||
// schema-drift ingest writes to a different key from the live one.
|
// schema-drift ingest writes to a different key from the live one.
|
||||||
@ -53,10 +53,16 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maxBytes := cfg.Ingestd.MaxIngestBytes
|
||||||
|
if maxBytes <= 0 {
|
||||||
|
maxBytes = defaultMaxIngestBytes
|
||||||
|
}
|
||||||
|
|
||||||
h := &handlers{
|
h := &handlers{
|
||||||
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
|
storagedURL: strings.TrimRight(cfg.Ingestd.StoragedURL, "/"),
|
||||||
catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL),
|
catalogd: catalogclient.New(cfg.Ingestd.CatalogdURL),
|
||||||
hc: &http.Client{Timeout: 5 * time.Minute},
|
hc: &http.Client{Timeout: 5 * time.Minute},
|
||||||
|
maxBytes: maxBytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := shared.Run("ingestd", cfg.Ingestd.Bind, h.register); err != nil {
|
if err := shared.Run("ingestd", cfg.Ingestd.Bind, h.register); err != nil {
|
||||||
@ -69,6 +75,7 @@ type handlers struct {
|
|||||||
storagedURL string
|
storagedURL string
|
||||||
catalogd *catalogclient.Client
|
catalogd *catalogclient.Client
|
||||||
hc *http.Client
|
hc *http.Client
|
||||||
|
maxBytes int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handlers) register(r chi.Router) {
|
func (h *handlers) register(r chi.Router) {
|
||||||
@ -100,7 +107,7 @@ func (h *handlers) handleIngest(w http.ResponseWriter, r *http.Request) {
|
|||||||
// be re-readable — the schema infer pre-reads N rows, and the
|
// be re-readable — the schema infer pre-reads N rows, and the
|
||||||
// pqarrow writer needs all rows including those samples. We
|
// pqarrow writer needs all rows including those samples. We
|
||||||
// take the simple path: the whole upload in memory, capped.
|
// take the simple path: the whole upload in memory, capped.
|
||||||
r.Body = http.MaxBytesReader(w, r.Body, maxIngestBytes)
|
r.Body = http.MaxBytesReader(w, r.Body, h.maxBytes)
|
||||||
if err := r.ParseMultipartForm(64 << 20); err != nil {
|
if err := r.ParseMultipartForm(64 << 20); err != nil {
|
||||||
var maxErr *http.MaxBytesError
|
var maxErr *http.MaxBytesError
|
||||||
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
if errors.As(err, &maxErr) || strings.Contains(err.Error(), "http: request body too large") {
|
||||||
|
|||||||
@ -1173,3 +1173,103 @@ Real architectural choices that proved out:
|
|||||||
Next: G1+. The substrate is now in place to build gRPC adapters,
|
Next: G1+. The substrate is now in place to build gRPC adapters,
|
||||||
vector indices (Lance/HNSW), the Go MCP SDK port, distillation
|
vector indices (Lance/HNSW), the Go MCP SDK port, distillation
|
||||||
rebuild, and observer/Langfuse integration on top.
|
rebuild, and observer/Langfuse integration on top.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Real-scale validation (2026-04-29 post-G0)
|
||||||
|
|
||||||
|
After G0 shipped, a half-day validation against the real production
|
||||||
|
dataset (`workers_500k.parquet` from the Rust system, 18 cols ×
|
||||||
|
500,000 rows of staffing data with quoted-comma fields, multi-line
|
||||||
|
text, mixed numeric/categorical). The smokes use 5-row CSVs; this
|
||||||
|
exercises the substrate at production scale.
|
||||||
|
|
||||||
|
### Method
|
||||||
|
|
||||||
|
Converted `/home/profit/lakehouse/data/datasets/workers_500k.parquet`
|
||||||
|
→ CSV via DuckDB (`COPY (SELECT * FROM read_parquet) TO 'csv'`).
|
||||||
|
Drove the resulting CSV through the gateway's `/v1/ingest` route,
|
||||||
|
queried via `/v1/sql`. Captured wall time, parquet output size,
|
||||||
|
and ingestd peak RSS.
|
||||||
|
|
||||||
|
### Results
|
||||||
|
|
||||||
|
| Dataset | CSV size | Parquet | Ingest wall | ingestd peak RSS |
|
||||||
|
|---|---|---|---|---|
|
||||||
|
| 100K × 18 cols | 68 MiB | 14 MiB | 0.93s | 98 MiB |
|
||||||
|
| 500K × 18 cols | 344 MiB | 71 MiB | 3.12s | 209 MiB |
|
||||||
|
|
||||||
|
| Query (500K rows) | Latency |
|
||||||
|
|---|---|
|
||||||
|
| `SELECT count(*)` | 24ms |
|
||||||
|
| `SELECT col, name FROM x ORDER BY id LIMIT 5` | 56ms |
|
||||||
|
| `SELECT state, count(*) GROUP BY state ORDER BY n DESC LIMIT 10` | 45ms |
|
||||||
|
| `SELECT round(avg(reliability), 4), min, max` | 47ms |
|
||||||
|
| `SELECT count(*) WHERE state='CA' AND reliability > 0.8` | 34ms |
|
||||||
|
| `SELECT column_name, data_type FROM duckdb_columns()` | 25ms |
|
||||||
|
|
||||||
|
Memory at rest after ingest:
|
||||||
|
- storaged: 55 MiB
|
||||||
|
- catalogd: 28 MiB
|
||||||
|
- ingestd: 209 MiB (held the post-ingest heap; not released)
|
||||||
|
- gateway: 10 MiB
|
||||||
|
- queryd: 112 MiB
|
||||||
|
|
||||||
|
### Findings
|
||||||
|
|
||||||
|
**Finding #1 — ingestd's hardcoded cap is the binding constraint
|
||||||
|
for big CSVs.** A 500K-row staffing CSV is 344 MiB. The previous
|
||||||
|
hardcoded `maxIngestBytes = 256 << 20` rejected it with 413 in
|
||||||
|
360ms — the cap fired correctly, no OOM, server stayed alive, but
|
||||||
|
the dataset couldn't ship. **Fixed**: extracted to
|
||||||
|
`[ingestd].max_ingest_bytes` config field with a 256 MiB default.
|
||||||
|
Operators can bump for known-large workloads (validated at 512 MiB
|
||||||
|
for the 500K dataset). The cap is on the multipart upload body,
|
||||||
|
not on the resulting Parquet (which compresses ~5× via Snappy and
|
||||||
|
is well under storaged's 256 MiB PUT cap).
|
||||||
|
|
||||||
|
**Finding #2 — ingestd doesn't release memory between ingests.**
|
||||||
|
Peak RSS stayed at 209 MiB after the 500K ingest finished, even
|
||||||
|
though the CSV bytes + Arrow builders were eligible for GC. Go's
|
||||||
|
runtime is conservative about returning heap memory to the OS.
|
||||||
|
For a long-running daemon this is fine (next ingest reuses the
|
||||||
|
heap); for a short-lived ingest CLI tool it would matter.
|
||||||
|
**Deferred** — operational note, not a correctness bug.
|
||||||
|
|
||||||
|
**Finding #3 — DuckDB-via-httpfs latency is healthy at 500K.**
|
||||||
|
GROUP BY across 500K rows in 45ms. Sub-linear scaling vs 100K
|
||||||
|
(36ms → 45ms for 5× rows). The `read_parquet('s3://...')` path
|
||||||
|
through MinIO is not a bottleneck at this scale.
|
||||||
|
|
||||||
|
**Finding #4 — Real-data type inference works.** ADR-010's
|
||||||
|
default-to-string-on-ambiguity correctly typed `worker_id` as
|
||||||
|
BIGINT, numeric scores as DOUBLE, multi-line `resume_text` as
|
||||||
|
VARCHAR. No silent type errors. The 1000-row sample was sufficient.
|
||||||
|
|
||||||
|
**Finding #5 — Multipart parsing handles complex CSVs.** The real
|
||||||
|
data has quoted-comma fields (skills: `"bilingual, cold storage,
|
||||||
|
hazmat"`) and multi-line text inside quotes (`resume_text` with
|
||||||
|
embedded newlines). Go's `encoding/csv` defaults handle RFC 4180
|
||||||
|
correctly without `LazyQuotes` — confirming the D4 scrum dismissal
|
||||||
|
of Qwen's BLOCK on this point.
|
||||||
|
|
||||||
|
### Decisions
|
||||||
|
|
||||||
|
- **Configurable `max_ingest_bytes`**: applied. Default 256 MiB,
|
||||||
|
override via `[ingestd].max_ingest_bytes` in TOML.
|
||||||
|
- **Streaming-spool multipart parser**: deferred. The current
|
||||||
|
in-memory ParseMultipartForm with the 64 MiB threshold spills
|
||||||
|
larger bodies to /tmp automatically, which is good enough for
|
||||||
|
production workloads up to ~1 GiB CSV. True streaming Parquet
|
||||||
|
generation (read CSV row-by-row from the multipart stream
|
||||||
|
without ever holding the full body) is a G2+ refinement.
|
||||||
|
- **Memory release**: deferred. `runtime/debug.FreeOSMemory()`
|
||||||
|
after each ingest would aggressively release; not worth the
|
||||||
|
complexity for a long-running daemon.
|
||||||
|
|
||||||
|
### Net assessment
|
||||||
|
|
||||||
|
G0's substrate handles real production-scale data with one config
|
||||||
|
knob bumped. No correctness issues, no OOMs, no silent type
|
||||||
|
errors. Query latency is fast enough for ad-hoc analytics.
|
||||||
|
The substrate is ready for G1+ work to build on top.
|
||||||
|
|||||||
@ -31,10 +31,18 @@ type Config struct {
|
|||||||
// IngestConfig adds ingestd-specific knobs. ingestd needs to PUT
|
// IngestConfig adds ingestd-specific knobs. ingestd needs to PUT
|
||||||
// parquet to storaged AND register manifests with catalogd, so it
|
// parquet to storaged AND register manifests with catalogd, so it
|
||||||
// holds two upstream URLs in addition to its own bind.
|
// holds two upstream URLs in addition to its own bind.
|
||||||
|
//
|
||||||
|
// MaxIngestBytes caps the multipart body size. CSVs are typically
|
||||||
|
// 4-6× larger than the resulting Snappy-compressed Parquet, so 256
|
||||||
|
// MiB CSV → ~50 MiB Parquet — well under storaged's 256 MiB PUT
|
||||||
|
// cap. Real-scale validation (2026-04-29) showed 500K workers ×
|
||||||
|
// 18 cols = 344 MiB CSV → 71 MiB Parquet; bumping this knob to
|
||||||
|
// 512 MiB is the documented path for that workload.
|
||||||
type IngestConfig struct {
|
type IngestConfig struct {
|
||||||
Bind string `toml:"bind"`
|
Bind string `toml:"bind"`
|
||||||
StoragedURL string `toml:"storaged_url"`
|
StoragedURL string `toml:"storaged_url"`
|
||||||
CatalogdURL string `toml:"catalogd_url"`
|
CatalogdURL string `toml:"catalogd_url"`
|
||||||
|
MaxIngestBytes int64 `toml:"max_ingest_bytes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
|
// GatewayConfig adds the upstream URLs the reverse proxy fronts.
|
||||||
@ -106,9 +114,10 @@ func DefaultConfig() Config {
|
|||||||
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
|
Storaged: ServiceConfig{Bind: "127.0.0.1:3211"},
|
||||||
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
|
Catalogd: CatalogConfig{Bind: "127.0.0.1:3212", StoragedURL: "http://127.0.0.1:3211"},
|
||||||
Ingestd: IngestConfig{
|
Ingestd: IngestConfig{
|
||||||
Bind: "127.0.0.1:3213",
|
Bind: "127.0.0.1:3213",
|
||||||
StoragedURL: "http://127.0.0.1:3211",
|
StoragedURL: "http://127.0.0.1:3211",
|
||||||
CatalogdURL: "http://127.0.0.1:3212",
|
CatalogdURL: "http://127.0.0.1:3212",
|
||||||
|
MaxIngestBytes: 256 << 20, // 256 MiB; bump per deployment via lakehouse.toml
|
||||||
},
|
},
|
||||||
Queryd: QuerydConfig{
|
Queryd: QuerydConfig{
|
||||||
Bind: "127.0.0.1:3214",
|
Bind: "127.0.0.1:3214",
|
||||||
|
|||||||
@ -22,6 +22,10 @@ storaged_url = "http://127.0.0.1:3211"
|
|||||||
bind = "127.0.0.1:3213"
|
bind = "127.0.0.1:3213"
|
||||||
storaged_url = "http://127.0.0.1:3211"
|
storaged_url = "http://127.0.0.1:3211"
|
||||||
catalogd_url = "http://127.0.0.1:3212"
|
catalogd_url = "http://127.0.0.1:3212"
|
||||||
|
# CSV uploads are ~4-6× the resulting Parquet. 256 MiB cap keeps the in-memory
|
||||||
|
# parse + Arrow + Parquet output footprint bounded. Bump for known large
|
||||||
|
# datasets (e.g. workers_500k → 344 MiB CSV needs 512 MiB).
|
||||||
|
max_ingest_bytes = 268435456
|
||||||
|
|
||||||
[queryd]
|
[queryd]
|
||||||
bind = "127.0.0.1:3214"
|
bind = "127.0.0.1:3214"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user