Phase 6: Ingest pipeline — CSV, JSON, PDF, text file support

- ingestd crate: detect file type → parse → schema detection → Parquet → catalog
- CSV: auto-detect column types (int, float, bool, string), handles $, %, commas
  Strips dollar signs from amounts, flexible row parsing, sanitized column names
- JSON: array or newline-delimited, nested object flattening (a.b.c → a_b_c)
- PDF: text extraction via lopdf, one row per page (source_file, page_number, text)
- Text/SMS: line-based ingestion with line numbers
- Dedup: SHA-256 content hash, re-ingest same file = no-op
- Gateway: POST /ingest/file multipart upload, 256MB body limit
- Schema detection per ADR-010: ambiguous types default to String
- 12 unit tests passing (CSV parsing, JSON flattening, type inference, dedup)
- Tested: messy CSV with missing data, dollar amounts, N/A values → queryable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-27 08:07:31 -05:00
parent 6740a017c7
commit bb05c4412e
25 changed files with 1261 additions and 78 deletions

180
Cargo.lock generated
View File

@ -8,6 +8,17 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "ahash"
version = "0.8.12"
@ -508,6 +519,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block-padding"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
dependencies = [
"generic-array",
]
[[package]]
name = "brotli"
version = "8.0.2"
@ -535,6 +555,12 @@ version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "byteorder"
version = "1.5.0"
@ -588,6 +614,15 @@ dependencies = [
"uuid",
]
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
dependencies = [
"cipher",
]
[[package]]
name = "cc"
version = "1.2.58"
@ -690,6 +725,16 @@ dependencies = [
"half",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "combine"
version = "4.6.7"
@ -894,6 +939,25 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -2311,6 +2375,7 @@ dependencies = [
"aibridge",
"axum",
"catalogd",
"ingestd",
"object_store",
"opentelemetry",
"opentelemetry-stdout",
@ -2820,6 +2885,38 @@ dependencies = [
"cfb",
]
[[package]]
name = "ingestd"
version = "0.1.0"
dependencies = [
"arrow",
"axum",
"bytes",
"catalogd",
"chrono",
"csv",
"lopdf",
"object_store",
"parquet",
"serde",
"serde_json",
"sha2",
"shared",
"storaged",
"tokio",
"tracing",
]
[[package]]
name = "inout"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
dependencies = [
"block-padding",
"generic-array",
]
[[package]]
name = "integer-encoding"
version = "3.0.4"
@ -3075,6 +3172,30 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86"
[[package]]
name = "lopdf"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7c1d3350d071cb86987a6bcb205c7019a0eb70dcad92b454fec722cca8d68b"
dependencies = [
"aes",
"cbc",
"chrono",
"encoding_rs",
"flate2",
"indexmap",
"itoa",
"log",
"md-5",
"nom",
"nom_locate",
"rangemap",
"rayon",
"thiserror 2.0.18",
"time",
"weezl",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
@ -3217,6 +3338,12 @@ dependencies = [
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.9"
@ -3291,6 +3418,27 @@ dependencies = [
"jni-sys 0.3.1",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nom_locate"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e3c83c053b0713da60c5b8de47fe8e494fe3ece5267b2f23090a07a053ba8f3"
dependencies = [
"bytecount",
"memchr",
"nom",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@ -3971,12 +4119,38 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rangemap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68"
[[package]]
name = "raw-window-handle"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "recursive"
version = "0.1.1"
@ -5439,6 +5613,12 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "weezl"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88"
[[package]]
name = "winapi-util"
version = "0.1.11"

View File

@ -7,6 +7,7 @@ members = [
"crates/catalogd",
"crates/queryd",
"crates/aibridge",
"crates/ingestd",
"crates/gateway",
"crates/ui",
]
@ -38,3 +39,6 @@ opentelemetry_sdk = { version = "0.28", features = ["rt-tokio"] }
opentelemetry-stdout = { version = "0.28", features = ["trace"] }
tracing-opentelemetry = "0.29"
toml = "0.8"
csv = "1"
lopdf = "0.35"
encoding_rs = "0.8"

View File

@ -9,6 +9,7 @@ storaged = { path = "../storaged" }
catalogd = { path = "../catalogd" }
queryd = { path = "../queryd" }
aibridge = { path = "../aibridge" }
ingestd = { path = "../ingestd" }
tokio = { workspace = true }
axum = { workspace = true }
serde = { workspace = true }

View File

@ -1,7 +1,7 @@
mod auth;
mod observability;
use axum::{Router, routing::get};
use axum::{Router, extract::DefaultBodyLimit, routing::get};
use proto::lakehouse::catalog_service_server::CatalogServiceServer;
use shared::config::Config;
use tower_http::cors::{Any, CorsLayer};
@ -39,10 +39,14 @@ async fn main() {
// HTTP router
let mut app = Router::new()
.route("/health", get(health))
.nest("/storage", storaged::service::router(store))
.nest("/storage", storaged::service::router(store.clone()))
.nest("/catalog", catalogd::service::router(registry.clone()))
.nest("/query", queryd::service::router(engine))
.nest("/ai", aibridge::service::router(ai_client));
.nest("/ai", aibridge::service::router(ai_client))
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
store: store.clone(),
registry: registry.clone(),
}));
// Auth middleware (if enabled)
if config.auth.enabled {
@ -58,6 +62,7 @@ async fn main() {
}
app = app
.layer(DefaultBodyLimit::max(256 * 1024 * 1024)) // 256MB
.layer(CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)

22
crates/ingestd/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "ingestd"
version = "0.1.0"
edition = "2024"
[dependencies]
shared = { path = "../shared" }
storaged = { path = "../storaged" }
catalogd = { path = "../catalogd" }
tokio = { workspace = true }
axum = { workspace = true, features = ["multipart"] }
lopdf = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
arrow = { workspace = true }
parquet = { workspace = true }
bytes = { workspace = true }
sha2 = { workspace = true }
csv = { workspace = true }
chrono = { workspace = true }
object_store = { workspace = true }

View File

@ -0,0 +1,203 @@
use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
/// Inferred column type from sampling data.
#[derive(Debug, Clone, PartialEq)]
enum InferredType {
Integer,
Float,
Boolean,
String,
}
/// Parse CSV bytes into Arrow RecordBatches with automatic schema detection.
/// Per ADR-010: ambiguous types default to String.
pub fn parse_csv(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let mut reader = csv::ReaderBuilder::new()
.flexible(true) // allow varying column counts
.trim(csv::Trim::All)
.from_reader(content);
let headers: Vec<String> = reader.headers()
.map_err(|e| format!("CSV header error: {e}"))?
.iter()
.enumerate()
.map(|(i, h)| {
let h = h.trim().to_string();
if h.is_empty() { format!("column_{i}") } else { sanitize_column_name(&h) }
})
.collect();
let n_cols = headers.len();
if n_cols == 0 {
return Err("CSV has no columns".into());
}
// Read all rows into string columns
let mut columns: Vec<Vec<String>> = vec![vec![]; n_cols];
let mut row_count = 0;
for result in reader.records() {
let record = result.map_err(|e| format!("CSV row error: {e}"))?;
for (i, field) in record.iter().enumerate() {
if i < n_cols {
columns[i].push(field.trim().to_string());
}
}
// Pad short rows with empty strings
for col in columns.iter_mut().skip(record.len().min(n_cols)) {
col.push(String::new());
}
row_count += 1;
}
if row_count == 0 {
return Err("CSV has no data rows".into());
}
tracing::info!("parsed CSV: {row_count} rows × {n_cols} columns");
// Infer types by sampling (look at all values)
let types: Vec<InferredType> = columns.iter().map(|col| infer_column_type(col)).collect();
// Build Arrow schema
let fields: Vec<Field> = headers.iter().zip(types.iter()).map(|(name, typ)| {
let dt = match typ {
InferredType::Integer => DataType::Int64,
InferredType::Float => DataType::Float64,
InferredType::Boolean => DataType::Boolean,
InferredType::String => DataType::Utf8,
};
Field::new(name, dt, true) // all nullable
}).collect();
let schema = Arc::new(Schema::new(fields));
// Build arrays
let arrays: Vec<ArrayRef> = columns.iter().zip(types.iter()).map(|(col, typ)| {
match typ {
InferredType::Integer => {
let vals: Vec<Option<i64>> = col.iter().map(|v| {
if v.is_empty() { None } else { v.replace(',', "").parse().ok() }
}).collect();
Arc::new(Int64Array::from(vals)) as ArrayRef
}
InferredType::Float => {
let vals: Vec<Option<f64>> = col.iter().map(|v| {
if v.is_empty() { None }
else { v.replace(',', "").replace('$', "").replace('%', "").parse().ok() }
}).collect();
Arc::new(Float64Array::from(vals)) as ArrayRef
}
InferredType::Boolean => {
let vals: Vec<Option<bool>> = col.iter().map(|v| {
match v.to_lowercase().as_str() {
"true" | "yes" | "1" | "y" | "t" => Some(true),
"false" | "no" | "0" | "n" | "f" => Some(false),
_ => None,
}
}).collect();
Arc::new(BooleanArray::from(vals)) as ArrayRef
}
InferredType::String => {
Arc::new(StringArray::from(col.clone())) as ArrayRef
}
}
}).collect();
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
/// Infer column type from values. Conservative: defaults to String on ambiguity.
fn infer_column_type(values: &[String]) -> InferredType {
let non_empty: Vec<&str> = values.iter()
.map(|v| v.as_str())
.filter(|v| !v.is_empty() && *v != "NULL" && *v != "null" && *v != "N/A" && *v != "n/a")
.collect();
if non_empty.is_empty() {
return InferredType::String;
}
// Check boolean
let all_bool = non_empty.iter().all(|v| {
matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "1" | "0" | "y" | "n" | "t" | "f")
});
if all_bool && non_empty.len() >= 2 {
// Make sure it's not just all "1" and "0" which could be integers
let has_text_bool = non_empty.iter().any(|v| {
matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "y" | "n" | "t" | "f")
});
if has_text_bool {
return InferredType::Boolean;
}
}
// Check integer (allow commas as thousands separator)
let int_rate = non_empty.iter()
.filter(|v| v.replace(',', "").parse::<i64>().is_ok())
.count() as f64 / non_empty.len() as f64;
if int_rate > 0.95 {
return InferredType::Integer;
}
// Check float (allow $, %, commas)
let float_rate = non_empty.iter()
.filter(|v| v.replace(',', "").replace('$', "").replace('%', "").parse::<f64>().is_ok())
.count() as f64 / non_empty.len() as f64;
if float_rate > 0.95 {
return InferredType::Float;
}
InferredType::String
}
/// Sanitize column name for SQL compatibility.
fn sanitize_column_name(name: &str) -> String {
name.chars()
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
.collect::<String>()
.trim_matches('_')
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_simple_csv() {
let csv = b"Name,Age,Salary\nAlice,30,50000\nBob,25,45000\n";
let (schema, batches) = parse_csv(csv).unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(batches[0].num_rows(), 2);
assert_eq!(schema.field(1).data_type(), &DataType::Int64);
assert_eq!(schema.field(2).data_type(), &DataType::Int64);
}
#[test]
fn parse_csv_with_mixed_types() {
let csv = b"id,value\n1,hello\n2,world\n3,N/A\n";
let (schema, _) = parse_csv(csv).unwrap();
assert_eq!(schema.field(0).data_type(), &DataType::Int64);
assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
}
#[test]
fn parse_csv_with_dollar_amounts() {
let csv = b"item,price\nWidget,$29.99\nGadget,$149.50\n";
let (schema, _) = parse_csv(csv).unwrap();
assert_eq!(schema.field(1).data_type(), &DataType::Float64);
}
#[test]
fn sanitize_names() {
assert_eq!(sanitize_column_name("First Name"), "first_name");
assert_eq!(sanitize_column_name("Bill Rate ($)"), "bill_rate");
}
}

View File

@ -0,0 +1,103 @@
use sha2::{Digest, Sha256};
/// Detected file type from content inspection.
#[derive(Debug, Clone, PartialEq)]
pub enum FileType {
Csv,
Json,
NdJson, // newline-delimited JSON
Pdf,
Text, // plain text, SMS logs, etc.
Unknown,
}
/// Detect file type from filename extension and content sniffing.
pub fn detect_file_type(filename: &str, content: &[u8]) -> FileType {
// Extension-based first
let lower = filename.to_lowercase();
if lower.ends_with(".csv") || lower.ends_with(".tsv") {
return FileType::Csv;
}
if lower.ends_with(".json") {
// Check if it's newline-delimited JSON
if content.iter().take(4096).filter(|&&b| b == b'\n').count() > 2 {
let first_line = content.split(|&b| b == b'\n').next().unwrap_or(b"");
if first_line.starts_with(b"{") {
return FileType::NdJson;
}
}
return FileType::Json;
}
if lower.ends_with(".ndjson") || lower.ends_with(".jsonl") {
return FileType::NdJson;
}
if lower.ends_with(".pdf") {
return FileType::Pdf;
}
if lower.ends_with(".txt") || lower.ends_with(".log") || lower.ends_with(".sms") {
return FileType::Text;
}
// Content sniffing fallback
if content.starts_with(b"%PDF") {
return FileType::Pdf;
}
if content.starts_with(b"[") || content.starts_with(b"{") {
return FileType::Json;
}
// Check if it looks like CSV (has commas and newlines in first chunk)
let sample = &content[..content.len().min(4096)];
let comma_count = sample.iter().filter(|&&b| b == b',').count();
let newline_count = sample.iter().filter(|&&b| b == b'\n').count();
if comma_count > 3 && newline_count > 1 {
return FileType::Csv;
}
// If it's valid UTF-8, treat as text
if std::str::from_utf8(sample).is_ok() {
return FileType::Text;
}
FileType::Unknown
}
/// Compute SHA-256 hash of content for deduplication.
pub fn content_hash(content: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(content);
format!("{:x}", hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_csv_by_extension() {
assert_eq!(detect_file_type("data.csv", b"a,b,c\n1,2,3"), FileType::Csv);
}
#[test]
fn detect_json_by_extension() {
assert_eq!(detect_file_type("data.json", b"[{\"a\":1}]"), FileType::Json);
}
#[test]
fn detect_pdf_by_magic() {
assert_eq!(detect_file_type("unknown", b"%PDF-1.4 blah"), FileType::Pdf);
}
#[test]
fn detect_csv_by_content() {
let csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n";
assert_eq!(detect_file_type("unknown.dat", csv), FileType::Csv);
}
#[test]
fn content_hash_deterministic() {
let h1 = content_hash(b"hello world");
let h2 = content_hash(b"hello world");
assert_eq!(h1, h2);
}
}

View File

@ -0,0 +1,167 @@
use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::collections::BTreeMap;
use std::sync::Arc;
/// Parse JSON (array of objects or newline-delimited) into Arrow RecordBatches.
/// Nested objects are flattened: {"a": {"b": 1}} → column "a_b".
pub fn parse_json(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let text = std::str::from_utf8(content).map_err(|e| format!("invalid UTF-8: {e}"))?;
let text = text.trim();
// Parse into Vec<Map>
let rows: Vec<serde_json::Map<String, serde_json::Value>> = if text.starts_with('[') {
// JSON array
let arr: Vec<serde_json::Value> = serde_json::from_str(text)
.map_err(|e| format!("JSON parse error: {e}"))?;
arr.into_iter()
.filter_map(|v| v.as_object().cloned())
.collect()
} else {
// Newline-delimited JSON
text.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| {
let v: serde_json::Value = serde_json::from_str(l)
.map_err(|e| format!("NDJSON line parse error: {e}"))?;
v.as_object().cloned().ok_or_else(|| "NDJSON line is not an object".into())
})
.collect::<Result<Vec<_>, String>>()?
};
if rows.is_empty() {
return Err("JSON has no records".into());
}
// Flatten and collect all columns
let mut columns: BTreeMap<String, Vec<serde_json::Value>> = BTreeMap::new();
let n_rows = rows.len();
for (row_idx, row) in rows.iter().enumerate() {
let flat = flatten_object(row, "");
// Pad existing columns that aren't in this row
for key in columns.keys().cloned().collect::<Vec<_>>() {
if !flat.contains_key(&key) {
columns.get_mut(&key).unwrap().push(serde_json::Value::Null);
}
}
// Add new columns or append values
for (key, val) in flat {
let col = columns.entry(key).or_insert_with(Vec::new);
// Pad with nulls if this column is new and we've seen prior rows
while col.len() < row_idx {
col.push(serde_json::Value::Null);
}
col.push(val);
}
}
// Pad short columns
for col in columns.values_mut() {
while col.len() < n_rows {
col.push(serde_json::Value::Null);
}
}
tracing::info!("parsed JSON: {n_rows} rows × {} columns", columns.len());
// Infer types and build schema
let mut fields = Vec::new();
let mut arrays: Vec<ArrayRef> = Vec::new();
for (name, values) in &columns {
let (dt, array) = json_column_to_arrow(values);
fields.push(Field::new(name, dt, true));
arrays.push(array);
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
/// Flatten nested JSON object: {"a": {"b": 1}} → {"a_b": 1}
fn flatten_object(obj: &serde_json::Map<String, serde_json::Value>, prefix: &str) -> BTreeMap<String, serde_json::Value> {
let mut result = BTreeMap::new();
for (key, val) in obj {
let full_key = if prefix.is_empty() { key.clone() } else { format!("{prefix}_{key}") };
match val {
serde_json::Value::Object(inner) => {
result.extend(flatten_object(inner, &full_key));
}
other => {
result.insert(full_key, other.clone());
}
}
}
result
}
/// Convert a column of JSON values to an Arrow array.
fn json_column_to_arrow(values: &[serde_json::Value]) -> (DataType, ArrayRef) {
// Check if all non-null values are the same type
let non_null: Vec<&serde_json::Value> = values.iter()
.filter(|v| !v.is_null())
.collect();
if non_null.is_empty() {
return (DataType::Utf8, Arc::new(StringArray::from(vec![None::<&str>; values.len()])));
}
let all_bool = non_null.iter().all(|v| v.is_boolean());
let all_i64 = non_null.iter().all(|v| v.is_i64() || v.is_u64());
let all_f64 = non_null.iter().all(|v| v.is_number());
if all_bool {
let arr: Vec<Option<bool>> = values.iter().map(|v| v.as_bool()).collect();
(DataType::Boolean, Arc::new(BooleanArray::from(arr)))
} else if all_i64 {
let arr: Vec<Option<i64>> = values.iter().map(|v| v.as_i64()).collect();
(DataType::Int64, Arc::new(Int64Array::from(arr)))
} else if all_f64 {
let arr: Vec<Option<f64>> = values.iter().map(|v| v.as_f64()).collect();
(DataType::Float64, Arc::new(Float64Array::from(arr)))
} else {
// Default to string
let arr: Vec<Option<String>> = values.iter().map(|v| {
match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s.clone()),
other => Some(other.to_string()),
}
}).collect();
(DataType::Utf8, Arc::new(StringArray::from(arr)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_json_array() {
let json = br#"[{"name":"Alice","age":30},{"name":"Bob","age":25}]"#;
let (schema, batches) = parse_json(json).unwrap();
assert_eq!(batches[0].num_rows(), 2);
assert!(schema.field_with_name("age").unwrap().data_type() == &DataType::Int64);
}
#[test]
fn parse_ndjson() {
let json = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n";
let (_, batches) = parse_json(json).unwrap();
assert_eq!(batches[0].num_rows(), 3);
}
#[test]
fn flatten_nested() {
let json = br#"[{"user":{"name":"Alice","address":{"city":"NYC"}},"score":9.5}]"#;
let (schema, _) = parse_json(json).unwrap();
let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert!(names.contains(&"user_name"));
assert!(names.contains(&"user_address_city"));
}
}

View File

@ -0,0 +1,6 @@
pub mod detect;
pub mod csv_ingest;
pub mod json_ingest;
pub mod pdf_ingest;
pub mod pipeline;
pub mod service;

View File

@ -0,0 +1,52 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
/// Extract text from a PDF file.
/// Returns one row per page: (page_number, text_content).
/// This handles text-based PDFs. Scanned/image PDFs need OCR (not implemented yet).
pub fn parse_pdf(content: &[u8], source_filename: &str) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let doc = lopdf::Document::load_mem(content)
.map_err(|e| format!("PDF load error: {e}"))?;
let pages = doc.get_pages();
let mut page_numbers: Vec<i32> = Vec::new();
let mut page_texts: Vec<String> = Vec::new();
let mut sources: Vec<String> = Vec::new();
for (&page_num, _) in pages.iter() {
let text = doc.extract_text(&[page_num]).unwrap_or_default();
let text = text.trim().to_string();
if !text.is_empty() {
page_numbers.push(page_num as i32);
page_texts.push(text);
sources.push(source_filename.to_string());
}
}
if page_numbers.is_empty() {
// PDF has no extractable text — likely scanned/image
return Err("PDF contains no extractable text (may be scanned/image — OCR not yet supported)".into());
}
tracing::info!("extracted {} pages with text from PDF '{}'", page_numbers.len(), source_filename);
let schema = Arc::new(Schema::new(vec![
Field::new("source_file", DataType::Utf8, false),
Field::new("page_number", DataType::Int32, false),
Field::new("text_content", DataType::Utf8, false),
]));
let arrays: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(sources)),
Arc::new(Int32Array::from(page_numbers)),
Arc::new(StringArray::from(page_texts)),
];
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}

View File

@ -0,0 +1,162 @@
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use object_store::ObjectStore;
use std::sync::Arc;
use catalogd::registry::Registry;
use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet};
use shared::types::{ObjectRef, SchemaFingerprint};
use storaged::ops;
use crate::detect::{FileType, content_hash, detect_file_type};
use crate::csv_ingest;
use crate::json_ingest;
use crate::pdf_ingest;
/// Result of an ingest operation.
#[derive(Debug, Clone, serde::Serialize)]
pub struct IngestResult {
pub dataset_name: String,
pub file_type: String,
pub rows: usize,
pub columns: usize,
pub storage_key: String,
pub content_hash: String,
pub schema_fingerprint: String,
pub deduplicated: bool,
}
/// Full ingest pipeline: detect → parse → dedup → store → register.
pub async fn ingest_file(
filename: &str,
content: &[u8],
dataset_name: Option<&str>,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<IngestResult, String> {
// 1. Detect file type
let file_type = detect_file_type(filename, content);
tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len());
// 2. Parse into Arrow
let (schema, batches) = match file_type {
FileType::Csv => csv_ingest::parse_csv(content)?,
FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?,
FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?,
FileType::Text => parse_text_file(content, filename)?,
FileType::Unknown => return Err(format!("unknown file type for '{filename}'")),
};
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let total_cols = schema.fields().len();
if total_rows == 0 {
return Err("file contains no data".into());
}
// 3. Content hash for dedup
let hash = content_hash(content);
// 4. Check if already ingested (same content hash)
let existing = registry.list().await;
if existing.iter().any(|d| d.schema_fingerprint.0 == hash) {
tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]);
return Ok(IngestResult {
dataset_name: dataset_name.unwrap_or(filename).to_string(),
file_type: format!("{:?}", file_type),
rows: total_rows,
columns: total_cols,
storage_key: String::new(),
content_hash: hash,
schema_fingerprint: String::new(),
deduplicated: true,
});
}
// 5. Convert to Parquet
let mut all_parquet = Vec::new();
for batch in &batches {
let pq = record_batch_to_parquet(batch)?;
all_parquet.extend_from_slice(&pq);
}
let parquet_bytes = Bytes::from(all_parquet);
let parquet_size = parquet_bytes.len() as u64;
// 6. Store in object storage
let name = dataset_name.unwrap_or_else(|| {
filename.rsplit('/').next().unwrap_or(filename)
.rsplit('.').last().unwrap_or(filename)
});
let safe_name = sanitize_dataset_name(name);
let storage_key = format!("datasets/{}.parquet", safe_name);
ops::put(store, &storage_key, parquet_bytes).await?;
tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size);
// 7. Register in catalog
let schema_fp = fingerprint_schema(&schema);
let now = chrono::Utc::now();
let obj_ref = ObjectRef {
bucket: "data".to_string(),
key: storage_key.clone(),
size_bytes: parquet_size,
created_at: now,
};
// Use content hash as fingerprint for dedup tracking
registry.register(
safe_name.clone(),
SchemaFingerprint(hash.clone()),
vec![obj_ref],
).await?;
Ok(IngestResult {
dataset_name: safe_name,
file_type: format!("{:?}", file_type),
rows: total_rows,
columns: total_cols,
storage_key,
content_hash: hash,
schema_fingerprint: schema_fp.0,
deduplicated: false,
})
}
/// Parse plain text / SMS logs into a simple table.
fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
let text = String::from_utf8_lossy(content);
let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect();
if lines.is_empty() {
return Err("text file is empty".into());
}
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false),
arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false),
arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false),
]));
let sources: Vec<&str> = vec![filename; lines.len()];
let line_nums: Vec<i32> = (1..=lines.len() as i32).collect();
let arrays: Vec<arrow::array::ArrayRef> = vec![
Arc::new(arrow::array::StringArray::from(sources)),
Arc::new(arrow::array::Int32Array::from(line_nums)),
Arc::new(arrow::array::StringArray::from(lines)),
];
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
fn sanitize_dataset_name(name: &str) -> String {
let clean: String = name.chars()
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
.collect();
let trimmed = clean.trim_matches('_').to_string();
if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed }
}

