Compare commits

..

2 Commits

Author SHA1 Message Date
root
bb05c4412e Phase 6: Ingest pipeline — CSV, JSON, PDF, text file support
- ingestd crate: detect file type → parse → schema detection → Parquet → catalog
- CSV: auto-detect column types (int, float, bool, string), handles $, %, commas
  Strips dollar signs from amounts, flexible row parsing, sanitized column names
- JSON: array or newline-delimited, nested object flattening (a.b.c → a_b_c)
- PDF: text extraction via lopdf, one row per page (source_file, page_number, text)
- Text/SMS: line-based ingestion with line numbers
- Dedup: SHA-256 content hash, re-ingest same file = no-op
- Gateway: POST /ingest/file multipart upload, 256MB body limit
- Schema detection per ADR-010: ambiguous types default to String
- 12 unit tests passing (CSV parsing, JSON flattening, type inference, dedup)
- Tested: messy CSV with missing data, dollar amounts, N/A values → queryable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 08:07:31 -05:00
root
6740a017c7 PRD v2: production roadmap with ingest, vector search, hot cache phases
- Phase 6: Ingest pipeline (CSV/JSON → schema detect → Parquet → catalog)
- Phase 7: Vector index + RAG (embed → HNSW → semantic search → LLM answer)
- Phase 8: Hot cache + incremental updates (MemTable, delta files, merge-on-read)
- ADR-008 through ADR-011: embeddings as Parquet, delta files not Delta Lake,
  schema defaults to string, not a CRM replacement
- Staffing company reference dataset (286K rows, 7 tables)
- Honest risk assessment: vector search at scale and incremental updates are hard

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 07:54:24 -05:00
28 changed files with 1901 additions and 157 deletions

180
Cargo.lock generated
View File

