use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use std::sync::Arc; /// Inferred column type from sampling data. #[derive(Debug, Clone, PartialEq)] enum InferredType { Integer, Float, Boolean, String, } /// Parse CSV bytes into Arrow RecordBatches with automatic schema detection. /// Per ADR-010: ambiguous types default to String. pub fn parse_csv(content: &[u8]) -> Result<(Arc, Vec), String> { let mut reader = csv::ReaderBuilder::new() .flexible(true) // allow varying column counts .trim(csv::Trim::All) .from_reader(content); let headers: Vec = reader.headers() .map_err(|e| format!("CSV header error: {e}"))? .iter() .enumerate() .map(|(i, h)| { let h = h.trim().to_string(); if h.is_empty() { format!("column_{i}") } else { sanitize_column_name(&h) } }) .collect(); let n_cols = headers.len(); if n_cols == 0 { return Err("CSV has no columns".into()); } // Read all rows into string columns let mut columns: Vec> = vec![vec![]; n_cols]; let mut row_count = 0; for result in reader.records() { let record = result.map_err(|e| format!("CSV row error: {e}"))?; for (i, field) in record.iter().enumerate() { if i < n_cols { columns[i].push(field.trim().to_string()); } } // Pad short rows with empty strings for col in columns.iter_mut().skip(record.len().min(n_cols)) { col.push(String::new()); } row_count += 1; } if row_count == 0 { return Err("CSV has no data rows".into()); } tracing::info!("parsed CSV: {row_count} rows × {n_cols} columns"); // Infer types by sampling (look at all values) let types: Vec = columns.iter().map(|col| infer_column_type(col)).collect(); // Build Arrow schema let fields: Vec = headers.iter().zip(types.iter()).map(|(name, typ)| { let dt = match typ { InferredType::Integer => DataType::Int64, InferredType::Float => DataType::Float64, InferredType::Boolean => DataType::Boolean, InferredType::String => DataType::Utf8, }; Field::new(name, dt, true) // all nullable }).collect(); let schema = Arc::new(Schema::new(fields)); // Build arrays let arrays: Vec = columns.iter().zip(types.iter()).map(|(col, typ)| { match typ { InferredType::Integer => { let vals: Vec> = col.iter().map(|v| { if v.is_empty() { None } else { v.replace(',', "").parse().ok() } }).collect(); Arc::new(Int64Array::from(vals)) as ArrayRef } InferredType::Float => { let vals: Vec> = col.iter().map(|v| { if v.is_empty() { None } else { v.replace(',', "").replace('$', "").replace('%', "").parse().ok() } }).collect(); Arc::new(Float64Array::from(vals)) as ArrayRef } InferredType::Boolean => { let vals: Vec> = col.iter().map(|v| { match v.to_lowercase().as_str() { "true" | "yes" | "1" | "y" | "t" => Some(true), "false" | "no" | "0" | "n" | "f" => Some(false), _ => None, } }).collect(); Arc::new(BooleanArray::from(vals)) as ArrayRef } InferredType::String => { Arc::new(StringArray::from(col.clone())) as ArrayRef } } }).collect(); let batch = RecordBatch::try_new(schema.clone(), arrays) .map_err(|e| format!("RecordBatch error: {e}"))?; Ok((schema, vec![batch])) } /// Infer column type from values. Conservative: defaults to String on ambiguity. fn infer_column_type(values: &[String]) -> InferredType { let non_empty: Vec<&str> = values.iter() .map(|v| v.as_str()) .filter(|v| !v.is_empty() && *v != "NULL" && *v != "null" && *v != "N/A" && *v != "n/a") .collect(); if non_empty.is_empty() { return InferredType::String; } // Check boolean let all_bool = non_empty.iter().all(|v| { matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "1" | "0" | "y" | "n" | "t" | "f") }); if all_bool && non_empty.len() >= 2 { // Make sure it's not just all "1" and "0" which could be integers let has_text_bool = non_empty.iter().any(|v| { matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "y" | "n" | "t" | "f") }); if has_text_bool { return InferredType::Boolean; } } // Check integer (allow commas as thousands separator) let int_rate = non_empty.iter() .filter(|v| v.replace(',', "").parse::().is_ok()) .count() as f64 / non_empty.len() as f64; if int_rate > 0.95 { return InferredType::Integer; } // Check float (allow $, %, commas) let float_rate = non_empty.iter() .filter(|v| v.replace(',', "").replace('$', "").replace('%', "").parse::().is_ok()) .count() as f64 / non_empty.len() as f64; if float_rate > 0.95 { return InferredType::Float; } InferredType::String } /// Sanitize column name for SQL compatibility. fn sanitize_column_name(name: &str) -> String { name.chars() .map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' }) .collect::() .trim_matches('_') .to_string() } #[cfg(test)] mod tests { use super::*; #[test] fn parse_simple_csv() { let csv = b"Name,Age,Salary\nAlice,30,50000\nBob,25,45000\n"; let (schema, batches) = parse_csv(csv).unwrap(); assert_eq!(schema.fields().len(), 3); assert_eq!(batches[0].num_rows(), 2); assert_eq!(schema.field(1).data_type(), &DataType::Int64); assert_eq!(schema.field(2).data_type(), &DataType::Int64); } #[test] fn parse_csv_with_mixed_types() { let csv = b"id,value\n1,hello\n2,world\n3,N/A\n"; let (schema, _) = parse_csv(csv).unwrap(); assert_eq!(schema.field(0).data_type(), &DataType::Int64); assert_eq!(schema.field(1).data_type(), &DataType::Utf8); } #[test] fn parse_csv_with_dollar_amounts() { let csv = b"item,price\nWidget,$29.99\nGadget,$149.50\n"; let (schema, _) = parse_csv(csv).unwrap(); assert_eq!(schema.field(1).data_type(), &DataType::Float64); } #[test] fn sanitize_names() { assert_eq!(sanitize_column_name("First Name"), "first_name"); assert_eq!(sanitize_column_name("Bill Rate ($)"), "bill_rate"); } }