diff --git a/Cargo.lock b/Cargo.lock index d7c9d45..70cf55c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.8.12" @@ -508,6 +519,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "brotli" version = "8.0.2" @@ -535,6 +555,12 @@ version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +[[package]] +name = "bytecount" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" + [[package]] name = "byteorder" version = "1.5.0" @@ -588,6 +614,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.58" @@ -690,6 +725,16 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "combine" version = "4.6.7" @@ -894,6 +939,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2311,6 +2375,7 @@ dependencies = [ "aibridge", "axum", "catalogd", + "ingestd", "object_store", "opentelemetry", "opentelemetry-stdout", @@ -2820,6 +2885,38 @@ dependencies = [ "cfb", ] +[[package]] +name = "ingestd" +version = "0.1.0" +dependencies = [ + "arrow", + "axum", + "bytes", + "catalogd", + "chrono", + "csv", + "lopdf", + "object_store", + "parquet", + "serde", + "serde_json", + "sha2", + "shared", + "storaged", + "tokio", + "tracing", +] + +[[package]] +name = "inout" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -3075,6 +3172,30 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86" +[[package]] +name = "lopdf" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7c1d3350d071cb86987a6bcb205c7019a0eb70dcad92b454fec722cca8d68b" +dependencies = [ + "aes", + "cbc", + "chrono", + "encoding_rs", + "flate2", + "indexmap", + "itoa", + "log", + "md-5", + "nom", + "nom_locate", + "rangemap", + "rayon", + "thiserror 2.0.18", + "time", + "weezl", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -3217,6 +3338,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -3291,6 +3418,27 @@ dependencies = [ "jni-sys 0.3.1", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "nom_locate" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e3c83c053b0713da60c5b8de47fe8e494fe3ece5267b2f23090a07a053ba8f3" +dependencies = [ + "bytecount", + "memchr", + "nom", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -3971,12 +4119,38 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rangemap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68" + [[package]] name = "raw-window-handle" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5439,6 +5613,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "weezl" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" + [[package]] name = "winapi-util" version = "0.1.11" diff --git a/Cargo.toml b/Cargo.toml index b7ade62..695759b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/catalogd", "crates/queryd", "crates/aibridge", + "crates/ingestd", "crates/gateway", "crates/ui", ] @@ -38,3 +39,6 @@ opentelemetry_sdk = { version = "0.28", features = ["rt-tokio"] } opentelemetry-stdout = { version = "0.28", features = ["trace"] } tracing-opentelemetry = "0.29" toml = "0.8" +csv = "1" +lopdf = "0.35" +encoding_rs = "0.8" diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index db6c48f..0c3dfd5 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -9,6 +9,7 @@ storaged = { path = "../storaged" } catalogd = { path = "../catalogd" } queryd = { path = "../queryd" } aibridge = { path = "../aibridge" } +ingestd = { path = "../ingestd" } tokio = { workspace = true } axum = { workspace = true } serde = { workspace = true } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index c4eb7c6..490f049 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -1,7 +1,7 @@ mod auth; mod observability; -use axum::{Router, routing::get}; +use axum::{Router, extract::DefaultBodyLimit, routing::get}; use proto::lakehouse::catalog_service_server::CatalogServiceServer; use shared::config::Config; use tower_http::cors::{Any, CorsLayer}; @@ -39,10 +39,14 @@ async fn main() { // HTTP router let mut app = Router::new() .route("/health", get(health)) - .nest("/storage", storaged::service::router(store)) + .nest("/storage", storaged::service::router(store.clone())) .nest("/catalog", catalogd::service::router(registry.clone())) .nest("/query", queryd::service::router(engine)) - .nest("/ai", aibridge::service::router(ai_client)); + .nest("/ai", aibridge::service::router(ai_client)) + .nest("/ingest", ingestd::service::router(ingestd::service::IngestState { + store: store.clone(), + registry: registry.clone(), + })); // Auth middleware (if enabled) if config.auth.enabled { @@ -58,6 +62,7 @@ async fn main() { } app = app + .layer(DefaultBodyLimit::max(256 * 1024 * 1024)) // 256MB .layer(CorsLayer::new() .allow_origin(Any) .allow_methods(Any) diff --git a/crates/ingestd/Cargo.toml b/crates/ingestd/Cargo.toml new file mode 100644 index 0000000..c1963e8 --- /dev/null +++ b/crates/ingestd/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ingestd" +version = "0.1.0" +edition = "2024" + +[dependencies] +shared = { path = "../shared" } +storaged = { path = "../storaged" } +catalogd = { path = "../catalogd" } +tokio = { workspace = true } +axum = { workspace = true, features = ["multipart"] } +lopdf = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true } +arrow = { workspace = true } +parquet = { workspace = true } +bytes = { workspace = true } +sha2 = { workspace = true } +csv = { workspace = true } +chrono = { workspace = true } +object_store = { workspace = true } diff --git a/crates/ingestd/src/csv_ingest.rs b/crates/ingestd/src/csv_ingest.rs new file mode 100644 index 0000000..68cf932 --- /dev/null +++ b/crates/ingestd/src/csv_ingest.rs @@ -0,0 +1,203 @@ +use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// Inferred column type from sampling data. +#[derive(Debug, Clone, PartialEq)] +enum InferredType { + Integer, + Float, + Boolean, + String, +} + +/// Parse CSV bytes into Arrow RecordBatches with automatic schema detection. +/// Per ADR-010: ambiguous types default to String. +pub fn parse_csv(content: &[u8]) -> Result<(Arc, Vec), String> { + let mut reader = csv::ReaderBuilder::new() + .flexible(true) // allow varying column counts + .trim(csv::Trim::All) + .from_reader(content); + + let headers: Vec = reader.headers() + .map_err(|e| format!("CSV header error: {e}"))? + .iter() + .enumerate() + .map(|(i, h)| { + let h = h.trim().to_string(); + if h.is_empty() { format!("column_{i}") } else { sanitize_column_name(&h) } + }) + .collect(); + + let n_cols = headers.len(); + if n_cols == 0 { + return Err("CSV has no columns".into()); + } + + // Read all rows into string columns + let mut columns: Vec> = vec![vec![]; n_cols]; + let mut row_count = 0; + + for result in reader.records() { + let record = result.map_err(|e| format!("CSV row error: {e}"))?; + for (i, field) in record.iter().enumerate() { + if i < n_cols { + columns[i].push(field.trim().to_string()); + } + } + // Pad short rows with empty strings + for col in columns.iter_mut().skip(record.len().min(n_cols)) { + col.push(String::new()); + } + row_count += 1; + } + + if row_count == 0 { + return Err("CSV has no data rows".into()); + } + + tracing::info!("parsed CSV: {row_count} rows × {n_cols} columns"); + + // Infer types by sampling (look at all values) + let types: Vec = columns.iter().map(|col| infer_column_type(col)).collect(); + + // Build Arrow schema + let fields: Vec = headers.iter().zip(types.iter()).map(|(name, typ)| { + let dt = match typ { + InferredType::Integer => DataType::Int64, + InferredType::Float => DataType::Float64, + InferredType::Boolean => DataType::Boolean, + InferredType::String => DataType::Utf8, + }; + Field::new(name, dt, true) // all nullable + }).collect(); + + let schema = Arc::new(Schema::new(fields)); + + // Build arrays + let arrays: Vec = columns.iter().zip(types.iter()).map(|(col, typ)| { + match typ { + InferredType::Integer => { + let vals: Vec> = col.iter().map(|v| { + if v.is_empty() { None } else { v.replace(',', "").parse().ok() } + }).collect(); + Arc::new(Int64Array::from(vals)) as ArrayRef + } + InferredType::Float => { + let vals: Vec> = col.iter().map(|v| { + if v.is_empty() { None } + else { v.replace(',', "").replace('$', "").replace('%', "").parse().ok() } + }).collect(); + Arc::new(Float64Array::from(vals)) as ArrayRef + } + InferredType::Boolean => { + let vals: Vec> = col.iter().map(|v| { + match v.to_lowercase().as_str() { + "true" | "yes" | "1" | "y" | "t" => Some(true), + "false" | "no" | "0" | "n" | "f" => Some(false), + _ => None, + } + }).collect(); + Arc::new(BooleanArray::from(vals)) as ArrayRef + } + InferredType::String => { + Arc::new(StringArray::from(col.clone())) as ArrayRef + } + } + }).collect(); + + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} + +/// Infer column type from values. Conservative: defaults to String on ambiguity. +fn infer_column_type(values: &[String]) -> InferredType { + let non_empty: Vec<&str> = values.iter() + .map(|v| v.as_str()) + .filter(|v| !v.is_empty() && *v != "NULL" && *v != "null" && *v != "N/A" && *v != "n/a") + .collect(); + + if non_empty.is_empty() { + return InferredType::String; + } + + // Check boolean + let all_bool = non_empty.iter().all(|v| { + matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "1" | "0" | "y" | "n" | "t" | "f") + }); + if all_bool && non_empty.len() >= 2 { + // Make sure it's not just all "1" and "0" which could be integers + let has_text_bool = non_empty.iter().any(|v| { + matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "y" | "n" | "t" | "f") + }); + if has_text_bool { + return InferredType::Boolean; + } + } + + // Check integer (allow commas as thousands separator) + let int_rate = non_empty.iter() + .filter(|v| v.replace(',', "").parse::().is_ok()) + .count() as f64 / non_empty.len() as f64; + if int_rate > 0.95 { + return InferredType::Integer; + } + + // Check float (allow $, %, commas) + let float_rate = non_empty.iter() + .filter(|v| v.replace(',', "").replace('$', "").replace('%', "").parse::().is_ok()) + .count() as f64 / non_empty.len() as f64; + if float_rate > 0.95 { + return InferredType::Float; + } + + InferredType::String +} + +/// Sanitize column name for SQL compatibility. +fn sanitize_column_name(name: &str) -> String { + name.chars() + .map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' }) + .collect::() + .trim_matches('_') + .to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_simple_csv() { + let csv = b"Name,Age,Salary\nAlice,30,50000\nBob,25,45000\n"; + let (schema, batches) = parse_csv(csv).unwrap(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(batches[0].num_rows(), 2); + assert_eq!(schema.field(1).data_type(), &DataType::Int64); + assert_eq!(schema.field(2).data_type(), &DataType::Int64); + } + + #[test] + fn parse_csv_with_mixed_types() { + let csv = b"id,value\n1,hello\n2,world\n3,N/A\n"; + let (schema, _) = parse_csv(csv).unwrap(); + assert_eq!(schema.field(0).data_type(), &DataType::Int64); + assert_eq!(schema.field(1).data_type(), &DataType::Utf8); + } + + #[test] + fn parse_csv_with_dollar_amounts() { + let csv = b"item,price\nWidget,$29.99\nGadget,$149.50\n"; + let (schema, _) = parse_csv(csv).unwrap(); + assert_eq!(schema.field(1).data_type(), &DataType::Float64); + } + + #[test] + fn sanitize_names() { + assert_eq!(sanitize_column_name("First Name"), "first_name"); + assert_eq!(sanitize_column_name("Bill Rate ($)"), "bill_rate"); + } +} diff --git a/crates/ingestd/src/detect.rs b/crates/ingestd/src/detect.rs new file mode 100644 index 0000000..fff0e57 --- /dev/null +++ b/crates/ingestd/src/detect.rs @@ -0,0 +1,103 @@ +use sha2::{Digest, Sha256}; + +/// Detected file type from content inspection. +#[derive(Debug, Clone, PartialEq)] +pub enum FileType { + Csv, + Json, + NdJson, // newline-delimited JSON + Pdf, + Text, // plain text, SMS logs, etc. + Unknown, +} + +/// Detect file type from filename extension and content sniffing. +pub fn detect_file_type(filename: &str, content: &[u8]) -> FileType { + // Extension-based first + let lower = filename.to_lowercase(); + if lower.ends_with(".csv") || lower.ends_with(".tsv") { + return FileType::Csv; + } + if lower.ends_with(".json") { + // Check if it's newline-delimited JSON + if content.iter().take(4096).filter(|&&b| b == b'\n').count() > 2 { + let first_line = content.split(|&b| b == b'\n').next().unwrap_or(b""); + if first_line.starts_with(b"{") { + return FileType::NdJson; + } + } + return FileType::Json; + } + if lower.ends_with(".ndjson") || lower.ends_with(".jsonl") { + return FileType::NdJson; + } + if lower.ends_with(".pdf") { + return FileType::Pdf; + } + if lower.ends_with(".txt") || lower.ends_with(".log") || lower.ends_with(".sms") { + return FileType::Text; + } + + // Content sniffing fallback + if content.starts_with(b"%PDF") { + return FileType::Pdf; + } + if content.starts_with(b"[") || content.starts_with(b"{") { + return FileType::Json; + } + + // Check if it looks like CSV (has commas and newlines in first chunk) + let sample = &content[..content.len().min(4096)]; + let comma_count = sample.iter().filter(|&&b| b == b',').count(); + let newline_count = sample.iter().filter(|&&b| b == b'\n').count(); + if comma_count > 3 && newline_count > 1 { + return FileType::Csv; + } + + // If it's valid UTF-8, treat as text + if std::str::from_utf8(sample).is_ok() { + return FileType::Text; + } + + FileType::Unknown +} + +/// Compute SHA-256 hash of content for deduplication. +pub fn content_hash(content: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(content); + format!("{:x}", hasher.finalize()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn detect_csv_by_extension() { + assert_eq!(detect_file_type("data.csv", b"a,b,c\n1,2,3"), FileType::Csv); + } + + #[test] + fn detect_json_by_extension() { + assert_eq!(detect_file_type("data.json", b"[{\"a\":1}]"), FileType::Json); + } + + #[test] + fn detect_pdf_by_magic() { + assert_eq!(detect_file_type("unknown", b"%PDF-1.4 blah"), FileType::Pdf); + } + + #[test] + fn detect_csv_by_content() { + let csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n"; + assert_eq!(detect_file_type("unknown.dat", csv), FileType::Csv); + } + + #[test] + fn content_hash_deterministic() { + let h1 = content_hash(b"hello world"); + let h2 = content_hash(b"hello world"); + assert_eq!(h1, h2); + } +} diff --git a/crates/ingestd/src/json_ingest.rs b/crates/ingestd/src/json_ingest.rs new file mode 100644 index 0000000..aa91140 --- /dev/null +++ b/crates/ingestd/src/json_ingest.rs @@ -0,0 +1,167 @@ +use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::collections::BTreeMap; +use std::sync::Arc; + +/// Parse JSON (array of objects or newline-delimited) into Arrow RecordBatches. +/// Nested objects are flattened: {"a": {"b": 1}} → column "a_b". +pub fn parse_json(content: &[u8]) -> Result<(Arc, Vec), String> { + let text = std::str::from_utf8(content).map_err(|e| format!("invalid UTF-8: {e}"))?; + let text = text.trim(); + + // Parse into Vec + let rows: Vec> = if text.starts_with('[') { + // JSON array + let arr: Vec = serde_json::from_str(text) + .map_err(|e| format!("JSON parse error: {e}"))?; + arr.into_iter() + .filter_map(|v| v.as_object().cloned()) + .collect() + } else { + // Newline-delimited JSON + text.lines() + .filter(|l| !l.trim().is_empty()) + .map(|l| { + let v: serde_json::Value = serde_json::from_str(l) + .map_err(|e| format!("NDJSON line parse error: {e}"))?; + v.as_object().cloned().ok_or_else(|| "NDJSON line is not an object".into()) + }) + .collect::, String>>()? + }; + + if rows.is_empty() { + return Err("JSON has no records".into()); + } + + // Flatten and collect all columns + let mut columns: BTreeMap> = BTreeMap::new(); + let n_rows = rows.len(); + + for (row_idx, row) in rows.iter().enumerate() { + let flat = flatten_object(row, ""); + // Pad existing columns that aren't in this row + for key in columns.keys().cloned().collect::>() { + if !flat.contains_key(&key) { + columns.get_mut(&key).unwrap().push(serde_json::Value::Null); + } + } + // Add new columns or append values + for (key, val) in flat { + let col = columns.entry(key).or_insert_with(Vec::new); + // Pad with nulls if this column is new and we've seen prior rows + while col.len() < row_idx { + col.push(serde_json::Value::Null); + } + col.push(val); + } + } + + // Pad short columns + for col in columns.values_mut() { + while col.len() < n_rows { + col.push(serde_json::Value::Null); + } + } + + tracing::info!("parsed JSON: {n_rows} rows × {} columns", columns.len()); + + // Infer types and build schema + let mut fields = Vec::new(); + let mut arrays: Vec = Vec::new(); + + for (name, values) in &columns { + let (dt, array) = json_column_to_arrow(values); + fields.push(Field::new(name, dt, true)); + arrays.push(array); + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} + +/// Flatten nested JSON object: {"a": {"b": 1}} → {"a_b": 1} +fn flatten_object(obj: &serde_json::Map, prefix: &str) -> BTreeMap { + let mut result = BTreeMap::new(); + for (key, val) in obj { + let full_key = if prefix.is_empty() { key.clone() } else { format!("{prefix}_{key}") }; + match val { + serde_json::Value::Object(inner) => { + result.extend(flatten_object(inner, &full_key)); + } + other => { + result.insert(full_key, other.clone()); + } + } + } + result +} + +/// Convert a column of JSON values to an Arrow array. +fn json_column_to_arrow(values: &[serde_json::Value]) -> (DataType, ArrayRef) { + // Check if all non-null values are the same type + let non_null: Vec<&serde_json::Value> = values.iter() + .filter(|v| !v.is_null()) + .collect(); + + if non_null.is_empty() { + return (DataType::Utf8, Arc::new(StringArray::from(vec![None::<&str>; values.len()]))); + } + + let all_bool = non_null.iter().all(|v| v.is_boolean()); + let all_i64 = non_null.iter().all(|v| v.is_i64() || v.is_u64()); + let all_f64 = non_null.iter().all(|v| v.is_number()); + + if all_bool { + let arr: Vec> = values.iter().map(|v| v.as_bool()).collect(); + (DataType::Boolean, Arc::new(BooleanArray::from(arr))) + } else if all_i64 { + let arr: Vec> = values.iter().map(|v| v.as_i64()).collect(); + (DataType::Int64, Arc::new(Int64Array::from(arr))) + } else if all_f64 { + let arr: Vec> = values.iter().map(|v| v.as_f64()).collect(); + (DataType::Float64, Arc::new(Float64Array::from(arr))) + } else { + // Default to string + let arr: Vec> = values.iter().map(|v| { + match v { + serde_json::Value::Null => None, + serde_json::Value::String(s) => Some(s.clone()), + other => Some(other.to_string()), + } + }).collect(); + (DataType::Utf8, Arc::new(StringArray::from(arr))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_json_array() { + let json = br#"[{"name":"Alice","age":30},{"name":"Bob","age":25}]"#; + let (schema, batches) = parse_json(json).unwrap(); + assert_eq!(batches[0].num_rows(), 2); + assert!(schema.field_with_name("age").unwrap().data_type() == &DataType::Int64); + } + + #[test] + fn parse_ndjson() { + let json = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n"; + let (_, batches) = parse_json(json).unwrap(); + assert_eq!(batches[0].num_rows(), 3); + } + + #[test] + fn flatten_nested() { + let json = br#"[{"user":{"name":"Alice","address":{"city":"NYC"}},"score":9.5}]"#; + let (schema, _) = parse_json(json).unwrap(); + let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert!(names.contains(&"user_name")); + assert!(names.contains(&"user_address_city")); + } +} diff --git a/crates/ingestd/src/lib.rs b/crates/ingestd/src/lib.rs new file mode 100644 index 0000000..c507d30 --- /dev/null +++ b/crates/ingestd/src/lib.rs @@ -0,0 +1,6 @@ +pub mod detect; +pub mod csv_ingest; +pub mod json_ingest; +pub mod pdf_ingest; +pub mod pipeline; +pub mod service; diff --git a/crates/ingestd/src/pdf_ingest.rs b/crates/ingestd/src/pdf_ingest.rs new file mode 100644 index 0000000..c087e80 --- /dev/null +++ b/crates/ingestd/src/pdf_ingest.rs @@ -0,0 +1,52 @@ +use arrow::array::{ArrayRef, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// Extract text from a PDF file. +/// Returns one row per page: (page_number, text_content). +/// This handles text-based PDFs. Scanned/image PDFs need OCR (not implemented yet). +pub fn parse_pdf(content: &[u8], source_filename: &str) -> Result<(Arc, Vec), String> { + let doc = lopdf::Document::load_mem(content) + .map_err(|e| format!("PDF load error: {e}"))?; + + let pages = doc.get_pages(); + let mut page_numbers: Vec = Vec::new(); + let mut page_texts: Vec = Vec::new(); + let mut sources: Vec = Vec::new(); + + for (&page_num, _) in pages.iter() { + let text = doc.extract_text(&[page_num]).unwrap_or_default(); + let text = text.trim().to_string(); + + if !text.is_empty() { + page_numbers.push(page_num as i32); + page_texts.push(text); + sources.push(source_filename.to_string()); + } + } + + if page_numbers.is_empty() { + // PDF has no extractable text — likely scanned/image + return Err("PDF contains no extractable text (may be scanned/image — OCR not yet supported)".into()); + } + + tracing::info!("extracted {} pages with text from PDF '{}'", page_numbers.len(), source_filename); + + let schema = Arc::new(Schema::new(vec![ + Field::new("source_file", DataType::Utf8, false), + Field::new("page_number", DataType::Int32, false), + Field::new("text_content", DataType::Utf8, false), + ])); + + let arrays: Vec = vec![ + Arc::new(StringArray::from(sources)), + Arc::new(Int32Array::from(page_numbers)), + Arc::new(StringArray::from(page_texts)), + ]; + + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} diff --git a/crates/ingestd/src/pipeline.rs b/crates/ingestd/src/pipeline.rs new file mode 100644 index 0000000..2267064 --- /dev/null +++ b/crates/ingestd/src/pipeline.rs @@ -0,0 +1,162 @@ +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use bytes::Bytes; +use object_store::ObjectStore; +use std::sync::Arc; + +use catalogd::registry::Registry; +use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet}; +use shared::types::{ObjectRef, SchemaFingerprint}; +use storaged::ops; + +use crate::detect::{FileType, content_hash, detect_file_type}; +use crate::csv_ingest; +use crate::json_ingest; +use crate::pdf_ingest; + +/// Result of an ingest operation. +#[derive(Debug, Clone, serde::Serialize)] +pub struct IngestResult { + pub dataset_name: String, + pub file_type: String, + pub rows: usize, + pub columns: usize, + pub storage_key: String, + pub content_hash: String, + pub schema_fingerprint: String, + pub deduplicated: bool, +} + +/// Full ingest pipeline: detect → parse → dedup → store → register. +pub async fn ingest_file( + filename: &str, + content: &[u8], + dataset_name: Option<&str>, + store: &Arc, + registry: &Registry, +) -> Result { + // 1. Detect file type + let file_type = detect_file_type(filename, content); + tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len()); + + // 2. Parse into Arrow + let (schema, batches) = match file_type { + FileType::Csv => csv_ingest::parse_csv(content)?, + FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?, + FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?, + FileType::Text => parse_text_file(content, filename)?, + FileType::Unknown => return Err(format!("unknown file type for '{filename}'")), + }; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + let total_cols = schema.fields().len(); + + if total_rows == 0 { + return Err("file contains no data".into()); + } + + // 3. Content hash for dedup + let hash = content_hash(content); + + // 4. Check if already ingested (same content hash) + let existing = registry.list().await; + if existing.iter().any(|d| d.schema_fingerprint.0 == hash) { + tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]); + return Ok(IngestResult { + dataset_name: dataset_name.unwrap_or(filename).to_string(), + file_type: format!("{:?}", file_type), + rows: total_rows, + columns: total_cols, + storage_key: String::new(), + content_hash: hash, + schema_fingerprint: String::new(), + deduplicated: true, + }); + } + + // 5. Convert to Parquet + let mut all_parquet = Vec::new(); + for batch in &batches { + let pq = record_batch_to_parquet(batch)?; + all_parquet.extend_from_slice(&pq); + } + let parquet_bytes = Bytes::from(all_parquet); + let parquet_size = parquet_bytes.len() as u64; + + // 6. Store in object storage + let name = dataset_name.unwrap_or_else(|| { + filename.rsplit('/').next().unwrap_or(filename) + .rsplit('.').last().unwrap_or(filename) + }); + let safe_name = sanitize_dataset_name(name); + let storage_key = format!("datasets/{}.parquet", safe_name); + + ops::put(store, &storage_key, parquet_bytes).await?; + tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size); + + // 7. Register in catalog + let schema_fp = fingerprint_schema(&schema); + let now = chrono::Utc::now(); + let obj_ref = ObjectRef { + bucket: "data".to_string(), + key: storage_key.clone(), + size_bytes: parquet_size, + created_at: now, + }; + + // Use content hash as fingerprint for dedup tracking + registry.register( + safe_name.clone(), + SchemaFingerprint(hash.clone()), + vec![obj_ref], + ).await?; + + Ok(IngestResult { + dataset_name: safe_name, + file_type: format!("{:?}", file_type), + rows: total_rows, + columns: total_cols, + storage_key, + content_hash: hash, + schema_fingerprint: schema_fp.0, + deduplicated: false, + }) +} + +/// Parse plain text / SMS logs into a simple table. +fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec), String> { + let text = String::from_utf8_lossy(content); + let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect(); + + if lines.is_empty() { + return Err("text file is empty".into()); + } + + let schema = Arc::new(arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false), + arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false), + arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false), + ])); + + let sources: Vec<&str> = vec![filename; lines.len()]; + let line_nums: Vec = (1..=lines.len() as i32).collect(); + + let arrays: Vec = vec![ + Arc::new(arrow::array::StringArray::from(sources)), + Arc::new(arrow::array::Int32Array::from(line_nums)), + Arc::new(arrow::array::StringArray::from(lines)), + ]; + + let batch = RecordBatch::try_new(schema.clone(), arrays) + .map_err(|e| format!("RecordBatch error: {e}"))?; + + Ok((schema, vec![batch])) +} + +fn sanitize_dataset_name(name: &str) -> String { + let clean: String = name.chars() + .map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' }) + .collect(); + let trimmed = clean.trim_matches('_').to_string(); + if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed } +} diff --git a/crates/ingestd/src/service.rs b/crates/ingestd/src/service.rs new file mode 100644 index 0000000..1ac3666 --- /dev/null +++ b/crates/ingestd/src/service.rs @@ -0,0 +1,69 @@ +use axum::{ + Json, Router, + extract::{Multipart, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, +}; +use object_store::ObjectStore; +use serde::Deserialize; +use std::sync::Arc; + +use catalogd::registry::Registry; +use crate::pipeline; + +#[derive(Clone)] +pub struct IngestState { + pub store: Arc, + pub registry: Registry, +} + +pub fn router(state: IngestState) -> Router { + Router::new() + .route("/health", get(health)) + .route("/file", post(ingest_file)) + .with_state(state) +} + +async fn health() -> &'static str { + "ingestd ok" +} + +#[derive(Deserialize)] +struct IngestQuery { + /// Override dataset name (otherwise derived from filename) + name: Option, +} + +/// Upload a file for ingestion. Accepts multipart/form-data with a "file" field. +async fn ingest_file( + State(state): State, + Query(query): Query, + mut multipart: Multipart, +) -> impl IntoResponse { + // Read the first file field + let field = match multipart.next_field().await { + Ok(Some(f)) => f, + Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())), + Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))), + }; + + let filename = field.file_name().unwrap_or("unknown").to_string(); + let content = field.bytes().await + .map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?; + + tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len()); + + let dataset_name = query.name.as_deref(); + + match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await { + Ok(result) => { + if result.deduplicated { + Ok((StatusCode::OK, Json(result))) + } else { + Ok((StatusCode::CREATED, Json(result))) + } + } + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)), + } +} diff --git a/data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json b/data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json new file mode 100644 index 0000000..0d33132 --- /dev/null +++ b/data/_catalog/manifests/1ca61945-d151-490b-81fd-2ca0397b68fa.json @@ -0,0 +1,15 @@ +{ + "id": "1ca61945-d151-490b-81fd-2ca0397b68fa", + "name": "sms_messages", + "schema_fingerprint": "e1d079cbb2b7eedae5019767a886bd9a3396e291aa03630b9db69e9864948c09", + "objects": [ + { + "bucket": "data", + "key": "datasets/sms_messages.parquet", + "size_bytes": 2018, + "created_at": "2026-03-27T13:07:14.253881797Z" + } + ], + "created_at": "2026-03-27T13:07:14.253886027Z", + "updated_at": "2026-03-27T13:07:14.253886027Z" +} \ No newline at end of file diff --git a/data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json b/data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json deleted file mode 100644 index be47193..0000000 --- a/data/_catalog/manifests/1f81445a-404f-48ea-bd72-00f6721ee18b.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "1f81445a-404f-48ea-bd72-00f6721ee18b", - "name": "employees", - "schema_fingerprint": "auto", - "objects": [ - { - "bucket": "data", - "key": "datasets/employees.parquet", - "size_bytes": 1424, - "created_at": "2026-03-27T11:55:55.013996293Z" - } - ], - "created_at": "2026-03-27T11:55:55.014010258Z", - "updated_at": "2026-03-27T11:55:55.014010258Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json b/data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json new file mode 100644 index 0000000..c30c875 --- /dev/null +++ b/data/_catalog/manifests/478072c3-0c95-46a2-9193-f4b3ac4085ab.json @@ -0,0 +1,15 @@ +{ + "id": "478072c3-0c95-46a2-9193-f4b3ac4085ab", + "name": "test_ingest", + "schema_fingerprint": "4bdc4e5baeddc1187aecd4bfb788654f26145c2ba346b4bec6ca8ab950e1c133", + "objects": [ + { + "bucket": "data", + "key": "datasets/test_ingest.parquet", + "size_bytes": 3129, + "created_at": "2026-03-27T13:06:57.437484309Z" + } + ], + "created_at": "2026-03-27T13:06:57.437488259Z", + "updated_at": "2026-03-27T13:06:57.437488259Z" +} \ No newline at end of file diff --git a/data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json b/data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json deleted file mode 100644 index 9e89f03..0000000 --- a/data/_catalog/manifests/50d402a0-6fb4-4849-ac75-3541b8530aba.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "50d402a0-6fb4-4849-ac75-3541b8530aba", - "name": "u5", - "schema_fingerprint": "from-ui", - "objects": [ - { - "bucket": "data", - "key": "datasets/events.parquet", - "size_bytes": 0, - "created_at": "2026-03-27T12:16:57.520054490Z" - } - ], - "created_at": "2026-03-27T12:16:57.520066991Z", - "updated_at": "2026-03-27T12:16:57.520066991Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json b/data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json deleted file mode 100644 index f140ce9..0000000 --- a/data/_catalog/manifests/6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8", - "name": "events", - "schema_fingerprint": "auto", - "objects": [ - { - "bucket": "data", - "key": "datasets/events.parquet", - "size_bytes": 1127, - "created_at": "2026-03-27T11:55:55.017003417Z" - } - ], - "created_at": "2026-03-27T11:55:55.017011224Z", - "updated_at": "2026-03-27T11:55:55.017011224Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json b/data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json deleted file mode 100644 index cd2a93e..0000000 --- a/data/_catalog/manifests/f5281277-afe9-456b-b5fa-d667c2b0f41f.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "f5281277-afe9-456b-b5fa-d667c2b0f41f", - "name": "products", - "schema_fingerprint": "auto", - "objects": [ - { - "bucket": "data", - "key": "datasets/products.parquet", - "size_bytes": 1341, - "created_at": "2026-03-27T11:55:55.019096056Z" - } - ], - "created_at": "2026-03-27T11:55:55.019098965Z", - "updated_at": "2026-03-27T11:55:55.019098965Z" -} \ No newline at end of file diff --git a/data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json b/data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json deleted file mode 100644 index 265200f..0000000 --- a/data/_catalog/manifests/fc046131-8f44-428a-af90-ebf6347d4cc6.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "id": "fc046131-8f44-428a-af90-ebf6347d4cc6", - "name": "u", - "schema_fingerprint": "from-ui", - "objects": [ - { - "bucket": "data", - "key": "datasets/employees.parquet", - "size_bytes": 0, - "created_at": "2026-03-27T12:16:51.462058823Z" - } - ], - "created_at": "2026-03-27T12:16:51.462071311Z", - "updated_at": "2026-03-27T12:16:51.462071311Z" -} \ No newline at end of file diff --git a/data/datasets/employees.parquet b/data/datasets/employees.parquet deleted file mode 100644 index e830c57..0000000 Binary files a/data/datasets/employees.parquet and /dev/null differ diff --git a/data/datasets/events.parquet b/data/datasets/events.parquet deleted file mode 100644 index c9d827d..0000000 Binary files a/data/datasets/events.parquet and /dev/null differ diff --git a/data/datasets/products.parquet b/data/datasets/products.parquet deleted file mode 100644 index 0006892..0000000 Binary files a/data/datasets/products.parquet and /dev/null differ diff --git a/data/datasets/sms_messages.parquet b/data/datasets/sms_messages.parquet new file mode 100644 index 0000000..eedb46c Binary files /dev/null and b/data/datasets/sms_messages.parquet differ diff --git a/data/datasets/test_ingest.parquet b/data/datasets/test_ingest.parquet new file mode 100644 index 0000000..cc7b161 Binary files /dev/null and b/data/datasets/test_ingest.parquet differ diff --git a/scripts/generate_demo.py b/scripts/generate_demo.py new file mode 100644 index 0000000..6effe8d --- /dev/null +++ b/scripts/generate_demo.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +"""Generate realistic demo datasets that actually stress the stack.""" + +import pyarrow as pa +import pyarrow.parquet as pq +import random +import json +import urllib.request +import time +from datetime import datetime, timedelta + +API = "http://localhost:3100" + +def upload_and_register(name, table): + """Upload Parquet to storage and register in catalog.""" + path = f"/tmp/{name}.parquet" + pq.write_table(table, path, compression="snappy") + size = len(open(path, "rb").read()) + + with open(path, "rb") as f: + data = f.read() + key = f"datasets/{name}.parquet" + req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT") + urllib.request.urlopen(req) + + body = json.dumps({ + "name": name, + "schema_fingerprint": "auto", + "objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}] + }).encode() + req = urllib.request.Request( + f"{API}/catalog/datasets", data=body, method="POST", + headers={"Content-Type": "application/json"} + ) + urllib.request.urlopen(req) + print(f" {name}: {table.num_rows:,} rows, {len(data):,} bytes ({len(data)/1024/1024:.1f} MB)") + +# ============================================================ +# Dataset 1: web_events — 500K rows of web analytics +# Shows: DataFusion scanning half a million rows in milliseconds +# ============================================================ +print("Generating web_events (500K rows)...") + +N_EVENTS = 500_000 +countries = ["US", "UK", "DE", "FR", "JP", "BR", "IN", "AU", "CA", "KR", "MX", "NG", "ZA", "SE", "IT"] +pages = ["/", "/pricing", "/docs", "/blog", "/about", "/login", "/signup", "/dashboard", "/settings", "/api", + "/docs/getting-started", "/docs/api-reference", "/docs/tutorials", "/blog/rust-performance", + "/blog/ai-lakehouse", "/blog/parquet-vs-csv", "/products", "/products/enterprise", "/contact", "/careers"] +actions = ["pageview", "pageview", "pageview", "pageview", "click", "click", "scroll", "form_submit", "download", "signup"] +browsers = ["Chrome", "Firefox", "Safari", "Edge", "Arc"] +devices = ["desktop", "desktop", "desktop", "mobile", "mobile", "tablet"] + +base_time = datetime(2026, 1, 1) +random.seed(42) + +timestamps = [] +user_ids = [] +page_list = [] +action_list = [] +duration_list = [] +country_list = [] +browser_list = [] +device_list = [] +session_ids = [] + +for i in range(N_EVENTS): + ts = base_time + timedelta(seconds=random.randint(0, 86400 * 90)) # 90 days + timestamps.append(ts.isoformat()) + user_ids.append(random.randint(1, 50000)) + page_list.append(random.choice(pages)) + action_list.append(random.choice(actions)) + duration_list.append(random.randint(100, 300000)) # ms + country_list.append(random.choice(countries)) + browser_list.append(random.choice(browsers)) + device_list.append(random.choice(devices)) + session_ids.append(f"sess_{random.randint(1, 200000):06d}") + +web_events = pa.table({ + "timestamp": timestamps, + "user_id": user_ids, + "session_id": session_ids, + "page": page_list, + "action": action_list, + "duration_ms": duration_list, + "country": country_list, + "browser": browser_list, + "device": device_list, +}) +upload_and_register("web_events", web_events) + +# ============================================================ +# Dataset 2: products — 5K products with real-ish descriptions +# Shows: AI can read and understand product data, semantic search +# ============================================================ +print("Generating products (5K rows)...") + +categories = ["SaaS", "API", "Database", "Analytics", "Security", "DevOps", "AI/ML", "Storage", "Networking", "Monitoring"] +adjectives = ["Enterprise", "Cloud-Native", "Open-Source", "Serverless", "Real-Time", "Distributed", "Scalable", "Lightweight", "High-Performance", "Self-Hosted"] +nouns = ["Platform", "Engine", "Gateway", "Toolkit", "Framework", "Suite", "Service", "Connector", "Pipeline", "Hub"] +features = [ + "with built-in authentication and RBAC", + "featuring automatic horizontal scaling", + "with zero-config deployment", + "supporting 100+ integrations", + "with sub-millisecond latency", + "featuring end-to-end encryption", + "with real-time dashboards", + "supporting multi-region replication", + "with built-in CI/CD pipelines", + "featuring AI-powered anomaly detection", + "with comprehensive audit logging", + "supporting GraphQL and REST APIs", + "with automated backup and recovery", + "featuring smart caching layers", + "with native Kubernetes support", +] + +product_ids = [] +product_names = [] +product_categories = [] +product_prices = [] +product_descriptions = [] +product_ratings = [] +product_reviews_count = [] +product_created = [] + +for i in range(5000): + cat = random.choice(categories) + adj = random.choice(adjectives) + noun = random.choice(nouns) + feat = random.choice(features) + name = f"{adj} {cat} {noun}" + desc = f"{name} — a {cat.lower()} solution {feat}. Built for teams that need reliable {cat.lower()} infrastructure without the complexity." + + product_ids.append(i + 1) + product_names.append(name) + product_categories.append(cat) + product_prices.append(round(random.uniform(9.99, 2999.99), 2)) + product_descriptions.append(desc) + product_ratings.append(round(random.uniform(2.5, 5.0), 1)) + product_reviews_count.append(random.randint(0, 5000)) + product_created.append((base_time - timedelta(days=random.randint(0, 730))).strftime("%Y-%m-%d")) + +products = pa.table({ + "product_id": product_ids, + "name": product_names, + "category": product_categories, + "price": product_prices, + "description": product_descriptions, + "rating": product_ratings, + "review_count": product_reviews_count, + "created_date": product_created, +}) +upload_and_register("products", products) + +# ============================================================ +# Dataset 3: transactions — 200K purchase records +# Shows: JOINs across datasets, aggregation at scale +# ============================================================ +print("Generating transactions (200K rows)...") + +N_TXN = 200_000 +txn_ids = [] +txn_user_ids = [] +txn_product_ids = [] +txn_quantities = [] +txn_amounts = [] +txn_timestamps = [] +txn_statuses = [] +txn_payment_methods = [] + +statuses = ["completed", "completed", "completed", "completed", "pending", "refunded", "failed"] +payments = ["credit_card", "credit_card", "credit_card", "debit_card", "paypal", "crypto", "wire_transfer"] + +for i in range(N_TXN): + pid = random.randint(1, 5000) + qty = random.randint(1, 10) + price = product_prices[pid - 1] + + txn_ids.append(f"TXN-{i+1:07d}") + txn_user_ids.append(random.randint(1, 50000)) + txn_product_ids.append(pid) + txn_quantities.append(qty) + txn_amounts.append(round(price * qty, 2)) + txn_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat()) + txn_statuses.append(random.choice(statuses)) + txn_payment_methods.append(random.choice(payments)) + +transactions = pa.table({ + "txn_id": txn_ids, + "user_id": txn_user_ids, + "product_id": txn_product_ids, + "quantity": txn_quantities, + "amount": txn_amounts, + "timestamp": txn_timestamps, + "status": txn_statuses, + "payment_method": txn_payment_methods, +}) +upload_and_register("transactions", transactions) + +# ============================================================ +# Dataset 4: server_metrics — 1M rows of infrastructure telemetry +# Shows: time-series analytics, the kind of data you'd put in a lakehouse +# ============================================================ +print("Generating server_metrics (1M rows)...") + +N_METRICS = 1_000_000 +hosts = [f"prod-{i:03d}" for i in range(100)] +metrics_names = ["cpu_usage", "memory_usage", "disk_io", "network_in", "network_out", "request_latency", "error_rate", "gc_pause"] +regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"] + +m_timestamps = [] +m_hosts = [] +m_metrics = [] +m_values = [] +m_regions = [] + +for i in range(N_METRICS): + metric = random.choice(metrics_names) + if metric == "cpu_usage": + val = round(random.gauss(45, 20), 2) + elif metric == "memory_usage": + val = round(random.gauss(60, 15), 2) + elif metric in ("network_in", "network_out"): + val = round(random.expovariate(1/1000), 2) + elif metric == "request_latency": + val = round(random.expovariate(1/50), 2) + elif metric == "error_rate": + val = round(random.expovariate(1/2), 4) + else: + val = round(random.uniform(0, 100), 2) + + m_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat()) + m_hosts.append(random.choice(hosts)) + m_metrics.append(metric) + m_values.append(max(0, val)) + m_regions.append(random.choice(regions)) + +server_metrics = pa.table({ + "timestamp": m_timestamps, + "host": m_hosts, + "metric": m_metrics, + "value": m_values, + "region": m_regions, +}) +upload_and_register("server_metrics", server_metrics) + +print(f"\nDone — 4 datasets, {N_EVENTS + 5000 + N_TXN + N_METRICS:,} total rows") +print("\nDemo queries to try:") +print(' "How many page views per country, sorted by volume?"') +print(' "What are the top 10 products by total revenue?"') +print(' "Show average CPU usage per host in us-east-1"') +print(' "Which payment method has the highest failure rate?"') +print(' "What are the busiest hours for web traffic?"')