@ -8,6 +8,17 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "ahash"
version = "0.8.12"
@ -508,6 +519,15 @@ dependencies = [
"generic-array",
]
[[package]]
name = "block-padding"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
dependencies = [
"generic-array",
]
[[package]]
name = "brotli"
version = "8.0.2"
@ -535,6 +555,12 @@ version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "byteorder"
version = "1.5.0"
@ -588,6 +614,15 @@ dependencies = [
"uuid",
]
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
dependencies = [
"cipher",
]
[[package]]
name = "cc"
version = "1.2.58"
@ -690,6 +725,16 @@ dependencies = [
"half",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "combine"
version = "4.6.7"
@ -894,6 +939,25 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -2311,6 +2375,7 @@ dependencies = [
"aibridge",
"axum",
"catalogd",
"ingestd",
"object_store",
"opentelemetry",
"opentelemetry-stdout",
@ -2820,6 +2885,38 @@ dependencies = [
"cfb",
]
[[package]]
name = "ingestd"
version = "0.1.0"
dependencies = [
"arrow",
"axum",
"bytes",
"catalogd",
"chrono",
"csv",
"lopdf",
"object_store",
"parquet",
"serde",
"serde_json",
"sha2",
"shared",
"storaged",
"tokio",
"tracing",
]
[[package]]
name = "inout"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01"
dependencies = [
"block-padding",
"generic-array",
]
[[package]]
name = "integer-encoding"
version = "3.0.4"
@ -3075,6 +3172,30 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3bd0dd2cd90571056fdb71f6275fada10131182f84899f4b2a916e565d81d86"
[[package]]
name = "lopdf"
version = "0.35.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7c1d3350d071cb86987a6bcb205c7019a0eb70dcad92b454fec722cca8d68b"
dependencies = [
"aes",
"cbc",
"chrono",
"encoding_rs",
"flate2",
"indexmap",
"itoa",
"log",
"md-5",
"nom",
"nom_locate",
"rangemap",
"rayon",
"thiserror 2.0.18",
"time",
"weezl",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
@ -3217,6 +3338,12 @@ dependencies = [
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.8.9"
@ -3291,6 +3418,27 @@ dependencies = [
"jni-sys 0.3.1",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "nom_locate"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e3c83c053b0713da60c5b8de47fe8e494fe3ece5267b2f23090a07a053ba8f3"
dependencies = [
"bytecount",
"memchr",
"nom",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@ -3971,12 +4119,38 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rangemap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68"
[[package]]
name = "raw-window-handle"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539"
[[package]]
name = "rayon"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
]
[[package]]
name = "recursive"
version = "0.1.1"
@ -5439,6 +5613,12 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "weezl"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88"
[[package]]
name = "winapi-util"
version = "0.1.11"

View File

@ -7,6 +7,7 @@ members = [
"crates/catalogd",
"crates/queryd",
"crates/aibridge",
"crates/ingestd",
"crates/gateway",
"crates/ui",
]
@ -38,3 +39,6 @@ opentelemetry_sdk = { version = "0.28", features = ["rt-tokio"] }
opentelemetry-stdout = { version = "0.28", features = ["trace"] }
tracing-opentelemetry = "0.29"
toml = "0.8"
csv = "1"
lopdf = "0.35"
encoding_rs = "0.8"

View File

@ -9,6 +9,7 @@ storaged = { path = "../storaged" }
catalogd = { path = "../catalogd" }
queryd = { path = "../queryd" }
aibridge = { path = "../aibridge" }
ingestd = { path = "../ingestd" }
tokio = { workspace = true }
axum = { workspace = true }
serde = { workspace = true }

View File

@ -1,7 +1,7 @@
mod auth;
mod observability;
use axum::{Router, routing::get};
use axum::{Router, extract::DefaultBodyLimit, routing::get};
use proto::lakehouse::catalog_service_server::CatalogServiceServer;
use shared::config::Config;
use tower_http::cors::{Any, CorsLayer};
@ -39,10 +39,14 @@ async fn main() {
// HTTP router
let mut app = Router::new()
.route("/health", get(health))
.nest("/storage", storaged::service::router(store))
.nest("/storage", storaged::service::router(store.clone()))
.nest("/catalog", catalogd::service::router(registry.clone()))
.nest("/query", queryd::service::router(engine))
.nest("/ai", aibridge::service::router(ai_client));
.nest("/ai", aibridge::service::router(ai_client))
.nest("/ingest", ingestd::service::router(ingestd::service::IngestState {
store: store.clone(),
registry: registry.clone(),
}));
// Auth middleware (if enabled)
if config.auth.enabled {
@ -58,6 +62,7 @@ async fn main() {
}
app = app
.layer(DefaultBodyLimit::max(256 * 1024 * 1024)) // 256MB
.layer(CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)

22
crates/ingestd/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "ingestd"
version = "0.1.0"
edition = "2024"
[dependencies]
shared = { path = "../shared" }
storaged = { path = "../storaged" }
catalogd = { path = "../catalogd" }
tokio = { workspace = true }
axum = { workspace = true, features = ["multipart"] }
lopdf = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
arrow = { workspace = true }
parquet = { workspace = true }
bytes = { workspace = true }
sha2 = { workspace = true }
csv = { workspace = true }
chrono = { workspace = true }
object_store = { workspace = true }

View File

@ -0,0 +1,203 @@
use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
/// Inferred column type from sampling data.
#[derive(Debug, Clone, PartialEq)]
enum InferredType {
Integer,
Float,
Boolean,
String,
}
/// Parse CSV bytes into Arrow RecordBatches with automatic schema detection.
/// Per ADR-010: ambiguous types default to String.
pub fn parse_csv(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let mut reader = csv::ReaderBuilder::new()
.flexible(true) // allow varying column counts
.trim(csv::Trim::All)
.from_reader(content);
let headers: Vec<String> = reader.headers()
.map_err(|e| format!("CSV header error: {e}"))?
.iter()
.enumerate()
.map(|(i, h)| {
let h = h.trim().to_string();
if h.is_empty() { format!("column_{i}") } else { sanitize_column_name(&h) }
})
.collect();
let n_cols = headers.len();
if n_cols == 0 {
return Err("CSV has no columns".into());
}
// Read all rows into string columns
let mut columns: Vec<Vec<String>> = vec![vec![]; n_cols];
let mut row_count = 0;
for result in reader.records() {
let record = result.map_err(|e| format!("CSV row error: {e}"))?;
for (i, field) in record.iter().enumerate() {
if i < n_cols {
columns[i].push(field.trim().to_string());
}
}
// Pad short rows with empty strings
for col in columns.iter_mut().skip(record.len().min(n_cols)) {
col.push(String::new());
}
row_count += 1;
}
if row_count == 0 {
return Err("CSV has no data rows".into());
}
tracing::info!("parsed CSV: {row_count} rows × {n_cols} columns");
// Infer types by sampling (look at all values)
let types: Vec<InferredType> = columns.iter().map(|col| infer_column_type(col)).collect();
// Build Arrow schema
let fields: Vec<Field> = headers.iter().zip(types.iter()).map(|(name, typ)| {
let dt = match typ {
InferredType::Integer => DataType::Int64,
InferredType::Float => DataType::Float64,
InferredType::Boolean => DataType::Boolean,
InferredType::String => DataType::Utf8,
};
Field::new(name, dt, true) // all nullable
}).collect();
let schema = Arc::new(Schema::new(fields));
// Build arrays
let arrays: Vec<ArrayRef> = columns.iter().zip(types.iter()).map(|(col, typ)| {
match typ {
InferredType::Integer => {
let vals: Vec<Option<i64>> = col.iter().map(|v| {
if v.is_empty() { None } else { v.replace(',', "").parse().ok() }
}).collect();
Arc::new(Int64Array::from(vals)) as ArrayRef
}
InferredType::Float => {
let vals: Vec<Option<f64>> = col.iter().map(|v| {
if v.is_empty() { None }
else { v.replace(',', "").replace('$', "").replace('%', "").parse().ok() }
}).collect();
Arc::new(Float64Array::from(vals)) as ArrayRef
}
InferredType::Boolean => {
let vals: Vec<Option<bool>> = col.iter().map(|v| {
match v.to_lowercase().as_str() {
"true" | "yes" | "1" | "y" | "t" => Some(true),
"false" | "no" | "0" | "n" | "f" => Some(false),
_ => None,
}
}).collect();
Arc::new(BooleanArray::from(vals)) as ArrayRef
}
InferredType::String => {
Arc::new(StringArray::from(col.clone())) as ArrayRef
}
}
}).collect();
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
/// Infer column type from values. Conservative: defaults to String on ambiguity.
fn infer_column_type(values: &[String]) -> InferredType {
let non_empty: Vec<&str> = values.iter()
.map(|v| v.as_str())
.filter(|v| !v.is_empty() && *v != "NULL" && *v != "null" && *v != "N/A" && *v != "n/a")
.collect();
if non_empty.is_empty() {
return InferredType::String;
}
// Check boolean
let all_bool = non_empty.iter().all(|v| {
matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "1" | "0" | "y" | "n" | "t" | "f")
});
if all_bool && non_empty.len() >= 2 {
// Make sure it's not just all "1" and "0" which could be integers
let has_text_bool = non_empty.iter().any(|v| {
matches!(v.to_lowercase().as_str(), "true" | "false" | "yes" | "no" | "y" | "n" | "t" | "f")
});
if has_text_bool {
return InferredType::Boolean;
}
}
// Check integer (allow commas as thousands separator)
let int_rate = non_empty.iter()
.filter(|v| v.replace(',', "").parse::<i64>().is_ok())
.count() as f64 / non_empty.len() as f64;
if int_rate > 0.95 {
return InferredType::Integer;
}
// Check float (allow $, %, commas)
let float_rate = non_empty.iter()
.filter(|v| v.replace(',', "").replace('$', "").replace('%', "").parse::<f64>().is_ok())
.count() as f64 / non_empty.len() as f64;
if float_rate > 0.95 {
return InferredType::Float;
}
InferredType::String
}
/// Sanitize column name for SQL compatibility.
fn sanitize_column_name(name: &str) -> String {
name.chars()
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
.collect::<String>()
.trim_matches('_')
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_simple_csv() {
let csv = b"Name,Age,Salary\nAlice,30,50000\nBob,25,45000\n";
let (schema, batches) = parse_csv(csv).unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(batches[0].num_rows(), 2);
assert_eq!(schema.field(1).data_type(), &DataType::Int64);
assert_eq!(schema.field(2).data_type(), &DataType::Int64);
}
#[test]
fn parse_csv_with_mixed_types() {
let csv = b"id,value\n1,hello\n2,world\n3,N/A\n";
let (schema, _) = parse_csv(csv).unwrap();
assert_eq!(schema.field(0).data_type(), &DataType::Int64);
assert_eq!(schema.field(1).data_type(), &DataType::Utf8);
}
#[test]
fn parse_csv_with_dollar_amounts() {
let csv = b"item,price\nWidget,$29.99\nGadget,$149.50\n";
let (schema, _) = parse_csv(csv).unwrap();
assert_eq!(schema.field(1).data_type(), &DataType::Float64);
}
#[test]
fn sanitize_names() {
assert_eq!(sanitize_column_name("First Name"), "first_name");
assert_eq!(sanitize_column_name("Bill Rate ($)"), "bill_rate");
}
}

View File

@ -0,0 +1,103 @@
use sha2::{Digest, Sha256};
/// Detected file type from content inspection.
#[derive(Debug, Clone, PartialEq)]
pub enum FileType {
Csv,
Json,
NdJson, // newline-delimited JSON
Pdf,
Text, // plain text, SMS logs, etc.
Unknown,
}
/// Detect file type from filename extension and content sniffing.
pub fn detect_file_type(filename: &str, content: &[u8]) -> FileType {
// Extension-based first
let lower = filename.to_lowercase();
if lower.ends_with(".csv") || lower.ends_with(".tsv") {
return FileType::Csv;
}
if lower.ends_with(".json") {
// Check if it's newline-delimited JSON
if content.iter().take(4096).filter(|&&b| b == b'\n').count() > 2 {
let first_line = content.split(|&b| b == b'\n').next().unwrap_or(b"");
if first_line.starts_with(b"{") {
return FileType::NdJson;
}
}
return FileType::Json;
}
if lower.ends_with(".ndjson") || lower.ends_with(".jsonl") {
return FileType::NdJson;
}
if lower.ends_with(".pdf") {
return FileType::Pdf;
}
if lower.ends_with(".txt") || lower.ends_with(".log") || lower.ends_with(".sms") {
return FileType::Text;
}
// Content sniffing fallback
if content.starts_with(b"%PDF") {
return FileType::Pdf;
}
if content.starts_with(b"[") || content.starts_with(b"{") {
return FileType::Json;
}
// Check if it looks like CSV (has commas and newlines in first chunk)
let sample = &content[..content.len().min(4096)];
let comma_count = sample.iter().filter(|&&b| b == b',').count();
let newline_count = sample.iter().filter(|&&b| b == b'\n').count();
if comma_count > 3 && newline_count > 1 {
return FileType::Csv;
}
// If it's valid UTF-8, treat as text
if std::str::from_utf8(sample).is_ok() {
return FileType::Text;
}
FileType::Unknown
}
/// Compute SHA-256 hash of content for deduplication.
pub fn content_hash(content: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(content);
format!("{:x}", hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_csv_by_extension() {
assert_eq!(detect_file_type("data.csv", b"a,b,c\n1,2,3"), FileType::Csv);
}
#[test]
fn detect_json_by_extension() {
assert_eq!(detect_file_type("data.json", b"[{\"a\":1}]"), FileType::Json);
}
#[test]
fn detect_pdf_by_magic() {
assert_eq!(detect_file_type("unknown", b"%PDF-1.4 blah"), FileType::Pdf);
}
#[test]
fn detect_csv_by_content() {
let csv = b"name,age,city\nAlice,30,NYC\nBob,25,LA\n";
assert_eq!(detect_file_type("unknown.dat", csv), FileType::Csv);
}
#[test]
fn content_hash_deterministic() {
let h1 = content_hash(b"hello world");
let h2 = content_hash(b"hello world");
assert_eq!(h1, h2);
}
}

View File

@ -0,0 +1,167 @@
use arrow::array::{ArrayRef, Float64Array, Int64Array, StringArray, BooleanArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::collections::BTreeMap;
use std::sync::Arc;
/// Parse JSON (array of objects or newline-delimited) into Arrow RecordBatches.
/// Nested objects are flattened: {"a": {"b": 1}} → column "a_b".
pub fn parse_json(content: &[u8]) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let text = std::str::from_utf8(content).map_err(|e| format!("invalid UTF-8: {e}"))?;
let text = text.trim();
// Parse into Vec<Map>
let rows: Vec<serde_json::Map<String, serde_json::Value>> = if text.starts_with('[') {
// JSON array
let arr: Vec<serde_json::Value> = serde_json::from_str(text)
.map_err(|e| format!("JSON parse error: {e}"))?;
arr.into_iter()
.filter_map(|v| v.as_object().cloned())
.collect()
} else {
// Newline-delimited JSON
text.lines()
.filter(|l| !l.trim().is_empty())
.map(|l| {
let v: serde_json::Value = serde_json::from_str(l)
.map_err(|e| format!("NDJSON line parse error: {e}"))?;
v.as_object().cloned().ok_or_else(|| "NDJSON line is not an object".into())
})
.collect::<Result<Vec<_>, String>>()?
};
if rows.is_empty() {
return Err("JSON has no records".into());
}
// Flatten and collect all columns
let mut columns: BTreeMap<String, Vec<serde_json::Value>> = BTreeMap::new();
let n_rows = rows.len();
for (row_idx, row) in rows.iter().enumerate() {
let flat = flatten_object(row, "");
// Pad existing columns that aren't in this row
for key in columns.keys().cloned().collect::<Vec<_>>() {
if !flat.contains_key(&key) {
columns.get_mut(&key).unwrap().push(serde_json::Value::Null);
}
}
// Add new columns or append values
for (key, val) in flat {
let col = columns.entry(key).or_insert_with(Vec::new);
// Pad with nulls if this column is new and we've seen prior rows
while col.len() < row_idx {
col.push(serde_json::Value::Null);
}
col.push(val);
}
}
// Pad short columns
for col in columns.values_mut() {
while col.len() < n_rows {
col.push(serde_json::Value::Null);
}
}
tracing::info!("parsed JSON: {n_rows} rows × {} columns", columns.len());
// Infer types and build schema
let mut fields = Vec::new();
let mut arrays: Vec<ArrayRef> = Vec::new();
for (name, values) in &columns {
let (dt, array) = json_column_to_arrow(values);
fields.push(Field::new(name, dt, true));
arrays.push(array);
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
/// Flatten nested JSON object: {"a": {"b": 1}} → {"a_b": 1}
fn flatten_object(obj: &serde_json::Map<String, serde_json::Value>, prefix: &str) -> BTreeMap<String, serde_json::Value> {
let mut result = BTreeMap::new();
for (key, val) in obj {
let full_key = if prefix.is_empty() { key.clone() } else { format!("{prefix}_{key}") };
match val {
serde_json::Value::Object(inner) => {
result.extend(flatten_object(inner, &full_key));
}
other => {
result.insert(full_key, other.clone());
}
}
}
result
}
/// Convert a column of JSON values to an Arrow array.
fn json_column_to_arrow(values: &[serde_json::Value]) -> (DataType, ArrayRef) {
// Check if all non-null values are the same type
let non_null: Vec<&serde_json::Value> = values.iter()
.filter(|v| !v.is_null())
.collect();
if non_null.is_empty() {
return (DataType::Utf8, Arc::new(StringArray::from(vec![None::<&str>; values.len()])));
}
let all_bool = non_null.iter().all(|v| v.is_boolean());
let all_i64 = non_null.iter().all(|v| v.is_i64() || v.is_u64());
let all_f64 = non_null.iter().all(|v| v.is_number());
if all_bool {
let arr: Vec<Option<bool>> = values.iter().map(|v| v.as_bool()).collect();
(DataType::Boolean, Arc::new(BooleanArray::from(arr)))
} else if all_i64 {
let arr: Vec<Option<i64>> = values.iter().map(|v| v.as_i64()).collect();
(DataType::Int64, Arc::new(Int64Array::from(arr)))
} else if all_f64 {
let arr: Vec<Option<f64>> = values.iter().map(|v| v.as_f64()).collect();
(DataType::Float64, Arc::new(Float64Array::from(arr)))
} else {
// Default to string
let arr: Vec<Option<String>> = values.iter().map(|v| {
match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s.clone()),
other => Some(other.to_string()),
}
}).collect();
(DataType::Utf8, Arc::new(StringArray::from(arr)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_json_array() {
let json = br#"[{"name":"Alice","age":30},{"name":"Bob","age":25}]"#;
let (schema, batches) = parse_json(json).unwrap();
assert_eq!(batches[0].num_rows(), 2);
assert!(schema.field_with_name("age").unwrap().data_type() == &DataType::Int64);
}
#[test]
fn parse_ndjson() {
let json = b"{\"x\":1}\n{\"x\":2}\n{\"x\":3}\n";
let (_, batches) = parse_json(json).unwrap();
assert_eq!(batches[0].num_rows(), 3);
}
#[test]
fn flatten_nested() {
let json = br#"[{"user":{"name":"Alice","address":{"city":"NYC"}},"score":9.5}]"#;
let (schema, _) = parse_json(json).unwrap();
let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert!(names.contains(&"user_name"));
assert!(names.contains(&"user_address_city"));
}
}

View File

@ -0,0 +1,6 @@
pub mod detect;
pub mod csv_ingest;
pub mod json_ingest;
pub mod pdf_ingest;
pub mod pipeline;
pub mod service;

View File

@ -0,0 +1,52 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
/// Extract text from a PDF file.
/// Returns one row per page: (page_number, text_content).
/// This handles text-based PDFs. Scanned/image PDFs need OCR (not implemented yet).
pub fn parse_pdf(content: &[u8], source_filename: &str) -> Result<(Arc<Schema>, Vec<RecordBatch>), String> {
let doc = lopdf::Document::load_mem(content)
.map_err(|e| format!("PDF load error: {e}"))?;
let pages = doc.get_pages();
let mut page_numbers: Vec<i32> = Vec::new();
let mut page_texts: Vec<String> = Vec::new();
let mut sources: Vec<String> = Vec::new();
for (&page_num, _) in pages.iter() {
let text = doc.extract_text(&[page_num]).unwrap_or_default();
let text = text.trim().to_string();
if !text.is_empty() {
page_numbers.push(page_num as i32);
page_texts.push(text);
sources.push(source_filename.to_string());
}
}
if page_numbers.is_empty() {
// PDF has no extractable text — likely scanned/image
return Err("PDF contains no extractable text (may be scanned/image — OCR not yet supported)".into());
}
tracing::info!("extracted {} pages with text from PDF '{}'", page_numbers.len(), source_filename);
let schema = Arc::new(Schema::new(vec![
Field::new("source_file", DataType::Utf8, false),
Field::new("page_number", DataType::Int32, false),
Field::new("text_content", DataType::Utf8, false),
]));
let arrays: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(sources)),
Arc::new(Int32Array::from(page_numbers)),
Arc::new(StringArray::from(page_texts)),
];
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}

View File

@ -0,0 +1,162 @@
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use object_store::ObjectStore;
use std::sync::Arc;
use catalogd::registry::Registry;
use shared::arrow_helpers::{fingerprint_schema, record_batch_to_parquet};
use shared::types::{ObjectRef, SchemaFingerprint};
use storaged::ops;
use crate::detect::{FileType, content_hash, detect_file_type};
use crate::csv_ingest;
use crate::json_ingest;
use crate::pdf_ingest;
/// Result of an ingest operation.
#[derive(Debug, Clone, serde::Serialize)]
pub struct IngestResult {
pub dataset_name: String,
pub file_type: String,
pub rows: usize,
pub columns: usize,
pub storage_key: String,
pub content_hash: String,
pub schema_fingerprint: String,
pub deduplicated: bool,
}
/// Full ingest pipeline: detect → parse → dedup → store → register.
pub async fn ingest_file(
filename: &str,
content: &[u8],
dataset_name: Option<&str>,
store: &Arc<dyn ObjectStore>,
registry: &Registry,
) -> Result<IngestResult, String> {
// 1. Detect file type
let file_type = detect_file_type(filename, content);
tracing::info!("ingesting '{}' as {:?} ({} bytes)", filename, file_type, content.len());
// 2. Parse into Arrow
let (schema, batches) = match file_type {
FileType::Csv => csv_ingest::parse_csv(content)?,
FileType::Json | FileType::NdJson => json_ingest::parse_json(content)?,
FileType::Pdf => pdf_ingest::parse_pdf(content, filename)?,
FileType::Text => parse_text_file(content, filename)?,
FileType::Unknown => return Err(format!("unknown file type for '{filename}'")),
};
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
let total_cols = schema.fields().len();
if total_rows == 0 {
return Err("file contains no data".into());
}
// 3. Content hash for dedup
let hash = content_hash(content);
// 4. Check if already ingested (same content hash)
let existing = registry.list().await;
if existing.iter().any(|d| d.schema_fingerprint.0 == hash) {
tracing::info!("file already ingested (hash: {}), skipping", &hash[..12]);
return Ok(IngestResult {
dataset_name: dataset_name.unwrap_or(filename).to_string(),
file_type: format!("{:?}", file_type),
rows: total_rows,
columns: total_cols,
storage_key: String::new(),
content_hash: hash,
schema_fingerprint: String::new(),
deduplicated: true,
});
}
// 5. Convert to Parquet
let mut all_parquet = Vec::new();
for batch in &batches {
let pq = record_batch_to_parquet(batch)?;
all_parquet.extend_from_slice(&pq);
}
let parquet_bytes = Bytes::from(all_parquet);
let parquet_size = parquet_bytes.len() as u64;
// 6. Store in object storage
let name = dataset_name.unwrap_or_else(|| {
filename.rsplit('/').next().unwrap_or(filename)
.rsplit('.').last().unwrap_or(filename)
});
let safe_name = sanitize_dataset_name(name);
let storage_key = format!("datasets/{}.parquet", safe_name);
ops::put(store, &storage_key, parquet_bytes).await?;
tracing::info!("stored {} as {} ({} bytes)", filename, storage_key, parquet_size);
// 7. Register in catalog
let schema_fp = fingerprint_schema(&schema);
let now = chrono::Utc::now();
let obj_ref = ObjectRef {
bucket: "data".to_string(),
key: storage_key.clone(),
size_bytes: parquet_size,
created_at: now,
};
// Use content hash as fingerprint for dedup tracking
registry.register(
safe_name.clone(),
SchemaFingerprint(hash.clone()),
vec![obj_ref],
).await?;
Ok(IngestResult {
dataset_name: safe_name,
file_type: format!("{:?}", file_type),
rows: total_rows,
columns: total_cols,
storage_key,
content_hash: hash,
schema_fingerprint: schema_fp.0,
deduplicated: false,
})
}
/// Parse plain text / SMS logs into a simple table.
fn parse_text_file(content: &[u8], filename: &str) -> Result<(SchemaRef, Vec<RecordBatch>), String> {
let text = String::from_utf8_lossy(content);
let lines: Vec<&str> = text.lines().filter(|l| !l.trim().is_empty()).collect();
if lines.is_empty() {
return Err("text file is empty".into());
}
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("source_file", arrow::datatypes::DataType::Utf8, false),
arrow::datatypes::Field::new("line_number", arrow::datatypes::DataType::Int32, false),
arrow::datatypes::Field::new("text", arrow::datatypes::DataType::Utf8, false),
]));
let sources: Vec<&str> = vec![filename; lines.len()];
let line_nums: Vec<i32> = (1..=lines.len() as i32).collect();
let arrays: Vec<arrow::array::ArrayRef> = vec![
Arc::new(arrow::array::StringArray::from(sources)),
Arc::new(arrow::array::Int32Array::from(line_nums)),
Arc::new(arrow::array::StringArray::from(lines)),
];
let batch = RecordBatch::try_new(schema.clone(), arrays)
.map_err(|e| format!("RecordBatch error: {e}"))?;
Ok((schema, vec![batch]))
}
fn sanitize_dataset_name(name: &str) -> String {
let clean: String = name.chars()
.map(|c| if c.is_alphanumeric() || c == '_' { c.to_ascii_lowercase() } else { '_' })
.collect();
let trimmed = clean.trim_matches('_').to_string();
if trimmed.is_empty() { "unnamed_dataset".to_string() } else { trimmed }
}

View File

@ -0,0 +1,69 @@
use axum::{
Json, Router,
extract::{Multipart, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use object_store::ObjectStore;
use serde::Deserialize;
use std::sync::Arc;
use catalogd::registry::Registry;
use crate::pipeline;
#[derive(Clone)]
pub struct IngestState {
pub store: Arc<dyn ObjectStore>,
pub registry: Registry,
}
pub fn router(state: IngestState) -> Router {
Router::new()
.route("/health", get(health))
.route("/file", post(ingest_file))
.with_state(state)
}
async fn health() -> &'static str {
"ingestd ok"
}
#[derive(Deserialize)]
struct IngestQuery {
/// Override dataset name (otherwise derived from filename)
name: Option<String>,
}
/// Upload a file for ingestion. Accepts multipart/form-data with a "file" field.
async fn ingest_file(
State(state): State<IngestState>,
Query(query): Query<IngestQuery>,
mut multipart: Multipart,
) -> impl IntoResponse {
// Read the first file field
let field = match multipart.next_field().await {
Ok(Some(f)) => f,
Ok(None) => return Err((StatusCode::BAD_REQUEST, "no file uploaded".to_string())),
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("multipart error: {e}"))),
};
let filename = field.file_name().unwrap_or("unknown").to_string();
let content = field.bytes().await
.map_err(|e| (StatusCode::BAD_REQUEST, format!("read error: {e}")))?;
tracing::info!("received file '{}' ({} bytes) for ingest", filename, content.len());
let dataset_name = query.name.as_deref();
match pipeline::ingest_file(&filename, &content, dataset_name, &state.store, &state.registry).await {
Ok(result) => {
if result.deduplicated {
Ok((StatusCode::OK, Json(result)))
} else {
Ok((StatusCode::CREATED, Json(result)))
}
}
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e)),
}
}

View File

@ -0,0 +1,15 @@
{
"id": "1ca61945-d151-490b-81fd-2ca0397b68fa",
"name": "sms_messages",
"schema_fingerprint": "e1d079cbb2b7eedae5019767a886bd9a3396e291aa03630b9db69e9864948c09",
"objects": [
{
"bucket": "data",
"key": "datasets/sms_messages.parquet",
"size_bytes": 2018,
"created_at": "2026-03-27T13:07:14.253881797Z"
}
],
"created_at": "2026-03-27T13:07:14.253886027Z",
"updated_at": "2026-03-27T13:07:14.253886027Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "1f81445a-404f-48ea-bd72-00f6721ee18b",
"name": "employees",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/employees.parquet",
"size_bytes": 1424,
"created_at": "2026-03-27T11:55:55.013996293Z"
}
],
"created_at": "2026-03-27T11:55:55.014010258Z",
"updated_at": "2026-03-27T11:55:55.014010258Z"
}

View File

@ -0,0 +1,15 @@
{
"id": "478072c3-0c95-46a2-9193-f4b3ac4085ab",
"name": "test_ingest",
"schema_fingerprint": "4bdc4e5baeddc1187aecd4bfb788654f26145c2ba346b4bec6ca8ab950e1c133",
"objects": [
{
"bucket": "data",
"key": "datasets/test_ingest.parquet",
"size_bytes": 3129,
"created_at": "2026-03-27T13:06:57.437484309Z"
}
],
"created_at": "2026-03-27T13:06:57.437488259Z",
"updated_at": "2026-03-27T13:06:57.437488259Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "50d402a0-6fb4-4849-ac75-3541b8530aba",
"name": "u5",
"schema_fingerprint": "from-ui",
"objects": [
{
"bucket": "data",
"key": "datasets/events.parquet",
"size_bytes": 0,
"created_at": "2026-03-27T12:16:57.520054490Z"
}
],
"created_at": "2026-03-27T12:16:57.520066991Z",
"updated_at": "2026-03-27T12:16:57.520066991Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "6c8dabf2-a6b3-4e23-b7d7-dc3b37a5e1a8",
"name": "events",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/events.parquet",
"size_bytes": 1127,
"created_at": "2026-03-27T11:55:55.017003417Z"
}
],
"created_at": "2026-03-27T11:55:55.017011224Z",
"updated_at": "2026-03-27T11:55:55.017011224Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "f5281277-afe9-456b-b5fa-d667c2b0f41f",
"name": "products",
"schema_fingerprint": "auto",
"objects": [
{
"bucket": "data",
"key": "datasets/products.parquet",
"size_bytes": 1341,
"created_at": "2026-03-27T11:55:55.019096056Z"
}
],
"created_at": "2026-03-27T11:55:55.019098965Z",
"updated_at": "2026-03-27T11:55:55.019098965Z"
}