View File

@ -0,0 +1,69 @@
use axum::{
Json, Router,
extract::{Multipart, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use object_store::ObjectStore;
use serde::Deserialize;
use std::sync::Arc;
use catalogd::registry::Registry;
use crate::pipeline;
#[derive(Clone)]
pub struct IngestState {
pub store: Arc<dyn ObjectStore>,
pub registry: Registry,
}
pub fn router(state: IngestState) -> Router {
Router::new()
.route("/health", get(health))
.route("/file", post(ingest_file))
.with_state(state)
}
async fn health() -> &'static str {
"ingestd ok"
}
#[derive(Deserialize)]
struct IngestQuery {
/// Override dataset name (otherwise derived from filename)
name: Option<String>,
}
/// Upload a file for ingestion. Accepts multipart/form-data with a "file" field.
async fn ingest_file(
State(state): State<IngestState>,
Query(query): Query<IngestQuery>,
mut multipart: Multipart,
) -> impl IntoResponse {
// Read the first file field
let field = match multipart.next_field().await {
Ok(Some(f)) => f,
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))),
};
let filename = field.file_name().unwrap_or("unknown").to_string();
let content = field.bytes().await
.map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?;
tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len());
let dataset_name = query.name.as_deref();
match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await {
Ok(result) => {
if result.deduplicated {
Ok((StatusCode::OK, Json(result)))
} else {
Ok((StatusCode::CREATED, Json(result)))
}
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -0,0 +1,15 @@
{
"id": "1ca61945-d151-490b-81fd-2ca0397b68fa",
"name": "sms_messages",
"schema_fingerprint": "e1d079cbb2b7eedae5019767a886bd9a3396e291aa03630b9db69e9864948c09",
"objects": [
{
"bucket": "data",
"key": "datasets/sms_messages.parquet",
"size_bytes": 2018,
"created_at": "2026-03-27T13:07:14.253881797Z"
}
],
"created_at": "2026-03-27T13:07:14.253886027Z",
"updated_at": "2026-03-27T13:07:14.253886027Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "1f81445a-404f-48ea-bd72-00f6721ee18b",
"name": "employees",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/employees.parquet",
"size_bytes": 1424,
"created_at": "2026-03-27T11:55:55.013996293Z"
}
],
"created_at": "2026-03-27T11:55:55.014010258Z",
"updated_at": "2026-03-27T11:55:55.014010258Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "478072c3-0c95-46a2-9193-f4b3ac4085ab",
"name": "test_ingest",
"schema_fingerprint": "4bdc4e5baeddc1187aecd4bfb788654f26145c2ba346b4bec6ca8ab950e1c133",
"objects": [
{
"bucket": "data",
"key": "datasets/test_ingest.parquet",
"size_bytes": 3129,
"created_at": "2026-03-27T13:06:57.437484309Z"
}
],
"created_at": "2026-03-27T13:06:57.437488259Z",
"updated_at": "2026-03-27T13:06:57.437488259Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "50d402a0-6fb4-4849-ac75-3541b8530aba",
"name": "u5",
"schema_fingerprint": "from-ui",
"objects": [
{
"bucket": "data",
"key": "datasets/events.parquet",
"size_bytes": 0,
"created_at": "2026-03-27T12:16:57.520054490Z"
}
],
"created_at": "2026-03-27T12:16:57.520066991Z",
"updated_at": "2026-03-27T12:16:57.520066991Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8",
"name": "events",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/events.parquet",
"size_bytes": 1127,
"created_at": "2026-03-27T11:55:55.017003417Z"
}
],
"created_at": "2026-03-27T11:55:55.017011224Z",
"updated_at": "2026-03-27T11:55:55.017011224Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "f5281277-afe9-456b-b5fa-d667c2b0f41f",
"name": "products",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/products.parquet",
"size_bytes": 1341,
"created_at": "2026-03-27T11:55:55.019096056Z"
}
],
"created_at": "2026-03-27T11:55:55.019098965Z",
"updated_at": "2026-03-27T11:55:55.019098965Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "fc046131-8f44-428a-af90-ebf6347d4cc6",
"name": "u",
"schema_fingerprint": "from-ui",
"objects": [
{
"bucket": "data",
"key": "datasets/employees.parquet",
"size_bytes": 0,
"created_at": "2026-03-27T12:16:51.462058823Z"
}
],
"created_at": "2026-03-27T12:16:51.462071311Z",
"updated_at": "2026-03-27T12:16:51.462071311Z"
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

