Compare commits
No commits in common. "bb05c4412e8127193dc58f6a44aa6517bda8d11f" and "b37e171e10f7deb1a5fb8c446e627d17f6be3d13" have entirely different histories.
bb05c4412e
...
b37e171e10
180
Cargo.lock
generated
180
Cargo.lock
generated
@ -8,17 +8,6 @@ version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
|
||||
|
||||
[[package]]
|
||||
name = "aes"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cipher",
|
||||
"cpufeatures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
version = "0.8.12"
|
||||
@ -519,15 +508,6 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block-padding"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "brotli"
|
||||
version = "8.0.2"
|
||||
@ -555,12 +535,6 @@ version = "3.20.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
|
||||
|
||||
[[package]]
|
||||
name = "bytecount"
|
||||
version = "0.6.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.5.0"
|
||||
@ -614,15 +588,6 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cbc"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
|
||||
dependencies = [
|
||||
"cipher",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.2.58"
|
||||
@ -725,16 +690,6 @@ dependencies = [
|
||||
"half",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cipher"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
|
||||
dependencies = [
|
||||
"crypto-common",
|
||||
"inout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.7"
|
||||
@ -939,25 +894,6 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.21"
|
||||
@ -2375,7 +2311,6 @@ dependencies = [
|
||||
"aibridge",
|
||||
"axum",
|
||||
"catalogd",
|
||||
"ingestd",
|
||||
"object_store",
|
||||
"opentelemetry",
|
||||
"opentelemetry-stdout",
|
||||
@ -2885,38 +2820,6 @@ dependencies = [
|
||||
"cfb",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ingestd"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"axum",
|
||||
"bytes",
|
||||
"catalogd",
|
||||
"chrono",
|
||||
"csv",
|
||||
"lopdf",
|
||||
"object_store",
|
||||
"parquet",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"shared",
|
||||
"storaged",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inout"
|
||||
version = "0.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
|
||||
dependencies = [
|
||||
"block-padding",
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "integer-encoding"
|
||||
version = "3.0.4"
|
||||
@ -3172,30 +3075,6 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86"
|
||||
|
||||
[[package]]
|
||||
name = "lopdf"
|
||||
version = "0.35.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c7c1d3350d071cb86987a6bcb205c7019a0eb70dcad92b454fec722cca8d68b"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"cbc",
|
||||
"chrono",
|
||||
"encoding_rs",
|
||||
"flate2",
|
||||
"indexmap",
|
||||
"itoa",
|
||||
"log",
|
||||
"md-5",
|
||||
"nom",
|
||||
"nom_locate",
|
||||
"rangemap",
|
||||
"rayon",
|
||||
"thiserror 2.0.18",
|
||||
"time",
|
||||
"weezl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru-slab"
|
||||
version = "0.1.2"
|
||||
@ -3338,12 +3217,6 @@ dependencies = [
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.9"
|
||||
@ -3418,27 +3291,6 @@ dependencies = [
|
||||
"jni-sys 0.3.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom_locate"
|
||||
version = "4.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e3c83c053b0713da60c5b8de47fe8e494fe3ece5267b2f23090a07a053ba8f3"
|
||||
dependencies = [
|
||||
"bytecount",
|
||||
"memchr",
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.3"
|
||||
@ -4119,38 +3971,12 @@ dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rangemap"
|
||||
version = "1.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68"
|
||||
|
||||
[[package]]
|
||||
name = "raw-window-handle"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
|
||||
dependencies = [
|
||||
"either",
|
||||
"rayon-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
|
||||
dependencies = [
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "recursive"
|
||||
version = "0.1.1"
|
||||
@ -5613,12 +5439,6 @@ dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "weezl"
|
||||
version = "0.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.11"
|
||||
|
||||
@ -7,7 +7,6 @@ members = [
|
||||
"crates/catalogd",
|
||||
"crates/queryd",
|
||||
"crates/aibridge",
|
||||
"crates/ingestd",
|
||||
"crates/gateway",
|
||||
"crates/ui",
|
||||
]
|
||||
@ -39,6 +38,3 @@ opentelemetry_sdk = { version = "0.28", features = ["rt-tokio"] }
|
||||
opentelemetry-stdout = { version = "0.28", features = ["trace"] }
|
||||
tracing-opentelemetry = "0.29"
|
||||
toml = "0.8"
|
||||
csv = "1"
|
||||
lopdf = "0.35"
|
||||
encoding_rs = "0.8"
|
||||
|
||||
@ -9,7 +9,6 @@ storaged = { path = "../storaged" }
|
||||
catalogd = { path = "../catalogd" }
|
||||
queryd = { path = "../queryd" }
|
||||
aibridge = { path = "../aibridge" }
|
||||
ingestd = { path = "../ingestd" }
|
||||
tokio = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
mod auth;
|
||||
mod observability;
|
||||
|
||||
use axum::{Router, extract::DefaultBodyLimit, routing::get};
|
||||
use axum::{Router, routing::get};
|
||||
use proto::lakehouse::catalog_service_server::CatalogServiceServer;
|
||||
use shared::config::Config;
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
@ -39,14 +39,10 @@ async fn main() {
|
||||
// HTTP router
|
||||
let mut app = Router::new()
|
||||
.route("/health", get(health))
|
||||
.nest("/storage", storaged::service::router(store.clone()))
|
||||
.nest("/storage", storaged::service::router(store))
|
||||
.nest("/catalog", catalogd::service::router(registry.clone()))
|
||||
.nest("/query", queryd::service::router(engine))
|
||||
.nest("/ai", aibridge::service::router(ai_client))
|
||||
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
|
||||
store: store.clone(),
|
||||
registry: registry.clone(),
|
||||
}));
|
||||
.nest("/ai", aibridge::service::router(ai_client));
|
||||
|
||||
// Auth middleware (if enabled)
|
||||
if config.auth.enabled {
|
||||
@ -62,7 +58,6 @@ async fn main() {
|
||||
}
|
||||
|
||||
app = app
|
||||
.layer(DefaultBodyLimit::max(256 * 1024 * 1024)) // 256MB
|
||||
.layer(CorsLayer::new()
|
||||
.allow_origin(Any)
|
||||
.allow_methods(Any)
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
[package]
|
||||
name = "ingestd"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
shared = { path = "../shared" }
|
||||
storaged = { path = "../storaged" }
|
||||
catalogd = { path = "../catalogd" }
|
||||
tokio = { workspace = true }
|
||||
axum = { workspace = true, features = ["multipart"] }
|
||||
lopdf = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
arrow = { workspace = true }
|
||||
parquet = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
csv = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
object_store = { workspace = true }
|
||||
@ -1,203 +0,0 @@
|
||||
use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Inferred column type from sampling data.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
enum InferredType {
|
||||
Integer,
|
||||
Float,
|
||||
Boolean,
|
||||
String,
|
||||
}
|
||||
|
||||
/// Parse CSV bytes into Arrow RecordBatches with automatic schema detection.
|
||||
/// Per ADR-010: ambiguous types default to String.
|
||||
pub fn parse_csv(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
|
||||
let mut reader = csv::ReaderBuilder::new()
|
||||
.flexible(true) // allow varying column counts
|
||||
.trim(csv::Trim::All)
|
||||
.from_reader(content);
|
||||
|
||||
let headers: Vec<String> = reader.headers()
|
||||
.map_err(|e| format!("CSV header error: {e}"))?
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, h)| {
|
||||
let h = h.trim().to_string();
|
||||
if h.is_empty() { format!("column_{i}") } else { sanitize_column_name(&h) }
|
||||
})
|
||||
.collect();
|
||||
|
||||
let n_cols = headers.len();
|
||||
if n_cols == 0 {
|
||||
return Err("CSV has no columns".into());
|
||||
}
|
||||
|
||||
// Read all rows into string columns
|
||||
let mut columns: Vec<Vec<String>> = vec![vec![]; n_cols];
|
||||
let mut row_count = 0;
|
||||
|
||||
for result in reader.records() {
|
||||
let record = result.map_err(|e| format!("CSV row error: {e}"))?;
|
||||
for (i, field) in record.iter().enumerate() {
|
||||
if i < n_cols {
|
||||
columns[i].push(field.trim().to_string());
|
||||
}
|
||||
}
|
||||
// Pad short rows with empty strings
|
||||
for col in columns.iter_mut().skip(record.len().min(n_cols)) {
|
||||
col.push(String::new());
|
||||
}
|
||||
row_count += 1;
|
||||
}
|
||||
|
||||
if row_count == 0 {
|
||||
return Err("CSV has no data rows".into());
|
||||
}
|
||||
|
||||
tracing::info!("parsed CSV: {row_count} rows × {n_cols} columns");
|
||||
|
||||
// Infer types by sampling (look at all values)
|
||||
let types: Vec<InferredType> = columns.iter().map(|col| infer_column_type(col)).collect();
|
||||
|
||||
// Build Arrow schema
|
||||
let fields: Vec<Field> = headers.iter().zip(types.iter()).map(|(name, typ)| {
|
||||
let dt = match typ {
|
||||
InferredType::Integer => DataType::Int64,
|
||||
InferredType::Float => DataType::Float64,
|
||||
InferredType::Boolean => DataType::Boolean,
|
||||
InferredType::String => DataType::Utf8,
|
||||
};
|
||||
Field::new(name, dt, true) // all nullable
|
||||
}).collect();
|
||||
|
||||
let schema = Arc::new(Schema::new(fields));
|
||||
|
||||
// Build arrays
|
||||
let arrays: Vec<ArrayRef> = columns.iter().zip(types.iter()).map(|(col, typ)| {
|
||||
match typ {
|
||||
InferredType::Integer => {
|
||||
let vals: Vec<Option<i64>> = col.iter().map(|v| {
|
||||
if v.is_empty() { None } else { v.replace(',', "").parse().ok() }
|
||||
}).collect();
|
||||
Arc::new(Int64Array::from(vals)) as ArrayRef
|
||||
}
|
||||
InferredType::Float => {
|
||||
let vals: Vec<Option<f64>> = col.iter().map(|v| {
|
||||
if v.is_empty() { None }
|
||||
else { v.replace(',', "").replace('$', "").replace('%', "").parse().ok() }
|
||||
}).collect();
|
||||
Arc::new(Float64Array::from(vals)) as ArrayRef
|
||||
}
|
||||
InferredType::Boolean => {
|
||||
let vals: Vec<Option<bool>> = col.iter().map(|v| {
|
||||
match v.to_lowercase().as_str() {
|
||||
"true" | "yes" | "1" | "y" | "t" => Some(true),
|
||||
"false" | "no" | "0" | "n" | "f" => Some(false),
|
||||
_ => None,
|
||||
}
|
||||
}).collect();
|
||||
Arc::new(BooleanArray::from(vals)) as ArrayRef
|
||||
}
|
||||
InferredType::String => {
|
||||
Arc::new(StringArray::from(col.clone())) as ArrayRef
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.clone(), arrays)
|
||||
.map_err(|e| format!("RecordBatch error: {e}"))?;
|
||||
|
||||
Ok((schema, vec![batch]))
|
||||
}
|
||||
|
||||
/// Infer column type from values. Conservative: defaults to String on ambiguity.
|
||||
fn infer_column_type(values: &[String]) -> InferredType {
|
||||
let non_empty: Vec<&str> = values.iter()
|
||||
.map(|v| v.as_str())
|
||||
.filter(|v| !v.is_empty() && *v != "NULL" && *v != "null" && *v != "N/A" && *v != "n/a")
|
||||
.collect();
|
||||
|
||||
if non_empty.is_empty() {
|
||||
return InferredType::String;
|
||||
}
|
||||
|
||||
// Check boolean
|
||||
let all_bool = non_empty.iter().all(|v| {
|
||||
matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "1" | "0" | "y" | "n" | "t" | "f")
|
||||
});
|
||||
if all_bool && non_empty.len() >= 2 {
|
||||
// Make sure it's not just all "1" and "0" which could be integers
|
||||
let has_text_bool = non_empty.iter().any(|v| {
|
||||
matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "y" | "n" | "t" | "f")
|
||||
});
|
||||
if has_text_bool {
|
||||
return InferredType::Boolean;
|
||||
}
|
||||
}
|
||||
|
||||
// Check integer (allow commas as thousands separator)
|
||||
let int_rate = non_empty.iter()
|
||||
.filter(|v| v.replace(',', "").parse::<i64>().is_ok())
|
||||
.count() as f64 / non_empty.len() as f64;
|
||||
if int_rate > 0.95 {
|
||||
return InferredType::Integer;
|
||||
}
|
||||
|
||||
// Check float (allow $, %, commas)
|
||||
let float_rate = non_empty.iter()
|
||||
.filter(|v| v.replace(',', "").replace('$', "").replace('%', "").parse::<f64>().is_ok())
|
||||
.count() as f64 / non_empty.len() as f64;
|
||||
if float_rate > 0.95 {
|
||||
return InferredType::Float;
|
||||
}
|
||||
|
||||
InferredType::String
|
||||
}
|
||||
|
||||
/// Sanitize column name for SQL compatibility.
|
||||
fn sanitize_column_name(name: &str) -> String {
|
||||
name.chars()
|
||||
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
|
||||
.collect::<String>()
|
||||
.trim_matches('_')
|
||||
.to_string()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_simple_csv() {
|
||||
let csv = b"Name,Age,Salary\nAlice,30,50000\nBob,25,45000\n";
|
||||
let (schema, batches) = parse_csv(csv).unwrap();
|
||||
assert_eq!(schema.fields().len(), 3);
|
||||
assert_eq!(batches[0].num_rows(), 2);
|
||||
assert_eq!(schema.field(1).data_type(), &DataType::Int64);
|
||||
assert_eq!(schema.field(2).data_type(), &DataType::Int64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_csv_with_mixed_types() {
|
||||
let csv = b"id,value\n1,hello\n2,world\n3,N/A\n";
|
||||
let (schema, _) = parse_csv(csv).unwrap();
|
||||
assert_eq!(schema.field(0).data_type(), &DataType::Int64);
|
||||
assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_csv_with_dollar_amounts() {
|
||||
let csv = b"item,price\nWidget,$29.99\nGadget,$149.50\n";
|
||||
let (schema, _) = parse_csv(csv).unwrap();
|
||||
assert_eq!(schema.field(1).data_type(), &DataType::Float64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sanitize_names() {
|
||||
assert_eq!(sanitize_column_name("First Name"), "first_name");
|
||||
assert_eq!(sanitize_column_name("Bill Rate ($)"), "bill_rate");
|
||||
}
|
||||
}
|
||||
@ -1,103 +0,0 @@
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
/// Detected file type from content inspection.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum FileType {
|
||||
Csv,
|
||||
Json,
|
||||
NdJson, // newline-delimited JSON
|
||||
Pdf,
|
||||
Text, // plain text, SMS logs, etc.
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Detect file type from filename extension and content sniffing.
|
||||
pub fn detect_file_type(filename: &str, content: &[u8]) -> FileType {
|
||||
// Extension-based first
|
||||
let lower = filename.to_lowercase();
|
||||
if lower.ends_with(".csv") || lower.ends_with(".tsv") {
|
||||
return FileType::Csv;
|
||||
}
|
||||
if lower.ends_with(".json") {
|
||||
// Check if it's newline-delimited JSON
|
||||
if content.iter().take(4096).filter(|&&b| b == b'\n').count() > 2 {
|
||||
let first_line = content.split(|&b| b == b'\n').next().unwrap_or(b"");
|
||||
if first_line.starts_with(b"{") {
|
||||
return FileType::NdJson;
|
||||
}
|
||||
}
|
||||
return FileType::Json;
|
||||
}
|
||||
if lower.ends_with(".ndjson") || lower.ends_with(".jsonl") {
|
||||
return FileType::NdJson;
|
||||
}
|
||||
if lower.ends_with(".pdf") {
|
||||
return FileType::Pdf;
|
||||
}
|
||||
if lower.ends_with(".txt") || lower.ends_with(".log") || lower.ends_with(".sms") {
|
||||
return FileType::Text;
|
||||
}
|
||||
|
||||
// Content sniffing fallback
|
||||
if content.starts_with(b"%PDF") {
|
||||
return FileType::Pdf;
|
||||
}
|
||||
if content.starts_with(b"[") || content.starts_with(b"{") {
|
||||
return FileType::Json;
|
||||
}
|
||||
|
||||
// Check if it looks like CSV (has commas and newlines in first chunk)
|
||||
let sample = &content[..content.len().min(4096)];
|
||||
let comma_count = sample.iter().filter(|&&b| b == b',').count();
|
||||
let newline_count = sample.iter().filter(|&&b| b == b'\n').count();
|
||||
if comma_count > 3 && newline_count > 1 {
|
||||
return FileType::Csv;
|
||||
}
|
||||
|
||||
// If it's valid UTF-8, treat as text
|
||||
if std::str::from_utf8(sample).is_ok() {
|
||||
return FileType::Text;
|
||||
}
|
||||
|
||||
FileType::Unknown
|
||||
}
|
||||
|
||||
/// Compute SHA-256 hash of content for deduplication.
|
||||
pub fn content_hash(content: &[u8]) -> String {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(content);
|
||||
format!("{:x}", hasher.finalize())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn detect_csv_by_extension() {
|
||||
assert_eq!(detect_file_type("data.csv", b"a,b,c\n1,2,3"), FileType::Csv);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detect_json_by_extension() {
|
||||
assert_eq!(detect_file_type("data.json", b"[{\"a\":1}]"), FileType::Json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detect_pdf_by_magic() {
|
||||
assert_eq!(detect_file_type("unknown", b"%PDF-1.4 blah"), FileType::Pdf);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detect_csv_by_content() {
|
||||
let csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n";
|
||||
assert_eq!(detect_file_type("unknown.dat", csv), FileType::Csv);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_hash_deterministic() {
|
||||
let h1 = content_hash(b"hello world");
|
||||
let h2 = content_hash(b"hello world");
|
||||
assert_eq!(h1, h2);
|
||||
}
|
||||
}
|
||||
@ -1,167 +0,0 @@
|
||||
use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Parse JSON (array of objects or newline-delimited) into Arrow RecordBatches.
|
||||
/// Nested objects are flattened: {"a": {"b": 1}} → column "a_b".
|
||||
pub fn parse_json(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
|
||||
let text = std::str::from_utf8(content).map_err(|e| format!("invalid UTF-8: {e}"))?;
|
||||
let text = text.trim();
|
||||
|
||||
// Parse into Vec<Map>
|
||||
let rows: Vec<serde_json::Map<String, serde_json::Value>> = if text.starts_with('[') {
|
||||
// JSON array
|
||||
let arr: Vec<serde_json::Value> = serde_json::from_str(text)
|
||||
.map_err(|e| format!("JSON parse error: {e}"))?;
|
||||
arr.into_iter()
|
||||
.filter_map(|v| v.as_object().cloned())
|
||||
.collect()
|
||||
} else {
|
||||
// Newline-delimited JSON
|
||||
text.lines()
|
||||
.filter(|l| !l.trim().is_empty())
|
||||
.map(|l| {
|
||||
let v: serde_json::Value = serde_json::from_str(l)
|
||||
.map_err(|e| format!("NDJSON line parse error: {e}"))?;
|
||||
v.as_object().cloned().ok_or_else(|| "NDJSON line is not an object".into())
|
||||
})
|
||||
.collect::<Result<Vec<_>, String>>()?
|
||||
};
|
||||
|
||||
if rows.is_empty() {
|
||||
return Err("JSON has no records".into());
|
||||
}
|
||||
|
||||
// Flatten and collect all columns
|
||||
let mut columns: BTreeMap<String, Vec<serde_json::Value>> = BTreeMap::new();
|
||||
let n_rows = rows.len();
|
||||
|
||||
for (row_idx, row) in rows.iter().enumerate() {
|
||||
let flat = flatten_object(row, "");
|
||||
// Pad existing columns that aren't in this row
|
||||
for key in columns.keys().cloned().collect::<Vec<_>>() {
|
||||
if !flat.contains_key(&key) {
|
||||
columns.get_mut(&key).unwrap().push(serde_json::Value::Null);
|
||||
}
|
||||
}
|
||||
// Add new columns or append values
|
||||
for (key, val) in flat {
|
||||
let col = columns.entry(key).or_insert_with(Vec::new);
|
||||
// Pad with nulls if this column is new and we've seen prior rows
|
||||
while col.len() < row_idx {
|
||||
col.push(serde_json::Value::Null);
|
||||
}
|
||||
col.push(val);
|
||||
}
|
||||
}
|
||||
|
||||
// Pad short columns
|
||||
for col in columns.values_mut() {
|
||||
while col.len() < n_rows {
|
||||
col.push(serde_json::Value::Null);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("parsed JSON: {n_rows} rows × {} columns", columns.len());
|
||||
|
||||
// Infer types and build schema
|
||||
let mut fields = Vec::new();
|
||||
let mut arrays: Vec<ArrayRef> = Vec::new();
|
||||
|
||||
for (name, values) in &columns {
|
||||
let (dt, array) = json_column_to_arrow(values);
|
||||
fields.push(Field::new(name, dt, true));
|
||||
arrays.push(array);
|
||||
}
|
||||
|
||||
let schema = Arc::new(Schema::new(fields));
|
||||
let batch = RecordBatch::try_new(schema.clone(), arrays)
|
||||
.map_err(|e| format!("RecordBatch error: {e}"))?;
|
||||
|
||||
Ok((schema, vec![batch]))
|
||||
}
|
||||
|
||||
/// Flatten nested JSON object: {"a": {"b": 1}} → {"a_b": 1}
|
||||
fn flatten_object(obj: &serde_json::Map<String, serde_json::Value>, prefix: &str) -> BTreeMap<String, serde_json::Value> {
|
||||
let mut result = BTreeMap::new();
|
||||
for (key, val) in obj {
|
||||
let full_key = if prefix.is_empty() { key.clone() } else { format!("{prefix}_{key}") };
|
||||
match val {
|
||||
serde_json::Value::Object(inner) => {
|
||||
result.extend(flatten_object(inner, &full_key));
|
||||
}
|
||||
other => {
|
||||
result.insert(full_key, other.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Convert a column of JSON values to an Arrow array.
|
||||
fn json_column_to_arrow(values: &[serde_json::Value]) -> (DataType, ArrayRef) {
|
||||
// Check if all non-null values are the same type
|
||||
let non_null: Vec<&serde_json::Value> = values.iter()
|
||||
.filter(|v| !v.is_null())
|
||||
.collect();
|
||||
|
||||
if non_null.is_empty() {
|
||||
return (DataType::Utf8, Arc::new(StringArray::from(vec![None::<&str>; values.len()])));
|
||||
}
|
||||
|
||||
let all_bool = non_null.iter().all(|v| v.is_boolean());
|
||||
let all_i64 = non_null.iter().all(|v| v.is_i64() || v.is_u64());
|
||||
let all_f64 = non_null.iter().all(|v| v.is_number());
|
||||
|
||||
if all_bool {
|
||||
let arr: Vec<Option<bool>> = values.iter().map(|v| v.as_bool()).collect();
|
||||
(DataType::Boolean, Arc::new(BooleanArray::from(arr)))
|
||||
} else if all_i64 {
|
||||
let arr: Vec<Option<i64>> = values.iter().map(|v| v.as_i64()).collect();
|
||||
(DataType::Int64, Arc::new(Int64Array::from(arr)))
|
||||
} else if all_f64 {
|
||||
let arr: Vec<Option<f64>> = values.iter().map(|v| v.as_f64()).collect();
|
||||
(DataType::Float64, Arc::new(Float64Array::from(arr)))
|
||||
} else {
|
||||
// Default to string
|
||||
let arr: Vec<Option<String>> = values.iter().map(|v| {
|
||||
match v {
|
||||
serde_json::Value::Null => None,
|
||||
serde_json::Value::String(s) => Some(s.clone()),
|
||||
other => Some(other.to_string()),
|
||||
}
|
||||
}).collect();
|
||||
(DataType::Utf8, Arc::new(StringArray::from(arr)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_json_array() {
|
||||
let json = br#"[{"name":"Alice","age":30},{"name":"Bob","age":25}]"#;
|
||||
let (schema, batches) = parse_json(json).unwrap();
|
||||
assert_eq!(batches[0].num_rows(), 2);
|
||||
assert!(schema.field_with_name("age").unwrap().data_type() == &DataType::Int64);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_ndjson() {
|
||||
let json = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n";
|
||||
let (_, batches) = parse_json(json).unwrap();
|
||||
assert_eq!(batches[0].num_rows(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flatten_nested() {
|
||||
let json = br#"[{"user":{"name":"Alice","address":{"city":"NYC"}},"score":9.5}]"#;
|
||||
let (schema, _) = parse_json(json).unwrap();
|
||||
let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
|
||||
assert!(names.contains(&"user_name"));
|
||||
assert!(names.contains(&"user_address_city"));
|
||||
}
|
||||
}
|
||||
@ -1,6 +0,0 @@
|
||||
pub mod detect;
|
||||
pub mod csv_ingest;
|
||||
pub mod json_ingest;
|
||||
pub mod pdf_ingest;
|
||||
pub mod pipeline;
|
||||
pub mod service;
|
||||
@ -1,52 +0,0 @@
|
||||
use arrow::array::{ArrayRef, Int32Array, StringArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Extract text from a PDF file.
|
||||
/// Returns one row per page: (page_number, text_content).
|
||||
/// This handles text-based PDFs. Scanned/image PDFs need OCR (not implemented yet).
|
||||
pub fn parse_pdf(content: &[u8], source_filename: &str) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
|
||||
let doc = lopdf::Document::load_mem(content)
|
||||
.map_err(|e| format!("PDF load error: {e}"))?;
|
||||
|
||||
let pages = doc.get_pages();
|
||||
let mut page_numbers: Vec<i32> = Vec::new();
|
||||
let mut page_texts: Vec<String> = Vec::new();
|
||||
let mut sources: Vec<String> = Vec::new();
|
||||
|
||||
for (&page_num, _) in pages.iter() {
|
||||
let text = doc.extract_text(&[page_num]).unwrap_or_default();
|
||||
let text = text.trim().to_string();
|
||||
|
||||
if !text.is_empty() {
|
||||
page_numbers.push(page_num as i32);
|
||||
page_texts.push(text);
|
||||
sources.push(source_filename.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if page_numbers.is_empty() {
|
||||
// PDF has no extractable text — likely scanned/image
|
||||
return Err("PDF contains no extractable text (may be scanned/image — OCR not yet supported)".into());
|
||||
}
|
||||
|
||||
tracing::info!("extracted {} pages with text from PDF '{}'", page_numbers.len(), source_filename);
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("source_file", DataType::Utf8, false),
|
||||
Field::new("page_number", DataType::Int32, false),
|
||||
Field::new("text_content", DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let arrays: Vec<ArrayRef> = vec![
|
||||
Arc::new(StringArray::from(sources)),
|
||||
Arc::new(Int32Array::from(page_numbers)),
|
||||
Arc::new(StringArray::from(page_texts)),
|
||||
];
|
||||
|
||||
let batch = RecordBatch::try_new(schema.clone(), arrays)
|
||||
.map_err(|e| format!("RecordBatch error: {e}"))?;
|
||||
|
||||
Ok((schema, vec![batch]))
|
||||
}
|
||||
@ -1,162 +0,0 @@
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use bytes::Bytes;
|
||||
use object_store::ObjectStore;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalogd::registry::Registry;
|
||||
use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet};
|
||||
use shared::types::{ObjectRef, SchemaFingerprint};
|
||||
use storaged::ops;
|
||||
|
||||
use crate::detect::{FileType, content_hash, detect_file_type};
|
||||
use crate::csv_ingest;
|
||||
use crate::json_ingest;
|
||||
use crate::pdf_ingest;
|
||||
|
||||
/// Result of an ingest operation.
|
||||
#[derive(Debug, Clone, serde::Serialize)]
|
||||
pub struct IngestResult {
|
||||
pub dataset_name: String,
|
||||
pub file_type: String,
|
||||
pub rows: usize,
|
||||
pub columns: usize,
|
||||
pub storage_key: String,
|
||||
pub content_hash: String,
|
||||
pub schema_fingerprint: String,
|
||||
pub deduplicated: bool,
|
||||
}
|
||||
|
||||
/// Full ingest pipeline: detect → parse → dedup → store → register.
|
||||
pub async fn ingest_file(
|
||||
filename: &str,
|
||||
content: &[u8],
|
||||
dataset_name: Option<&str>,
|
||||
store: &Arc<dyn ObjectStore>,
|
||||
registry: &Registry,
|
||||
) -> Result<IngestResult, String> {
|
||||
// 1. Detect file type
|
||||
let file_type = detect_file_type(filename, content);
|
||||
tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len());
|
||||
|
||||
// 2. Parse into Arrow
|
||||
let (schema, batches) = match file_type {
|
||||
FileType::Csv => csv_ingest::parse_csv(content)?,
|
||||
FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?,
|
||||
FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?,
|
||||
FileType::Text => parse_text_file(content, filename)?,
|
||||
FileType::Unknown => return Err(format!("unknown file type for '{filename}'")),
|
||||
};
|
||||
|
||||
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
|
||||
let total_cols = schema.fields().len();
|
||||
|
||||
if total_rows == 0 {
|
||||
return Err("file contains no data".into());
|
||||
}
|
||||
|
||||
// 3. Content hash for dedup
|
||||
let hash = content_hash(content);
|
||||
|
||||
// 4. Check if already ingested (same content hash)
|
||||
let existing = registry.list().await;
|
||||
if existing.iter().any(|d| d.schema_fingerprint.0 == hash) {
|
||||
tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]);
|
||||
return Ok(IngestResult {
|
||||
dataset_name: dataset_name.unwrap_or(filename).to_string(),
|
||||
file_type: format!("{:?}", file_type),
|
||||
rows: total_rows,
|
||||
columns: total_cols,
|
||||
storage_key: String::new(),
|
||||
content_hash: hash,
|
||||
schema_fingerprint: String::new(),
|
||||
deduplicated: true,
|
||||
});
|
||||
}
|
||||
|
||||
// 5. Convert to Parquet
|
||||
let mut all_parquet = Vec::new();
|
||||
for batch in &batches {
|
||||
let pq = record_batch_to_parquet(batch)?;
|
||||
all_parquet.extend_from_slice(&pq);
|
||||
}
|
||||
let parquet_bytes = Bytes::from(all_parquet);
|
||||
let parquet_size = parquet_bytes.len() as u64;
|
||||
|
||||
// 6. Store in object storage
|
||||
let name = dataset_name.unwrap_or_else(|| {
|
||||
filename.rsplit('/').next().unwrap_or(filename)
|
||||
.rsplit('.').last().unwrap_or(filename)
|
||||
});
|
||||
let safe_name = sanitize_dataset_name(name);
|
||||
let storage_key = format!("datasets/{}.parquet", safe_name);
|
||||
|
||||
ops::put(store, &storage_key, parquet_bytes).await?;
|
||||
tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size);
|
||||
|
||||
// 7. Register in catalog
|
||||
let schema_fp = fingerprint_schema(&schema);
|
||||
let now = chrono::Utc::now();
|
||||
let obj_ref = ObjectRef {
|
||||
bucket: "data".to_string(),
|
||||
key: storage_key.clone(),
|
||||
size_bytes: parquet_size,
|
||||
created_at: now,
|
||||
};
|
||||
|
||||
// Use content hash as fingerprint for dedup tracking
|
||||
registry.register(
|
||||
safe_name.clone(),
|
||||
SchemaFingerprint(hash.clone()),
|
||||
vec![obj_ref],
|
||||
).await?;
|
||||
|
||||
Ok(IngestResult {
|
||||
dataset_name: safe_name,
|
||||
file_type: format!("{:?}", file_type),
|
||||
rows: total_rows,
|
||||
columns: total_cols,
|
||||
storage_key,
|
||||
content_hash: hash,
|
||||
schema_fingerprint: schema_fp.0,
|
||||
deduplicated: false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Parse plain text / SMS logs into a simple table.
|
||||
fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
|
||||
let text = String::from_utf8_lossy(content);
|
||||
let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect();
|
||||
|
||||
if lines.is_empty() {
|
||||
return Err("text file is empty".into());
|
||||
}
|
||||
|
||||
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
|
||||
arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false),
|
||||
arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false),
|
||||
arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false),
|
||||
]));
|
||||
|
||||
let sources: Vec<&str> = vec![filename; lines.len()];
|
||||
let line_nums: Vec<i32> = (1..=lines.len() as i32).collect();
|
||||
|
||||
let arrays: Vec<arrow::array::ArrayRef> = vec![
|
||||
Arc::new(arrow::array::StringArray::from(sources)),
|
||||
Arc::new(arrow::array::Int32Array::from(line_nums)),
|
||||
Arc::new(arrow::array::StringArray::from(lines)),
|
||||
];
|
||||
|
||||
let batch = RecordBatch::try_new(schema.clone(), arrays)
|
||||
.map_err(|e| format!("RecordBatch error: {e}"))?;
|
||||
|
||||
Ok((schema, vec![batch]))
|
||||
}
|
||||
|
||||
fn sanitize_dataset_name(name: &str) -> String {
|
||||
let clean: String = name.chars()
|
||||
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
|
||||
.collect();
|
||||
let trimmed = clean.trim_matches('_').to_string();
|
||||
if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed }
|
||||
}
|
||||
@ -1,69 +0,0 @@
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::{Multipart, Query, State},
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
routing::{get, post},
|
||||
};
|
||||
use object_store::ObjectStore;
|
||||
use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalogd::registry::Registry;
|
||||
use crate::pipeline;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IngestState {
|
||||
pub store: Arc<dyn ObjectStore>,
|
||||
pub registry: Registry,
|
||||
}
|
||||
|
||||
pub fn router(state: IngestState) -> Router {
|
||||
Router::new()
|
||||
.route("/health", get(health))
|
||||
.route("/file", post(ingest_file))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
async fn health() -> &'static str {
|
||||
"ingestd ok"
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct IngestQuery {
|
||||
/// Override dataset name (otherwise derived from filename)
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
/// Upload a file for ingestion. Accepts multipart/form-data with a "file" field.
|
||||
async fn ingest_file(
|
||||
State(state): State<IngestState>,
|
||||
Query(query): Query<IngestQuery>,
|
||||
mut multipart: Multipart,
|
||||
) -> impl IntoResponse {
|
||||
// Read the first file field
|
||||
let field = match multipart.next_field().await {
|
||||
Ok(Some(f)) => f,
|
||||
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
|
||||
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))),
|
||||
};
|
||||
|
||||
let filename = field.file_name().unwrap_or("unknown").to_string();
|
||||
let content = field.bytes().await
|
||||
.map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?;
|
||||
|
||||
tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len());
|
||||
|
||||
let dataset_name = query.name.as_deref();
|
||||
|
||||
match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await {
|
||||
Ok(result) => {
|
||||
if result.deduplicated {
|
||||
Ok((StatusCode::OK, Json(result)))
|
||||
} else {
|
||||
Ok((StatusCode::CREATED, Json(result)))
|
||||
}
|
||||
}
|
||||
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
|
||||
}
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
{
|
||||
"id": "1ca61945-d151-490b-81fd-2ca0397b68fa",
|
||||
"name": "sms_messages",
|
||||
"schema_fingerprint": "e1d079cbb2b7eedae5019767a886bd9a3396e291aa03630b9db69e9864948c09",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/sms_messages.parquet",
|
||||
"size_bytes": 2018,
|
||||
"created_at": "2026-03-27T13:07:14.253881797Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:07:14.253886027Z",
|
||||
"updated_at": "2026-03-27T13:07:14.253886027Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "1f81445a-404f-48ea-bd72-00f6721ee18b",
|
||||
"name": "employees",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/employees.parquet",
|
||||
"size_bytes": 1424,
|
||||
"created_at": "2026-03-27T11:55:55.013996293Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T11:55:55.014010258Z",
|
||||
"updated_at": "2026-03-27T11:55:55.014010258Z"
|
||||
}
|
||||
@ -1,15 +0,0 @@
|
||||
{
|
||||
"id": "478072c3-0c95-46a2-9193-f4b3ac4085ab",
|
||||
"name": "test_ingest",
|
||||
"schema_fingerprint": "4bdc4e5baeddc1187aecd4bfb788654f26145c2ba346b4bec6ca8ab950e1c133",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/test_ingest.parquet",
|
||||
"size_bytes": 3129,
|
||||
"created_at": "2026-03-27T13:06:57.437484309Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T13:06:57.437488259Z",
|
||||
"updated_at": "2026-03-27T13:06:57.437488259Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "50d402a0-6fb4-4849-ac75-3541b8530aba",
|
||||
"name": "u5",
|
||||
"schema_fingerprint": "from-ui",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/events.parquet",
|
||||
"size_bytes": 0,
|
||||
"created_at": "2026-03-27T12:16:57.520054490Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T12:16:57.520066991Z",
|
||||
"updated_at": "2026-03-27T12:16:57.520066991Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8",
|
||||
"name": "events",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/events.parquet",
|
||||
"size_bytes": 1127,
|
||||
"created_at": "2026-03-27T11:55:55.017003417Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T11:55:55.017011224Z",
|
||||
"updated_at": "2026-03-27T11:55:55.017011224Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "f5281277-afe9-456b-b5fa-d667c2b0f41f",
|
||||
"name": "products",
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/products.parquet",
|
||||
"size_bytes": 1341,
|
||||
"created_at": "2026-03-27T11:55:55.019096056Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T11:55:55.019098965Z",
|
||||
"updated_at": "2026-03-27T11:55:55.019098965Z"
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
{
|
||||
"id": "fc046131-8f44-428a-af90-ebf6347d4cc6",
|
||||
"name": "u",
|
||||
"schema_fingerprint": "from-ui",
|
||||
"objects": [
|
||||
{
|
||||
"bucket": "data",
|
||||
"key": "datasets/employees.parquet",
|
||||
"size_bytes": 0,
|
||||
"created_at": "2026-03-27T12:16:51.462058823Z"
|
||||
}
|
||||
],
|
||||
"created_at": "2026-03-27T12:16:51.462071311Z",
|
||||
"updated_at": "2026-03-27T12:16:51.462071311Z"
|
||||
}
|
||||
BIN
data/datasets/employees.parquet
Normal file
BIN
data/datasets/employees.parquet
Normal file
Binary file not shown.
BIN
data/datasets/events.parquet
Normal file
BIN
data/datasets/events.parquet
Normal file
Binary file not shown.
BIN
data/datasets/products.parquet
Normal file
BIN
data/datasets/products.parquet
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -34,23 +34,3 @@
|
||||
**Date:** 2026-03-27
|
||||
**Decision:** Gateway serves HTTP on :3100 (external) and gRPC on :3101 (internal). Both run in the same process.
|
||||
**Rationale:** Single binary simplifies deployment. HTTP stays for browser/curl access. gRPC provides typed contracts for service-to-service calls. No premature microservice split.
|
||||
|
||||
## ADR-008: Embeddings stored as Parquet, not a proprietary vector DB
|
||||
**Date:** 2026-03-27
|
||||
**Decision:** Vector embeddings stored as Parquet files (doc_id, chunk_text, vector columns). Vector index (HNSW) serialized as a sidecar file.
|
||||
**Rationale:** Keeps all data in one portable format. No vendor lock-in to Pinecone/Weaviate/Qdrant. Vectors are queryable via DataFusion like any other data. Trade-off: brute-force search is fine up to ~100K vectors; HNSW needed beyond that.
|
||||
|
||||
## ADR-009: Incremental updates via delta files, not Delta Lake
|
||||
**Date:** 2026-03-27
|
||||
**Decision:** Updates append to delta Parquet files. Queries merge base + deltas at read time. Periodic compaction merges deltas into base. Single-writer model (no concurrent writers).
|
||||
**Rationale:** Full ACID over Parquet (Delta Lake/Iceberg) is a multi-year project. Our use case is single-writer (one ingest pipeline) with read-heavy workloads. Merge-on-read with compaction is sufficient and dramatically simpler.
|
||||
|
||||
## ADR-010: Schema detection defaults to string
|
||||
**Date:** 2026-03-27
|
||||
**Decision:** Ingest pipeline infers column types from data. When ambiguous or mixed, defaults to String rather than failing.
|
||||
**Rationale:** Legacy data is messy. A column with "123", "N/A", and "" is a string, not an integer. Downstream queries can CAST as needed. Better to ingest everything than reject on type errors.
|
||||
|
||||
## ADR-011: This is not a CRM replacement
|
||||
**Date:** 2026-03-27
|
||||
**Decision:** The lakehouse is the analytical layer BEHIND operational systems. It ingests exports, not live data. CRM/ATS stays for daily operations.
|
||||
**Rationale:** Operational systems need single-record CRUD, permissions, UI workflows. The lakehouse answers cross-cutting questions that no single operational system can. They complement, not compete.
|
||||
|
||||
240
docs/PRD.md
240
docs/PRD.md
@ -1,6 +1,6 @@
|
||||
# PRD: Lakehouse — Rust-First Object Storage System
|
||||
|
||||
**Status:** Active — Phase 0-5 complete, entering production path
|
||||
**Status:** Active
|
||||
**Created:** 2026-03-27
|
||||
**Owner:** J
|
||||
|
||||
@ -8,23 +8,20 @@
|
||||
|
||||
## Problem
|
||||
|
||||
Legacy data systems silo information across CRMs, databases, spreadsheets, and file shares. Querying across them requires manual ETL, pre-defined schemas, and expensive database licenses. When AI enters the picture, these systems can't handle the dual requirement of fast analytical queries AND semantic retrieval over unstructured text.
|
||||
|
||||
A staffing company (our reference case) has candidate records in an ATS, client data in a CRM, timesheets in billing software, call logs from a phone system, and email records from Exchange. Answering "find every Java developer in Chicago who was called 5+ times but never placed" requires querying across all of them — and no single system can do it.
|
||||
Traditional data platforms couple storage, compute, and metadata into monolithic databases. This creates vendor lock-in, scaling bottlenecks, and opaque data access. AI workloads bolt onto these systems awkwardly, sharing resources with transactional queries.
|
||||
|
||||
We need a system where:
|
||||
- Any data source (CSV, DB export, PDF, JSON) can be ingested without pre-defined schemas
|
||||
- Structured data is queryable via SQL at scale (millions of rows, sub-second)
|
||||
- Unstructured data is searchable via AI embeddings (semantic retrieval)
|
||||
- An LLM can answer natural language questions against all of it
|
||||
- Everything runs locally — no cloud APIs, total data privacy
|
||||
- The system is rebuildable from repository + object storage alone
|
||||
- Object storage is the source of truth (not a database)
|
||||
- Metadata, access, and execution are controlled by Rust services
|
||||
- Queries run directly over object storage via Arrow/Parquet
|
||||
- AI inference is isolated and swappable
|
||||
- The entire system is rebuildable from repository + docs alone
|
||||
|
||||
---
|
||||
|
||||
## Solution
|
||||
|
||||
A modular Rust service mesh over S3-compatible object storage, with a local AI layer for embeddings and generation.
|
||||
A modular Rust service mesh over S3-compatible object storage.
|
||||
|
||||
### Locked Stack
|
||||
|
||||
@ -33,15 +30,14 @@ A modular Rust service mesh over S3-compatible object storage, with a local AI l
|
||||
| Frontend | Dioxus | Yes |
|
||||
| API | Axum + Tokio | Yes |
|
||||
| Object Storage Interface | Apache Arrow `object_store` | Yes |
|
||||
| Storage Backend | LocalFileSystem → RustFS → S3 | Yes |
|
||||
| Storage Backend | RustFS (fallback: SeaweedFS) | Yes |
|
||||
| Query Engine | DataFusion | Yes |
|
||||
| Data Format | Parquet + Arrow | Yes |
|
||||
| RPC (internal) | tonic (gRPC) | Yes |
|
||||
| AI Runtime | Ollama (local models) | Yes |
|
||||
| AI Boundary | Python FastAPI sidecar → Ollama HTTP API | Yes |
|
||||
| Vector Index | TBD — evaluate `hora`, `qdrant` crate, or HNSW from scratch | **Open** |
|
||||
|
||||
No new frameworks without documented ADR.
|
||||
No new frameworks. No exceptions.
|
||||
|
||||
---
|
||||
|
||||
@ -51,160 +47,84 @@ No new frameworks without documented ADR.
|
||||
|
||||
| Service | Responsibility |
|
||||
|---|---|
|
||||
| **gateway** | HTTP/gRPC ingress, routing, auth, CORS, body limits |
|
||||
| **catalogd** | Metadata control plane — dataset registry, schema versions, manifests |
|
||||
| **storaged** | Object I/O — read/write/list/delete via `object_store` |
|
||||
| **queryd** | SQL execution — DataFusion over Parquet, MemTable hot cache |
|
||||
| **ingestd** | *NEW* — Ingest pipeline: CSV/JSON/DB → normalize → Parquet → catalog |
|
||||
| **vectord** | *NEW* — Embedding store + vector index: chunk → embed → index → search |
|
||||
| **gateway** | HTTP ingress, routing, auth envelope, middleware |
|
||||
| **catalogd** | Metadata control plane — dataset registry, schema versions, manifest index |
|
||||
| **storaged** | Object I/O — read/write/list/delete via `object_store` crate |
|
||||
| **queryd** | SQL execution — DataFusion over registered Parquet datasets |
|
||||
| **aibridge** | Rust↔Python boundary — HTTP client to FastAPI sidecar |
|
||||
| **ui** | Dioxus frontend — Ask, Explore, SQL, System tabs |
|
||||
| **shared** | Types, errors, Arrow helpers, config, protobuf definitions |
|
||||
| **ui** | Dioxus frontend — dataset browser, query editor, results viewer |
|
||||
| **shared** | Types, errors, Arrow helpers, protobuf definitions |
|
||||
|
||||
### AI Sidecar
|
||||
|
||||
Python FastAPI process that adapts Ollama's HTTP API into Arrow-compatible formats:
|
||||
- `POST /embed` → `nomic-embed-text` via Ollama
|
||||
- `POST /generate` → configurable model (qwen2.5, mistral, gemma2, llama3.2)
|
||||
- `POST /rerank` → cross-encoder reranking via generate endpoint
|
||||
|
||||
No mocks. No stubs. Real models from day one. Ollama manages model lifecycle, GPU scheduling, caching. Sidecar is stateless passthrough.
|
||||
|
||||
### Data Flow
|
||||
|
||||
```
|
||||
Raw data → ingestd (normalize, chunk, detect schema)
|
||||
├→ storaged (Parquet files to object storage)
|
||||
├→ catalogd (register dataset + schema)
|
||||
├→ vectord (embed text chunks, build index)
|
||||
└→ queryd (auto-register as queryable table)
|
||||
|
||||
User question → gateway
|
||||
├→ vectord (semantic search for relevant chunks) ← RAG path
|
||||
├→ queryd (SQL over structured data) ← Analytics path
|
||||
└→ aibridge → Ollama (generate answer from context)
|
||||
Client → gateway → catalogd (metadata lookup)
|
||||
→ storaged (object read/write)
|
||||
→ queryd (SQL execution over Parquet)
|
||||
→ aibridge → sidecar → Ollama (inference)
|
||||
```
|
||||
|
||||
### Query Paths
|
||||
|
||||
**Analytical (SQL):** "What's the average bill rate for .NET devs in Chicago?"
|
||||
→ DataFusion scans Parquet columnar, returns in <200ms
|
||||
|
||||
**Semantic (RAG):** "Find candidates who could do data engineering work"
|
||||
→ Embed question → vector search across resume embeddings → retrieve top chunks → LLM answers
|
||||
|
||||
**Hybrid:** "Which clients are we losing money on, and why?"
|
||||
→ SQL for margin calculations + RAG over client notes/emails for context → LLM synthesizes
|
||||
|
||||
### Invariants
|
||||
|
||||
1. Object storage = source of truth for all data
|
||||
2. catalogd = sole metadata authority
|
||||
3. No raw data in catalog — only pointers
|
||||
4. vectord stores embeddings AS Parquet (portable, not a proprietary format)
|
||||
5. ingestd is idempotent — re-ingesting the same file is a no-op
|
||||
6. Hot cache is a performance layer, not a source of truth — eviction is safe
|
||||
7. All services modular and independently replaceable
|
||||
2. catalogd = sole metadata authority (datasets, schemas, manifests)
|
||||
3. No raw data stored in catalog — only pointers (bucket, key, schema fingerprint)
|
||||
4. storaged never interprets data — dumb pipe with presigned URLs
|
||||
5. queryd registers tables via catalog pointers, not by scanning storage
|
||||
6. aibridge is stateless — Python sidecar is replaceable without touching Rust
|
||||
7. All services are modular and independently replaceable
|
||||
|
||||
### Dependency Graph
|
||||
|
||||
```
|
||||
shared ← storaged ← catalogd ← queryd
|
||||
shared ← aibridge
|
||||
gateway → {storaged, catalogd, queryd, aibridge}
|
||||
ui → gateway (HTTP only, no crate dependency)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phases
|
||||
|
||||
### Phase 0-5: Foundation ✅ COMPLETE
|
||||
### Phase 0: Bootstrap
|
||||
Workspace compiles, gateway serves health check, structured logging works.
|
||||
|
||||
- Rust workspace, Axum gateway, object storage, catalog, DataFusion query engine
|
||||
- Python sidecar with real Ollama models (embed, generate, rerank)
|
||||
- Dioxus UI with Ask (NL→SQL), Explore, SQL, System tabs
|
||||
- gRPC, OpenTelemetry, auth middleware, TOML config
|
||||
- Validated with 286K row staffing company dataset across 7 tables
|
||||
- Cross-reference queries (JOINs across candidates, placements, timesheets, calls) in <150ms
|
||||
**Gate:** `cargo build` clean, `GET /health` returns 200, logs on stdout, docs committed.
|
||||
|
||||
### Phase 6: Ingest Pipeline
|
||||
### Phase 1: Storage + Catalog
|
||||
Write Parquet to object storage, register in catalog, read back.
|
||||
|
||||
Build the data on-ramp. Accept messy real-world data, normalize it, make it queryable.
|
||||
**Gate:** Upload Parquet → register dataset → retrieve metadata → read back. All via gateway HTTP.
|
||||
|
||||
| Step | Deliverable | Gate |
|
||||
|---|---|---|
|
||||
| 6.1 | `ingestd` crate with CSV parser → Arrow RecordBatch → Parquet | CSV file → queryable dataset |
|
||||
| 6.2 | JSON ingest (newline-delimited JSON, nested objects) | JSON file → flat Parquet |
|
||||
| 6.3 | Schema detection — infer column types from data | No manual schema definition needed |
|
||||
| 6.4 | Deduplication — detect and skip already-ingested files (content hash) | Re-ingest same file = no-op |
|
||||
| 6.5 | Text chunking — split large text fields for embedding | Long text → overlapping chunks |
|
||||
| 6.6 | Auto-registration — ingest writes to storage AND registers in catalog | Single API call: file in → queryable |
|
||||
| 6.7 | Gateway endpoint: `POST /ingest` with file upload | Upload CSV from browser → query in seconds |
|
||||
### Phase 2: Query Engine
|
||||
SQL queries over registered Parquet datasets via DataFusion.
|
||||
|
||||
**Gate:** Upload a raw CSV or JSON file → auto-detected schema → stored as Parquet → registered → immediately queryable via SQL. No manual steps.
|
||||
**Gate:** `SELECT * FROM dataset LIMIT 10` returns correct results. Resolution goes through catalog.
|
||||
|
||||
**Risk:** Schema detection on messy data (mixed types, nulls, inconsistent formatting). Mitigation: conservative type inference (default to string), let user override.
|
||||
### Phase 3: AI Integration
|
||||
Python sidecar with real Ollama models. Embeddings, generation, reranking.
|
||||
|
||||
### Phase 7: Vector Index + RAG Pipeline
|
||||
**Gate:** Rust sends text → Python → Ollama → real embeddings return as Arrow-compatible floats.
|
||||
|
||||
Make unstructured data searchable by meaning, not just keywords.
|
||||
### Phase 4: Frontend
|
||||
Dioxus UI: dataset browser, query editor, results table.
|
||||
|
||||
| Step | Deliverable | Gate |
|
||||
|---|---|---|
|
||||
| 7.1 | `vectord` crate with embedding storage as Parquet (doc_id, chunk_text, vector) | Embeddings stored as portable Parquet |
|
||||
| 7.2 | Chunking strategy — configurable chunk size + overlap for text columns | Large text fields split into embeddable chunks |
|
||||
| 7.3 | Brute-force vector search via DataFusion (cosine similarity SQL) | Semantic search works, correctness verified |
|
||||
| 7.4 | HNSW index for fast approximate nearest neighbor | Search over 100K+ vectors in <50ms |
|
||||
| 7.5 | RAG endpoint: `POST /rag` — question → embed → search → retrieve → generate | Natural language question → grounded answer |
|
||||
| 7.6 | Auto-embed on ingest — text columns automatically embedded during ingest | No separate embedding step needed |
|
||||
| 7.7 | Hybrid search — combine SQL filters with vector similarity | "Java devs in Chicago" (SQL) + "who could do data engineering" (semantic) |
|
||||
**Gate:** User can browse datasets and run queries from browser.
|
||||
|
||||
**Gate:** Ingest 15K candidate resumes → auto-embed → ask "find someone who could handle our Kubernetes migration" → system returns relevant candidates ranked by semantic match, with LLM explanation.
|
||||
### Phase 5: Hardening
|
||||
gRPC internals, OpenTelemetry, auth, config-driven startup.
|
||||
|
||||
**Risk: HNSW in Rust at scale.** This is the hardest technical problem. Options:
|
||||
- `hora` crate — Rust-native ANN, but less mature than FAISS
|
||||
- Store HNSW index as a serialized file alongside Parquet data
|
||||
- Fallback: brute-force scan is fine up to ~100K vectors; optimize later
|
||||
- Nuclear option: use Qdrant as an external vector store (breaks "no new services" rule)
|
||||
|
||||
**Decision needed:** Evaluate `hora` vs external Qdrant vs brute-force at J's data scale.
|
||||
|
||||
### Phase 8: Hot Cache + Incremental Updates
|
||||
|
||||
Make frequently-accessed data fast, and handle real-time updates without full rewrite.
|
||||
|
||||
| Step | Deliverable | Gate |
|
||||
|---|---|---|
|
||||
| 8.1 | MemTable hot cache — pin active datasets in memory | Queries on hot data: <10ms |
|
||||
| 8.2 | Cache policy — LRU eviction based on access patterns | Memory-bounded, auto-manages |
|
||||
| 8.3 | Incremental writes — append new rows without rewriting entire Parquet file | Update one candidate's phone → no full table rewrite |
|
||||
| 8.4 | Merge-on-read — query combines base Parquet + delta files | Correct results from base + updates |
|
||||
| 8.5 | Compaction — periodic merge of delta files into base Parquet | Prevent delta file proliferation |
|
||||
| 8.6 | Upsert semantics — insert or update by primary key | Same candidate ID → update in place |
|
||||
|
||||
**Gate:** Update a single row in a 15K-row dataset. Query reflects the change immediately. No full Parquet rewrite. Memory cache serves hot data in <10ms.
|
||||
|
||||
**Risk: This is the Delta Lake problem.** Full ACID transactions over Parquet files is what Databricks spent years building. We're NOT building Delta Lake — we're building a pragmatic version:
|
||||
- Append-only delta files (easy)
|
||||
- Merge-on-read (moderate)
|
||||
- Compaction (moderate)
|
||||
- Full ACID isolation (NOT attempting — single-writer model instead)
|
||||
|
||||
### Phase 9+: Future (not designed yet)
|
||||
|
||||
- Database connector ingest (PostgreSQL, MySQL, MSSQL → Parquet)
|
||||
- PDF/document ingest (OCR → text → chunks → embed)
|
||||
- Scheduled ingest (cron-based file watching)
|
||||
- Multi-node query distribution
|
||||
- Row-level access control
|
||||
- Audit log (who queried what, when)
|
||||
|
||||
---
|
||||
|
||||
## Reference Dataset: Staffing Company
|
||||
|
||||
Validated with realistic staffing company data:
|
||||
|
||||
| Table | Rows | Description |
|
||||
|---|---|---|
|
||||
| candidates | 15,000 | Names, phones, emails, zip, skills, resume text, availability |
|
||||
| clients | 500 | Companies, contacts, verticals, bill rates |
|
||||
| job_orders | 3,000 | Positions with descriptions, requirements, rates |
|
||||
| placements | 8,000 | Candidate↔job matches with dates, rates, recruiters |
|
||||
| timesheets | 120,000 | Weekly hours, bill/pay totals, approvals |
|
||||
| call_log | 80,000 | Phone CDR — who called whom, duration, disposition |
|
||||
| email_log | 60,000 | Email tracking — subject, opened, direction |
|
||||
| **Total** | **286,500** | **7 tables, cross-referenced** |
|
||||
|
||||
Proven queries:
|
||||
- Candidate search by skills + location + availability: 80ms
|
||||
- Revenue by client with profit margins (JOIN 120K timesheets): 142ms
|
||||
- Cold lead detection (candidates called 5+ times, never placed): 94ms
|
||||
- Margin analysis by vertical (JOIN placements → job orders): 53ms
|
||||
- Natural language → AI-generated SQL → execution → results: ~3s (model inference)
|
||||
**Gate:** Services communicate via gRPC. Traces propagate. Auth enforced. System restartable from repo + config.
|
||||
|
||||
---
|
||||
|
||||
@ -212,22 +132,24 @@ Proven queries:
|
||||
|
||||
| Model | Use |
|
||||
|---|---|
|
||||
| `nomic-embed-text` | Embeddings (768d) — semantic search, RAG retrieval |
|
||||
| `qwen2.5` | SQL generation, structured output, summarization |
|
||||
| `mistral` | General generation, longer context |
|
||||
| `nomic-embed-text` | Embeddings (768d) |
|
||||
| `qwen2.5` | Code generation, structured output |
|
||||
| `mistral` | General generation |
|
||||
| `gemma2` | General generation |
|
||||
| `llama3.2` | General generation, lightweight |
|
||||
| `llama3.2` | General generation |
|
||||
|
||||
Model selection via environment variables. No hardcoded model names in Rust code.
|
||||
|
||||
---
|
||||
|
||||
## Non-Goals
|
||||
|
||||
- Multi-tenancy (single-owner system)
|
||||
- Cloud deployment (local-first, always)
|
||||
- Full ACID transactions (single-writer model is sufficient)
|
||||
- Real-time streaming / CDC (batch ingest is the model)
|
||||
- Replacing the CRM (this is the analytical layer BEHIND the CRM)
|
||||
- Custom file formats (Parquet is the format, period)
|
||||
- Multi-tenancy
|
||||
- Streaming ingestion / CDC
|
||||
- Custom file formats
|
||||
- Query caching / materialized views
|
||||
- Wrapping `object_store` with another abstraction
|
||||
- Cloud deployment (local-first)
|
||||
|
||||
---
|
||||
|
||||
@ -235,13 +157,11 @@ Proven queries:
|
||||
|
||||
| Risk | Severity | Mitigation |
|
||||
|---|---|---|
|
||||
| Vector search in Rust at scale | **High** | Start brute-force, evaluate `hora` crate, Qdrant as fallback |
|
||||
| Incremental updates on Parquet | **High** | Delta files + merge-on-read, NOT full Delta Lake |
|
||||
| Legacy data messiness | **High** | Conservative schema detection, default to string, user overrides |
|
||||
| Schema evolution across ingests | **Medium** | Schema fingerprinting + versioned manifests |
|
||||
| Memory pressure from hot cache | **Medium** | LRU eviction, configurable memory limit |
|
||||
| HNSW index persistence | **Medium** | Serialize alongside Parquet, rebuild on startup |
|
||||
| Python sidecar as bottleneck | **Low** | Can replace with direct Ollama HTTP from Rust later |
|
||||
| RustFS immaturity | High | Start with LocalFileSystem, test against MinIO, RustFS last. SeaweedFS fallback. |
|
||||
| DataFusion table registration overhead | Medium | Lazy registration + LRU cache of SessionContext instances. |
|
||||
| Catalog consistency without DB | Medium | Write-ahead: persist manifest before in-memory update. Rebuild from storage on restart. |
|
||||
| Dioxus WASM gaps | Medium | Phase 4 is last. Fallback to plain HTML if blocked. |
|
||||
| Schema evolution | Medium | Schema fingerprinting in Phase 1. Validate before query. |
|
||||
|
||||
---
|
||||
|
||||
@ -253,5 +173,3 @@ Proven queries:
|
||||
4. No silent architecture drift
|
||||
5. Always work in smallest valid step
|
||||
6. Always verify before moving on
|
||||
7. **New:** Flag when something is genuinely hard vs just engineering work
|
||||
8. **New:** If a phase reveals the approach is wrong, update the PRD before continuing
|
||||
|
||||
@ -1,254 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate realistic demo datasets that actually stress the stack."""
|
||||
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import random
|
||||
import json
|
||||
import urllib.request
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
API = "http://localhost:3100"
|
||||
|
||||
def upload_and_register(name, table):
|
||||
"""Upload Parquet to storage and register in catalog."""
|
||||
path = f"/tmp/{name}.parquet"
|
||||
pq.write_table(table, path, compression="snappy")
|
||||
size = len(open(path, "rb").read())
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = f.read()
|
||||
key = f"datasets/{name}.parquet"
|
||||
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
|
||||
urllib.request.urlopen(req)
|
||||
|
||||
body = json.dumps({
|
||||
"name": name,
|
||||
"schema_fingerprint": "auto",
|
||||
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]
|
||||
}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{API}/catalog/datasets", data=body, method="POST",
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
urllib.request.urlopen(req)
|
||||
print(f" {name}: {table.num_rows:,} rows, {len(data):,} bytes ({len(data)/1024/1024:.1f} MB)")
|
||||
|
||||
# ============================================================
|
||||
# Dataset 1: web_events — 500K rows of web analytics
|
||||
# Shows: DataFusion scanning half a million rows in milliseconds
|
||||
# ============================================================
|
||||
print("Generating web_events (500K rows)...")
|
||||
|
||||
N_EVENTS = 500_000
|
||||
countries = ["US", "UK", "DE", "FR", "JP", "BR", "IN", "AU", "CA", "KR", "MX", "NG", "ZA", "SE", "IT"]
|
||||
pages = ["/", "/pricing", "/docs", "/blog", "/about", "/login", "/signup", "/dashboard", "/settings", "/api",
|
||||
"/docs/getting-started", "/docs/api-reference", "/docs/tutorials", "/blog/rust-performance",
|
||||
"/blog/ai-lakehouse", "/blog/parquet-vs-csv", "/products", "/products/enterprise", "/contact", "/careers"]
|
||||
actions = ["pageview", "pageview", "pageview", "pageview", "click", "click", "scroll", "form_submit", "download", "signup"]
|
||||
browsers = ["Chrome", "Firefox", "Safari", "Edge", "Arc"]
|
||||
devices = ["desktop", "desktop", "desktop", "mobile", "mobile", "tablet"]
|
||||
|
||||
base_time = datetime(2026, 1, 1)
|
||||
random.seed(42)
|
||||
|
||||
timestamps = []
|
||||
user_ids = []
|
||||
page_list = []
|
||||
action_list = []
|
||||
duration_list = []
|
||||
country_list = []
|
||||
browser_list = []
|
||||
device_list = []
|
||||
session_ids = []
|
||||
|
||||
for i in range(N_EVENTS):
|
||||
ts = base_time + timedelta(seconds=random.randint(0, 86400 * 90)) # 90 days
|
||||
timestamps.append(ts.isoformat())
|
||||
user_ids.append(random.randint(1, 50000))
|
||||
page_list.append(random.choice(pages))
|
||||
action_list.append(random.choice(actions))
|
||||
duration_list.append(random.randint(100, 300000)) # ms
|
||||
country_list.append(random.choice(countries))
|
||||
browser_list.append(random.choice(browsers))
|
||||
device_list.append(random.choice(devices))
|
||||
session_ids.append(f"sess_{random.randint(1, 200000):06d}")
|
||||
|
||||
web_events = pa.table({
|
||||
"timestamp": timestamps,
|
||||
"user_id": user_ids,
|
||||
"session_id": session_ids,
|
||||
"page": page_list,
|
||||
"action": action_list,
|
||||
"duration_ms": duration_list,
|
||||
"country": country_list,
|
||||
"browser": browser_list,
|
||||
"device": device_list,
|
||||
})
|
||||
upload_and_register("web_events", web_events)
|
||||
|
||||
# ============================================================
|
||||
# Dataset 2: products — 5K products with real-ish descriptions
|
||||
# Shows: AI can read and understand product data, semantic search
|
||||
# ============================================================
|
||||
print("Generating products (5K rows)...")
|
||||
|
||||
categories = ["SaaS", "API", "Database", "Analytics", "Security", "DevOps", "AI/ML", "Storage", "Networking", "Monitoring"]
|
||||
adjectives = ["Enterprise", "Cloud-Native", "Open-Source", "Serverless", "Real-Time", "Distributed", "Scalable", "Lightweight", "High-Performance", "Self-Hosted"]
|
||||
nouns = ["Platform", "Engine", "Gateway", "Toolkit", "Framework", "Suite", "Service", "Connector", "Pipeline", "Hub"]
|
||||
features = [
|
||||
"with built-in authentication and RBAC",
|
||||
"featuring automatic horizontal scaling",
|
||||
"with zero-config deployment",
|
||||
"supporting 100+ integrations",
|
||||
"with sub-millisecond latency",
|
||||
"featuring end-to-end encryption",
|
||||
"with real-time dashboards",
|
||||
"supporting multi-region replication",
|
||||
"with built-in CI/CD pipelines",
|
||||
"featuring AI-powered anomaly detection",
|
||||
"with comprehensive audit logging",
|
||||
"supporting GraphQL and REST APIs",
|
||||
"with automated backup and recovery",
|
||||
"featuring smart caching layers",
|
||||
"with native Kubernetes support",
|
||||
]
|
||||
|
||||
product_ids = []
|
||||
product_names = []
|
||||
product_categories = []
|
||||
product_prices = []
|
||||
product_descriptions = []
|
||||
product_ratings = []
|
||||
product_reviews_count = []
|
||||
product_created = []
|
||||
|
||||
for i in range(5000):
|
||||
cat = random.choice(categories)
|
||||
adj = random.choice(adjectives)
|
||||
noun = random.choice(nouns)
|
||||
feat = random.choice(features)
|
||||
name = f"{adj} {cat} {noun}"
|
||||
desc = f"{name} — a {cat.lower()} solution {feat}. Built for teams that need reliable {cat.lower()} infrastructure without the complexity."
|
||||
|
||||
product_ids.append(i + 1)
|
||||
product_names.append(name)
|
||||
product_categories.append(cat)
|
||||
product_prices.append(round(random.uniform(9.99, 2999.99), 2))
|
||||
product_descriptions.append(desc)
|
||||
product_ratings.append(round(random.uniform(2.5, 5.0), 1))
|
||||
product_reviews_count.append(random.randint(0, 5000))
|
||||
product_created.append((base_time - timedelta(days=random.randint(0, 730))).strftime("%Y-%m-%d"))
|
||||
|
||||
products = pa.table({
|
||||
"product_id": product_ids,
|
||||
"name": product_names,
|
||||
"category": product_categories,
|
||||
"price": product_prices,
|
||||
"description": product_descriptions,
|
||||
"rating": product_ratings,
|
||||
"review_count": product_reviews_count,
|
||||
"created_date": product_created,
|
||||
})
|
||||
upload_and_register("products", products)
|
||||
|
||||
# ============================================================
|
||||
# Dataset 3: transactions — 200K purchase records
|
||||
# Shows: JOINs across datasets, aggregation at scale
|
||||
# ============================================================
|
||||
print("Generating transactions (200K rows)...")
|
||||
|
||||
N_TXN = 200_000
|
||||
txn_ids = []
|
||||
txn_user_ids = []
|
||||
txn_product_ids = []
|
||||
txn_quantities = []
|
||||
txn_amounts = []
|
||||
txn_timestamps = []
|
||||
txn_statuses = []
|
||||
txn_payment_methods = []
|
||||
|
||||
statuses = ["completed", "completed", "completed", "completed", "pending", "refunded", "failed"]
|
||||
payments = ["credit_card", "credit_card", "credit_card", "debit_card", "paypal", "crypto", "wire_transfer"]
|
||||
|
||||
for i in range(N_TXN):
|
||||
pid = random.randint(1, 5000)
|
||||
qty = random.randint(1, 10)
|
||||
price = product_prices[pid - 1]
|
||||
|
||||
txn_ids.append(f"TXN-{i+1:07d}")
|
||||
txn_user_ids.append(random.randint(1, 50000))
|
||||
txn_product_ids.append(pid)
|
||||
txn_quantities.append(qty)
|
||||
txn_amounts.append(round(price * qty, 2))
|
||||
txn_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
|
||||
txn_statuses.append(random.choice(statuses))
|
||||
txn_payment_methods.append(random.choice(payments))
|
||||
|
||||
transactions = pa.table({
|
||||
"txn_id": txn_ids,
|
||||
"user_id": txn_user_ids,
|
||||
"product_id": txn_product_ids,
|
||||
"quantity": txn_quantities,
|
||||
"amount": txn_amounts,
|
||||
"timestamp": txn_timestamps,
|
||||
"status": txn_statuses,
|
||||
"payment_method": txn_payment_methods,
|
||||
})
|
||||
upload_and_register("transactions", transactions)
|
||||
|
||||
# ============================================================
|
||||
# Dataset 4: server_metrics — 1M rows of infrastructure telemetry
|
||||
# Shows: time-series analytics, the kind of data you'd put in a lakehouse
|
||||
# ============================================================
|
||||
print("Generating server_metrics (1M rows)...")
|
||||
|
||||
N_METRICS = 1_000_000
|
||||
hosts = [f"prod-{i:03d}" for i in range(100)]
|
||||
metrics_names = ["cpu_usage", "memory_usage", "disk_io", "network_in", "network_out", "request_latency", "error_rate", "gc_pause"]
|
||||
regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]
|
||||
|
||||
m_timestamps = []
|
||||
m_hosts = []
|
||||
m_metrics = []
|
||||
m_values = []
|
||||
m_regions = []
|
||||
|
||||
for i in range(N_METRICS):
|
||||
metric = random.choice(metrics_names)
|
||||
if metric == "cpu_usage":
|
||||
val = round(random.gauss(45, 20), 2)
|
||||
elif metric == "memory_usage":
|
||||
val = round(random.gauss(60, 15), 2)
|
||||
elif metric in ("network_in", "network_out"):
|
||||
val = round(random.expovariate(1/1000), 2)
|
||||
elif metric == "request_latency":
|
||||
val = round(random.expovariate(1/50), 2)
|
||||
elif metric == "error_rate":
|
||||
val = round(random.expovariate(1/2), 4)
|
||||
else:
|
||||
val = round(random.uniform(0, 100), 2)
|
||||
|
||||
m_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
|
||||
m_hosts.append(random.choice(hosts))
|
||||
m_metrics.append(metric)
|
||||
m_values.append(max(0, val))
|
||||
m_regions.append(random.choice(regions))
|
||||
|
||||
server_metrics = pa.table({
|
||||
"timestamp": m_timestamps,
|
||||
"host": m_hosts,
|
||||
"metric": m_metrics,
|
||||
"value": m_values,
|
||||
"region": m_regions,
|
||||
})
|
||||
upload_and_register("server_metrics", server_metrics)
|
||||
|
||||
print(f"\nDone — 4 datasets, {N_EVENTS + 5000 + N_TXN + N_METRICS:,} total rows")
|
||||
print("\nDemo queries to try:")
|
||||
print(' "How many page views per country, sorted by volume?"')
|
||||
print(' "What are the top 10 products by total revenue?"')
|
||||
print(' "Show average CPU usage per host in us-east-1"')
|
||||
print(' "Which payment method has the highest failure rate?"')
|
||||
print(' "What are the busiest hours for web traffic?"')
|
||||
@ -1,459 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Realistic staffing company data generator.
|
||||
Multiple source systems, overlapping data, real cross-reference problems.
|
||||
|
||||
Data sources (like a real staffing company):
|
||||
- ATS (Applicant Tracking System) → candidates
|
||||
- CRM → client companies + contacts
|
||||
- Job board → job orders with descriptions
|
||||
- Placements → who got placed where
|
||||
- Timesheets → hours worked, bill/pay rates
|
||||
- Phone system CDR → call detail records
|
||||
- Email logs → communication tracking
|
||||
"""
|
||||
|
||||
import random, json, urllib.request, hashlib, string, time
|
||||
from datetime import datetime, timedelta
|
||||
import pyarrow as pa, pyarrow.parquet as pq
|
||||
|
||||
API = "http://localhost:3100"
|
||||
random.seed(2026)
|
||||
|
||||
def upload(name, table):
|
||||
path = f"/tmp/{name}.parquet"
|
||||
pq.write_table(table, path, compression="snappy")
|
||||
with open(path, "rb") as f:
|
||||
data = f.read()
|
||||
key = f"datasets/{name}.parquet"
|
||||
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
|
||||
urllib.request.urlopen(req)
|
||||
body = json.dumps({"name": name, "schema_fingerprint": "auto",
|
||||
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]}).encode()
|
||||
req = urllib.request.Request(f"{API}/catalog/datasets", data=body, method="POST",
|
||||
headers={"Content-Type": "application/json"})
|
||||
urllib.request.urlopen(req)
|
||||
print(f" {name}: {table.num_rows:,} rows ({len(data)/1024:.0f} KB)")
|
||||
|
||||
# ============================================================
|
||||
# Shared reference data
|
||||
# ============================================================
|
||||
|
||||
first_names = ["James","Mary","Robert","Patricia","John","Jennifer","Michael","Linda","David","Elizabeth",
|
||||
"William","Barbara","Richard","Susan","Joseph","Jessica","Thomas","Sarah","Christopher","Karen",
|
||||
"Charles","Lisa","Daniel","Nancy","Matthew","Betty","Anthony","Margaret","Mark","Sandra",
|
||||
"Donald","Ashley","Steven","Dorothy","Paul","Kimberly","Andrew","Emily","Joshua","Donna",
|
||||
"Kenneth","Michelle","Kevin","Carol","Brian","Amanda","George","Melissa","Timothy","Deborah",
|
||||
"Ronald","Stephanie","Edward","Rebecca","Jason","Sharon","Jeffrey","Laura","Ryan","Cynthia",
|
||||
"Jacob","Kathleen","Gary","Amy","Nicholas","Angela","Eric","Shirley","Jonathan","Anna",
|
||||
"Stephen","Brenda","Larry","Pamela","Justin","Emma","Scott","Nicole","Brandon","Helen",
|
||||
"Benjamin","Samantha","Samuel","Katherine","Raymond","Christine","Gregory","Debra","Frank","Rachel",
|
||||
"Alexander","Carolyn","Patrick","Janet","Jack","Catherine","Dennis","Maria","Jerry","Heather"]
|
||||
|
||||
last_names = ["Smith","Johnson","Williams","Brown","Jones","Garcia","Miller","Davis","Rodriguez","Martinez",
|
||||
"Hernandez","Lopez","Gonzalez","Wilson","Anderson","Thomas","Taylor","Moore","Jackson","Martin",
|
||||
"Lee","Perez","Thompson","White","Harris","Sanchez","Clark","Ramirez","Lewis","Robinson",
|
||||
"Walker","Young","Allen","King","Wright","Scott","Torres","Nguyen","Hill","Flores",
|
||||
"Green","Adams","Nelson","Baker","Hall","Rivera","Campbell","Mitchell","Carter","Roberts",
|
||||
"Gomez","Phillips","Evans","Turner","Diaz","Parker","Cruz","Edwards","Collins","Reyes",
|
||||
"Stewart","Morris","Morales","Murphy","Cook","Rogers","Gutierrez","Ortiz","Morgan","Cooper",
|
||||
"Peterson","Bailey","Reed","Kelly","Howard","Ramos","Kim","Cox","Ward","Richardson"]
|
||||
|
||||
cities_zips = [
|
||||
("Chicago","IL","60601"),("Chicago","IL","60602"),("Chicago","IL","60603"),("Chicago","IL","60610"),
|
||||
("Chicago","IL","60614"),("Chicago","IL","60616"),("Chicago","IL","60622"),("Chicago","IL","60647"),
|
||||
("New York","NY","10001"),("New York","NY","10002"),("New York","NY","10003"),("New York","NY","10010"),
|
||||
("New York","NY","10016"),("New York","NY","10019"),("New York","NY","10022"),("New York","NY","10036"),
|
||||
("Los Angeles","CA","90001"),("Los Angeles","CA","90012"),("Los Angeles","CA","90024"),("Los Angeles","CA","90036"),
|
||||
("Houston","TX","77001"),("Houston","TX","77002"),("Houston","TX","77003"),("Houston","TX","77019"),
|
||||
("Dallas","TX","75201"),("Dallas","TX","75202"),("Dallas","TX","75204"),("Dallas","TX","75219"),
|
||||
("Atlanta","GA","30301"),("Atlanta","GA","30303"),("Atlanta","GA","30305"),("Atlanta","GA","30309"),
|
||||
("Denver","CO","80201"),("Denver","CO","80202"),("Denver","CO","80204"),("Denver","CO","80206"),
|
||||
("Phoenix","AZ","85001"),("Phoenix","AZ","85003"),("Phoenix","AZ","85004"),("Phoenix","AZ","85006"),
|
||||
("Seattle","WA","98101"),("Seattle","WA","98102"),("Seattle","WA","98103"),("Seattle","WA","98104"),
|
||||
("Miami","FL","33101"),("Miami","FL","33125"),("Miami","FL","33130"),("Miami","FL","33132"),
|
||||
]
|
||||
|
||||
skills_pool = {
|
||||
"IT": ["Java","Python","C#",".NET","JavaScript","TypeScript","React","Angular","Node.js","SQL",
|
||||
"AWS","Azure","GCP","Docker","Kubernetes","Linux","Git","REST APIs","GraphQL","MongoDB",
|
||||
"PostgreSQL","MySQL","Redis","Terraform","Jenkins","CI/CD","Agile","Scrum","DevOps","Microservices",
|
||||
"Spring Boot","Django","Flask","Ruby on Rails","Go","Rust","Swift","Kotlin","PHP","Vue.js"],
|
||||
"Healthcare": ["RN","LPN","CNA","BLS","ACLS","PALS","EMR","Epic","Cerner","Meditech",
|
||||
"ICD-10","CPT","Medical Billing","Medical Coding","HIPAA","Phlebotomy","IV Therapy",
|
||||
"Telemetry","ICU","OR","ER","Med-Surg","Labor & Delivery","Pediatrics","Oncology"],
|
||||
"Industrial": ["Forklift","OSHA 10","OSHA 30","Welding","MIG","TIG","CNC","PLC","Blueprint Reading",
|
||||
"Quality Control","Six Sigma","Lean Manufacturing","AutoCAD","SolidWorks","GD&T",
|
||||
"Mechanical Assembly","Electrical","Hydraulics","Pneumatics","Warehouse"],
|
||||
"Accounting": ["QuickBooks","SAP","Oracle","Accounts Payable","Accounts Receivable","General Ledger",
|
||||
"Financial Reporting","Tax Preparation","CPA","Payroll","Budgeting","Forecasting",
|
||||
"Audit","Compliance","Excel Advanced","Power BI","Tableau","GAAP","SOX"],
|
||||
"Admin": ["Microsoft Office","Data Entry","Customer Service","Scheduling","Filing","Receptionist",
|
||||
"Executive Assistant","Travel Coordination","Calendar Management","SAP","Salesforce",
|
||||
"CRM","Multi-line Phone","Typing 60+ WPM","Notary","Bilingual Spanish"],
|
||||
}
|
||||
|
||||
verticals = list(skills_pool.keys())
|
||||
email_domains = ["gmail.com","yahoo.com","hotmail.com","outlook.com","aol.com","icloud.com","protonmail.com"]
|
||||
|
||||
def make_phone():
|
||||
return f"({random.randint(200,999)}) {random.randint(200,999)}-{random.randint(1000,9999)}"
|
||||
|
||||
def make_email(first, last):
|
||||
sep = random.choice([".", "_", ""])
|
||||
num = random.choice(["", str(random.randint(1,99))])
|
||||
return f"{first.lower()}{sep}{last.lower()}{num}@{random.choice(email_domains)}"
|
||||
|
||||
base_date = datetime(2026, 1, 1)
|
||||
|
||||
# ============================================================
|
||||
# 1. CANDIDATES — 15,000 from ATS
|
||||
# ============================================================
|
||||
print("Generating candidates (15K)...")
|
||||
|
||||
N_CAND = 15000
|
||||
c_ids, c_first, c_last, c_emails, c_phones, c_phones_alt = [], [], [], [], [], []
|
||||
c_city, c_state, c_zip = [], [], []
|
||||
c_vertical, c_skills, c_resume_summary = [], [], []
|
||||
c_status, c_source, c_pay_rate_min, c_created = [], [], [], []
|
||||
c_availability, c_years_exp = [], []
|
||||
|
||||
for i in range(N_CAND):
|
||||
fn = random.choice(first_names)
|
||||
ln = random.choice(last_names)
|
||||
city, state, zipcode = random.choice(cities_zips)
|
||||
vert = random.choice(verticals)
|
||||
n_skills = random.randint(3, 12)
|
||||
sk = random.sample(skills_pool[vert], min(n_skills, len(skills_pool[vert])))
|
||||
yrs = random.randint(0, 25)
|
||||
|
||||
resume = f"{fn} {ln} — {vert} professional with {yrs} years experience. "
|
||||
resume += f"Based in {city}, {state} {zipcode}. "
|
||||
resume += f"Key skills: {', '.join(sk)}. "
|
||||
resume += random.choice([
|
||||
f"Previously worked at {random.choice(['Acme Corp','TechFlow','GlobalStaff','MedPro','BuildRight'])} as a {random.choice(['Senior','Lead','Staff','Junior'])} {vert} specialist.",
|
||||
f"Seeking {random.choice(['contract','full-time','temp-to-hire'])} opportunities in the {city} metro area.",
|
||||
f"Available {random.choice(['immediately','in 2 weeks','after current contract ends'])}. Open to {random.choice(['remote','hybrid','on-site'])} work.",
|
||||
])
|
||||
|
||||
c_ids.append(f"CAND-{i+1:05d}")
|
||||
c_first.append(fn)
|
||||
c_last.append(ln)
|
||||
c_emails.append(make_email(fn, ln))
|
||||
c_phones.append(make_phone())
|
||||
c_phones_alt.append(make_phone() if random.random() < 0.3 else "")
|
||||
c_city.append(city)
|
||||
c_state.append(state)
|
||||
c_zip.append(zipcode)
|
||||
c_vertical.append(vert)
|
||||
c_skills.append("|".join(sk))
|
||||
c_resume_summary.append(resume)
|
||||
c_status.append(random.choice(["active","active","active","active","inactive","do_not_contact","placed"]))
|
||||
c_source.append(random.choice(["Indeed","LinkedIn","Referral","Walk-in","Monster","CareerBuilder","Website","Job Fair"]))
|
||||
c_pay_rate_min.append(round(random.uniform(12, 85), 2))
|
||||
c_created.append((base_date - timedelta(days=random.randint(0, 1095))).strftime("%Y-%m-%d"))
|
||||
c_availability.append(random.choice(["immediate","1_week","2_weeks","1_month","not_available"]))
|
||||
c_years_exp.append(yrs)
|
||||
|
||||
candidates = pa.table({
|
||||
"candidate_id": c_ids, "first_name": c_first, "last_name": c_last,
|
||||
"email": c_emails, "phone": c_phones, "phone_alt": c_phones_alt,
|
||||
"city": c_city, "state": c_state, "zip": c_zip,
|
||||
"vertical": c_vertical, "skills": c_skills, "resume_summary": c_resume_summary,
|
||||
"status": c_status, "source": c_source, "min_pay_rate": c_pay_rate_min,
|
||||
"created_date": c_created, "availability": c_availability, "years_experience": c_years_exp,
|
||||
})
|
||||
upload("candidates", candidates)
|
||||
|
||||
# ============================================================
|
||||
# 2. CLIENTS — 500 companies
|
||||
# ============================================================
|
||||
print("Generating clients (500)...")
|
||||
|
||||
company_prefixes = ["Apex","Summit","Core","First","National","Metro","Pacific","Atlantic","Central","Premier",
|
||||
"Global","United","Alliance","Pinnacle","Elite","Horizon","Pioneer","Titan","Quantum","Vertex"]
|
||||
company_suffixes = ["Industries","Solutions","Systems","Group","Corp","Technologies","Services","Partners",
|
||||
"Holdings","Enterprises","Manufacturing","Healthcare","Logistics","Financial","Engineering"]
|
||||
|
||||
cl_ids, cl_names, cl_verticals, cl_contacts, cl_contact_emails, cl_contact_phones = [], [], [], [], [], []
|
||||
cl_city, cl_state, cl_zip, cl_bill_rate_avg, cl_status, cl_since = [], [], [], [], [], []
|
||||
|
||||
for i in range(500):
|
||||
name = f"{random.choice(company_prefixes)} {random.choice(company_suffixes)}"
|
||||
city, state, zipcode = random.choice(cities_zips)
|
||||
vert = random.choice(verticals)
|
||||
contact_fn = random.choice(first_names)
|
||||
contact_ln = random.choice(last_names)
|
||||
|
||||
cl_ids.append(f"CLI-{i+1:04d}")
|
||||
cl_names.append(name)
|
||||
cl_verticals.append(vert)
|
||||
cl_contacts.append(f"{contact_fn} {contact_ln}")
|
||||
cl_contact_emails.append(f"{contact_fn.lower()}.{contact_ln.lower()}@{name.lower().replace(' ','')}.com")
|
||||
cl_contact_phones.append(make_phone())
|
||||
cl_city.append(city)
|
||||
cl_state.append(state)
|
||||
cl_zip.append(zipcode)
|
||||
cl_bill_rate_avg.append(round(random.uniform(25, 150), 2))
|
||||
cl_status.append(random.choice(["active","active","active","inactive","prospect"]))
|
||||
cl_since.append((base_date - timedelta(days=random.randint(30, 2000))).strftime("%Y-%m-%d"))
|
||||
|
||||
clients = pa.table({
|
||||
"client_id": cl_ids, "company_name": cl_names, "vertical": cl_verticals,
|
||||
"contact_name": cl_contacts, "contact_email": cl_contact_emails, "contact_phone": cl_contact_phones,
|
||||
"city": cl_city, "state": cl_state, "zip": cl_zip,
|
||||
"avg_bill_rate": cl_bill_rate_avg, "status": cl_status, "client_since": cl_since,
|
||||
})
|
||||
upload("clients", clients)
|
||||
|
||||
# ============================================================
|
||||
# 3. JOB ORDERS — 3,000 open/filled/closed
|
||||
# ============================================================
|
||||
print("Generating job_orders (3K)...")
|
||||
|
||||
titles = {
|
||||
"IT": ["Software Developer","Java Developer",".NET Developer","DevOps Engineer","Data Analyst",
|
||||
"QA Engineer","Systems Admin","Help Desk","Network Engineer","Cloud Architect",
|
||||
"Full Stack Developer","Python Developer","React Developer","DBA","Security Analyst"],
|
||||
"Healthcare": ["Registered Nurse","LPN","CNA","Medical Assistant","Phlebotomist",
|
||||
"Radiology Tech","Pharmacy Tech","Medical Coder","Billing Specialist","Case Manager"],
|
||||
"Industrial": ["Forklift Operator","Welder","CNC Machinist","Quality Inspector","Maintenance Tech",
|
||||
"Electrician","Warehouse Associate","Assembly Technician","Production Supervisor","Shipping Clerk"],
|
||||
"Accounting": ["Staff Accountant","AP Specialist","AR Specialist","Payroll Clerk","Tax Preparer",
|
||||
"Financial Analyst","Bookkeeper","Audit Associate","Controller","Cost Accountant"],
|
||||
"Admin": ["Administrative Assistant","Executive Assistant","Receptionist","Data Entry Clerk","Office Manager",
|
||||
"Customer Service Rep","HR Coordinator","Legal Secretary","Office Coordinator","Scheduler"],
|
||||
}
|
||||
|
||||
jo_ids, jo_client_ids, jo_titles, jo_verticals, jo_descriptions = [], [], [], [], []
|
||||
jo_city, jo_state, jo_zip = [], [], []
|
||||
jo_bill_rate, jo_pay_rate, jo_status, jo_openings, jo_created = [], [], [], [], []
|
||||
jo_work_type, jo_duration = [], []
|
||||
|
||||
for i in range(3000):
|
||||
vert = random.choice(verticals)
|
||||
title = random.choice(titles[vert])
|
||||
ci = random.randint(0, 499)
|
||||
city, state, zipcode = random.choice(cities_zips)
|
||||
bill = round(random.uniform(25, 150), 2)
|
||||
pay = round(bill * random.uniform(0.55, 0.75), 2)
|
||||
|
||||
req_skills = random.sample(skills_pool[vert], min(random.randint(3, 6), len(skills_pool[vert])))
|
||||
desc = f"{title} needed for {cl_names[ci]} in {city}, {state}. "
|
||||
desc += f"Requirements: {', '.join(req_skills)}. "
|
||||
desc += f"{random.randint(1,10)}+ years experience preferred. "
|
||||
desc += f"Bill rate: ${bill}/hr. "
|
||||
desc += random.choice([
|
||||
"Background check required.",
|
||||
"Drug screen required.",
|
||||
"Must have reliable transportation.",
|
||||
"Steel-toe boots required on site.",
|
||||
"Remote work available.",
|
||||
"Hybrid schedule: 3 days on-site.",
|
||||
])
|
||||
|
||||
jo_ids.append(f"JO-{i+1:05d}")
|
||||
jo_client_ids.append(cl_ids[ci])
|
||||
jo_titles.append(title)
|
||||
jo_verticals.append(vert)
|
||||
jo_descriptions.append(desc)
|
||||
jo_city.append(city)
|
||||
jo_state.append(state)
|
||||
jo_zip.append(zipcode)
|
||||
jo_bill_rate.append(bill)
|
||||
jo_pay_rate.append(pay)
|
||||
jo_status.append(random.choice(["open","open","open","filled","filled","closed","on_hold"]))
|
||||
jo_openings.append(random.randint(1, 5))
|
||||
jo_created.append((base_date - timedelta(days=random.randint(0, 365))).strftime("%Y-%m-%d"))
|
||||
jo_work_type.append(random.choice(["contract","temp_to_hire","direct_hire","contract"]))
|
||||
jo_duration.append(random.choice(["3 months","6 months","12 months","ongoing","project-based"]))
|
||||
|
||||
job_orders = pa.table({
|
||||
"job_order_id": jo_ids, "client_id": jo_client_ids, "title": jo_titles,
|
||||
"vertical": jo_verticals, "description": jo_descriptions,
|
||||
"city": jo_city, "state": jo_state, "zip": jo_zip,
|
||||
"bill_rate": jo_bill_rate, "pay_rate": jo_pay_rate, "status": jo_status,
|
||||
"openings": jo_openings, "created_date": jo_created,
|
||||
"work_type": jo_work_type, "duration": jo_duration,
|
||||
})
|
||||
upload("job_orders", job_orders)
|
||||
|
||||
# ============================================================
|
||||
# 4. PLACEMENTS — 8,000 candidate-job matches
|
||||
# ============================================================
|
||||
print("Generating placements (8K)...")
|
||||
|
||||
p_ids, p_cand_ids, p_job_ids, p_client_ids = [], [], [], []
|
||||
p_start, p_end, p_status, p_bill, p_pay, p_recruiter = [], [], [], [], [], []
|
||||
|
||||
recruiters = [f"{random.choice(first_names)} {random.choice(last_names)}" for _ in range(30)]
|
||||
|
||||
for i in range(8000):
|
||||
ci = random.randint(0, N_CAND - 1)
|
||||
ji = random.randint(0, 2999)
|
||||
start = base_date - timedelta(days=random.randint(0, 730))
|
||||
end = start + timedelta(days=random.randint(30, 365))
|
||||
|
||||
p_ids.append(f"PL-{i+1:05d}")
|
||||
p_cand_ids.append(c_ids[ci])
|
||||
p_job_ids.append(jo_ids[ji])
|
||||
p_client_ids.append(jo_client_ids[ji])
|
||||
p_start.append(start.strftime("%Y-%m-%d"))
|
||||
p_end.append(end.strftime("%Y-%m-%d") if random.random() < 0.7 else "")
|
||||
p_status.append(random.choice(["active","active","completed","completed","terminated","no_show"]))
|
||||
p_bill.append(jo_bill_rate[ji])
|
||||
p_pay.append(jo_pay_rate[ji])
|
||||
p_recruiter.append(random.choice(recruiters))
|
||||
|
||||
placements = pa.table({
|
||||
"placement_id": p_ids, "candidate_id": p_cand_ids, "job_order_id": p_job_ids,
|
||||
"client_id": p_client_ids, "start_date": p_start, "end_date": p_end,
|
||||
"status": p_status, "bill_rate": p_bill, "pay_rate": p_pay, "recruiter": p_recruiter,
|
||||
})
|
||||
upload("placements", placements)
|
||||
|
||||
# ============================================================
|
||||
# 5. TIMESHEETS — 120K weekly entries
|
||||
# ============================================================
|
||||
print("Generating timesheets (120K)...")
|
||||
|
||||
ts_ids, ts_placement_ids, ts_cand_ids, ts_client_ids = [], [], [], []
|
||||
ts_week_ending, ts_hours_reg, ts_hours_ot, ts_bill_total, ts_pay_total = [], [], [], [], []
|
||||
ts_approved, ts_approved_by = [], []
|
||||
|
||||
for i in range(120000):
|
||||
pi = random.randint(0, 7999)
|
||||
hrs_reg = round(random.choice([40, 40, 40, 32, 24, 20, 8]), 1)
|
||||
hrs_ot = round(random.choice([0, 0, 0, 0, 4, 8, 12, 16]), 1)
|
||||
bill = p_bill[pi]
|
||||
pay = p_pay[pi]
|
||||
|
||||
ts_ids.append(f"TS-{i+1:06d}")
|
||||
ts_placement_ids.append(p_ids[pi])
|
||||
ts_cand_ids.append(p_cand_ids[pi])
|
||||
ts_client_ids.append(p_client_ids[pi])
|
||||
ts_week_ending.append((base_date - timedelta(weeks=random.randint(0, 104))).strftime("%Y-%m-%d"))
|
||||
ts_hours_reg.append(hrs_reg)
|
||||
ts_hours_ot.append(hrs_ot)
|
||||
ts_bill_total.append(round(hrs_reg * bill + hrs_ot * bill * 1.5, 2))
|
||||
ts_pay_total.append(round(hrs_reg * pay + hrs_ot * pay * 1.5, 2))
|
||||
ts_approved.append(random.choice([True, True, True, True, False]))
|
||||
ts_approved_by.append(random.choice(cl_contacts) if ts_approved[-1] else "")
|
||||
|
||||
timesheets = pa.table({
|
||||
"timesheet_id": ts_ids, "placement_id": ts_placement_ids,
|
||||
"candidate_id": ts_cand_ids, "client_id": ts_client_ids,
|
||||
"week_ending": ts_week_ending, "hours_regular": ts_hours_reg, "hours_overtime": ts_hours_ot,
|
||||
"bill_total": ts_bill_total, "pay_total": ts_pay_total,
|
||||
"approved": ts_approved, "approved_by": ts_approved_by,
|
||||
})
|
||||
upload("timesheets", timesheets)
|
||||
|
||||
# ============================================================
|
||||
# 6. CALL LOG — 80K phone records (CDR)
|
||||
# ============================================================
|
||||
print("Generating call_log (80K)...")
|
||||
|
||||
call_ids, call_from, call_to, call_direction = [], [], [], []
|
||||
call_duration, call_timestamp, call_recruiter, call_cand_id, call_disposition = [], [], [], [], []
|
||||
|
||||
dispositions = ["connected","voicemail","no_answer","busy","wrong_number","callback_scheduled","declined"]
|
||||
|
||||
for i in range(80000):
|
||||
ci = random.randint(0, N_CAND - 1)
|
||||
rec = random.choice(recruiters)
|
||||
direction = random.choice(["outbound","outbound","outbound","inbound"])
|
||||
|
||||
call_ids.append(f"CALL-{i+1:06d}")
|
||||
if direction == "outbound":
|
||||
call_from.append(make_phone()) # recruiter's line
|
||||
call_to.append(c_phones[ci])
|
||||
else:
|
||||
call_from.append(c_phones[ci])
|
||||
call_to.append(make_phone())
|
||||
call_direction.append(direction)
|
||||
call_duration.append(random.randint(0, 1800))
|
||||
call_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat())
|
||||
call_recruiter.append(rec)
|
||||
call_cand_id.append(c_ids[ci])
|
||||
call_disposition.append(random.choice(dispositions))
|
||||
|
||||
call_log = pa.table({
|
||||
"call_id": call_ids, "from_number": call_from, "to_number": call_to,
|
||||
"direction": call_direction, "duration_seconds": call_duration,
|
||||
"timestamp": call_timestamp, "recruiter": call_recruiter,
|
||||
"candidate_id": call_cand_id, "disposition": call_disposition,
|
||||
})
|
||||
upload("call_log", call_log)
|
||||
|
||||
# ============================================================
|
||||
# 7. EMAIL LOG — 60K email records
|
||||
# ============================================================
|
||||
print("Generating email_log (60K)...")
|
||||
|
||||
em_ids, em_from, em_to, em_subject, em_timestamp = [], [], [], [], []
|
||||
em_recruiter, em_cand_id, em_direction, em_opened = [], [], [], []
|
||||
|
||||
subjects = [
|
||||
"New job opportunity — {title} in {city}",
|
||||
"Following up on your application",
|
||||
"Interview scheduled — {title}",
|
||||
"Timesheet reminder for week ending {date}",
|
||||
"Your background check is complete",
|
||||
"New assignment details — {client}",
|
||||
"Pay rate update for your current assignment",
|
||||
"Re: Availability for {title} position",
|
||||
"Welcome to {client} — your first day info",
|
||||
"Reference check request",
|
||||
]
|
||||
|
||||
for i in range(60000):
|
||||
ci = random.randint(0, N_CAND - 1)
|
||||
ji = random.randint(0, 2999)
|
||||
rec = random.choice(recruiters)
|
||||
direction = random.choice(["outbound","outbound","outbound","inbound"])
|
||||
subj = random.choice(subjects).format(
|
||||
title=jo_titles[ji], city=jo_city[ji], date="2026-01-05", client=cl_names[random.randint(0,499)]
|
||||
)
|
||||
|
||||
em_ids.append(f"EM-{i+1:06d}")
|
||||
if direction == "outbound":
|
||||
em_from.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com")
|
||||
em_to.append(c_emails[ci])
|
||||
else:
|
||||
em_from.append(c_emails[ci])
|
||||
em_to.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com")
|
||||
em_subject.append(subj)
|
||||
em_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat())
|
||||
em_recruiter.append(rec)
|
||||
em_cand_id.append(c_ids[ci])
|
||||
em_direction.append(direction)
|
||||
em_opened.append(random.random() < 0.6 if direction == "outbound" else True)
|
||||
|
||||
email_log = pa.table({
|
||||
"email_id": em_ids, "from_addr": em_from, "to_addr": em_to,
|
||||
"subject": em_subject, "timestamp": em_timestamp,
|
||||
"recruiter": em_recruiter, "candidate_id": em_cand_id,
|
||||
"direction": em_direction, "opened": em_opened,
|
||||
})
|
||||
upload("email_log", email_log)
|
||||
|
||||
# ============================================================
|
||||
total = sum([candidates.num_rows, clients.num_rows, job_orders.num_rows,
|
||||
placements.num_rows, timesheets.num_rows, call_log.num_rows, email_log.num_rows])
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Staffing company data loaded: {total:,} total rows across 7 tables")
|
||||
print(f"{'='*60}")
|
||||
print(f"""
|
||||
Cross-reference queries to try:
|
||||
"Find all Java developers in Chicago who are available immediately"
|
||||
"Which recruiter has the most placements this year?"
|
||||
"Show me the total revenue by client for Q1 2026"
|
||||
"Find candidates who were called more than 5 times but never placed"
|
||||
"What's the average bill rate for .NET developers in New York?"
|
||||
"Which clients have the highest overtime hours?"
|
||||
"Show candidates in zip 60601 with Healthcare skills"
|
||||
"Find the spread (bill - pay) by vertical"
|
||||
"Which candidates have worked for multiple different clients?"
|
||||
"Show email open rates by recruiter"
|
||||
""")
|
||||
Loading…
x
Reference in New Issue
Block a user