- ingestd crate: detect file type → parse → schema detection → Parquet → catalog - CSV: auto-detect column types (int, float, bool, string), handles $, %, commas Strips dollar signs from amounts, flexible row parsing, sanitized column names - JSON: array or newline-delimited, nested object flattening (a.b.c → a_b_c) - PDF: text extraction via lopdf, one row per page (source_file, page_number, text) - Text/SMS: line-based ingestion with line numbers - Dedup: SHA-256 content hash, re-ingest same file = no-op - Gateway: POST /ingest/file multipart upload, 256MB body limit - Schema detection per ADR-010: ambiguous types default to String - 12 unit tests passing (CSV parsing, JSON flattening, type inference, dedup) - Tested: messy CSV with missing data, dollar amounts, N/A values → queryable Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
255 lines
9.3 KiB
Python
255 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""Generate realistic demo datasets that actually stress the stack."""
|
|
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
import random
|
|
import json
|
|
import urllib.request
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
API = "http://localhost:3100"
|
|
|
|
def upload_and_register(name, table):
|
|
"""Upload Parquet to storage and register in catalog."""
|
|
path = f"/tmp/{name}.parquet"
|
|
pq.write_table(table, path, compression="snappy")
|
|
size = len(open(path, "rb").read())
|
|
|
|
with open(path, "rb") as f:
|
|
data = f.read()
|
|
key = f"datasets/{name}.parquet"
|
|
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
|
|
urllib.request.urlopen(req)
|
|
|
|
body = json.dumps({
|
|
"name": name,
|
|
"schema_fingerprint": "auto",
|
|
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]
|
|
}).encode()
|
|
req = urllib.request.Request(
|
|
f"{API}/catalog/datasets", data=body, method="POST",
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
urllib.request.urlopen(req)
|
|
print(f" {name}: {table.num_rows:,} rows, {len(data):,} bytes ({len(data)/1024/1024:.1f} MB)")
|
|
|
|
# ============================================================
|
|
# Dataset 1: web_events — 500K rows of web analytics
|
|
# Shows: DataFusion scanning half a million rows in milliseconds
|
|
# ============================================================
|
|
print("Generating web_events (500K rows)...")
|
|
|
|
N_EVENTS = 500_000
|
|
countries = ["US", "UK", "DE", "FR", "JP", "BR", "IN", "AU", "CA", "KR", "MX", "NG", "ZA", "SE", "IT"]
|
|
pages = ["/", "/pricing", "/docs", "/blog", "/about", "/login", "/signup", "/dashboard", "/settings", "/api",
|
|
"/docs/getting-started", "/docs/api-reference", "/docs/tutorials", "/blog/rust-performance",
|
|
"/blog/ai-lakehouse", "/blog/parquet-vs-csv", "/products", "/products/enterprise", "/contact", "/careers"]
|
|
actions = ["pageview", "pageview", "pageview", "pageview", "click", "click", "scroll", "form_submit", "download", "signup"]
|
|
browsers = ["Chrome", "Firefox", "Safari", "Edge", "Arc"]
|
|
devices = ["desktop", "desktop", "desktop", "mobile", "mobile", "tablet"]
|
|
|
|
base_time = datetime(2026, 1, 1)
|
|
random.seed(42)
|
|
|
|
timestamps = []
|
|
user_ids = []
|
|
page_list = []
|
|
action_list = []
|
|
duration_list = []
|
|
country_list = []
|
|
browser_list = []
|
|
device_list = []
|
|
session_ids = []
|
|
|
|
for i in range(N_EVENTS):
|
|
ts = base_time + timedelta(seconds=random.randint(0, 86400 * 90)) # 90 days
|
|
timestamps.append(ts.isoformat())
|
|
user_ids.append(random.randint(1, 50000))
|
|
page_list.append(random.choice(pages))
|
|
action_list.append(random.choice(actions))
|
|
duration_list.append(random.randint(100, 300000)) # ms
|
|
country_list.append(random.choice(countries))
|
|
browser_list.append(random.choice(browsers))
|
|
device_list.append(random.choice(devices))
|
|
session_ids.append(f"sess_{random.randint(1, 200000):06d}")
|
|
|
|
web_events = pa.table({
|
|
"timestamp": timestamps,
|
|
"user_id": user_ids,
|
|
"session_id": session_ids,
|
|
"page": page_list,
|
|
"action": action_list,
|
|
"duration_ms": duration_list,
|
|
"country": country_list,
|
|
"browser": browser_list,
|
|
"device": device_list,
|
|
})
|
|
upload_and_register("web_events", web_events)
|
|
|
|
# ============================================================
|
|
# Dataset 2: products — 5K products with real-ish descriptions
|
|
# Shows: AI can read and understand product data, semantic search
|
|
# ============================================================
|
|
print("Generating products (5K rows)...")
|
|
|
|
categories = ["SaaS", "API", "Database", "Analytics", "Security", "DevOps", "AI/ML", "Storage", "Networking", "Monitoring"]
|
|
adjectives = ["Enterprise", "Cloud-Native", "Open-Source", "Serverless", "Real-Time", "Distributed", "Scalable", "Lightweight", "High-Performance", "Self-Hosted"]
|
|
nouns = ["Platform", "Engine", "Gateway", "Toolkit", "Framework", "Suite", "Service", "Connector", "Pipeline", "Hub"]
|
|
features = [
|
|
"with built-in authentication and RBAC",
|
|
"featuring automatic horizontal scaling",
|
|
"with zero-config deployment",
|
|
"supporting 100+ integrations",
|
|
"with sub-millisecond latency",
|
|
"featuring end-to-end encryption",
|
|
"with real-time dashboards",
|
|
"supporting multi-region replication",
|
|
"with built-in CI/CD pipelines",
|
|
"featuring AI-powered anomaly detection",
|
|
"with comprehensive audit logging",
|
|
"supporting GraphQL and REST APIs",
|
|
"with automated backup and recovery",
|
|
"featuring smart caching layers",
|
|
"with native Kubernetes support",
|
|
]
|
|
|
|
product_ids = []
|
|
product_names = []
|
|
product_categories = []
|
|
product_prices = []
|
|
product_descriptions = []
|
|
product_ratings = []
|
|
product_reviews_count = []
|
|
product_created = []
|
|
|
|
for i in range(5000):
|
|
cat = random.choice(categories)
|
|
adj = random.choice(adjectives)
|
|
noun = random.choice(nouns)
|
|
feat = random.choice(features)
|
|
name = f"{adj} {cat} {noun}"
|
|
desc = f"{name} — a {cat.lower()} solution {feat}. Built for teams that need reliable {cat.lower()} infrastructure without the complexity."
|
|
|
|
product_ids.append(i + 1)
|
|
product_names.append(name)
|
|
product_categories.append(cat)
|
|
product_prices.append(round(random.uniform(9.99, 2999.99), 2))
|
|
product_descriptions.append(desc)
|
|
product_ratings.append(round(random.uniform(2.5, 5.0), 1))
|
|
product_reviews_count.append(random.randint(0, 5000))
|
|
product_created.append((base_time - timedelta(days=random.randint(0, 730))).strftime("%Y-%m-%d"))
|
|
|
|
products = pa.table({
|
|
"product_id": product_ids,
|
|
"name": product_names,
|
|
"category": product_categories,
|
|
"price": product_prices,
|
|
"description": product_descriptions,
|
|
"rating": product_ratings,
|
|
"review_count": product_reviews_count,
|
|
"created_date": product_created,
|
|
})
|
|
upload_and_register("products", products)
|
|
|
|
# ============================================================
|
|
# Dataset 3: transactions — 200K purchase records
|
|
# Shows: JOINs across datasets, aggregation at scale
|
|
# ============================================================
|
|
print("Generating transactions (200K rows)...")
|
|
|
|
N_TXN = 200_000
|
|
txn_ids = []
|
|
txn_user_ids = []
|
|
txn_product_ids = []
|
|
txn_quantities = []
|
|
txn_amounts = []
|
|
txn_timestamps = []
|
|
txn_statuses = []
|
|
txn_payment_methods = []
|
|
|
|
statuses = ["completed", "completed", "completed", "completed", "pending", "refunded", "failed"]
|
|
payments = ["credit_card", "credit_card", "credit_card", "debit_card", "paypal", "crypto", "wire_transfer"]
|
|
|
|
for i in range(N_TXN):
|
|
pid = random.randint(1, 5000)
|
|
qty = random.randint(1, 10)
|
|
price = product_prices[pid - 1]
|
|
|
|
txn_ids.append(f"TXN-{i+1:07d}")
|
|
txn_user_ids.append(random.randint(1, 50000))
|
|
txn_product_ids.append(pid)
|
|
txn_quantities.append(qty)
|
|
txn_amounts.append(round(price * qty, 2))
|
|
txn_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
|
|
txn_statuses.append(random.choice(statuses))
|
|
txn_payment_methods.append(random.choice(payments))
|
|
|
|
transactions = pa.table({
|
|
"txn_id": txn_ids,
|
|
"user_id": txn_user_ids,
|
|
"product_id": txn_product_ids,
|
|
"quantity": txn_quantities,
|
|
"amount": txn_amounts,
|
|
"timestamp": txn_timestamps,
|
|
"status": txn_statuses,
|
|
"payment_method": txn_payment_methods,
|
|
})
|
|
upload_and_register("transactions", transactions)
|
|
|
|
# ============================================================
|
|
# Dataset 4: server_metrics — 1M rows of infrastructure telemetry
|
|
# Shows: time-series analytics, the kind of data you'd put in a lakehouse
|
|
# ============================================================
|
|
print("Generating server_metrics (1M rows)...")
|
|
|
|
N_METRICS = 1_000_000
|
|
hosts = [f"prod-{i:03d}" for i in range(100)]
|
|
metrics_names = ["cpu_usage", "memory_usage", "disk_io", "network_in", "network_out", "request_latency", "error_rate", "gc_pause"]
|
|
regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"]
|
|
|
|
m_timestamps = []
|
|
m_hosts = []
|
|
m_metrics = []
|
|
m_values = []
|
|
m_regions = []
|
|
|
|
for i in range(N_METRICS):
|
|
metric = random.choice(metrics_names)
|
|
if metric == "cpu_usage":
|
|
val = round(random.gauss(45, 20), 2)
|
|
elif metric == "memory_usage":
|
|
val = round(random.gauss(60, 15), 2)
|
|
elif metric in ("network_in", "network_out"):
|
|
val = round(random.expovariate(1/1000), 2)
|
|
elif metric == "request_latency":
|
|
val = round(random.expovariate(1/50), 2)
|
|
elif metric == "error_rate":
|
|
val = round(random.expovariate(1/2), 4)
|
|
else:
|
|
val = round(random.uniform(0, 100), 2)
|
|
|
|
m_timestamps.append((base_time + timedelta(seconds=random.randint(0, 86400 * 90))).isoformat())
|
|
m_hosts.append(random.choice(hosts))
|
|
m_metrics.append(metric)
|
|
m_values.append(max(0, val))
|
|
m_regions.append(random.choice(regions))
|
|
|
|
server_metrics = pa.table({
|
|
"timestamp": m_timestamps,
|
|
"host": m_hosts,
|
|
"metric": m_metrics,
|
|
"value": m_values,
|
|
"region": m_regions,
|
|
})
|
|
upload_and_register("server_metrics", server_metrics)
|
|
|
|
print(f"\nDone — 4 datasets, {N_EVENTS + 5000 + N_TXN + N_METRICS:,} total rows")
|
|
print("\nDemo queries to try:")
|
|
print(' "How many page views per country, sorted by volume?"')
|
|
print(' "What are the top 10 products by total revenue?"')
|
|
print(' "Show average CPU usage per host in us-east-1"')
|
|
print(' "Which payment method has the highest failure rate?"')
|
|
print(' "What are the busiest hours for web traffic?"')
|