// Package ingestd holds the CSV → Arrow → Parquet → catalogd pipeline. // Schema inference per ADR-010: when a column is ambiguous or mixed, // fall back to string. Legacy CRM data is messy — a column with // "123", "N/A", "" is a string, not an integer. Downstream queries // CAST as needed. package ingestd import ( "crypto/sha256" "encoding/hex" "errors" "fmt" "strconv" "strings" "github.com/apache/arrow-go/v18/arrow" ) // ColumnType is the inferred logical type of a CSV column. Maps to // an Arrow primitive in ArrowSchema(). type ColumnType string const ( TypeString ColumnType = "string" TypeInt64 ColumnType = "int64" TypeFloat64 ColumnType = "float64" TypeBool ColumnType = "bool" ) // ColumnSpec is one column's name + inferred type + nullability. // Nullable is true when at least one sampled cell was empty. type ColumnSpec struct { Name string `json:"name"` Type ColumnType `json:"type"` Nullable bool `json:"nullable"` } // Schema is the inferred CSV schema in column order. Order is // preserved from the CSV header — the fingerprint and Arrow schema // both depend on it. type Schema []ColumnSpec // DefaultSampleRows is how many data rows InferSchema looks at when // no explicit cap is supplied. 1000 is enough to catch ambiguity in // most real CSVs without making header inference O(file size). const DefaultSampleRows = 1000 // InferSchema returns a Schema for the given CSV headers + sampled // rows. The sample is interpreted as "the first N data rows, in // header order"; columns shorter than the header are treated as // missing (nullable). Empty header names are rejected. func InferSchema(headers []string, samples [][]string) (Schema, error) { if len(headers) == 0 { return nil, errors.New("ingestd: empty CSV header") } for i, h := range headers { if strings.TrimSpace(h) == "" { return nil, fmt.Errorf("ingestd: empty header name at column %d", i) } } cols := make(Schema, len(headers)) for i, name := range headers { // Per-column sample slice — skip cells beyond the row's width. colSamples := make([]string, 0, len(samples)) nullable := false for _, row := range samples { if i >= len(row) { nullable = true continue } cell := row[i] if cell == "" { nullable = true continue } colSamples = append(colSamples, cell) } cols[i] = ColumnSpec{ Name: name, Type: inferColumnType(colSamples), Nullable: nullable, } } return cols, nil } // inferColumnType walks the non-empty samples once per candidate // type, returning the strictest match. ADR-010: any ambiguity → // string. Order matters: int → float → bool → string. A column with // only "1"/"0" parses as int64, NOT bool — bool only when values are // the literal text "true"/"false". func inferColumnType(samples []string) ColumnType { if len(samples) == 0 { // All empty cells. Conservative default — string lets future // inserts of any shape land without a schema-conflict 409. return TypeString } allInt := true allFloat := true allBool := true for _, s := range samples { if allInt { if _, err := strconv.ParseInt(s, 10, 64); err != nil { allInt = false } } if allFloat { if _, err := strconv.ParseFloat(s, 64); err != nil { allFloat = false } } if allBool { if !isBoolLiteral(s) { allBool = false } } if !allInt && !allFloat && !allBool { break } } switch { case allInt: return TypeInt64 case allFloat: return TypeFloat64 case allBool: return TypeBool default: return TypeString } } // isBoolLiteral matches the strict set of strings that count as // boolean. Excluding "1"/"0" so they remain int64 — keeps the type // system honest for staffing data where 0/1 columns are typically // counts, not flags. func isBoolLiteral(s string) bool { switch s { case "true", "True", "TRUE", "false", "False", "FALSE": return true } return false } // Fingerprint is a deterministic SHA-256 of the schema's column // (name, type) tuples in header order. Stable across runs; flips // the moment a column is added, removed, renamed, or retyped — which // catalogd's ADR-020 register turns into a 409 Conflict so schema // drift becomes loud instead of silent. // // Nullability is intentionally NOT part of the fingerprint: a column // that gained a null in iteration N+1 isn't "the schema changed," // it's "the data got a missing value." The Arrow schema still marks // the column nullable via the spec's flag — the on-wire shape just // doesn't gate idempotency on it. func (s Schema) Fingerprint() string { h := sha256.New() for _, c := range s { h.Write([]byte(c.Name)) h.Write([]byte{0x1e}) // ASCII record separator — can't appear in a CSV header h.Write([]byte(c.Type)) h.Write([]byte{0x1f}) // ASCII unit separator — between columns } return "sha256:" + hex.EncodeToString(h.Sum(nil)) } // ArrowSchema converts the inferred Schema to an Arrow schema for // use with the Parquet writer. Every field carries its Nullable // flag so empty cells round-trip as nulls on numeric columns. func (s Schema) ArrowSchema() *arrow.Schema { fields := make([]arrow.Field, len(s)) for i, c := range s { fields[i] = arrow.Field{ Name: c.Name, Type: arrowTypeFor(c.Type), Nullable: c.Nullable, } } return arrow.NewSchema(fields, nil) } func arrowTypeFor(t ColumnType) arrow.DataType { switch t { case TypeInt64: return arrow.PrimitiveTypes.Int64 case TypeFloat64: return arrow.PrimitiveTypes.Float64 case TypeBool: return arrow.FixedWidthTypes.Boolean default: return arrow.BinaryTypes.String } }