// csv.go — single-pass CSV → Arrow → Parquet streaming pipeline. // Buffers the first N data rows for schema inference, then writes // those rows + every subsequent row directly into the pqarrow writer // in record batches of BatchSize. Output is the encoded Parquet // payload + the inferred schema + total row_count. package ingestd import ( "bytes" "encoding/csv" "errors" "fmt" "io" "strconv" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/parquet" "github.com/apache/arrow-go/v18/parquet/compress" "github.com/apache/arrow-go/v18/parquet/pqarrow" ) // DefaultBatchSize is the per-record-batch row count fed to the // pqarrow writer. 8192 is the Arrow community default — large // enough to amortize per-batch overhead, small enough to keep peak // memory bounded for very wide CSVs. const DefaultBatchSize = 8192 // IngestResult is what IngestCSV returns to the caller. Parquet // holds the full encoded payload — fits in memory by design (G0 // CSVs are <1 GB; storaged caps PUT at 256 MiB; the gap will // resurface if/when we hit it). type IngestResult struct { Schema Schema Parquet []byte RowCount int64 } // IngestCSV reads CSV from r, infers the schema from the first // SampleRows data rows, and streams the entire CSV (sample + rest) // into a Parquet payload using the inferred schema. // // sampleRows ≤ 0 → DefaultSampleRows; batchSize ≤ 0 → DefaultBatchSize. // // The CSV is read once. The sample rows are held in memory only // until they're flushed to the writer; subsequent rows stream // through the per-column Arrow builders one batch at a time. func IngestCSV(r io.Reader, sampleRows, batchSize int) (*IngestResult, error) { if sampleRows <= 0 { sampleRows = DefaultSampleRows } if batchSize <= 0 { batchSize = DefaultBatchSize } cr := csv.NewReader(r) cr.FieldsPerRecord = -1 // tolerate ragged rows; trailing-empty cells become nulls header, err := cr.Read() if err != nil { if errors.Is(err, io.EOF) { return nil, errors.New("ingestd: CSV is empty (no header)") } return nil, fmt.Errorf("read header: %w", err) } // Buffer up to sampleRows for schema inference. Buffered rows // are also written to the Parquet output — no double-read. buffered := make([][]string, 0, sampleRows) for len(buffered) < sampleRows { row, err := cr.Read() if errors.Is(err, io.EOF) { break } if err != nil { return nil, fmt.Errorf("read row %d: %w", len(buffered)+1, err) } buffered = append(buffered, row) } schema, err := InferSchema(header, buffered) if err != nil { return nil, err } arrowSchema := schema.ArrowSchema() mem := memory.NewGoAllocator() var buf bytes.Buffer props := parquet.NewWriterProperties( parquet.WithCompression(compress.Codecs.Snappy), ) w, err := pqarrow.NewFileWriter(arrowSchema, &buf, props, pqarrow.NewArrowWriterProperties()) if err != nil { return nil, fmt.Errorf("pqarrow writer: %w", err) } // Per scrum C-WCLOSE (Opus WARN): writer holds buffered column // data + OS resources; every error return path past this point // must close it, otherwise long-running ingestd leaks per failed // ingest. closeOnce guards the success-path explicit Close from // double-firing. wClosed := false defer func() { if !wClosed { _ = w.Close() } }() // Build a fresh RecordBuilder for each batch — flush, release, repeat. rowsTotal := int64(0) flush := func(rb *array.RecordBuilder) error { rec := rb.NewRecord() defer rec.Release() return w.Write(rec) } rb := array.NewRecordBuilder(mem, arrowSchema) defer rb.Release() rowsInBatch := 0 appendRow := func(row []string) error { if err := appendRowToBuilders(rb, schema, row); err != nil { return err } rowsInBatch++ rowsTotal++ if rowsInBatch >= batchSize { if err := flush(rb); err != nil { return err } rb.Release() rb = array.NewRecordBuilder(mem, arrowSchema) rowsInBatch = 0 } return nil } // Replay buffered (already-read) rows into the writer first. for _, row := range buffered { if err := appendRow(row); err != nil { return nil, fmt.Errorf("buffered row: %w", err) } } // Then drain the rest of the CSV. for { row, err := cr.Read() if errors.Is(err, io.EOF) { break } if err != nil { return nil, fmt.Errorf("read row %d: %w", rowsTotal+1, err) } if err := appendRow(row); err != nil { return nil, err } } // Flush any partial trailing batch. if rowsInBatch > 0 { if err := flush(rb); err != nil { return nil, fmt.Errorf("flush trailing batch: %w", err) } } if err := w.Close(); err != nil { return nil, fmt.Errorf("pqarrow close: %w", err) } wClosed = true return &IngestResult{ Schema: schema, Parquet: buf.Bytes(), RowCount: rowsTotal, }, nil } // appendRowToBuilders writes one CSV row's cells into the per- // column builders held by rb. Cells beyond the row's width append // nulls (ragged-row tolerance). Empty cells append nulls on // numeric/bool columns and empty strings on string columns. func appendRowToBuilders(rb *array.RecordBuilder, schema Schema, row []string) error { for i, col := range schema { var cell string if i < len(row) { cell = row[i] } fb := rb.Field(i) switch col.Type { case TypeInt64: b := fb.(*array.Int64Builder) if cell == "" { b.AppendNull() continue } v, err := strconv.ParseInt(cell, 10, 64) if err != nil { return fmt.Errorf("col %q: %q is not int64: %w", col.Name, cell, err) } b.Append(v) case TypeFloat64: b := fb.(*array.Float64Builder) if cell == "" { b.AppendNull() continue } v, err := strconv.ParseFloat(cell, 64) if err != nil { return fmt.Errorf("col %q: %q is not float64: %w", col.Name, cell, err) } b.Append(v) case TypeBool: b := fb.(*array.BooleanBuilder) if cell == "" { b.AppendNull() continue } if !isBoolLiteral(cell) { return fmt.Errorf("col %q: %q is not bool", col.Name, cell) } b.Append(cell == "true" || cell == "True" || cell == "TRUE") default: // TypeString b := fb.(*array.StringBuilder) b.Append(cell) } } return nil } // quiet linter — arrow.Schema is referenced via schema.ArrowSchema() // but the import otherwise looks unused on first scan. var _ = arrow.NewSchema