lakehouse/crates/ingestd/src/json_ingest.rs
root bb05c4412e Phase 6: Ingest pipeline — CSV, JSON, PDF, text file support
- ingestd crate: detect file type → parse → schema detection → Parquet → catalog
- CSV: auto-detect column types (int, float, bool, string), handles $, %, commas
  Strips dollar signs from amounts, flexible row parsing, sanitized column names
- JSON: array or newline-delimited, nested object flattening (a.b.c → a_b_c)
- PDF: text extraction via lopdf, one row per page (source_file, page_number, text)
- Text/SMS: line-based ingestion with line numbers
- Dedup: SHA-256 content hash, re-ingest same file = no-op
- Gateway: POST /ingest/file multipart upload, 256MB body limit
- Schema detection per ADR-010: ambiguous types default to String
- 12 unit tests passing (CSV parsing, JSON flattening, type inference, dedup)
- Tested: messy CSV with missing data, dollar amounts, N/A values → queryable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 08:07:31 -05:00

168 lines
6.0 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::collections::BTreeMap;
use std::sync::Arc;
/// Parse JSON (array of objects or newline-delimited) into Arrow RecordBatches.
/// Nested objects are flattened: {"a": {"b": 1}} → column "a_b".
pub fn parse_json(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let text = std::str::from_utf8(content).map_err(|e| format!("invalid UTF-8: {e}"))?;
let text = text.trim();
// Parse into Vec<Map>
let rows: Vec<serde_json::Map<String, serde_json::Value>> = if text.starts_with('[') {
// JSON array
let arr: Vec<serde_json::Value> = serde_json::from_str(text)
.map_err(|e| format!("JSON parse error: {e}"))?;
arr.into_iter()
.filter_map(|v| v.as_object().cloned())
.collect()
} else {
// Newline-delimited JSON
text.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| {
let v: serde_json::Value = serde_json::from_str(l)
.map_err(|e| format!("NDJSON line parse error: {e}"))?;
v.as_object().cloned().ok_or_else(|| "NDJSON line is not an object".into())
})
.collect::<Result<Vec<_>, String>>()?
};
if rows.is_empty() {
return Err("JSON has no records".into());
}
// Flatten and collect all columns
let mut columns: BTreeMap<String, Vec<serde_json::Value>> = BTreeMap::new();
let n_rows = rows.len();
for (row_idx, row) in rows.iter().enumerate() {
let flat = flatten_object(row, "");
// Pad existing columns that aren't in this row
for key in columns.keys().cloned().collect::<Vec<_>>() {
if !flat.contains_key(&key) {
columns.get_mut(&key).unwrap().push(serde_json::Value::Null);
}
}
// Add new columns or append values
for (key, val) in flat {
let col = columns.entry(key).or_insert_with(Vec::new);
// Pad with nulls if this column is new and we've seen prior rows
while col.len() < row_idx {
col.push(serde_json::Value::Null);
}
col.push(val);
}
}
// Pad short columns
for col in columns.values_mut() {
while col.len() < n_rows {
col.push(serde_json::Value::Null);
}
}
tracing::info!("parsed JSON: {n_rows} rows × {} columns", columns.len());
// Infer types and build schema
let mut fields = Vec::new();
let mut arrays: Vec<ArrayRef> = Vec::new();
for (name, values) in &columns {
let (dt, array) = json_column_to_arrow(values);
fields.push(Field::new(name, dt, true));
arrays.push(array);
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
/// Flatten nested JSON object: {"a": {"b": 1}} → {"a_b": 1}
fn flatten_object(obj: &serde_json::Map<String, serde_json::Value>, prefix: &str) -> BTreeMap<String, serde_json::Value> {
let mut result = BTreeMap::new();
for (key, val) in obj {
let full_key = if prefix.is_empty() { key.clone() } else { format!("{prefix}_{key}") };
match val {
serde_json::Value::Object(inner) => {
result.extend(flatten_object(inner, &full_key));
}
other => {
result.insert(full_key, other.clone());
}
}
}
result
}
/// Convert a column of JSON values to an Arrow array.
fn json_column_to_arrow(values: &[serde_json::Value]) -> (DataType, ArrayRef) {
// Check if all non-null values are the same type
let non_null: Vec<&serde_json::Value> = values.iter()
.filter(|v| !v.is_null())
.collect();
if non_null.is_empty() {
return (DataType::Utf8, Arc::new(StringArray::from(vec![None::<&str>; values.len()])));
}
let all_bool = non_null.iter().all(|v| v.is_boolean());
let all_i64 = non_null.iter().all(|v| v.is_i64() || v.is_u64());
let all_f64 = non_null.iter().all(|v| v.is_number());
if all_bool {
let arr: Vec<Option<bool>> = values.iter().map(|v| v.as_bool()).collect();
(DataType::Boolean, Arc::new(BooleanArray::from(arr)))
} else if all_i64 {
let arr: Vec<Option<i64>> = values.iter().map(|v| v.as_i64()).collect();
(DataType::Int64, Arc::new(Int64Array::from(arr)))
} else if all_f64 {
let arr: Vec<Option<f64>> = values.iter().map(|v| v.as_f64()).collect();
(DataType::Float64, Arc::new(Float64Array::from(arr)))
} else {
// Default to string
let arr: Vec<Option<String>> = values.iter().map(|v| {
match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s.clone()),
other => Some(other.to_string()),
}
}).collect();
(DataType::Utf8, Arc::new(StringArray::from(arr)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_json_array() {
let json = br#"[{"name":"Alice","age":30},{"name":"Bob","age":25}]"#;
let (schema, batches) = parse_json(json).unwrap();
assert_eq!(batches[0].num_rows(), 2);
assert!(schema.field_with_name("age").unwrap().data_type() == &DataType::Int64);
}
#[test]
fn parse_ndjson() {
let json = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n";
let (_, batches) = parse_json(json).unwrap();
assert_eq!(batches[0].num_rows(), 3);
}
#[test]
fn flatten_nested() {
let json = br#"[{"user":{"name":"Alice","address":{"city":"NYC"}},"score":9.5}]"#;
let (schema, _) = parse_json(json).unwrap();
let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert!(names.contains(&"user_name"));
assert!(names.contains(&"user_address_city"));
}
}