Phase G0 Day 4 ships ingestd: multipart CSV upload, Arrow schema
inference per ADR-010 (default-to-string on ambiguity), single-pass
streaming CSV → Parquet via pqarrow batched writer (Snappy compressed,
8192 rows per batch), PUT to storaged at content-addressed key
datasets/<name>/<fp_hex>.parquet, register manifest with catalogd.
Acceptance smoke 6/6 PASS including idempotent re-ingest (proves
inference is deterministic — same CSV always produces same fingerprint)
and schema-drift → 409 (proves catalogd's gate fires on ingest traffic).
Schema fingerprint is SHA-256 over (name, type) tuples in header order
using ASCII record/unit separators (0x1e/0x1f) so column names with
commas can't collide. Nullability intentionally NOT in the fingerprint
— a column gaining nulls isn't a schema change.
Cross-lineage scrum on shipped code:
- Opus 4.7 (opencode): 4 WARN + 3 INFO (after 2 self-retracted BLOCKs)
- Kimi K2-0905 (openrouter): 1 BLOCK + 2 WARN + 1 INFO
- Qwen3-coder (openrouter): 2 BLOCK + 2 WARN + 2 INFO
Fixed (2, both Opus single-reviewer):
C-DRIFT: PUT-then-register on fixed datasets/<name>/data.parquet
meant a schema-drift ingest overwrote the live parquet BEFORE
catalogd's 409 fired → storaged inconsistent with manifest.
Fix: content-addressed key datasets/<name>/<fp_hex>.parquet.
Drift writes to a different file (orphan in G2 GC scope); the
live data is never corrupted.
C-WCLOSE: pqarrow.NewFileWriter not Closed on error paths leaks
buffered column data + OS resources per failed ingest.
Fix: deferred guarded close with wClosed flag.
Dismissed (5, all false positives):
Qwen BLOCK "csv.Reader needs LazyQuotes=true for multi-line" — false,
Go csv handles RFC 4180 multi-line quoted fields by default
Qwen BLOCK "row[i] OOB" — already bounds-checked at schema.go:73
and csv.go:201
Kimi BLOCK "type assertion panic if pqarrow reorders fields" —
speculative, no real path
Kimi WARN + Qwen WARN×2 "RecordBuilder leak on early error" —
false convergent. Outer defer rb.Release() captures the current
builder; in-loop release runs before reassignment. No leak.
Deferred (6 INFO + accepted-with-rationale on 3 WARN): sample
boundary type mismatch (G0 cap bounds peak), string-match
paranoia on http.MaxBytesError, multipart double-buffer (G2 spool-
to-disk), separator validation, body close ordering, etc.
The D4 scrum produced fewer real findings than D3 (2 vs 6) — both
were architectural hazards smoke wouldn't catch because the smoke's
"schema drift → 409" assertion was passing even in the corrupted-
state world. The 409 fires correctly; what was wrong was the PUT
having already mutated the live parquet before the validation check.
Opus's PUT-then-register read of the order is exactly the kind of
architectural insight the cross-lineage scrum is designed to surface.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
129 lines
3.3 KiB
Go
129 lines
3.3 KiB
Go
package ingestd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/apache/arrow-go/v18/parquet/file"
|
|
"github.com/apache/arrow-go/v18/parquet/pqarrow"
|
|
)
|
|
|
|
func TestIngestCSV_Basic(t *testing.T) {
|
|
csvText := strings.Join([]string{
|
|
"id,name,salary,active",
|
|
"1,Alice,50000,true",
|
|
"2,Bob,60000,false",
|
|
"3,Carol,,true",
|
|
"4,Dave,75000,",
|
|
}, "\n")
|
|
|
|
res, err := IngestCSV(strings.NewReader(csvText), 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("IngestCSV: %v", err)
|
|
}
|
|
|
|
if res.RowCount != 4 {
|
|
t.Errorf("RowCount: got %d, want 4", res.RowCount)
|
|
}
|
|
if len(res.Schema) != 4 {
|
|
t.Fatalf("schema cols: got %d, want 4", len(res.Schema))
|
|
}
|
|
|
|
want := []ColumnSpec{
|
|
{Name: "id", Type: TypeInt64, Nullable: false},
|
|
{Name: "name", Type: TypeString, Nullable: false},
|
|
{Name: "salary", Type: TypeInt64, Nullable: true}, // empty cell on row 3
|
|
{Name: "active", Type: TypeBool, Nullable: true}, // empty cell on row 4
|
|
}
|
|
for i, w := range want {
|
|
if res.Schema[i] != w {
|
|
t.Errorf("col %d: got %+v, want %+v", i, res.Schema[i], w)
|
|
}
|
|
}
|
|
|
|
// Round-trip through the pqarrow reader.
|
|
tbl, err := readParquetTable(res.Parquet)
|
|
if err != nil {
|
|
t.Fatalf("read parquet: %v", err)
|
|
}
|
|
defer tbl.Release()
|
|
if tbl.NumRows() != 4 {
|
|
t.Errorf("parquet rows: got %d, want 4", tbl.NumRows())
|
|
}
|
|
}
|
|
|
|
func TestIngestCSV_StringFallback(t *testing.T) {
|
|
// Per ADR-010: "salary" with mixed values → string.
|
|
csvText := "id,salary\n1,50000\n2,N/A\n3,60000\n"
|
|
res, err := IngestCSV(strings.NewReader(csvText), 0, 0)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if res.Schema[1].Type != TypeString {
|
|
t.Errorf("salary fell to %s, want string", res.Schema[1].Type)
|
|
}
|
|
}
|
|
|
|
func TestIngestCSV_BatchBoundary(t *testing.T) {
|
|
// 17 rows, batch size 5 → 4 batches (5+5+5+2). Tests the trailing-
|
|
// partial-batch flush + the schema sample being smaller than rows
|
|
// in the file.
|
|
var sb strings.Builder
|
|
sb.WriteString("id\n")
|
|
for i := 0; i < 17; i++ {
|
|
sb.WriteString("1\n")
|
|
}
|
|
res, err := IngestCSV(strings.NewReader(sb.String()), 5, 5)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if res.RowCount != 17 {
|
|
t.Errorf("RowCount: got %d, want 17", res.RowCount)
|
|
}
|
|
}
|
|
|
|
func TestIngestCSV_EmptyFile(t *testing.T) {
|
|
if _, err := IngestCSV(strings.NewReader(""), 0, 0); err == nil {
|
|
t.Error("empty CSV should error")
|
|
}
|
|
}
|
|
|
|
func TestIngestCSV_HeaderOnly(t *testing.T) {
|
|
res, err := IngestCSV(strings.NewReader("a,b,c\n"), 0, 0)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if res.RowCount != 0 {
|
|
t.Errorf("RowCount: got %d, want 0", res.RowCount)
|
|
}
|
|
// All-empty samples → all string columns per inferColumnType.
|
|
for _, c := range res.Schema {
|
|
if c.Type != TypeString {
|
|
t.Errorf("col %q with no samples: got %s, want string", c.Name, c.Type)
|
|
}
|
|
}
|
|
}
|
|
|
|
// readParquetTable is a small test helper.
|
|
func readParquetTable(b []byte) (interface{ NumRows() int64; Release() }, error) {
|
|
rdr, err := file.NewParquetReader(bytes.NewReader(b))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.NewGoAllocator())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tbl, err := pr.ReadTable(context.Background())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return tbl, nil
|
|
}
|
|
|
|
var _ = array.NewRecordBuilder // keep import for the symbol path
|