254
scripts/generate_demo.py Normal file
View File

@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""Generate realistic demo datasets that actually stress the stack."""
import pyarrow as pa
import pyarrow.parquet as pq
import random
import json
import urllib.request
import time
from datetime import datetime, timedelta
API = "http://localhost:3100"
def upload_and_register(name, table):
"""Upload Parquet to storage and register in catalog."""
path = f"/tmp/{name}.parquet"
pq.write_table(table, path, compression="snappy")
size = len(open(path, "rb").read())
with open(path, "rb") as f:
data = f.read()
key = f"datasets/{name}.parquet"
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
urllib.request.urlopen(req)
body = json.dumps({
"name": name,
"schema_fingerprint": "auto",
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]
}).encode()
req = urllib.request.Request(
f"{API}/catalog/datasets", data=body, method="POST",
headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req)
print(f" {name}: {table.num_rows:,} rows, {len(data):,} bytes ({len(data)/1024/1024:.1f} MB)")
# ============================================================
# Dataset 1: web_events — 500K rows of web analytics
# Shows: DataFusion scanning half a million rows in milliseconds
# ============================================================
print("Generating web_events (500K rows)...")
N_EVENTS = 500_000
countries = ["US", "UK", "DE", "FR", "JP", "BR", "IN", "AU", "CA", "KR", "MX", "NG", "ZA", "SE", "IT"]
pages = ["/", "/pricing", "/docs", "/blog", "/about", "/login", "/signup", "/dashboard", "/settings", "/api",
"/docs/getting-started", "/docs/api-reference", "/docs/tutorials", "/blog/rust-performance",
"/blog/ai-lakehouse", "/blog/parquet-vs-csv", "/products", "/products/enterprise", "/contact", "/careers"]
actions = ["pageview", "pageview", "pageview", "pageview", "click", "click", "scroll", "form_submit", "download", "signup"]
browsers = ["Chrome", "Firefox", "Safari", "Edge", "Arc"]
devices = ["desktop", "desktop", "desktop", "mobile", "mobile", "tablet"]
base_time = datetime(2026, 1, 1)
random.seed(42)
timestamps = []
user_ids = []
page_list = []
action_list = []
duration_list = []
country_list = []
browser_list = []
device_list = []
session_ids = []
for i in range(N_EVENTS):
ts = base_time + timedelta(seconds=random.randint(0, 86400 * 90)) # 90 days
timestamps.append(ts.isoformat())
user_ids.append(random.randint(1, 50000))
page_list.append(random.choice(pages))
action_list.append(random.choice(actions))
duration_list.append(random.randint(100, 300000)) # ms
country_list.append(random.choice(countries))
browser_list.append(random.choice(browsers))
device_list.append(random.choice(devices))
session_ids.append(f"sess_{random.randint(1, 200000):06d}")
web_events = pa.table({
"timestamp": timestamps,
"user_id": user_ids,
"session_id": session_ids,
"page": page_list,
"action": action_list,
"duration_ms": duration_list,
"country": country_list,
"browser": browser_list,
"device": device_list,
})
upload_and_register("web_events", web_events)
# ============================================================
# Dataset 2: products — 5K products with real-ish descriptions
# Shows: AI can read and understand product data, semantic search
# ============================================================
print("Generating products (5K rows)...")
categories = ["SaaS", "API", "Database", "Analytics", "Security", "DevOps", "AI/ML", "Storage", "Networking", "Monitoring"]
adjectives = ["Enterprise", "Cloud-Native", "Open-Source", "Serverless", "Real-Time", "Distributed", "Scalable", "Lightweight", "High-Performance", "Self-Hosted"]
nouns = ["Platform", "Engine", "Gateway", "Toolkit", "Framework", "Suite", "Service", "Connector", "Pipeline", "Hub"]
features = [
"with built-in authentication and RBAC",
"featuring automatic horizontal scaling",
"with zero-config deployment",
"supporting 100+ integrations",
"with sub-millisecond latency",
"featuring end-to-end encryption",
"with real-time dashboards",
"supporting multi-region replication",
"with built-in CI/CD pipelines",
"featuring AI-powered anomaly detection",
"with comprehensive audit logging",
"supporting GraphQL and REST APIs",
"with automated backup and recovery",
"featuring smart caching layers",
"with native Kubernetes support",
]
product_ids = []
product_names = []
product_categories = []
product_prices = []
product_descriptions = []
product_ratings = []
product_reviews_count = []
product_created = []
for i in range(5000):
cat = random.choice(categories)
adj = random.choice(adjectives)
noun = random.choice(nouns)
feat = random.choice(features)
name = f"{adj} {cat} {noun}"
desc = f"{name} — a {cat.lower()} solution {feat}. Built for teams that need reliable {cat.lower()} infrastructure without the complexity."
product_ids.append(i + 1)
product_names.append(name)
product_categories.append(cat)
product_prices.append(round(random.uniform(9.99, 2999.99), 2))
product_descriptions.append(desc)
product_ratings.append(round(random.uniform(2.5, 5.0), 1))
product_reviews_count.append(random.randint(0, 5000))
product_created.append((base_time - timedelta(days=random.randint(0, 730))).strftime("%Y-%m-%d"))
products = pa.table({
"product_id": product_ids,
"name": product_names,
"category": product_categories,
"price": product_prices,
"description": product_descriptions,
"rating": product_ratings,
"review_count": product_reviews_count,
"created_date": product_created,
})
upload_and_register("products", products)
# ============================================================
# Dataset 3: transactions — 200K purchase records
# Shows: JOINs across datasets, aggregation at scale
# ============================================================
print("Generating transactions (200K rows)...")
N_TXN = 200_000
txn_ids = []
txn_user_ids = []
txn_product_ids = []
txn_quantities = []
txn_amounts = []
txn_timestamps = []
txn_statuses = []
txn_payment_methods = []
statuses = ["completed", "completed", "completed", "completed", "pending", "refunded", "failed"]
payments = ["credit_card", "credit_card", "credit_card", "debit_card", "paypal", "crypto", "wire_transfer"]
for i in range(N_TXN):
pid = random.randint(1, 5000)
qty = random.randint(1, 10)
price = product_prices[pid - 1]
txn_ids.append(f"TXN-{i+1:07d}")
txn_user_ids.append(random.randint(1, 50000))
txn_product_ids.append(pid)
txn_quantities.append(qty)
txn_amounts.append(round(price * qty, 2))
txn_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
txn_statuses.append(random.choice(statuses))
txn_payment_methods.append(random.choice(payments))
transactions = pa.table({
"txn_id": txn_ids,
"user_id": txn_user_ids,
"product_id": txn_product_ids,
"quantity": txn_quantities,
"amount": txn_amounts,
"timestamp": txn_timestamps,
"status": txn_statuses,
"payment_method": txn_payment_methods,
})
upload_and_register("transactions", transactions)
# ============================================================
# Dataset 4: server_metrics — 1M rows of infrastructure telemetry
# Shows: time-series analytics, the kind of data you'd put in a lakehouse
# ============================================================
print("Generating server_metrics (1M rows)...")
N_METRICS = 1_000_000
hosts = [f"prod-{i:03d}" for i in range(100)]
metrics_names = ["cpu_usage", "memory_usage", "disk_io", "network_in", "network_out", "request_latency", "error_rate", "gc_pause"]
regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]
m_timestamps = []
m_hosts = []
m_metrics = []
m_values = []
m_regions = []
for i in range(N_METRICS):
metric = random.choice(metrics_names)
if metric == "cpu_usage":
val = round(random.gauss(45, 20), 2)
elif metric == "memory_usage":
val = round(random.gauss(60, 15), 2)
elif metric in ("network_in", "network_out"):
val = round(random.expovariate(1/1000), 2)
elif metric == "request_latency":
val = round(random.expovariate(1/50), 2)
elif metric == "error_rate":
val = round(random.expovariate(1/2), 4)
else:
val = round(random.uniform(0, 100), 2)
m_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
m_hosts.append(random.choice(hosts))
m_metrics.append(metric)
m_values.append(max(0, val))
m_regions.append(random.choice(regions))
server_metrics = pa.table({
"timestamp": m_timestamps,
"host": m_hosts,
"metric": m_metrics,
"value": m_values,
"region": m_regions,
})
upload_and_register("server_metrics", server_metrics)
print(f"\nDone — 4 datasets, {N_EVENTS + 5000 + N_TXN + N_METRICS:,} total rows")
print("\nDemo queries to try:")
print(' "How many page views per country, sorted by volume?"')
print(' "What are the top 10 products by total revenue?"')
print(' "Show average CPU usage per host in us-east-1"')
print(' "Which payment method has the highest failure rate?"')
print(' "What are the busiest hours for web traffic?"')