root c1e411347a G0 D4: ingestd CSV → Parquet → catalogd register · 2 scrum fixes
Phase G0 Day 4 ships ingestd: multipart CSV upload, Arrow schema
inference per ADR-010 (default-to-string on ambiguity), single-pass
streaming CSV → Parquet via pqarrow batched writer (Snappy compressed,
8192 rows per batch), PUT to storaged at content-addressed key
datasets/<name>/<fp_hex>.parquet, register manifest with catalogd.
Acceptance smoke 6/6 PASS including idempotent re-ingest (proves
inference is deterministic — same CSV always produces same fingerprint)
and schema-drift → 409 (proves catalogd's gate fires on ingest traffic).

Schema fingerprint is SHA-256 over (name, type) tuples in header order
using ASCII record/unit separators (0x1e/0x1f) so column names with
commas can't collide. Nullability intentionally NOT in the fingerprint
— a column gaining nulls isn't a schema change.

Cross-lineage scrum on shipped code:
  - Opus 4.7 (opencode):                       4 WARN + 3 INFO (after 2 self-retracted BLOCKs)
  - Kimi K2-0905 (openrouter):                 1 BLOCK + 2 WARN + 1 INFO
  - Qwen3-coder (openrouter):                  2 BLOCK + 2 WARN + 2 INFO

Fixed (2, both Opus single-reviewer):
  C-DRIFT: PUT-then-register on fixed datasets/<name>/data.parquet
    meant a schema-drift ingest overwrote the live parquet BEFORE
    catalogd's 409 fired → storaged inconsistent with manifest.
    Fix: content-addressed key datasets/<name>/<fp_hex>.parquet.
    Drift writes to a different file (orphan in G2 GC scope); the
    live data is never corrupted.
  C-WCLOSE: pqarrow.NewFileWriter not Closed on error paths leaks
    buffered column data + OS resources per failed ingest.
    Fix: deferred guarded close with wClosed flag.

Dismissed (5, all false positives):
  Qwen BLOCK "csv.Reader needs LazyQuotes=true for multi-line" — false,
    Go csv handles RFC 4180 multi-line quoted fields by default
  Qwen BLOCK "row[i] OOB" — already bounds-checked at schema.go:73
    and csv.go:201
  Kimi BLOCK "type assertion panic if pqarrow reorders fields" —
    speculative, no real path
  Kimi WARN + Qwen WARN×2 "RecordBuilder leak on early error" —
    false convergent. Outer defer rb.Release() captures the current
    builder; in-loop release runs before reassignment. No leak.

Deferred (6 INFO + accepted-with-rationale on 3 WARN): sample
boundary type mismatch (G0 cap bounds peak), string-match
paranoia on http.MaxBytesError, multipart double-buffer (G2 spool-
to-disk), separator validation, body close ordering, etc.

The D4 scrum produced fewer real findings than D3 (2 vs 6) — both
were architectural hazards smoke wouldn't catch because the smoke's
"schema drift → 409" assertion was passing even in the corrupted-
state world. The 409 fires correctly; what was wrong was the PUT
having already mutated the live parquet before the validation check.
Opus's PUT-then-register read of the order is exactly the kind of
architectural insight the cross-lineage scrum is designed to surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 23:50:10 -05:00

194 lines
5.5 KiB
Go

// Package ingestd holds the CSV → Arrow → Parquet → catalogd pipeline.
// Schema inference per ADR-010: when a column is ambiguous or mixed,
// fall back to string. Legacy CRM data is messy — a column with
// "123", "N/A", "" is a string, not an integer. Downstream queries
// CAST as needed.
package ingestd
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
"github.com/apache/arrow-go/v18/arrow"
)
// ColumnType is the inferred logical type of a CSV column. Maps to
// an Arrow primitive in ArrowSchema().
type ColumnType string
const (
TypeString ColumnType = "string"
TypeInt64 ColumnType = "int64"
TypeFloat64 ColumnType = "float64"
TypeBool ColumnType = "bool"
)
// ColumnSpec is one column's name + inferred type + nullability.
// Nullable is true when at least one sampled cell was empty.
type ColumnSpec struct {
Name string `json:"name"`
Type ColumnType `json:"type"`
Nullable bool `json:"nullable"`
}
// Schema is the inferred CSV schema in column order. Order is
// preserved from the CSV header — the fingerprint and Arrow schema
// both depend on it.
type Schema []ColumnSpec
// DefaultSampleRows is how many data rows InferSchema looks at when
// no explicit cap is supplied. 1000 is enough to catch ambiguity in
// most real CSVs without making header inference O(file size).
const DefaultSampleRows = 1000
// InferSchema returns a Schema for the given CSV headers + sampled
// rows. The sample is interpreted as "the first N data rows, in
// header order"; columns shorter than the header are treated as
// missing (nullable). Empty header names are rejected.
func InferSchema(headers []string, samples [][]string) (Schema, error) {
if len(headers) == 0 {
return nil, errors.New("ingestd: empty CSV header")
}
for i, h := range headers {
if strings.TrimSpace(h) == "" {
return nil, fmt.Errorf("ingestd: empty header name at column %d", i)
}
}
cols := make(Schema, len(headers))
for i, name := range headers {
// Per-column sample slice — skip cells beyond the row's width.
colSamples := make([]string, 0, len(samples))
nullable := false
for _, row := range samples {
if i >= len(row) {
nullable = true
continue
}
cell := row[i]
if cell == "" {
nullable = true
continue
}
colSamples = append(colSamples, cell)
}
cols[i] = ColumnSpec{
Name: name,
Type: inferColumnType(colSamples),
Nullable: nullable,
}
}
return cols, nil
}
// inferColumnType walks the non-empty samples once per candidate
// type, returning the strictest match. ADR-010: any ambiguity →
// string. Order matters: int → float → bool → string. A column with
// only "1"/"0" parses as int64, NOT bool — bool only when values are
// the literal text "true"/"false".
func inferColumnType(samples []string) ColumnType {
if len(samples) == 0 {
// All empty cells. Conservative default — string lets future
// inserts of any shape land without a schema-conflict 409.
return TypeString
}
allInt := true
allFloat := true
allBool := true
for _, s := range samples {
if allInt {
if _, err := strconv.ParseInt(s, 10, 64); err != nil {
allInt = false
}
}
if allFloat {
if _, err := strconv.ParseFloat(s, 64); err != nil {
allFloat = false
}
}
if allBool {
if !isBoolLiteral(s) {
allBool = false
}
}
if !allInt && !allFloat && !allBool {
break
}
}
switch {
case allInt:
return TypeInt64
case allFloat:
return TypeFloat64
case allBool:
return TypeBool
default:
return TypeString
}
}
// isBoolLiteral matches the strict set of strings that count as
// boolean. Excluding "1"/"0" so they remain int64 — keeps the type
// system honest for staffing data where 0/1 columns are typically
// counts, not flags.
func isBoolLiteral(s string) bool {
switch s {
case "true", "True", "TRUE", "false", "False", "FALSE":
return true
}
return false
}
// Fingerprint is a deterministic SHA-256 of the schema's column
// (name, type) tuples in header order. Stable across runs; flips
// the moment a column is added, removed, renamed, or retyped — which
// catalogd's ADR-020 register turns into a 409 Conflict so schema
// drift becomes loud instead of silent.
//
// Nullability is intentionally NOT part of the fingerprint: a column
// that gained a null in iteration N+1 isn't "the schema changed,"
// it's "the data got a missing value." The Arrow schema still marks
// the column nullable via the spec's flag — the on-wire shape just
// doesn't gate idempotency on it.
func (s Schema) Fingerprint() string {
h := sha256.New()
for _, c := range s {
h.Write([]byte(c.Name))
h.Write([]byte{0x1e}) // ASCII record separator — can't appear in a CSV header
h.Write([]byte(c.Type))
h.Write([]byte{0x1f}) // ASCII unit separator — between columns
}
return "sha256:" + hex.EncodeToString(h.Sum(nil))
}
// ArrowSchema converts the inferred Schema to an Arrow schema for
// use with the Parquet writer. Every field carries its Nullable
// flag so empty cells round-trip as nulls on numeric columns.
func (s Schema) ArrowSchema() *arrow.Schema {
fields := make([]arrow.Field, len(s))
for i, c := range s {
fields[i] = arrow.Field{
Name: c.Name,
Type: arrowTypeFor(c.Type),
Nullable: c.Nullable,
}
}
return arrow.NewSchema(fields, nil)
}
func arrowTypeFor(t ColumnType) arrow.DataType {
switch t {
case TypeInt64:
return arrow.PrimitiveTypes.Int64
case TypeFloat64:
return arrow.PrimitiveTypes.Float64
case TypeBool:
return arrow.FixedWidthTypes.Boolean
default:
return arrow.BinaryTypes.String
}
}