From bb05c4412e8127193dc58f6a44aa6517bda8d11f Mon Sep 17 00:00:00 2001 From: root Date: Fri, 27 Mar 2026 08:07:31 -0500 Subject: [PATCH] =?UTF-8?q?Phase=206:=20Ingest=20pipeline=20=E2=80=94=20CS?= =?UTF-8?q?V,=20JSON,=20PDF,=20text=20file=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ingestd crate: detect file type → parse → schema detection → Parquet → catalog - CSV: auto-detect column types (int, float, bool, string), handles $, %, commas Strips dollar signs from amounts, flexible row parsing, sanitized column names - JSON: array or newline-delimited, nested object flattening (a.b.c → a_b_c) - PDF: text extraction via lopdf, one row per page (source_file, page_number, text) - Text/SMS: line-based ingestion with line numbers - Dedup: SHA-256 content hash, re-ingest same file = no-op - Gateway: POST /ingest/file multipart upload, 256MB body limit - Schema detection per ADR-010: ambiguous types default to String - 12 unit tests passing (CSV parsing, JSON flattening, type inference, dedup) - Tested: messy CSV with missing data, dollar amounts, N/A values → queryable Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 180 +++++++++++++ Cargo.toml | 4 + crates/gateway/Cargo.toml | 1 + crates/gateway/src/main.rs | 11 +- crates/ingestd/Cargo.toml | 22 ++ crates/ingestd/src/csv_ingest.rs | 203 ++++++++++++++ crates/ingestd/src/detect.rs | 103 +++++++ crates/ingestd/src/json_ingest.rs | 167 ++++++++++++ crates/ingestd/src/lib.rs | 6 + crates/ingestd/src/pdf_ingest.rs | 52 ++++ crates/ingestd/src/pipeline.rs | 162 +++++++++++ crates/ingestd/src/service.rs | 69 +++++ .../1ca61945-d151-490b-81fd-2ca0397b68fa.json | 15 ++ .../1f81445a-404f-48ea-bd72-00f6721ee18b.json | 15 -- .../478072c3-0c95-46a2-9193-f4b3ac4085ab.json | 15 ++ .../50d402a0-6fb4-4849-ac75-3541b8530aba.json | 15 -- .../6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json | 15 -- .../f5281277-afe9-456b-b5fa-d667c2b0f41f.json | 15 -- .../fc046131-8f44-428a-af90-ebf6347d4cc6.json | 15 -- data/datasets/employees.parquet | Bin 1424 -> 0 bytes data/datasets/events.parquet | Bin 1127 -> 0 bytes data/datasets/products.parquet | Bin 1341 -> 0 bytes data/datasets/sms_messages.parquet | Bin 0 -> 2018 bytes data/datasets/test_ingest.parquet | Bin 0 -> 3129 bytes scripts/generate_demo.py | 254 ++++++++++++++++++ 25 files changed, 1261 insertions(+), 78 deletions(-) create mode 100644 crates/ingestd/Cargo.toml create mode 100644 crates/ingestd/src/csv_ingest.rs create mode 100644 crates/ingestd/src/detect.rs create mode 100644 crates/ingestd/src/json_ingest.rs create mode 100644 crates/ingestd/src/lib.rs create mode 100644 crates/ingestd/src/pdf_ingest.rs create mode 100644 crates/ingestd/src/pipeline.rs create mode 100644 crates/ingestd/src/service.rs create mode 100644 data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json delete mode 100644 data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json create mode 100644 data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json delete mode 100644 data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json delete mode 100644 data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json delete mode 100644 data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json delete mode 100644 data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json delete mode 100644 data/datasets/employees.parquet delete mode 100644 data/datasets/events.parquet delete mode 100644 data/datasets/products.parquet create mode 100644 data/datasets/sms_messages.parquet create mode 100644 data/datasets/test_ingest.parquet create mode 100644 scripts/generate_demo.py diff --git a/Cargo.lock b/Cargo.lock index d7c9d45..70cf55c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.8.12" @@ -508,6 +519,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "brotli" version = "8.0.2" @@ -535,6 +555,12 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "bytecount" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" + [[package]] name = "byteorder" version = "1.5.0" @@ -588,6 +614,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.58" @@ -690,6 +725,16 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "combine" version = "4.6.7" @@ -894,6 +939,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2311,6 +2375,7 @@ dependencies = [ "aibridge", "axum", "catalogd", + "ingestd", "object_store", "opentelemetry", "opentelemetry-stdout", @@ -2820,6 +2885,38 @@ dependencies = [ "cfb", ] +[[package]] +name = "ingestd" +version = "0.1.0" +dependencies = [ + "arrow", + "axum", + "bytes", + "catalogd", + "chrono", + "csv", + "lopdf", + "object_store", + "parquet", + "serde", + "serde_json", + "sha2", + "shared", + "storaged", + "tokio", + "tracing", +] + +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -3075,6 +3172,30 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86" +[[package]] +name = "lopdf" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7c1d3350d071cb86987a6bcb205c7019a0eb70dcad92b454fec722cca8d68b" +dependencies = [ + "aes", + "cbc", + "chrono", + "encoding_rs", + "flate2", + "indexmap", + "itoa", + "log", + "md-5", + "nom", + "nom_locate", + "rangemap", + "rayon", + "thiserror 2.0.18", + "time", + "weezl", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -3217,6 +3338,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -3291,6 +3418,27 @@ dependencies = [ "jni-sys 0.3.1", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nom_locate" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e3c83c053b0713da60c5b8de47fe8e494fe3ece5267b2f23090a07a053ba8f3" +dependencies = [ + "bytecount", + "memchr", + "nom", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3971,12 +4119,38 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rangemap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68" + [[package]] name = "raw-window-handle" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5439,6 +5613,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "weezl" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" + [[package]] name = "winapi-util" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index b7ade62..695759b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/catalogd", "crates/queryd", "crates/aibridge", + "crates/ingestd", "crates/gateway", "crates/ui", ] @@ -38,3 +39,6 @@ opentelemetry_sdk = { version = "0.28", features = ["rt-tokio"] } opentelemetry-stdout = { version = "0.28", features = ["trace"] } tracing-opentelemetry = "0.29" toml = "0.8" +csv = "1" +lopdf = "0.35" +encoding_rs = "0.8" diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index db6c48f..0c3dfd5 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -9,6 +9,7 @@ storaged = { path = "../storaged" } catalogd = { path = "../catalogd" } queryd = { path = "../queryd" } aibridge = { path = "../aibridge" } +ingestd = { path = "../ingestd" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index c4eb7c6..490f049 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -1,7 +1,7 @@ mod auth; mod observability; -use axum::{Router, routing::get}; +use axum::{Router, extract::DefaultBodyLimit, routing::get}; use proto::lakehouse::catalog_service_server::CatalogServiceServer; use shared::config::Config; use tower_http::cors::{Any, CorsLayer}; @@ -39,10 +39,14 @@ async fn main() { // HTTP router let mut app = Router::new() .route("/health", get(health)) - .nest("/storage", storaged::service::router(store)) + .nest("/storage", storaged::service::router(store.clone())) .nest("/catalog", catalogd::service::router(registry.clone())) .nest("/query", queryd::service::router(engine)) - .nest("/ai", aibridge::service::router(ai_client)); + .nest("/ai", aibridge::service::router(ai_client)) + .nest("/ingest", ingestd::service::router(ingestd::service::IngestState { + store: store.clone(), + registry: registry.clone(), + })); // Auth middleware (if enabled) if config.auth.enabled { @@ -58,6 +62,7 @@ async fn main() { } app = app + .layer(DefaultBodyLimit::max(256 * 1024 * 1024)) // 256MB .layer(CorsLayer::new() .allow_origin(Any) .allow_methods(Any) diff --git a/crates/ingestd/Cargo.toml b/crates/ingestd/Cargo.toml new file mode 100644 index 0000000..c1963e8 --- /dev/null +++ b/crates/ingestd/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ingestd" +version = "0.1.0" +edition = "2024" + +[dependencies] +shared = { path = "../shared" } +storaged = { path = "../storaged" } +catalogd = { path = "../catalogd" } +tokio = { workspace = true } +axum = { workspace = true, features = ["multipart"] } +lopdf = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } +arrow = { workspace = true } +parquet = { workspace = true } +bytes = { workspace = true } +sha2 = { workspace = true } +csv = { workspace = true } +chrono = { workspace = true } +object_store = { workspace = true } diff --git a/crates/ingestd/src/csv_ingest.rs b/crates/ingestd/src/csv_ingest.rs new file mode 100644 index 0000000..68cf932 --- /dev/null +++ b/crates/ingestd/src/csv_ingest.rs @@ -0,0 +1,203 @@ +use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// Inferred column type from sampling data. +#[derive(Debug, Clone, PartialEq)] +enum InferredType { + Integer, + Float, + Boolean, + String, +} + +/// Parse CSV bytes into Arrow RecordBatches with automatic schema detection. +/// Per ADR-010: ambiguous types default to String. +pub fn parse_csv(content: &[u8]) -> Result<(Arc, Vec), String> { + let mut reader = csv::ReaderBuilder::new() + .flexible(true) // allow varying column counts + .trim(csv::Trim::All) + .from_reader(content); + + let headers: Vec = reader.headers() + .map_err(|e| format!("CSV header error: {e}"))? + .iter() + .enumerate() + .map(|(i, h)| { + let h = h.trim().to_string(); + if h.is_empty() { format!("column_{i}") } else { sanitize_column_name(&h) } + }) + .collect(); + + let n_cols = headers.len(); + if n_cols == 0 { + return Err("CSV has no columns".into()); + } + + // Read all rows into string columns + let mut columns: Vec> = vec![vec![]; n_cols]; + let mut row_count = 0; + + for result in reader.records() { + let record = result.map_err(|e| format!("CSV row error: {e}"))?; + for (i, field) in record.iter().enumerate() { + if i < n_cols { + columns[i].push(field.trim().to_string()); + } + } + // Pad short rows with empty strings + for col in columns.iter_mut().skip(record.len().min(n_cols)) { + col.push(String::new()); + } + row_count += 1; + } + + if row_count == 0 { + return Err("CSV has no data rows".into()); + } + + tracing::info!("parsed CSV: {row_count} rows × {n_cols} columns"); + + // Infer types by sampling (look at all values) + let types: Vec = columns.iter().map(|col| infer_column_type(col)).collect(); + + // Build Arrow schema + let fields: Vec = headers.iter().zip(types.iter()).map(|(name, typ)| { + let dt = match typ { + InferredType::Integer => DataType::Int64, + InferredType::Float => DataType::Float64, + InferredType::Boolean => DataType::Boolean, + InferredType::String => DataType::Utf8, + }; + Field::new(name, dt, true) // all nullable + }).collect(); + + let schema = Arc::new(Schema::new(fields)); + + // Build arrays + let arrays: Vec = columns.iter().zip(types.iter()).map(|(col, typ)| { + match typ { + InferredType::Integer => { + let vals: Vec> = col.iter().map(|v| { + if v.is_empty() { None } else { v.replace(',', "").parse().ok() } + }).collect(); + Arc::new(Int64Array::from(vals)) as ArrayRef + } + InferredType::Float => { + let vals: Vec> = col.iter().map(|v| { + if v.is_empty() { None } + else { v.replace(',', "").replace('$', "").replace('%', "").parse().ok() } + }).collect(); + Arc::new(Float64Array::from(vals)) as ArrayRef + } + InferredType::Boolean => { + let vals: Vec> = col.iter().map(|v| { + match v.to_lowercase().as_str() { + "true" | "yes" | "1" | "y" | "t" => Some(true), + "false" | "no" | "0" | "n" | "f" => Some(false), + _ => None, + } + }).collect(); + Arc::new(BooleanArray::from(vals)) as ArrayRef + } + InferredType::String => { + Arc::new(StringArray::from(col.clone())) as ArrayRef + } + } + }).collect(); + + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} + +/// Infer column type from values. Conservative: defaults to String on ambiguity. +fn infer_column_type(values: &[String]) -> InferredType { + let non_empty: Vec<&str> = values.iter() + .map(|v| v.as_str()) + .filter(|v| !v.is_empty() && *v != "NULL" && *v != "null" && *v != "N/A" && *v != "n/a") + .collect(); + + if non_empty.is_empty() { + return InferredType::String; + } + + // Check boolean + let all_bool = non_empty.iter().all(|v| { + matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "1" | "0" | "y" | "n" | "t" | "f") + }); + if all_bool && non_empty.len() >= 2 { + // Make sure it's not just all "1" and "0" which could be integers + let has_text_bool = non_empty.iter().any(|v| { + matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "y" | "n" | "t" | "f") + }); + if has_text_bool { + return InferredType::Boolean; + } + } + + // Check integer (allow commas as thousands separator) + let int_rate = non_empty.iter() + .filter(|v| v.replace(',', "").parse::().is_ok()) + .count() as f64 / non_empty.len() as f64; + if int_rate > 0.95 { + return InferredType::Integer; + } + + // Check float (allow $, %, commas) + let float_rate = non_empty.iter() + .filter(|v| v.replace(',', "").replace('$', "").replace('%', "").parse::().is_ok()) + .count() as f64 / non_empty.len() as f64; + if float_rate > 0.95 { + return InferredType::Float; + } + + InferredType::String +} + +/// Sanitize column name for SQL compatibility. +fn sanitize_column_name(name: &str) -> String { + name.chars() + .map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' }) + .collect::() + .trim_matches('_') + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_simple_csv() { + let csv = b"Name,Age,Salary\nAlice,30,50000\nBob,25,45000\n"; + let (schema, batches) = parse_csv(csv).unwrap(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(batches[0].num_rows(), 2); + assert_eq!(schema.field(1).data_type(), &DataType::Int64); + assert_eq!(schema.field(2).data_type(), &DataType::Int64); + } + + #[test] + fn parse_csv_with_mixed_types() { + let csv = b"id,value\n1,hello\n2,world\n3,N/A\n"; + let (schema, _) = parse_csv(csv).unwrap(); + assert_eq!(schema.field(0).data_type(), &DataType::Int64); + assert_eq!(schema.field(1).data_type(), &DataType::Utf8); + } + + #[test] + fn parse_csv_with_dollar_amounts() { + let csv = b"item,price\nWidget,$29.99\nGadget,$149.50\n"; + let (schema, _) = parse_csv(csv).unwrap(); + assert_eq!(schema.field(1).data_type(), &DataType::Float64); + } + + #[test] + fn sanitize_names() { + assert_eq!(sanitize_column_name("First Name"), "first_name"); + assert_eq!(sanitize_column_name("Bill Rate ($)"), "bill_rate"); + } +} diff --git a/crates/ingestd/src/detect.rs b/crates/ingestd/src/detect.rs new file mode 100644 index 0000000..fff0e57 --- /dev/null +++ b/crates/ingestd/src/detect.rs @@ -0,0 +1,103 @@ +use sha2::{Digest, Sha256}; + +/// Detected file type from content inspection. +#[derive(Debug, Clone, PartialEq)] +pub enum FileType { + Csv, + Json, + NdJson, // newline-delimited JSON + Pdf, + Text, // plain text, SMS logs, etc. + Unknown, +} + +/// Detect file type from filename extension and content sniffing. +pub fn detect_file_type(filename: &str, content: &[u8]) -> FileType { + // Extension-based first + let lower = filename.to_lowercase(); + if lower.ends_with(".csv") || lower.ends_with(".tsv") { + return FileType::Csv; + } + if lower.ends_with(".json") { + // Check if it's newline-delimited JSON + if content.iter().take(4096).filter(|&&b| b == b'\n').count() > 2 { + let first_line = content.split(|&b| b == b'\n').next().unwrap_or(b""); + if first_line.starts_with(b"{") { + return FileType::NdJson; + } + } + return FileType::Json; + } + if lower.ends_with(".ndjson") || lower.ends_with(".jsonl") { + return FileType::NdJson; + } + if lower.ends_with(".pdf") { + return FileType::Pdf; + } + if lower.ends_with(".txt") || lower.ends_with(".log") || lower.ends_with(".sms") { + return FileType::Text; + } + + // Content sniffing fallback + if content.starts_with(b"%PDF") { + return FileType::Pdf; + } + if content.starts_with(b"[") || content.starts_with(b"{") { + return FileType::Json; + } + + // Check if it looks like CSV (has commas and newlines in first chunk) + let sample = &content[..content.len().min(4096)]; + let comma_count = sample.iter().filter(|&&b| b == b',').count(); + let newline_count = sample.iter().filter(|&&b| b == b'\n').count(); + if comma_count > 3 && newline_count > 1 { + return FileType::Csv; + } + + // If it's valid UTF-8, treat as text + if std::str::from_utf8(sample).is_ok() { + return FileType::Text; + } + + FileType::Unknown +} + +/// Compute SHA-256 hash of content for deduplication. +pub fn content_hash(content: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(content); + format!("{:x}", hasher.finalize()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn detect_csv_by_extension() { + assert_eq!(detect_file_type("data.csv", b"a,b,c\n1,2,3"), FileType::Csv); + } + + #[test] + fn detect_json_by_extension() { + assert_eq!(detect_file_type("data.json", b"[{\"a\":1}]"), FileType::Json); + } + + #[test] + fn detect_pdf_by_magic() { + assert_eq!(detect_file_type("unknown", b"%PDF-1.4 blah"), FileType::Pdf); + } + + #[test] + fn detect_csv_by_content() { + let csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n"; + assert_eq!(detect_file_type("unknown.dat", csv), FileType::Csv); + } + + #[test] + fn content_hash_deterministic() { + let h1 = content_hash(b"hello world"); + let h2 = content_hash(b"hello world"); + assert_eq!(h1, h2); + } +} diff --git a/crates/ingestd/src/json_ingest.rs b/crates/ingestd/src/json_ingest.rs new file mode 100644 index 0000000..aa91140 --- /dev/null +++ b/crates/ingestd/src/json_ingest.rs @@ -0,0 +1,167 @@ +use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::collections::BTreeMap; +use std::sync::Arc; + +/// Parse JSON (array of objects or newline-delimited) into Arrow RecordBatches. +/// Nested objects are flattened: {"a": {"b": 1}} → column "a_b". +pub fn parse_json(content: &[u8]) -> Result<(Arc, Vec), String> { + let text = std::str::from_utf8(content).map_err(|e| format!("invalid UTF-8: {e}"))?; + let text = text.trim(); + + // Parse into Vec + let rows: Vec> = if text.starts_with('[') { + // JSON array + let arr: Vec = serde_json::from_str(text) + .map_err(|e| format!("JSON parse error: {e}"))?; + arr.into_iter() + .filter_map(|v| v.as_object().cloned()) + .collect() + } else { + // Newline-delimited JSON + text.lines() + .filter(|l| !l.trim().is_empty()) + .map(|l| { + let v: serde_json::Value = serde_json::from_str(l) + .map_err(|e| format!("NDJSON line parse error: {e}"))?; + v.as_object().cloned().ok_or_else(|| "NDJSON line is not an object".into()) + }) + .collect::, String>>()? + }; + + if rows.is_empty() { + return Err("JSON has no records".into()); + } + + // Flatten and collect all columns + let mut columns: BTreeMap> = BTreeMap::new(); + let n_rows = rows.len(); + + for (row_idx, row) in rows.iter().enumerate() { + let flat = flatten_object(row, ""); + // Pad existing columns that aren't in this row + for key in columns.keys().cloned().collect::>() { + if !flat.contains_key(&key) { + columns.get_mut(&key).unwrap().push(serde_json::Value::Null); + } + } + // Add new columns or append values + for (key, val) in flat { + let col = columns.entry(key).or_insert_with(Vec::new); + // Pad with nulls if this column is new and we've seen prior rows + while col.len() < row_idx { + col.push(serde_json::Value::Null); + } + col.push(val); + } + } + + // Pad short columns + for col in columns.values_mut() { + while col.len() < n_rows { + col.push(serde_json::Value::Null); + } + } + + tracing::info!("parsed JSON: {n_rows} rows × {} columns", columns.len()); + + // Infer types and build schema + let mut fields = Vec::new(); + let mut arrays: Vec = Vec::new(); + + for (name, values) in &columns { + let (dt, array) = json_column_to_arrow(values); + fields.push(Field::new(name, dt, true)); + arrays.push(array); + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} + +/// Flatten nested JSON object: {"a": {"b": 1}} → {"a_b": 1} +fn flatten_object(obj: &serde_json::Map, prefix: &str) -> BTreeMap { + let mut result = BTreeMap::new(); + for (key, val) in obj { + let full_key = if prefix.is_empty() { key.clone() } else { format!("{prefix}_{key}") }; + match val { + serde_json::Value::Object(inner) => { + result.extend(flatten_object(inner, &full_key)); + } + other => { + result.insert(full_key, other.clone()); + } + } + } + result +} + +/// Convert a column of JSON values to an Arrow array. +fn json_column_to_arrow(values: &[serde_json::Value]) -> (DataType, ArrayRef) { + // Check if all non-null values are the same type + let non_null: Vec<&serde_json::Value> = values.iter() + .filter(|v| !v.is_null()) + .collect(); + + if non_null.is_empty() { + return (DataType::Utf8, Arc::new(StringArray::from(vec![None::<&str>; values.len()]))); + } + + let all_bool = non_null.iter().all(|v| v.is_boolean()); + let all_i64 = non_null.iter().all(|v| v.is_i64() || v.is_u64()); + let all_f64 = non_null.iter().all(|v| v.is_number()); + + if all_bool { + let arr: Vec> = values.iter().map(|v| v.as_bool()).collect(); + (DataType::Boolean, Arc::new(BooleanArray::from(arr))) + } else if all_i64 { + let arr: Vec> = values.iter().map(|v| v.as_i64()).collect(); + (DataType::Int64, Arc::new(Int64Array::from(arr))) + } else if all_f64 { + let arr: Vec> = values.iter().map(|v| v.as_f64()).collect(); + (DataType::Float64, Arc::new(Float64Array::from(arr))) + } else { + // Default to string + let arr: Vec> = values.iter().map(|v| { + match v { + serde_json::Value::Null => None, + serde_json::Value::String(s) => Some(s.clone()), + other => Some(other.to_string()), + } + }).collect(); + (DataType::Utf8, Arc::new(StringArray::from(arr))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_json_array() { + let json = br#"[{"name":"Alice","age":30},{"name":"Bob","age":25}]"#; + let (schema, batches) = parse_json(json).unwrap(); + assert_eq!(batches[0].num_rows(), 2); + assert!(schema.field_with_name("age").unwrap().data_type() == &DataType::Int64); + } + + #[test] + fn parse_ndjson() { + let json = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n"; + let (_, batches) = parse_json(json).unwrap(); + assert_eq!(batches[0].num_rows(), 3); + } + + #[test] + fn flatten_nested() { + let json = br#"[{"user":{"name":"Alice","address":{"city":"NYC"}},"score":9.5}]"#; + let (schema, _) = parse_json(json).unwrap(); + let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert!(names.contains(&"user_name")); + assert!(names.contains(&"user_address_city")); + } +} diff --git a/crates/ingestd/src/lib.rs b/crates/ingestd/src/lib.rs new file mode 100644 index 0000000..c507d30 --- /dev/null +++ b/crates/ingestd/src/lib.rs @@ -0,0 +1,6 @@ +pub mod detect; +pub mod csv_ingest; +pub mod json_ingest; +pub mod pdf_ingest; +pub mod pipeline; +pub mod service; diff --git a/crates/ingestd/src/pdf_ingest.rs b/crates/ingestd/src/pdf_ingest.rs new file mode 100644 index 0000000..c087e80 --- /dev/null +++ b/crates/ingestd/src/pdf_ingest.rs @@ -0,0 +1,52 @@ +use arrow::array::{ArrayRef, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// Extract text from a PDF file. +/// Returns one row per page: (page_number, text_content). +/// This handles text-based PDFs. Scanned/image PDFs need OCR (not implemented yet). +pub fn parse_pdf(content: &[u8], source_filename: &str) -> Result<(Arc, Vec), String> { + let doc = lopdf::Document::load_mem(content) + .map_err(|e| format!("PDF load error: {e}"))?; + + let pages = doc.get_pages(); + let mut page_numbers: Vec = Vec::new(); + let mut page_texts: Vec = Vec::new(); + let mut sources: Vec = Vec::new(); + + for (&page_num, _) in pages.iter() { + let text = doc.extract_text(&[page_num]).unwrap_or_default(); + let text = text.trim().to_string(); + + if !text.is_empty() { + page_numbers.push(page_num as i32); + page_texts.push(text); + sources.push(source_filename.to_string()); + } + } + + if page_numbers.is_empty() { + // PDF has no extractable text — likely scanned/image + return Err("PDF contains no extractable text (may be scanned/image — OCR not yet supported)".into()); + } + + tracing::info!("extracted {} pages with text from PDF '{}'", page_numbers.len(), source_filename); + + let schema = Arc::new(Schema::new(vec![ + Field::new("source_file", DataType::Utf8, false), + Field::new("page_number", DataType::Int32, false), + Field::new("text_content", DataType::Utf8, false), + ])); + + let arrays: Vec = vec![ + Arc::new(StringArray::from(sources)), + Arc::new(Int32Array::from(page_numbers)), + Arc::new(StringArray::from(page_texts)), + ]; + + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} diff --git a/crates/ingestd/src/pipeline.rs b/crates/ingestd/src/pipeline.rs new file mode 100644 index 0000000..2267064 --- /dev/null +++ b/crates/ingestd/src/pipeline.rs @@ -0,0 +1,162 @@ +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use bytes::Bytes; +use object_store::ObjectStore; +use std::sync::Arc; + +use catalogd::registry::Registry; +use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet}; +use shared::types::{ObjectRef, SchemaFingerprint}; +use storaged::ops; + +use crate::detect::{FileType, content_hash, detect_file_type}; +use crate::csv_ingest; +use crate::json_ingest; +use crate::pdf_ingest; + +/// Result of an ingest operation. +#[derive(Debug, Clone, serde::Serialize)] +pub struct IngestResult { + pub dataset_name: String, + pub file_type: String, + pub rows: usize, + pub columns: usize, + pub storage_key: String, + pub content_hash: String, + pub schema_fingerprint: String, + pub deduplicated: bool, +} + +/// Full ingest pipeline: detect → parse → dedup → store → register. +pub async fn ingest_file( + filename: &str, + content: &[u8], + dataset_name: Option<&str>, + store: &Arc, + registry: &Registry, +) -> Result { + // 1. Detect file type + let file_type = detect_file_type(filename, content); + tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len()); + + // 2. Parse into Arrow + let (schema, batches) = match file_type { + FileType::Csv => csv_ingest::parse_csv(content)?, + FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?, + FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?, + FileType::Text => parse_text_file(content, filename)?, + FileType::Unknown => return Err(format!("unknown file type for '{filename}'")), + }; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let total_cols = schema.fields().len(); + + if total_rows == 0 { + return Err("file contains no data".into()); + } + + // 3. Content hash for dedup + let hash = content_hash(content); + + // 4. Check if already ingested (same content hash) + let existing = registry.list().await; + if existing.iter().any(|d| d.schema_fingerprint.0 == hash) { + tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]); + return Ok(IngestResult { + dataset_name: dataset_name.unwrap_or(filename).to_string(), + file_type: format!("{:?}", file_type), + rows: total_rows, + columns: total_cols, + storage_key: String::new(), + content_hash: hash, + schema_fingerprint: String::new(), + deduplicated: true, + }); + } + + // 5. Convert to Parquet + let mut all_parquet = Vec::new(); + for batch in &batches { + let pq = record_batch_to_parquet(batch)?; + all_parquet.extend_from_slice(&pq); + } + let parquet_bytes = Bytes::from(all_parquet); + let parquet_size = parquet_bytes.len() as u64; + + // 6. Store in object storage + let name = dataset_name.unwrap_or_else(|| { + filename.rsplit('/').next().unwrap_or(filename) + .rsplit('.').last().unwrap_or(filename) + }); + let safe_name = sanitize_dataset_name(name); + let storage_key = format!("datasets/{}.parquet", safe_name); + + ops::put(store, &storage_key, parquet_bytes).await?; + tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size); + + // 7. Register in catalog + let schema_fp = fingerprint_schema(&schema); + let now = chrono::Utc::now(); + let obj_ref = ObjectRef { + bucket: "data".to_string(), + key: storage_key.clone(), + size_bytes: parquet_size, + created_at: now, + }; + + // Use content hash as fingerprint for dedup tracking + registry.register( + safe_name.clone(), + SchemaFingerprint(hash.clone()), + vec![obj_ref], + ).await?; + + Ok(IngestResult { + dataset_name: safe_name, + file_type: format!("{:?}", file_type), + rows: total_rows, + columns: total_cols, + storage_key, + content_hash: hash, + schema_fingerprint: schema_fp.0, + deduplicated: false, + }) +} + +/// Parse plain text / SMS logs into a simple table. +fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec), String> { + let text = String::from_utf8_lossy(content); + let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect(); + + if lines.is_empty() { + return Err("text file is empty".into()); + } + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false), + arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false), + arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false), + ])); + + let sources: Vec<&str> = vec![filename; lines.len()]; + let line_nums: Vec = (1..=lines.len() as i32).collect(); + + let arrays: Vec = vec![ + Arc::new(arrow::array::StringArray::from(sources)), + Arc::new(arrow::array::Int32Array::from(line_nums)), + Arc::new(arrow::array::StringArray::from(lines)), + ]; + + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} + +fn sanitize_dataset_name(name: &str) -> String { + let clean: String = name.chars() + .map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' }) + .collect(); + let trimmed = clean.trim_matches('_').to_string(); + if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed } +} diff --git a/crates/ingestd/src/service.rs b/crates/ingestd/src/service.rs new file mode 100644 index 0000000..1ac3666 --- /dev/null +++ b/crates/ingestd/src/service.rs @@ -0,0 +1,69 @@ +use axum::{ + Json, Router, + extract::{Multipart, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, +}; +use object_store::ObjectStore; +use serde::Deserialize; +use std::sync::Arc; + +use catalogd::registry::Registry; +use crate::pipeline; + +#[derive(Clone)] +pub struct IngestState { + pub store: Arc, + pub registry: Registry, +} + +pub fn router(state: IngestState) -> Router { + Router::new() + .route("/health", get(health)) + .route("/file", post(ingest_file)) + .with_state(state) +} + +async fn health() -> &'static str { + "ingestd ok" +} + +#[derive(Deserialize)] +struct IngestQuery { + /// Override dataset name (otherwise derived from filename) + name: Option, +} + +/// Upload a file for ingestion. Accepts multipart/form-data with a "file" field. +async fn ingest_file( + State(state): State, + Query(query): Query, + mut multipart: Multipart, +) -> impl IntoResponse { + // Read the first file field + let field = match multipart.next_field().await { + Ok(Some(f)) => f, + Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())), + Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))), + }; + + let filename = field.file_name().unwrap_or("unknown").to_string(); + let content = field.bytes().await + .map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?; + + tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len()); + + let dataset_name = query.name.as_deref(); + + match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await { + Ok(result) => { + if result.deduplicated { + Ok((StatusCode::OK, Json(result))) + } else { + Ok((StatusCode::CREATED, Json(result))) + } + } + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json b/data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json new file mode 100644 index 0000000..0d33132 --- /dev/null +++ b/data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json @@ -0,0 +1,15 @@ +{ + "id": "1ca61945-d151-490b-81fd-2ca0397b68fa", + "name": "sms_messages", + "schema_fingerprint": "e1d079cbb2b7eedae5019767a886bd9a3396e291aa03630b9db69e9864948c09", + "objects": [ + { + "bucket": "data", + "key": "datasets/sms_messages.parquet", + "size_bytes": 2018, + "created_at": "2026-03-27T13:07:14.253881797Z" + } + ], + "created_at": "2026-03-27T13:07:14.253886027Z", + "updated_at": "2026-03-27T13:07:14.253886027Z" +} \ No newline at end of file diff --git a/data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json b/data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json deleted file mode 100644 index be47193..0000000 --- a/data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "1f81445a-404f-48ea-bd72-00f6721ee18b", - "name": "employees", - "schema_fingerprint": "auto", - "objects": [ - { - "bucket": "data", - "key": "datasets/employees.parquet", - "size_bytes": 1424, - "created_at": "2026-03-27T11:55:55.013996293Z" - } - ], - "created_at": "2026-03-27T11:55:55.014010258Z", - "updated_at": "2026-03-27T11:55:55.014010258Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json b/data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json new file mode 100644 index 0000000..c30c875 --- /dev/null +++ b/data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json @@ -0,0 +1,15 @@ +{ + "id": "478072c3-0c95-46a2-9193-f4b3ac4085ab", + "name": "test_ingest", + "schema_fingerprint": "4bdc4e5baeddc1187aecd4bfb788654f26145c2ba346b4bec6ca8ab950e1c133", + "objects": [ + { + "bucket": "data", + "key": "datasets/test_ingest.parquet", + "size_bytes": 3129, + "created_at": "2026-03-27T13:06:57.437484309Z" + } + ], + "created_at": "2026-03-27T13:06:57.437488259Z", + "updated_at": "2026-03-27T13:06:57.437488259Z" +} \ No newline at end of file diff --git a/data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json b/data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json deleted file mode 100644 index 9e89f03..0000000 --- a/data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "50d402a0-6fb4-4849-ac75-3541b8530aba", - "name": "u5", - "schema_fingerprint": "from-ui", - "objects": [ - { - "bucket": "data", - "key": "datasets/events.parquet", - "size_bytes": 0, - "created_at": "2026-03-27T12:16:57.520054490Z" - } - ], - "created_at": "2026-03-27T12:16:57.520066991Z", - "updated_at": "2026-03-27T12:16:57.520066991Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json b/data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json deleted file mode 100644 index f140ce9..0000000 --- a/data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8", - "name": "events", - "schema_fingerprint": "auto", - "objects": [ - { - "bucket": "data", - "key": "datasets/events.parquet", - "size_bytes": 1127, - "created_at": "2026-03-27T11:55:55.017003417Z" - } - ], - "created_at": "2026-03-27T11:55:55.017011224Z", - "updated_at": "2026-03-27T11:55:55.017011224Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json b/data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json deleted file mode 100644 index cd2a93e..0000000 --- a/data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "f5281277-afe9-456b-b5fa-d667c2b0f41f", - "name": "products", - "schema_fingerprint": "auto", - "objects": [ - { - "bucket": "data", - "key": "datasets/products.parquet", - "size_bytes": 1341, - "created_at": "2026-03-27T11:55:55.019096056Z" - } - ], - "created_at": "2026-03-27T11:55:55.019098965Z", - "updated_at": "2026-03-27T11:55:55.019098965Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json b/data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json deleted file mode 100644 index 265200f..0000000 --- a/data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "fc046131-8f44-428a-af90-ebf6347d4cc6", - "name": "u", - "schema_fingerprint": "from-ui", - "objects": [ - { - "bucket": "data", - "key": "datasets/employees.parquet", - "size_bytes": 0, - "created_at": "2026-03-27T12:16:51.462058823Z" - } - ], - "created_at": "2026-03-27T12:16:51.462071311Z", - "updated_at": "2026-03-27T12:16:51.462071311Z" -} \ No newline at end of file diff --git a/data/datasets/employees.parquet b/data/datasets/employees.parquet deleted file mode 100644 index e830c578bf813b464917faff4758d621a19d0492..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1424 zcmb7EzfTiU9DjGc9@h%VVSMME>A)bDniQmHl^96+S||{TwoOQpxLn(_EtIxiE0x7C z7{bJ&ad0pWj!usL0VX)|C%CXkB7;ht{C?lH#X=o?>3x5HKi?m(cPW+$Q;F^fbTvs; zN-h%;l?37wi0B)Th-XM45+* z$icB!3GE!7xFL6cM$gZpyNf@@UAC9hS;srkJe18ZURVfTRPRmw^ZqUr z>n=pwS2WdBw66oi2lX;N6=iaM+D&EBNjKvsZX?XA~G1g+SB1fR7G3E@0 diff --git a/data/datasets/events.parquet b/data/datasets/events.parquet deleted file mode 100644 index c9d827dc46bfaf0548eb08f6c2e367a370f330a6..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1127 zcmb7EOK;Oa5MDcp<5X#sBClmjKIBrXh@`~MqoGPA)}iGUN*V!exS-UHl88Kv;}ZH8 zxWR8goHJIQIu|<`*!td5JU^M)tS!+HYpoj~c66G)z5OtI`A|HwY<(1rig8 z7>kBUXk8{@xFhRwSt1jOPFG5(K~rg}&?J?pWN7j%qRGNEMUi ze)>=#1ez$(-6~LF6Gkcm(cZA#9SE_~oAZ``f%RMOyrCaJWsMq(%0;pj;VGIlN#F(I z=}7@G60XZg|6QPi*^sCCHK4Pzb}^I9WwMr?01F|9kVhy0l#2hP?JUm>mDlDZJK93G zmS&XmUo4zauNH_VjoePF-=q>aHbpIZK5F@|x@UyO!IGM8&>apoN!5T3xhocQ`>j#X z?0ZwMs_2GZHi-d*1tKN&IR_P0Mg1(Q??p2(<^OvodKwOiP>}#6VJY8~bdxC11y+hr zD^2_3g)Rm4n_&JH&5xq7>LFZXR6F`A&c%S7_KNp^6*hl{kalsZ%O`xD|A#OypnjE9 z#gx@w5uzJfoYm`3k9Q|0uU(3_QEoAmBaVzcW^BxyHalRf%_?o~bsWZ?pjKh*A+BK0 zRfGrf!oHb2Lgnm!FL-%!m~9+eW3=rWY`XIdpaj8w@0*;*Yx3Q3o@?fWtzS}9H-v4{1LpfDLc2F-L~{Hyt$K${DE(< H2EUlU0pG|Z diff --git a/data/datasets/products.parquet b/data/datasets/products.parquet deleted file mode 100644 index 0006892ac6c546e3c66f3c2aff6b183a561991fd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1341 zcmb7ETTc@~6h6BxWm&2L!fbZ6#3rVj8UeWk6HSSo1=}i!ZNsG{BxGsV?SgIBQYg{H z5EG5Rz(3$SkH$Cufv+aMkr-p*D^JGTnb{Vwg$F0?T+TW3ecybikfzyj;wNbmpC>Yb z0f4CAf*t`L^>q8ez97X);0>p_5+Dk>G>p}KBtQZoRcd8a$!gXp5IrN>-;e3usED&H z_-q{&foXL6>=LW?t+o0dPlb$?CAue|)8dH*{VPaQrTJovEDBSrs|;?D*Tl7~O8 z#@oqMiI-u8Dz$3ZqdTK#2mo@C3@`)1Gf)sz0q{0od40F)nRJ=>wY4cw1KWj?RW^x) zjzvUOc5Bt*PNBZ;6oCXW;IEivD+2Q&w1~(U=yGdL!2$xfmz>J_cD-8I0!7wB+Pn@S zgq?ywpLQCFh>EQ2i^`Fp7j^0X+Nup>CxM7ah`$OEV%Gwi4l=4?WWgpGSIKE!iOM%Y zKN9tyLg?JSb$}|%+G$w13G2PJCI5w}fAs5LeE5LQW}Mri4z7P509qsf+KG{zYf;JR zU%O&zd-t>un!vm`F+uqpP-IN8#*gR%n!2`CNq4XYG@!VSR#IDX%S=?mPEQh+wwpJSP8@S2jc`(I-$5b4T(&Xcv z+~mU8#>7I6r{^~D()BGS4bKMM z{XW;XQBKrXy_e%TTCLg$?^?BOr&-Yn2d;n?u G$N3ld8W4a0 diff --git a/data/datasets/sms_messages.parquet b/data/datasets/sms_messages.parquet new file mode 100644 index 0000000000000000000000000000000000000000..eedb46c2ad23949dbdf6267b362c1c25e4b86e4f GIT binary patch literal 2018 zcmbtVO-vI(6rSx0scV&=Fq3Z5!^G&Kp}@9viy8tox-qTm6gM^mUFz} zGWBwda>rv+mqBsGvc_{fOyjiZmb@Izz|5VD7p7UaR`OtsfK_t12p&DgS=nRRFwHn6 zT4S`#JX+bJAV_63&uUXQ!F-N02RtN)3k9$#Wo5`L^MGu5j3%7S<_32wfM;?nvq?F7 zR>^ag9m5C{Ou~?8Kkmi7p*XsRyD|@6KArc#05I{v&=ggDF^iy%xAE--SVR>Apcsk7 zB2mk-BGK55x;!xv1KEmC+yp9vc(9F1U?kGi2SIA?Zi%I+8ie?zKtDXOA&3<q`2*!^~3pH9$q18GQlYC9=i99)xjhiTVB=*Q|)ukL?E36WhDe^dxUI(exgJw*0< z@U|4Ug>>M@Z|mC6iqvXHTHyNRZouK(tvxNn%PBh8k8t3sFL*BPW_PZlg%Dh{Iao%NOD8I>oJ4)aSVkk!X5IUocPz2XqVd_Um<@7!i%U>VaoHI*LXb8b|cQGJ{%&? zFXBgSVVd=kvEHlXy=olzjXn7h>bw?ze>KNre^wg7cSVvvF5+ria5#q63^n78c9sl( ziwwV9f(*&e3q&*gu)wEA4+R7E($a&K$@=99e)EmXma?JA3D$eIU9+b*?0EsfYTF5+ zhL!@^wv*x~K!>YPz?fbY_+v3m*Tj5BO!-9!uq|ER7c#U&A|>@wU_xILt`cS%V19~y zN!o8pduJt5$i$X%X&H28RirX2y&n3%Gdm+?G0h}}UZyg)wv&#UQUfHHMKUuR_F|(1 z$wt01pQHV$<-_{GzLYSL`PKc+SJ&7fK6rNo>!5AwzKQr2`tjZTwK?FNLn2@TKyvA`tFsU61Nl>L|?QCL)wKsS- zHsO>*RdGN)_ka-M&_jhPgepXPLOmcmU)arTwcS%kl^5D47P5x~AmuX@-Ba=BKuD+ro3Io1)FVytD>Loi*g8b(#CSw0jY z&*n>WPoVb%k7xjpMyDnwPsUTJ)ai-IsWV(NJw4@UQfH^n;Tjf`YPahQJX~|`%(UY= z4v)`b0uU+z^Uk=p=*ro& z&!(_f2mv3-nL$*WYQOi!=vW>oaU1>Mu9od;Q^$Y{b&byfd58{mY=Vr2Je$<-_;=t6 zfgr%W7U1LUAp&S~4PCS2Ewgc5Z=3+R(9k#Zh83@wje51d8Mi9c4ci)vH}%GDyrDyb zmkjqH!GS2|<63<^Uf8uOW<73QhZA67OI)g27DjJW8g>7$0qy3HrFO@li+EBIL(Y=gMFkf~ z1+Vl06t_Cy;1%qE1MJp6I~{&8X3*~#Au@{-ImRA~^m{HPP1^cZh^=h!_;u6dl9?O`}mlBPT>D%qzTx z6+S`oF~H>NqlRYLJ=(r)=#nnKK7gtkUStpeL$|8iYh`m?Co~M=(Gs-OT0^rzM!tT_ z*6bb2i$GOh-GqC_)v&M}lkX@bMk8{H^hf2+6;i^&?5O}B#ZW@sSHC=lhmDH(1rdee zYZm8SM%bre`UU&-H)K>y~B`A+Y<(YuE?VjY_+>4!Xv40++F}4?_XS=YS zJaOEIB6}z*FN4ZY0mMDz>h2|Ix&*%?!ILPR?o&JP zo!SZPp`h$Vl&8mdPk3j&<8tl+XK6Q+cPSL^-4M-m7moKjM}IG}_oDJ06%=>FHm zA?^3btJhOXzhu7+(*j|CA7^1D1Ot54%P}dUE-ih#{FQUsr8|8|d`_tuRaJ}dhtEY- zZK>xs)df_+sB$Y>iXf}%1)M^FH-dytU;%`pIt#`kn6oDO-N4se3*Y>OL~1^JDw#ZY z1*chj;ZwYF$(g|WBDd#IF4bnnudSddyP9k!lU(UQfo?X}j#sl6(4N7i%rB0wt}IQI zfe}`-jEl`~ss+A4TsL2Yx(c4^JihQLUg9#$=5RlN10#Q6WBh@9sOoCXuu8d^UBEX` zzFdI(-9%EJ1ROJJg>Mz|(?Y7eV{%B*!Ob+!pI$RsE0c@OYjZpuYK7zRQh52uWQrV* zhpwZ&g}xc1w4B>6q4we}P<|$#3*63ASDvdhOErwoUV)&`<#QJ8=W!`>H!J0-rHxW8 zXJ11ZwgO-m}0N`Jb55_>z&PUBRcR>6y4NgYh-HL%EGPn+|17E@&xAc8zRZu oTI0r!Zl8u