View File

@ -1,15 +0,0 @@
{
"id": "fc046131-8f44-428a-af90-ebf6347d4cc6",
"name": "u",
"schema_fingerprint": "from-ui",
"objects": [
{
"bucket": "data",
"key": "datasets/employees.parquet",
"size_bytes": 0,
"created_at": "2026-03-27T12:16:51.462058823Z"
}
],
"created_at": "2026-03-27T12:16:51.462071311Z",
"updated_at": "2026-03-27T12:16:51.462071311Z"
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -34,3 +34,23 @@
**Date:** 2026-03-27
**Decision:** Gateway serves HTTP on :3100 (external) and gRPC on :3101 (internal). Both run in the same process.
**Rationale:** Single binary simplifies deployment. HTTP stays for browser/curl access. gRPC provides typed contracts for service-to-service calls. No premature microservice split.
## ADR-008: Embeddings stored as Parquet, not a proprietary vector DB
**Date:** 2026-03-27
**Decision:** Vector embeddings stored as Parquet files (doc_id, chunk_text, vector columns). Vector index (HNSW) serialized as a sidecar file.
**Rationale:** Keeps all data in one portable format. No vendor lock-in to Pinecone/Weaviate/Qdrant. Vectors are queryable via DataFusion like any other data. Trade-off: brute-force search is fine up to ~100K vectors; HNSW needed beyond that.
## ADR-009: Incremental updates via delta files, not Delta Lake
**Date:** 2026-03-27
**Decision:** Updates append to delta Parquet files. Queries merge base + deltas at read time. Periodic compaction merges deltas into base. Single-writer model (no concurrent writers).
**Rationale:** Full ACID over Parquet (Delta Lake/Iceberg) is a multi-year project. Our use case is single-writer (one ingest pipeline) with read-heavy workloads. Merge-on-read with compaction is sufficient and dramatically simpler.
## ADR-010: Schema detection defaults to string
**Date:** 2026-03-27
**Decision:** Ingest pipeline infers column types from data. When ambiguous or mixed, defaults to String rather than failing.
**Rationale:** Legacy data is messy. A column with "123", "N/A", and "" is a string, not an integer. Downstream queries can CAST as needed. Better to ingest everything than reject on type errors.
## ADR-011: This is not a CRM replacement
**Date:** 2026-03-27
**Decision:** The lakehouse is the analytical layer BEHIND operational systems. It ingests exports, not live data. CRM/ATS stays for daily operations.
**Rationale:** Operational systems need single-record CRUD, permissions, UI workflows. The lakehouse answers cross-cutting questions that no single operational system can. They complement, not compete.

View File

@ -1,6 +1,6 @@
# PRD: Lakehouse — Rust-First Object Storage System
**Status:** Active
**Status:** Active — Phase 0-5 complete, entering production path
**Created:** 2026-03-27
**Owner:** J
@ -8,20 +8,23 @@
## Problem
Traditional data platforms couple storage, compute, and metadata into monolithic databases. This creates vendor lock-in, scaling bottlenecks, and opaque data access. AI workloads bolt onto these systems awkwardly, sharing resources with transactional queries.
Legacy data systems silo information across CRMs, databases, spreadsheets, and file shares. Querying across them requires manual ETL, pre-defined schemas, and expensive database licenses. When AI enters the picture, these systems can't handle the dual requirement of fast analytical queries AND semantic retrieval over unstructured text.
A staffing company (our reference case) has candidate records in an ATS, client data in a CRM, timesheets in billing software, call logs from a phone system, and email records from Exchange. Answering "find every Java developer in Chicago who was called 5+ times but never placed" requires querying across all of them — and no single system can do it.
We need a system where:
- Object storage is the source of truth (not a database)
- Metadata, access, and execution are controlled by Rust services
- Queries run directly over object storage via Arrow/Parquet
- AI inference is isolated and swappable
- The entire system is rebuildable from repository + docs alone
- Any data source (CSV, DB export, PDF, JSON) can be ingested without pre-defined schemas
- Structured data is queryable via SQL at scale (millions of rows, sub-second)
- Unstructured data is searchable via AI embeddings (semantic retrieval)
- An LLM can answer natural language questions against all of it
- Everything runs locally — no cloud APIs, total data privacy
- The system is rebuildable from repository + object storage alone
---
## Solution
A modular Rust service mesh over S3-compatible object storage.
A modular Rust service mesh over S3-compatible object storage, with a local AI layer for embeddings and generation.
### Locked Stack
@ -30,14 +33,15 @@ A modular Rust service mesh over S3-compatible object storage.
| Frontend | Dioxus | Yes |
| API | Axum + Tokio | Yes |
| Object Storage Interface | Apache Arrow `object_store` | Yes |
| Storage Backend | RustFS (fallback: SeaweedFS) | Yes |
| Storage Backend | LocalFileSystem → RustFS → S3 | Yes |
| Query Engine | DataFusion | Yes |
| Data Format | Parquet + Arrow | Yes |
| RPC (internal) | tonic (gRPC) | Yes |
| AI Runtime | Ollama (local models) | Yes |
| AI Boundary | Python FastAPI sidecar → Ollama HTTP API | Yes |
| Vector Index | TBD — evaluate `hora`, `qdrant` crate, or HNSW from scratch | **Open** |
No new frameworks. No exceptions.
No new frameworks without documented ADR.
---
@ -47,84 +51,160 @@ No new frameworks. No exceptions.
| Service | Responsibility |
|---|---|
| **gateway** | HTTP ingress, routing, auth envelope, middleware |
| **catalogd** | Metadata control plane — dataset registry, schema versions, manifest index |
| **storaged** | Object I/O — read/write/list/delete via `object_store` crate |
| **queryd** | SQL execution — DataFusion over registered Parquet datasets |
| **gateway** | HTTP/gRPC ingress, routing, auth, CORS, body limits |
| **catalogd** | Metadata control plane — dataset registry, schema versions, manifests |
| **storaged** | Object I/O — read/write/list/delete via `object_store` |
| **queryd** | SQL execution — DataFusion over Parquet, MemTable hot cache |
| **ingestd** | *NEW* — Ingest pipeline: CSV/JSON/DB → normalize → Parquet → catalog |
| **vectord** | *NEW* — Embedding store + vector index: chunk → embed → index → search |
| **aibridge** | Rust↔Python boundary — HTTP client to FastAPI sidecar |
| **ui** | Dioxus frontend — dataset browser, query editor, results viewer |
| **shared** | Types, errors, Arrow helpers, protobuf definitions |
### AI Sidecar
Python FastAPI process that adapts Ollama's HTTP API into Arrow-compatible formats:
- `POST /embed``nomic-embed-text` via Ollama
- `POST /generate` → configurable model (qwen2.5, mistral, gemma2, llama3.2)
- `POST /rerank` → cross-encoder reranking via generate endpoint
No mocks. No stubs. Real models from day one. Ollama manages model lifecycle, GPU scheduling, caching. Sidecar is stateless passthrough.
| **ui** | Dioxus frontend — Ask, Explore, SQL, System tabs |
| **shared** | Types, errors, Arrow helpers, config, protobuf definitions |
### Data Flow
```
Client → gateway → catalogd (metadata lookup)
→ storaged (object read/write)
→ queryd (SQL execution over Parquet)
→ aibridge → sidecar → Ollama (inference)
Raw data → ingestd (normalize, chunk, detect schema)
├→ storaged (Parquet files to object storage)
├→ catalogd (register dataset + schema)
├→ vectord (embed text chunks, build index)
└→ queryd (auto-register as queryable table)
User question → gateway
├→ vectord (semantic search for relevant chunks) ← RAG path
├→ queryd (SQL over structured data) ← Analytics path
└→ aibridge → Ollama (generate answer from context)
```
### Query Paths
**Analytical (SQL):** "What's the average bill rate for .NET devs in Chicago?"
→ DataFusion scans Parquet columnar, returns in <200ms
**Semantic (RAG):** "Find candidates who could do data engineering work"
→ Embed question → vector search across resume embeddings → retrieve top chunks → LLM answers
**Hybrid:** "Which clients are we losing money on, and why?"
→ SQL for margin calculations + RAG over client notes/emails for context → LLM synthesizes
### Invariants
1. Object storage = source of truth for all data
2. catalogd = sole metadata authority (datasets, schemas, manifests)
3. No raw data stored in catalog — only pointers (bucket, key, schema fingerprint)
4. storaged never interprets data — dumb pipe with presigned URLs
5. queryd registers tables via catalog pointers, not by scanning storage
6. aibridge is stateless — Python sidecar is replaceable without touching Rust
7. All services are modular and independently replaceable
### Dependency Graph
```
shared ← storaged ← catalogd ← queryd
shared ← aibridge
gateway → {storaged, catalogd, queryd, aibridge}
ui → gateway (HTTP only, no crate dependency)
```
2. catalogd = sole metadata authority
3. No raw data in catalog — only pointers
4. vectord stores embeddings AS Parquet (portable, not a proprietary format)
5. ingestd is idempotent — re-ingesting the same file is a no-op
6. Hot cache is a performance layer, not a source of truth — eviction is safe
7. All services modular and independently replaceable
---
## Phases
### Phase 0: Bootstrap
Workspace compiles, gateway serves health check, structured logging works.
### Phase 0-5: Foundation ✅ COMPLETE
**Gate:** `cargo build` clean, `GET /health` returns 200, logs on stdout, docs committed.
- Rust workspace, Axum gateway, object storage, catalog, DataFusion query engine
- Python sidecar with real Ollama models (embed, generate, rerank)
- Dioxus UI with Ask (NL→SQL), Explore, SQL, System tabs
- gRPC, OpenTelemetry, auth middleware, TOML config
- Validated with 286K row staffing company dataset across 7 tables
- Cross-reference queries (JOINs across candidates, placements, timesheets, calls) in <150ms
### Phase 1: Storage + Catalog
Write Parquet to object storage, register in catalog, read back.
### Phase 6: Ingest Pipeline
**Gate:** Upload Parquet → register dataset → retrieve metadata → read back. All via gateway HTTP.
Build the data on-ramp. Accept messy real-world data, normalize it, make it queryable.
### Phase 2: Query Engine
SQL queries over registered Parquet datasets via DataFusion.
| Step | Deliverable | Gate |
|---|---|---|
| 6.1 | `ingestd` crate with CSV parser → Arrow RecordBatch → Parquet | CSV file → queryable dataset |
| 6.2 | JSON ingest (newline-delimited JSON, nested objects) | JSON file → flat Parquet |
| 6.3 | Schema detection — infer column types from data | No manual schema definition needed |
| 6.4 | Deduplication — detect and skip already-ingested files (content hash) | Re-ingest same file = no-op |
| 6.5 | Text chunking — split large text fields for embedding | Long text → overlapping chunks |
| 6.6 | Auto-registration — ingest writes to storage AND registers in catalog | Single API call: file in → queryable |
| 6.7 | Gateway endpoint: `POST /ingest` with file upload | Upload CSV from browser → query in seconds |
**Gate:** `SELECT * FROM dataset LIMIT 10` returns correct results. Resolution goes through catalog.
**Gate:** Upload a raw CSV or JSON file → auto-detected schema → stored as Parquet → registered → immediately queryable via SQL. No manual steps.
### Phase 3: AI Integration
Python sidecar with real Ollama models. Embeddings, generation, reranking.
**Risk:** Schema detection on messy data (mixed types, nulls, inconsistent formatting). Mitigation: conservative type inference (default to string), let user override.
**Gate:** Rust sends text → Python → Ollama → real embeddings return as Arrow-compatible floats.
### Phase 7: Vector Index + RAG Pipeline
### Phase 4: Frontend
Dioxus UI: dataset browser, query editor, results table.
Make unstructured data searchable by meaning, not just keywords.
**Gate:** User can browse datasets and run queries from browser.
| Step | Deliverable | Gate |
|---|---|---|
| 7.1 | `vectord` crate with embedding storage as Parquet (doc_id, chunk_text, vector) | Embeddings stored as portable Parquet |
| 7.2 | Chunking strategy — configurable chunk size + overlap for text columns | Large text fields split into embeddable chunks |
| 7.3 | Brute-force vector search via DataFusion (cosine similarity SQL) | Semantic search works, correctness verified |
| 7.4 | HNSW index for fast approximate nearest neighbor | Search over 100K+ vectors in <50ms |
| 7.5 | RAG endpoint: `POST /rag` — question → embed → search → retrieve → generate | Natural language question → grounded answer |
| 7.6 | Auto-embed on ingest — text columns automatically embedded during ingest | No separate embedding step needed |
| 7.7 | Hybrid search — combine SQL filters with vector similarity | "Java devs in Chicago" (SQL) + "who could do data engineering" (semantic) |
### Phase 5: Hardening
gRPC internals, OpenTelemetry, auth, config-driven startup.
**Gate:** Ingest 15K candidate resumes → auto-embed → ask "find someone who could handle our Kubernetes migration" → system returns relevant candidates ranked by semantic match, with LLM explanation.
**Gate:** Services communicate via gRPC. Traces propagate. Auth enforced. System restartable from repo + config.
**Risk: HNSW in Rust at scale.** This is the hardest technical problem. Options:
- `hora` crate — Rust-native ANN, but less mature than FAISS
- Store HNSW index as a serialized file alongside Parquet data
- Fallback: brute-force scan is fine up to ~100K vectors; optimize later
- Nuclear option: use Qdrant as an external vector store (breaks "no new services" rule)
**Decision needed:** Evaluate `hora` vs external Qdrant vs brute-force at J's data scale.
### Phase 8: Hot Cache + Incremental Updates
Make frequently-accessed data fast, and handle real-time updates without full rewrite.
| Step | Deliverable | Gate |
|---|---|---|
| 8.1 | MemTable hot cache — pin active datasets in memory | Queries on hot data: <10ms |
| 8.2 | Cache policy — LRU eviction based on access patterns | Memory-bounded, auto-manages |
| 8.3 | Incremental writes — append new rows without rewriting entire Parquet file | Update one candidate's phone → no full table rewrite |
| 8.4 | Merge-on-read — query combines base Parquet + delta files | Correct results from base + updates |
| 8.5 | Compaction — periodic merge of delta files into base Parquet | Prevent delta file proliferation |
| 8.6 | Upsert semantics — insert or update by primary key | Same candidate ID → update in place |
**Gate:** Update a single row in a 15K-row dataset. Query reflects the change immediately. No full Parquet rewrite. Memory cache serves hot data in <10ms.
**Risk: This is the Delta Lake problem.** Full ACID transactions over Parquet files is what Databricks spent years building. We're NOT building Delta Lake — we're building a pragmatic version:
- Append-only delta files (easy)
- Merge-on-read (moderate)
- Compaction (moderate)
- Full ACID isolation (NOT attempting — single-writer model instead)
### Phase 9+: Future (not designed yet)
- Database connector ingest (PostgreSQL, MySQL, MSSQL → Parquet)
- PDF/document ingest (OCR → text → chunks → embed)
- Scheduled ingest (cron-based file watching)
- Multi-node query distribution
- Row-level access control
- Audit log (who queried what, when)
---
## Reference Dataset: Staffing Company
Validated with realistic staffing company data:
| Table | Rows | Description |
|---|---|---|
| candidates | 15,000 | Names, phones, emails, zip, skills, resume text, availability |
| clients | 500 | Companies, contacts, verticals, bill rates |
| job_orders | 3,000 | Positions with descriptions, requirements, rates |
| placements | 8,000 | Candidate↔job matches with dates, rates, recruiters |
| timesheets | 120,000 | Weekly hours, bill/pay totals, approvals |
| call_log | 80,000 | Phone CDR — who called whom, duration, disposition |
| email_log | 60,000 | Email tracking — subject, opened, direction |
| **Total** | **286,500** | **7 tables, cross-referenced** |
Proven queries:
- Candidate search by skills + location + availability: 80ms
- Revenue by client with profit margins (JOIN 120K timesheets): 142ms
- Cold lead detection (candidates called 5+ times, never placed): 94ms
- Margin analysis by vertical (JOIN placements → job orders): 53ms
- Natural language → AI-generated SQL → execution → results: ~3s (model inference)
---
@ -132,24 +212,22 @@ gRPC internals, OpenTelemetry, auth, config-driven startup.
| Model | Use |
|---|---|
| `nomic-embed-text` | Embeddings (768d) |
| `qwen2.5` | Code generation, structured output |
| `mistral` | General generation |
| `nomic-embed-text` | Embeddings (768d) — semantic search, RAG retrieval |
| `qwen2.5` | SQL generation, structured output, summarization |
| `mistral` | General generation, longer context |
| `gemma2` | General generation |
| `llama3.2` | General generation |
Model selection via environment variables. No hardcoded model names in Rust code.
| `llama3.2` | General generation, lightweight |
---
## Non-Goals
- Multi-tenancy
- Streaming ingestion / CDC
- Custom file formats
- Query caching / materialized views
- Wrapping `object_store` with another abstraction
- Cloud deployment (local-first)
- Multi-tenancy (single-owner system)
- Cloud deployment (local-first, always)
- Full ACID transactions (single-writer model is sufficient)
- Real-time streaming / CDC (batch ingest is the model)
- Replacing the CRM (this is the analytical layer BEHIND the CRM)
- Custom file formats (Parquet is the format, period)
---
@ -157,11 +235,13 @@ Model selection via environment variables. No hardcoded model names in Rust code
| Risk | Severity | Mitigation |
|---|---|---|
| RustFS immaturity | High | Start with LocalFileSystem, test against MinIO, RustFS last. SeaweedFS fallback. |
| DataFusion table registration overhead | Medium | Lazy registration + LRU cache of SessionContext instances. |
| Catalog consistency without DB | Medium | Write-ahead: persist manifest before in-memory update. Rebuild from storage on restart. |
| Dioxus WASM gaps | Medium | Phase 4 is last. Fallback to plain HTML if blocked. |
| Schema evolution | Medium | Schema fingerprinting in Phase 1. Validate before query. |
| Vector search in Rust at scale | **High** | Start brute-force, evaluate `hora` crate, Qdrant as fallback |
| Incremental updates on Parquet | **High** | Delta files + merge-on-read, NOT full Delta Lake |
| Legacy data messiness | **High** | Conservative schema detection, default to string, user overrides |
| Schema evolution across ingests | **Medium** | Schema fingerprinting + versioned manifests |
| Memory pressure from hot cache | **Medium** | LRU eviction, configurable memory limit |
| HNSW index persistence | **Medium** | Serialize alongside Parquet, rebuild on startup |
| Python sidecar as bottleneck | **Low** | Can replace with direct Ollama HTTP from Rust later |
---
@ -173,3 +253,5 @@ Model selection via environment variables. No hardcoded model names in Rust code
4. No silent architecture drift
5. Always work in smallest valid step
6. Always verify before moving on
7. **New:** Flag when something is genuinely hard vs just engineering work
8. **New:** If a phase reveals the approach is wrong, update the PRD before continuing

254
scripts/generate_demo.py Normal file
View File

@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""Generate realistic demo datasets that actually stress the stack."""
import pyarrow as pa
import pyarrow.parquet as pq
import random
import json
import urllib.request
import time
from datetime import datetime, timedelta
API = "http://localhost:3100"
def upload_and_register(name, table):
"""Upload Parquet to storage and register in catalog."""
path = f"/tmp/{name}.parquet"
pq.write_table(table, path, compression="snappy")
size = len(open(path, "rb").read())
with open(path, "rb") as f:
data = f.read()
key = f"datasets/{name}.parquet"
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
urllib.request.urlopen(req)
body = json.dumps({
"name": name,
"schema_fingerprint": "auto",
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]
}).encode()
req = urllib.request.Request(
f"{API}/catalog/datasets", data=body, method="POST",
headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req)
print(f" {name}: {table.num_rows:,} rows, {len(data):,} bytes ({len(data)/1024/1024:.1f} MB)")
# ============================================================
# Dataset 1: web_events — 500K rows of web analytics
# Shows: DataFusion scanning half a million rows in milliseconds
# ============================================================
print("Generating web_events (500K rows)...")
N_EVENTS = 500_000
countries = ["US", "UK", "DE", "FR", "JP", "BR", "IN", "AU", "CA", "KR", "MX", "NG", "ZA", "SE", "IT"]
pages = ["/", "/pricing", "/docs", "/blog", "/about", "/login", "/signup", "/dashboard", "/settings", "/api",
"/docs/getting-started", "/docs/api-reference", "/docs/tutorials", "/blog/rust-performance",
"/blog/ai-lakehouse", "/blog/parquet-vs-csv", "/products", "/products/enterprise", "/contact", "/careers"]
actions = ["pageview", "pageview", "pageview", "pageview", "click", "click", "scroll", "form_submit", "download", "signup"]
browsers = ["Chrome", "Firefox", "Safari", "Edge", "Arc"]
devices = ["desktop", "desktop", "desktop", "mobile", "mobile", "tablet"]
base_time = datetime(2026, 1, 1)
random.seed(42)
timestamps = []
user_ids = []
page_list = []
action_list = []
duration_list = []
country_list = []
browser_list = []
device_list = []
session_ids = []
for i in range(N_EVENTS):
ts = base_time + timedelta(seconds=random.randint(0, 86400 * 90)) # 90 days
timestamps.append(ts.isoformat())
user_ids.append(random.randint(1, 50000))
page_list.append(random.choice(pages))
action_list.append(random.choice(actions))
duration_list.append(random.randint(100, 300000)) # ms
country_list.append(random.choice(countries))
browser_list.append(random.choice(browsers))
device_list.append(random.choice(devices))
session_ids.append(f"sess_{random.randint(1, 200000):06d}")
web_events = pa.table({
"timestamp": timestamps,
"user_id": user_ids,
"session_id": session_ids,
"page": page_list,
"action": action_list,
"duration_ms": duration_list,
"country": country_list,
"browser": browser_list,
"device": device_list,
})
upload_and_register("web_events", web_events)
# ============================================================
# Dataset 2: products — 5K products with real-ish descriptions
# Shows: AI can read and understand product data, semantic search
# ============================================================
print("Generating products (5K rows)...")
categories = ["SaaS", "API", "Database", "Analytics", "Security", "DevOps", "AI/ML", "Storage", "Networking", "Monitoring"]
adjectives = ["Enterprise", "Cloud-Native", "Open-Source", "Serverless", "Real-Time", "Distributed", "Scalable", "Lightweight", "High-Performance", "Self-Hosted"]
nouns = ["Platform", "Engine", "Gateway", "Toolkit", "Framework", "Suite", "Service", "Connector", "Pipeline", "Hub"]
features = [
"with built-in authentication and RBAC",
"featuring automatic horizontal scaling",
"with zero-config deployment",
"supporting 100+ integrations",
"with sub-millisecond latency",
"featuring end-to-end encryption",
"with real-time dashboards",
"supporting multi-region replication",
"with built-in CI/CD pipelines",
"featuring AI-powered anomaly detection",
"with comprehensive audit logging",
"supporting GraphQL and REST APIs",
"with automated backup and recovery",
"featuring smart caching layers",
"with native Kubernetes support",
]
product_ids = []
product_names = []
product_categories = []
product_prices = []
product_descriptions = []
product_ratings = []
product_reviews_count = []
product_created = []
for i in range(5000):
cat = random.choice(categories)
adj = random.choice(adjectives)
noun = random.choice(nouns)
feat = random.choice(features)
name = f"{adj} {cat} {noun}"
desc = f"{name} — a {cat.lower()} solution {feat}. Built for teams that need reliable {cat.lower()} infrastructure without the complexity."
product_ids.append(i + 1)
product_names.append(name)
product_categories.append(cat)
product_prices.append(round(random.uniform(9.99, 2999.99), 2))
product_descriptions.append(desc)
product_ratings.append(round(random.uniform(2.5, 5.0), 1))
product_reviews_count.append(random.randint(0, 5000))
product_created.append((base_time - timedelta(days=random.randint(0, 730))).strftime("%Y-%m-%d"))
products = pa.table({
"product_id": product_ids,
"name": product_names,
"category": product_categories,
"price": product_prices,
"description": product_descriptions,
"rating": product_ratings,
"review_count": product_reviews_count,
"created_date": product_created,
})
upload_and_register("products", products)
# ============================================================
# Dataset 3: transactions — 200K purchase records
# Shows: JOINs across datasets, aggregation at scale
# ============================================================
print("Generating transactions (200K rows)...")
N_TXN = 200_000
txn_ids = []
txn_user_ids = []
txn_product_ids = []
txn_quantities = []
txn_amounts = []
txn_timestamps = []
txn_statuses = []
txn_payment_methods = []
statuses = ["completed", "completed", "completed", "completed", "pending", "refunded", "failed"]
payments = ["credit_card", "credit_card", "credit_card", "debit_card", "paypal", "crypto", "wire_transfer"]
for i in range(N_TXN):
pid = random.randint(1, 5000)
qty = random.randint(1, 10)
price = product_prices[pid - 1]
txn_ids.append(f"TXN-{i+1:07d}")
txn_user_ids.append(random.randint(1, 50000))
txn_product_ids.append(pid)
txn_quantities.append(qty)
txn_amounts.append(round(price * qty, 2))
txn_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
txn_statuses.append(random.choice(statuses))
txn_payment_methods.append(random.choice(payments))
transactions = pa.table({
"txn_id": txn_ids,
"user_id": txn_user_ids,
"product_id": txn_product_ids,
"quantity": txn_quantities,
"amount": txn_amounts,
"timestamp": txn_timestamps,
"status": txn_statuses,
"payment_method": txn_payment_methods,
})
upload_and_register("transactions", transactions)
# ============================================================
# Dataset 4: server_metrics — 1M rows of infrastructure telemetry
# Shows: time-series analytics, the kind of data you'd put in a lakehouse
# ============================================================
print("Generating server_metrics (1M rows)...")
N_METRICS = 1_000_000
hosts = [f"prod-{i:03d}" for i in range(100)]
metrics_names = ["cpu_usage", "memory_usage", "disk_io", "network_in", "network_out", "request_latency", "error_rate", "gc_pause"]
regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]
m_timestamps = []
m_hosts = []
m_metrics = []
m_values = []
m_regions = []
for i in range(N_METRICS):
metric = random.choice(metrics_names)
if metric == "cpu_usage":
val = round(random.gauss(45, 20), 2)
elif metric == "memory_usage":
val = round(random.gauss(60, 15), 2)
elif metric in ("network_in", "network_out"):
val = round(random.expovariate(1/1000), 2)
elif metric == "request_latency":
val = round(random.expovariate(1/50), 2)
elif metric == "error_rate":
val = round(random.expovariate(1/2), 4)
else:
val = round(random.uniform(0, 100), 2)
m_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
m_hosts.append(random.choice(hosts))
m_metrics.append(metric)
m_values.append(max(0, val))
m_regions.append(random.choice(regions))
server_metrics = pa.table({
"timestamp": m_timestamps,
"host": m_hosts,
"metric": m_metrics,
"value": m_values,
"region": m_regions,
})
upload_and_register("server_metrics", server_metrics)
print(f"\nDone — 4 datasets, {N_EVENTS + 5000 + N_TXN + N_METRICS:,} total rows")
print("\nDemo queries to try:")
print(' "How many page views per country, sorted by volume?"')
print(' "What are the top 10 products by total revenue?"')
print(' "Show average CPU usage per host in us-east-1"')
print(' "Which payment method has the highest failure rate?"')
print(' "What are the busiest hours for web traffic?"')

459
scripts/staffing_demo.py Normal file
View File

@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""
Realistic staffing company data generator.
Multiple source systems, overlapping data, real cross-reference problems.
Data sources (like a real staffing company):
- ATS (Applicant Tracking System) candidates
- CRM client companies + contacts
- Job board job orders with descriptions
- Placements who got placed where
- Timesheets hours worked, bill/pay rates
- Phone system CDR call detail records
- Email logs communication tracking
"""
import random, json, urllib.request, hashlib, string, time
from datetime import datetime, timedelta
import pyarrow as pa, pyarrow.parquet as pq
API = "http://localhost:3100"
random.seed(2026)
def upload(name, table):
path = f"/tmp/{name}.parquet"
pq.write_table(table, path, compression="snappy")
with open(path, "rb") as f:
data = f.read()
key = f"datasets/{name}.parquet"
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
urllib.request.urlopen(req)
body = json.dumps({"name": name, "schema_fingerprint": "auto",
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]}).encode()
req = urllib.request.Request(f"{API}/catalog/datasets", data=body, method="POST",
headers={"Content-Type": "application/json"})
urllib.request.urlopen(req)
print(f" {name}: {table.num_rows:,} rows ({len(data)/1024:.0f} KB)")
# ============================================================
# Shared reference data
# ============================================================
first_names = ["James","Mary","Robert","Patricia","John","Jennifer","Michael","Linda","David","Elizabeth",
"William","Barbara","Richard","Susan","Joseph","Jessica","Thomas","Sarah","Christopher","Karen",
"Charles","Lisa","Daniel","Nancy","Matthew","Betty","Anthony","Margaret","Mark","Sandra",
"Donald","Ashley","Steven","Dorothy","Paul","Kimberly","Andrew","Emily","Joshua","Donna",
"Kenneth","Michelle","Kevin","Carol","Brian","Amanda","George","Melissa","Timothy","Deborah",
"Ronald","Stephanie","Edward","Rebecca","Jason","Sharon","Jeffrey","Laura","Ryan","Cynthia",
"Jacob","Kathleen","Gary","Amy","Nicholas","Angela","Eric","Shirley","Jonathan","Anna",
"Stephen","Brenda","Larry","Pamela","Justin","Emma","Scott","Nicole","Brandon","Helen",
"Benjamin","Samantha","Samuel","Katherine","Raymond","Christine","Gregory","Debra","Frank","Rachel",
"Alexander","Carolyn","Patrick","Janet","Jack","Catherine","Dennis","Maria","Jerry","Heather"]
last_names = ["Smith","Johnson","Williams","Brown","Jones","Garcia","Miller","Davis","Rodriguez","Martinez",
"Hernandez","Lopez","Gonzalez","Wilson","Anderson","Thomas","Taylor","Moore","Jackson","Martin",
"Lee","Perez","Thompson","White","Harris","Sanchez","Clark","Ramirez","Lewis","Robinson",
"Walker","Young","Allen","King","Wright","Scott","Torres","Nguyen","Hill","Flores",
"Green","Adams","Nelson","Baker","Hall","Rivera","Campbell","Mitchell","Carter","Roberts",
"Gomez","Phillips","Evans","Turner","Diaz","Parker","Cruz","Edwards","Collins","Reyes",
"Stewart","Morris","Morales","Murphy","Cook","Rogers","Gutierrez","Ortiz","Morgan","Cooper",
"Peterson","Bailey","Reed","Kelly","Howard","Ramos","Kim","Cox","Ward","Richardson"]
cities_zips = [
("Chicago","IL","60601"),("Chicago","IL","60602"),("Chicago","IL","60603"),("Chicago","IL","60610"),
("Chicago","IL","60614"),("Chicago","IL","60616"),("Chicago","IL","60622"),("Chicago","IL","60647"),
("New York","NY","10001"),("New York","NY","10002"),("New York","NY","10003"),("New York","NY","10010"),
("New York","NY","10016"),("New York","NY","10019"),("New York","NY","10022"),("New York","NY","10036"),
("Los Angeles","CA","90001"),("Los Angeles","CA","90012"),("Los Angeles","CA","90024"),("Los Angeles","CA","90036"),
("Houston","TX","77001"),("Houston","TX","77002"),("Houston","TX","77003"),("Houston","TX","77019"),
("Dallas","TX","75201"),("Dallas","TX","75202"),("Dallas","TX","75204"),("Dallas","TX","75219"),
("Atlanta","GA","30301"),("Atlanta","GA","30303"),("Atlanta","GA","30305"),("Atlanta","GA","30309"),
("Denver","CO","80201"),("Denver","CO","80202"),("Denver","CO","80204"),("Denver","CO","80206"),
("Phoenix","AZ","85001"),("Phoenix","AZ","85003"),("Phoenix","AZ","85004"),("Phoenix","AZ","85006"),
("Seattle","WA","98101"),("Seattle","WA","98102"),("Seattle","WA","98103"),("Seattle","WA","98104"),
("Miami","FL","33101"),("Miami","FL","33125"),("Miami","FL","33130"),("Miami","FL","33132"),
]
skills_pool = {
"IT": ["Java","Python","C#",".NET","JavaScript","TypeScript","React","Angular","Node.js","SQL",
"AWS","Azure","GCP","Docker","Kubernetes","Linux","Git","REST APIs","GraphQL","MongoDB",
"PostgreSQL","MySQL","Redis","Terraform","Jenkins","CI/CD","Agile","Scrum","DevOps","Microservices",
"Spring Boot","Django","Flask","Ruby on Rails","Go","Rust","Swift","Kotlin","PHP","Vue.js"],
"Healthcare": ["RN","LPN","CNA","BLS","ACLS","PALS","EMR","Epic","Cerner","Meditech",
"ICD-10","CPT","Medical Billing","Medical Coding","HIPAA","Phlebotomy","IV Therapy",
"Telemetry","ICU","OR","ER","Med-Surg","Labor & Delivery","Pediatrics","Oncology"],
"Industrial": ["Forklift","OSHA 10","OSHA 30","Welding","MIG","TIG","CNC","PLC","Blueprint Reading",
"Quality Control","Six Sigma","Lean Manufacturing","AutoCAD","SolidWorks","GD&T",
"Mechanical Assembly","Electrical","Hydraulics","Pneumatics","Warehouse"],
"Accounting": ["QuickBooks","SAP","Oracle","Accounts Payable","Accounts Receivable","General Ledger",
"Financial Reporting","Tax Preparation","CPA","Payroll","Budgeting","Forecasting",
"Audit","Compliance","Excel Advanced","Power BI","Tableau","GAAP","SOX"],
"Admin": ["Microsoft Office","Data Entry","Customer Service","Scheduling","Filing","Receptionist",
"Executive Assistant","Travel Coordination","Calendar Management","SAP","Salesforce",
"CRM","Multi-line Phone","Typing 60+ WPM","Notary","Bilingual Spanish"],
}
verticals = list(skills_pool.keys())
email_domains = ["gmail.com","yahoo.com","hotmail.com","outlook.com","aol.com","icloud.com","protonmail.com"]
def make_phone():
return f"({random.randint(200,999)}) {random.randint(200,999)}-{random.randint(1000,9999)}"
def make_email(first, last):
sep = random.choice([".", "_", ""])
num = random.choice(["", str(random.randint(1,99))])
return f"{first.lower()}{sep}{last.lower()}{num}@{random.choice(email_domains)}"
base_date = datetime(2026, 1, 1)
# ============================================================
# 1. CANDIDATES — 15,000 from ATS
# ============================================================
print("Generating candidates (15K)...")
N_CAND = 15000
c_ids, c_first, c_last, c_emails, c_phones, c_phones_alt = [], [], [], [], [], []
c_city, c_state, c_zip = [], [], []
c_vertical, c_skills, c_resume_summary = [], [], []
c_status, c_source, c_pay_rate_min, c_created = [], [], [], []
c_availability, c_years_exp = [], []
for i in range(N_CAND):
fn = random.choice(first_names)
ln = random.choice(last_names)
city, state, zipcode = random.choice(cities_zips)
vert = random.choice(verticals)
n_skills = random.randint(3, 12)
sk = random.sample(skills_pool[vert], min(n_skills, len(skills_pool[vert])))
yrs = random.randint(0, 25)
resume = f"{fn} {ln}{vert} professional with {yrs} years experience. "
resume += f"Based in {city}, {state} {zipcode}. "
resume += f"Key skills: {', '.join(sk)}. "
resume += random.choice([
f"Previously worked at {random.choice(['Acme Corp','TechFlow','GlobalStaff','MedPro','BuildRight'])} as a {random.choice(['Senior','Lead','Staff','Junior'])} {vert} specialist.",
f"Seeking {random.choice(['contract','full-time','temp-to-hire'])} opportunities in the {city} metro area.",
f"Available {random.choice(['immediately','in 2 weeks','after current contract ends'])}. Open to {random.choice(['remote','hybrid','on-site'])} work.",
])
c_ids.append(f"CAND-{i+1:05d}")
c_first.append(fn)
c_last.append(ln)
c_emails.append(make_email(fn, ln))
c_phones.append(make_phone())
c_phones_alt.append(make_phone() if random.random() < 0.3 else "")
c_city.append(city)
c_state.append(state)
c_zip.append(zipcode)
c_vertical.append(vert)
c_skills.append("|".join(sk))
c_resume_summary.append(resume)
c_status.append(random.choice(["active","active","active","active","inactive","do_not_contact","placed"]))
c_source.append(random.choice(["Indeed","LinkedIn","Referral","Walk-in","Monster","CareerBuilder","Website","Job Fair"]))
c_pay_rate_min.append(round(random.uniform(12, 85), 2))
c_created.append((base_date - timedelta(days=random.randint(0, 1095))).strftime("%Y-%m-%d"))
c_availability.append(random.choice(["immediate","1_week","2_weeks","1_month","not_available"]))
c_years_exp.append(yrs)
candidates = pa.table({
"candidate_id": c_ids, "first_name": c_first, "last_name": c_last,
"email": c_emails, "phone": c_phones, "phone_alt": c_phones_alt,
"city": c_city, "state": c_state, "zip": c_zip,
"vertical": c_vertical, "skills": c_skills, "resume_summary": c_resume_summary,
"status": c_status, "source": c_source, "min_pay_rate": c_pay_rate_min,
"created_date": c_created, "availability": c_availability, "years_experience": c_years_exp,
})
upload("candidates", candidates)
# ============================================================
# 2. CLIENTS — 500 companies
# ============================================================
print("Generating clients (500)...")
company_prefixes = ["Apex","Summit","Core","First","National","Metro","Pacific","Atlantic","Central","Premier",
"Global","United","Alliance","Pinnacle","Elite","Horizon","Pioneer","Titan","Quantum","Vertex"]
company_suffixes = ["Industries","Solutions","Systems","Group","Corp","Technologies","Services","Partners",
"Holdings","Enterprises","Manufacturing","Healthcare","Logistics","Financial","Engineering"]
cl_ids, cl_names, cl_verticals, cl_contacts, cl_contact_emails, cl_contact_phones = [], [], [], [], [], []
cl_city, cl_state, cl_zip, cl_bill_rate_avg, cl_status, cl_since = [], [], [], [], [], []
for i in range(500):
name = f"{random.choice(company_prefixes)} {random.choice(company_suffixes)}"
city, state, zipcode = random.choice(cities_zips)
vert = random.choice(verticals)
contact_fn = random.choice(first_names)
contact_ln = random.choice(last_names)
cl_ids.append(f"CLI-{i+1:04d}")
cl_names.append(name)
cl_verticals.append(vert)
cl_contacts.append(f"{contact_fn} {contact_ln}")
cl_contact_emails.append(f"{contact_fn.lower()}.{contact_ln.lower()}@{name.lower().replace(' ','')}.com")
cl_contact_phones.append(make_phone())
cl_city.append(city)
cl_state.append(state)
cl_zip.append(zipcode)
cl_bill_rate_avg.append(round(random.uniform(25, 150), 2))
cl_status.append(random.choice(["active","active","active","inactive","prospect"]))
cl_since.append((base_date - timedelta(days=random.randint(30, 2000))).strftime("%Y-%m-%d"))
clients = pa.table({
"client_id": cl_ids, "company_name": cl_names, "vertical": cl_verticals,
"contact_name": cl_contacts, "contact_email": cl_contact_emails, "contact_phone": cl_contact_phones,
"city": cl_city, "state": cl_state, "zip": cl_zip,
"avg_bill_rate": cl_bill_rate_avg, "status": cl_status, "client_since": cl_since,
})
upload("clients", clients)
# ============================================================
# 3. JOB ORDERS — 3,000 open/filled/closed
# ============================================================
print("Generating job_orders (3K)...")
titles = {
"IT": ["Software Developer","Java Developer",".NET Developer","DevOps Engineer","Data Analyst",
"QA Engineer","Systems Admin","Help Desk","Network Engineer","Cloud Architect",
"Full Stack Developer","Python Developer","React Developer","DBA","Security Analyst"],
"Healthcare": ["Registered Nurse","LPN","CNA","Medical Assistant","Phlebotomist",
"Radiology Tech","Pharmacy Tech","Medical Coder","Billing Specialist","Case Manager"],
"Industrial": ["Forklift Operator","Welder","CNC Machinist","Quality Inspector","Maintenance Tech",
"Electrician","Warehouse Associate","Assembly Technician","Production Supervisor","Shipping Clerk"],
"Accounting": ["Staff Accountant","AP Specialist","AR Specialist","Payroll Clerk","Tax Preparer",
"Financial Analyst","Bookkeeper","Audit Associate","Controller","Cost Accountant"],
"Admin": ["Administrative Assistant","Executive Assistant","Receptionist","Data Entry Clerk","Office Manager",
"Customer Service Rep","HR Coordinator","Legal Secretary","Office Coordinator","Scheduler"],
}
jo_ids, jo_client_ids, jo_titles, jo_verticals, jo_descriptions = [], [], [], [], []
jo_city, jo_state, jo_zip = [], [], []
jo_bill_rate, jo_pay_rate, jo_status, jo_openings, jo_created = [], [], [], [], []
jo_work_type, jo_duration = [], []
for i in range(3000):
vert = random.choice(verticals)
title = random.choice(titles[vert])
ci = random.randint(0, 499)
city, state, zipcode = random.choice(cities_zips)
bill = round(random.uniform(25, 150), 2)
pay = round(bill * random.uniform(0.55, 0.75), 2)
req_skills = random.sample(skills_pool[vert], min(random.randint(3, 6), len(skills_pool[vert])))
desc = f"{title} needed for {cl_names[ci]} in {city}, {state}. "
desc += f"Requirements: {', '.join(req_skills)}. "
desc += f"{random.randint(1,10)}+ years experience preferred. "
desc += f"Bill rate: ${bill}/hr. "
desc += random.choice([
"Background check required.",
"Drug screen required.",
"Must have reliable transportation.",
"Steel-toe boots required on site.",
"Remote work available.",
"Hybrid schedule: 3 days on-site.",
])
jo_ids.append(f"JO-{i+1:05d}")
jo_client_ids.append(cl_ids[ci])
jo_titles.append(title)
jo_verticals.append(vert)
jo_descriptions.append(desc)
jo_city.append(city)
jo_state.append(state)
jo_zip.append(zipcode)
jo_bill_rate.append(bill)
jo_pay_rate.append(pay)
jo_status.append(random.choice(["open","open","open","filled","filled","closed","on_hold"]))
jo_openings.append(random.randint(1, 5))
jo_created.append((base_date - timedelta(days=random.randint(0, 365))).strftime("%Y-%m-%d"))
jo_work_type.append(random.choice(["contract","temp_to_hire","direct_hire","contract"]))
jo_duration.append(random.choice(["3 months","6 months","12 months","ongoing","project-based"]))
job_orders = pa.table({
"job_order_id": jo_ids, "client_id": jo_client_ids, "title": jo_titles,
"vertical": jo_verticals, "description": jo_descriptions,
"city": jo_city, "state": jo_state, "zip": jo_zip,
"bill_rate": jo_bill_rate, "pay_rate": jo_pay_rate, "status": jo_status,
"openings": jo_openings, "created_date": jo_created,
"work_type": jo_work_type, "duration": jo_duration,
})
upload("job_orders", job_orders)
# ============================================================
# 4. PLACEMENTS — 8,000 candidate-job matches
# ============================================================
print("Generating placements (8K)...")
p_ids, p_cand_ids, p_job_ids, p_client_ids = [], [], [], []
p_start, p_end, p_status, p_bill, p_pay, p_recruiter = [], [], [], [], [], []
recruiters = [f"{random.choice(first_names)} {random.choice(last_names)}" for _ in range(30)]
for i in range(8000):
ci = random.randint(0, N_CAND - 1)
ji = random.randint(0, 2999)
start = base_date - timedelta(days=random.randint(0, 730))
end = start + timedelta(days=random.randint(30, 365))
p_ids.append(f"PL-{i+1:05d}")
p_cand_ids.append(c_ids[ci])
p_job_ids.append(jo_ids[ji])
p_client_ids.append(jo_client_ids[ji])
p_start.append(start.strftime("%Y-%m-%d"))
p_end.append(end.strftime("%Y-%m-%d") if random.random() < 0.7 else "")
p_status.append(random.choice(["active","active","completed","completed","terminated","no_show"]))
p_bill.append(jo_bill_rate[ji])
p_pay.append(jo_pay_rate[ji])
p_recruiter.append(random.choice(recruiters))
placements = pa.table({
"placement_id": p_ids, "candidate_id": p_cand_ids, "job_order_id": p_job_ids,
"client_id": p_client_ids, "start_date": p_start, "end_date": p_end,
"status": p_status, "bill_rate": p_bill, "pay_rate": p_pay, "recruiter": p_recruiter,
})
upload("placements", placements)
# ============================================================
# 5. TIMESHEETS — 120K weekly entries
# ============================================================
print("Generating timesheets (120K)...")
ts_ids, ts_placement_ids, ts_cand_ids, ts_client_ids = [], [], [], []
ts_week_ending, ts_hours_reg, ts_hours_ot, ts_bill_total, ts_pay_total = [], [], [], [], []
ts_approved, ts_approved_by = [], []
for i in range(120000):
pi = random.randint(0, 7999)
hrs_reg = round(random.choice([40, 40, 40, 32, 24, 20, 8]), 1)
hrs_ot = round(random.choice([0, 0, 0, 0, 4, 8, 12, 16]), 1)
bill = p_bill[pi]
pay = p_pay[pi]
ts_ids.append(f"TS-{i+1:06d}")
ts_placement_ids.append(p_ids[pi])
ts_cand_ids.append(p_cand_ids[pi])
ts_client_ids.append(p_client_ids[pi])
ts_week_ending.append((base_date - timedelta(weeks=random.randint(0, 104))).strftime("%Y-%m-%d"))
ts_hours_reg.append(hrs_reg)
ts_hours_ot.append(hrs_ot)
ts_bill_total.append(round(hrs_reg * bill + hrs_ot * bill * 1.5, 2))
ts_pay_total.append(round(hrs_reg * pay + hrs_ot * pay * 1.5, 2))
ts_approved.append(random.choice([True, True, True, True, False]))
ts_approved_by.append(random.choice(cl_contacts) if ts_approved[-1] else "")
timesheets = pa.table({
"timesheet_id": ts_ids, "placement_id": ts_placement_ids,
"candidate_id": ts_cand_ids, "client_id": ts_client_ids,
"week_ending": ts_week_ending, "hours_regular": ts_hours_reg, "hours_overtime": ts_hours_ot,
"bill_total": ts_bill_total, "pay_total": ts_pay_total,
"approved": ts_approved, "approved_by": ts_approved_by,
})
upload("timesheets", timesheets)
# ============================================================
# 6. CALL LOG — 80K phone records (CDR)
# ============================================================
print("Generating call_log (80K)...")
call_ids, call_from, call_to, call_direction = [], [], [], []
call_duration, call_timestamp, call_recruiter, call_cand_id, call_disposition = [], [], [], [], []
dispositions = ["connected","voicemail","no_answer","busy","wrong_number","callback_scheduled","declined"]
for i in range(80000):
ci = random.randint(0, N_CAND - 1)
rec = random.choice(recruiters)
direction = random.choice(["outbound","outbound","outbound","inbound"])
call_ids.append(f"CALL-{i+1:06d}")
if direction == "outbound":
call_from.append(make_phone()) # recruiter's line
call_to.append(c_phones[ci])
else:
call_from.append(c_phones[ci])
call_to.append(make_phone())
call_direction.append(direction)
call_duration.append(random.randint(0, 1800))
call_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat())
call_recruiter.append(rec)
call_cand_id.append(c_ids[ci])
call_disposition.append(random.choice(dispositions))
call_log = pa.table({
"call_id": call_ids, "from_number": call_from, "to_number": call_to,
"direction": call_direction, "duration_seconds": call_duration,
"timestamp": call_timestamp, "recruiter": call_recruiter,
"candidate_id": call_cand_id, "disposition": call_disposition,
})
upload("call_log", call_log)
# ============================================================
# 7. EMAIL LOG — 60K email records
# ============================================================
print("Generating email_log (60K)...")
em_ids, em_from, em_to, em_subject, em_timestamp = [], [], [], [], []
em_recruiter, em_cand_id, em_direction, em_opened = [], [], [], []
subjects = [
"New job opportunity — {title} in {city}",
"Following up on your application",
"Interview scheduled — {title}",
"Timesheet reminder for week ending {date}",
"Your background check is complete",
"New assignment details — {client}",
"Pay rate update for your current assignment",
"Re: Availability for {title} position",
"Welcome to {client} — your first day info",
"Reference check request",
]
for i in range(60000):
ci = random.randint(0, N_CAND - 1)
ji = random.randint(0, 2999)
rec = random.choice(recruiters)
direction = random.choice(["outbound","outbound","outbound","inbound"])
subj = random.choice(subjects).format(
title=jo_titles[ji], city=jo_city[ji], date="2026-01-05", client=cl_names[random.randint(0,499)]
)
em_ids.append(f"EM-{i+1:06d}")
if direction == "outbound":
em_from.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com")
em_to.append(c_emails[ci])
else:
em_from.append(c_emails[ci])
em_to.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com")
em_subject.append(subj)
em_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat())
em_recruiter.append(rec)
em_cand_id.append(c_ids[ci])
em_direction.append(direction)
em_opened.append(random.random() < 0.6 if direction == "outbound" else True)
email_log = pa.table({
"email_id": em_ids, "from_addr": em_from, "to_addr": em_to,
"subject": em_subject, "timestamp": em_timestamp,
"recruiter": em_recruiter, "candidate_id": em_cand_id,
"direction": em_direction, "opened": em_opened,
})
upload("email_log", email_log)
# ============================================================
total = sum([candidates.num_rows, clients.num_rows, job_orders.num_rows,
placements.num_rows, timesheets.num_rows, call_log.num_rows, email_log.num_rows])
print(f"\n{'='*60}")
print(f"Staffing company data loaded: {total:,} total rows across 7 tables")
print(f"{'='*60}")
print(f"""
Cross-reference queries to try:
"Find all Java developers in Chicago who are available immediately"
"Which recruiter has the most placements this year?"
"Show me the total revenue by client for Q1 2026"
"Find candidates who were called more than 5 times but never placed"
"What's the average bill rate for .NET developers in New York?"
"Which clients have the highest overtime hours?"
"Show candidates in zip 60601 with Healthcare skills"
"Find the spread (bill - pay) by vertical"
"Which candidates have worked for multiple different clients?"
"Show email open rates by recruiter"
""")