PRD v2: production roadmap with ingest, vector search, hot cache phases

- Phase 6: Ingest pipeline (CSV/JSON → schema detect → Parquet → catalog)
- Phase 7: Vector index + RAG (embed → HNSW → semantic search → LLM answer)
- Phase 8: Hot cache + incremental updates (MemTable, delta files, merge-on-read)
- ADR-008 through ADR-011: embeddings as Parquet, delta files not Delta Lake,
  schema defaults to string, not a CRM replacement
- Staffing company reference dataset (286K rows, 7 tables)
- Honest risk assessment: vector search at scale and incremental updates are hard

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-27 07:54:24 -05:00
parent b37e171e10
commit 6740a017c7
3 changed files with 640 additions and 79 deletions

View File

@ -34,3 +34,23 @@
**Date:** 2026-03-27
**Decision:** Gateway serves HTTP on :3100 (external) and gRPC on :3101 (internal). Both run in the same process.
**Rationale:** Single binary simplifies deployment. HTTP stays for browser/curl access. gRPC provides typed contracts for service-to-service calls. No premature microservice split.
## ADR-008: Embeddings stored as Parquet, not a proprietary vector DB
**Date:** 2026-03-27
**Decision:** Vector embeddings stored as Parquet files (doc_id, chunk_text, vector columns). Vector index (HNSW) serialized as a sidecar file.
**Rationale:** Keeps all data in one portable format. No vendor lock-in to Pinecone/Weaviate/Qdrant. Vectors are queryable via DataFusion like any other data. Trade-off: brute-force search is fine up to ~100K vectors; HNSW needed beyond that.
## ADR-009: Incremental updates via delta files, not Delta Lake
**Date:** 2026-03-27
**Decision:** Updates append to delta Parquet files. Queries merge base + deltas at read time. Periodic compaction merges deltas into base. Single-writer model (no concurrent writers).
**Rationale:** Full ACID over Parquet (Delta Lake/Iceberg) is a multi-year project. Our use case is single-writer (one ingest pipeline) with read-heavy workloads. Merge-on-read with compaction is sufficient and dramatically simpler.
## ADR-010: Schema detection defaults to string
**Date:** 2026-03-27
**Decision:** Ingest pipeline infers column types from data. When ambiguous or mixed, defaults to String rather than failing.
**Rationale:** Legacy data is messy. A column with "123", "N/A", and "" is a string, not an integer. Downstream queries can CAST as needed. Better to ingest everything than reject on type errors.
## ADR-011: This is not a CRM replacement
**Date:** 2026-03-27
**Decision:** The lakehouse is the analytical layer BEHIND operational systems. It ingests exports, not live data. CRM/ATS stays for daily operations.
**Rationale:** Operational systems need single-record CRUD, permissions, UI workflows. The lakehouse answers cross-cutting questions that no single operational system can. They complement, not compete.

View File

@ -1,6 +1,6 @@
# PRD: Lakehouse — Rust-First Object Storage System
**Status:** Active
**Status:** Active — Phase 0-5 complete, entering production path
**Created:** 2026-03-27
**Owner:** J
@ -8,20 +8,23 @@
## Problem
Traditional data platforms couple storage, compute, and metadata into monolithic databases. This creates vendor lock-in, scaling bottlenecks, and opaque data access. AI workloads bolt onto these systems awkwardly, sharing resources with transactional queries.
Legacy data systems silo information across CRMs, databases, spreadsheets, and file shares. Querying across them requires manual ETL, pre-defined schemas, and expensive database licenses. When AI enters the picture, these systems can't handle the dual requirement of fast analytical queries AND semantic retrieval over unstructured text.
A staffing company (our reference case) has candidate records in an ATS, client data in a CRM, timesheets in billing software, call logs from a phone system, and email records from Exchange. Answering "find every Java developer in Chicago who was called 5+ times but never placed" requires querying across all of them — and no single system can do it.
We need a system where:
- Object storage is the source of truth (not a database)
- Metadata, access, and execution are controlled by Rust services
- Queries run directly over object storage via Arrow/Parquet
- AI inference is isolated and swappable
- The entire system is rebuildable from repository + docs alone
- Any data source (CSV, DB export, PDF, JSON) can be ingested without pre-defined schemas
- Structured data is queryable via SQL at scale (millions of rows, sub-second)
- Unstructured data is searchable via AI embeddings (semantic retrieval)
- An LLM can answer natural language questions against all of it
- Everything runs locally — no cloud APIs, total data privacy
- The system is rebuildable from repository + object storage alone
---
## Solution
A modular Rust service mesh over S3-compatible object storage.
A modular Rust service mesh over S3-compatible object storage, with a local AI layer for embeddings and generation.
### Locked Stack
@ -30,14 +33,15 @@ A modular Rust service mesh over S3-compatible object storage.
| Frontend | Dioxus | Yes |
| API | Axum + Tokio | Yes |
| Object Storage Interface | Apache Arrow `object_store` | Yes |
| Storage Backend | RustFS (fallback: SeaweedFS) | Yes |
| Storage Backend | LocalFileSystem → RustFS → S3 | Yes |
| Query Engine | DataFusion | Yes |
| Data Format | Parquet + Arrow | Yes |
| RPC (internal) | tonic (gRPC) | Yes |
| AI Runtime | Ollama (local models) | Yes |
| AI Boundary | Python FastAPI sidecar → Ollama HTTP API | Yes |
| Vector Index | TBD — evaluate `hora`, `qdrant` crate, or HNSW from scratch | **Open** |
No new frameworks. No exceptions.
No new frameworks without documented ADR.
---
@ -47,84 +51,160 @@ No new frameworks. No exceptions.
| Service | Responsibility |
|---|---|
| **gateway** | HTTP ingress, routing, auth envelope, middleware |
| **catalogd** | Metadata control plane — dataset registry, schema versions, manifest index |
| **storaged** | Object I/O — read/write/list/delete via `object_store` crate |
| **queryd** | SQL execution — DataFusion over registered Parquet datasets |
| **gateway** | HTTP/gRPC ingress, routing, auth, CORS, body limits |
| **catalogd** | Metadata control plane — dataset registry, schema versions, manifests |
| **storaged** | Object I/O — read/write/list/delete via `object_store` |
| **queryd** | SQL execution — DataFusion over Parquet, MemTable hot cache |
| **ingestd** | *NEW* — Ingest pipeline: CSV/JSON/DB → normalize → Parquet → catalog |
| **vectord** | *NEW* — Embedding store + vector index: chunk → embed → index → search |
| **aibridge** | Rust↔Python boundary — HTTP client to FastAPI sidecar |
| **ui** | Dioxus frontend — dataset browser, query editor, results viewer |
| **shared** | Types, errors, Arrow helpers, protobuf definitions |
### AI Sidecar
Python FastAPI process that adapts Ollama's HTTP API into Arrow-compatible formats:
- `POST /embed``nomic-embed-text` via Ollama
- `POST /generate` → configurable model (qwen2.5, mistral, gemma2, llama3.2)
- `POST /rerank` → cross-encoder reranking via generate endpoint
No mocks. No stubs. Real models from day one. Ollama manages model lifecycle, GPU scheduling, caching. Sidecar is stateless passthrough.
| **ui** | Dioxus frontend — Ask, Explore, SQL, System tabs |
| **shared** | Types, errors, Arrow helpers, config, protobuf definitions |
### Data Flow
```
Client → gateway → catalogd (metadata lookup)
→ storaged (object read/write)
→ queryd (SQL execution over Parquet)
→ aibridge → sidecar → Ollama (inference)
Raw data → ingestd (normalize, chunk, detect schema)
├→ storaged (Parquet files to object storage)
├→ catalogd (register dataset + schema)
├→ vectord (embed text chunks, build index)
└→ queryd (auto-register as queryable table)
User question → gateway
├→ vectord (semantic search for relevant chunks) ← RAG path
├→ queryd (SQL over structured data) ← Analytics path
└→ aibridge → Ollama (generate answer from context)
```
### Query Paths
**Analytical (SQL):** "What's the average bill rate for .NET devs in Chicago?"
→ DataFusion scans Parquet columnar, returns in <200ms
**Semantic (RAG):** "Find candidates who could do data engineering work"
→ Embed question → vector search across resume embeddings → retrieve top chunks → LLM answers
**Hybrid:** "Which clients are we losing money on, and why?"
→ SQL for margin calculations + RAG over client notes/emails for context → LLM synthesizes
### Invariants
1. Object storage = source of truth for all data
2. catalogd = sole metadata authority (datasets, schemas, manifests)
3. No raw data stored in catalog — only pointers (bucket, key, schema fingerprint)
4. storaged never interprets data — dumb pipe with presigned URLs
5. queryd registers tables via catalog pointers, not by scanning storage
6. aibridge is stateless — Python sidecar is replaceable without touching Rust
7. All services are modular and independently replaceable
### Dependency Graph
```
shared ← storaged ← catalogd ← queryd
shared ← aibridge
gateway → {storaged, catalogd, queryd, aibridge}
ui → gateway (HTTP only, no crate dependency)
```
2. catalogd = sole metadata authority
3. No raw data in catalog — only pointers
4. vectord stores embeddings AS Parquet (portable, not a proprietary format)
5. ingestd is idempotent — re-ingesting the same file is a no-op
6. Hot cache is a performance layer, not a source of truth — eviction is safe
7. All services modular and independently replaceable
---
## Phases
### Phase 0: Bootstrap
Workspace compiles, gateway serves health check, structured logging works.
### Phase 0-5: Foundation ✅ COMPLETE
**Gate:** `cargo build` clean, `GET /health` returns 200, logs on stdout, docs committed.
- Rust workspace, Axum gateway, object storage, catalog, DataFusion query engine
- Python sidecar with real Ollama models (embed, generate, rerank)
- Dioxus UI with Ask (NL→SQL), Explore, SQL, System tabs
- gRPC, OpenTelemetry, auth middleware, TOML config
- Validated with 286K row staffing company dataset across 7 tables
- Cross-reference queries (JOINs across candidates, placements, timesheets, calls) in <150ms
### Phase 1: Storage + Catalog
Write Parquet to object storage, register in catalog, read back.
### Phase 6: Ingest Pipeline
**Gate:** Upload Parquet → register dataset → retrieve metadata → read back. All via gateway HTTP.
Build the data on-ramp. Accept messy real-world data, normalize it, make it queryable.
### Phase 2: Query Engine
SQL queries over registered Parquet datasets via DataFusion.
| Step | Deliverable | Gate |
|---|---|---|
| 6.1 | `ingestd` crate with CSV parser → Arrow RecordBatch → Parquet | CSV file → queryable dataset |
| 6.2 | JSON ingest (newline-delimited JSON, nested objects) | JSON file → flat Parquet |
| 6.3 | Schema detection — infer column types from data | No manual schema definition needed |
| 6.4 | Deduplication — detect and skip already-ingested files (content hash) | Re-ingest same file = no-op |
| 6.5 | Text chunking — split large text fields for embedding | Long text → overlapping chunks |
| 6.6 | Auto-registration — ingest writes to storage AND registers in catalog | Single API call: file in → queryable |
| 6.7 | Gateway endpoint: `POST /ingest` with file upload | Upload CSV from browser → query in seconds |
**Gate:** `SELECT * FROM dataset LIMIT 10` returns correct results. Resolution goes through catalog.
**Gate:** Upload a raw CSV or JSON file → auto-detected schema → stored as Parquet → registered → immediately queryable via SQL. No manual steps.
### Phase 3: AI Integration
Python sidecar with real Ollama models. Embeddings, generation, reranking.
**Risk:** Schema detection on messy data (mixed types, nulls, inconsistent formatting). Mitigation: conservative type inference (default to string), let user override.
**Gate:** Rust sends text → Python → Ollama → real embeddings return as Arrow-compatible floats.
### Phase 7: Vector Index + RAG Pipeline
### Phase 4: Frontend
Dioxus UI: dataset browser, query editor, results table.
Make unstructured data searchable by meaning, not just keywords.
**Gate:** User can browse datasets and run queries from browser.
| Step | Deliverable | Gate |
|---|---|---|
| 7.1 | `vectord` crate with embedding storage as Parquet (doc_id, chunk_text, vector) | Embeddings stored as portable Parquet |
| 7.2 | Chunking strategy — configurable chunk size + overlap for text columns | Large text fields split into embeddable chunks |
| 7.3 | Brute-force vector search via DataFusion (cosine similarity SQL) | Semantic search works, correctness verified |
| 7.4 | HNSW index for fast approximate nearest neighbor | Search over 100K+ vectors in <50ms |
| 7.5 | RAG endpoint: `POST /rag` — question → embed → search → retrieve → generate | Natural language question → grounded answer |
| 7.6 | Auto-embed on ingest — text columns automatically embedded during ingest | No separate embedding step needed |
| 7.7 | Hybrid search — combine SQL filters with vector similarity | "Java devs in Chicago" (SQL) + "who could do data engineering" (semantic) |
### Phase 5: Hardening
gRPC internals, OpenTelemetry, auth, config-driven startup.
**Gate:** Ingest 15K candidate resumes → auto-embed → ask "find someone who could handle our Kubernetes migration" → system returns relevant candidates ranked by semantic match, with LLM explanation.
**Gate:** Services communicate via gRPC. Traces propagate. Auth enforced. System restartable from repo + config.
**Risk: HNSW in Rust at scale.** This is the hardest technical problem. Options:
- `hora` crate — Rust-native ANN, but less mature than FAISS
- Store HNSW index as a serialized file alongside Parquet data
- Fallback: brute-force scan is fine up to ~100K vectors; optimize later
- Nuclear option: use Qdrant as an external vector store (breaks "no new services" rule)
**Decision needed:** Evaluate `hora` vs external Qdrant vs brute-force at J's data scale.
### Phase 8: Hot Cache + Incremental Updates
Make frequently-accessed data fast, and handle real-time updates without full rewrite.
| Step | Deliverable | Gate |
|---|---|---|
| 8.1 | MemTable hot cache — pin active datasets in memory | Queries on hot data: <10ms |
| 8.2 | Cache policy — LRU eviction based on access patterns | Memory-bounded, auto-manages |
| 8.3 | Incremental writes — append new rows without rewriting entire Parquet file | Update one candidate's phone → no full table rewrite |
| 8.4 | Merge-on-read — query combines base Parquet + delta files | Correct results from base + updates |
| 8.5 | Compaction — periodic merge of delta files into base Parquet | Prevent delta file proliferation |
| 8.6 | Upsert semantics — insert or update by primary key | Same candidate ID → update in place |
**Gate:** Update a single row in a 15K-row dataset. Query reflects the change immediately. No full Parquet rewrite. Memory cache serves hot data in <10ms.
**Risk: This is the Delta Lake problem.** Full ACID transactions over Parquet files is what Databricks spent years building. We're NOT building Delta Lake — we're building a pragmatic version:
- Append-only delta files (easy)
- Merge-on-read (moderate)
- Compaction (moderate)
- Full ACID isolation (NOT attempting — single-writer model instead)
### Phase 9+: Future (not designed yet)
- Database connector ingest (PostgreSQL, MySQL, MSSQL → Parquet)
- PDF/document ingest (OCR → text → chunks → embed)
- Scheduled ingest (cron-based file watching)
- Multi-node query distribution
- Row-level access control
- Audit log (who queried what, when)
---
## Reference Dataset: Staffing Company
Validated with realistic staffing company data:
| Table | Rows | Description |
|---|---|---|
| candidates | 15,000 | Names, phones, emails, zip, skills, resume text, availability |
| clients | 500 | Companies, contacts, verticals, bill rates |
| job_orders | 3,000 | Positions with descriptions, requirements, rates |
| placements | 8,000 | Candidate↔job matches with dates, rates, recruiters |
| timesheets | 120,000 | Weekly hours, bill/pay totals, approvals |
| call_log | 80,000 | Phone CDR — who called whom, duration, disposition |
| email_log | 60,000 | Email tracking — subject, opened, direction |
| **Total** | **286,500** | **7 tables, cross-referenced** |
Proven queries:
- Candidate search by skills + location + availability: 80ms
- Revenue by client with profit margins (JOIN 120K timesheets): 142ms
- Cold lead detection (candidates called 5+ times, never placed): 94ms
- Margin analysis by vertical (JOIN placements → job orders): 53ms
- Natural language → AI-generated SQL → execution → results: ~3s (model inference)
---
@ -132,24 +212,22 @@ gRPC internals, OpenTelemetry, auth, config-driven startup.
| Model | Use |
|---|---|
| `nomic-embed-text` | Embeddings (768d) |
| `qwen2.5` | Code generation, structured output |
| `mistral` | General generation |
| `nomic-embed-text` | Embeddings (768d) — semantic search, RAG retrieval |
| `qwen2.5` | SQL generation, structured output, summarization |
| `mistral` | General generation, longer context |
| `gemma2` | General generation |
| `llama3.2` | General generation |
Model selection via environment variables. No hardcoded model names in Rust code.
| `llama3.2` | General generation, lightweight |
---
## Non-Goals
- Multi-tenancy
- Streaming ingestion / CDC
- Custom file formats
- Query caching / materialized views
- Wrapping `object_store` with another abstraction
- Cloud deployment (local-first)
- Multi-tenancy (single-owner system)
- Cloud deployment (local-first, always)
- Full ACID transactions (single-writer model is sufficient)
- Real-time streaming / CDC (batch ingest is the model)
- Replacing the CRM (this is the analytical layer BEHIND the CRM)
- Custom file formats (Parquet is the format, period)
---
@ -157,11 +235,13 @@ Model selection via environment variables. No hardcoded model names in Rust code
| Risk | Severity | Mitigation |
|---|---|---|
| RustFS immaturity | High | Start with LocalFileSystem, test against MinIO, RustFS last. SeaweedFS fallback. |
| DataFusion table registration overhead | Medium | Lazy registration + LRU cache of SessionContext instances. |
| Catalog consistency without DB | Medium | Write-ahead: persist manifest before in-memory update. Rebuild from storage on restart. |
| Dioxus WASM gaps | Medium | Phase 4 is last. Fallback to plain HTML if blocked. |
| Schema evolution | Medium | Schema fingerprinting in Phase 1. Validate before query. |
| Vector search in Rust at scale | **High** | Start brute-force, evaluate `hora` crate, Qdrant as fallback |
| Incremental updates on Parquet | **High** | Delta files + merge-on-read, NOT full Delta Lake |
| Legacy data messiness | **High** | Conservative schema detection, default to string, user overrides |
| Schema evolution across ingests | **Medium** | Schema fingerprinting + versioned manifests |
| Memory pressure from hot cache | **Medium** | LRU eviction, configurable memory limit |
| HNSW index persistence | **Medium** | Serialize alongside Parquet, rebuild on startup |
| Python sidecar as bottleneck | **Low** | Can replace with direct Ollama HTTP from Rust later |
---
@ -173,3 +253,5 @@ Model selection via environment variables. No hardcoded model names in Rust code
4. No silent architecture drift
5. Always work in smallest valid step
6. Always verify before moving on
7. **New:** Flag when something is genuinely hard vs just engineering work
8. **New:** If a phase reveals the approach is wrong, update the PRD before continuing

459
scripts/staffing_demo.py Normal file
View File

@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""
Realistic staffing company data generator.
Multiple source systems, overlapping data, real cross-reference problems.
Data sources (like a real staffing company):
- ATS (Applicant Tracking System) candidates
- CRM client companies + contacts
- Job board job orders with descriptions
- Placements who got placed where
- Timesheets hours worked, bill/pay rates
- Phone system CDR call detail records
- Email logs communication tracking
"""
import random, json, urllib.request, hashlib, string, time
from datetime import datetime, timedelta
import pyarrow as pa, pyarrow.parquet as pq
API = "http://localhost:3100"
random.seed(2026)
def upload(name, table):
path = f"/tmp/{name}.parquet"
pq.write_table(table, path, compression="snappy")
with open(path, "rb") as f:
data = f.read()
key = f"datasets/{name}.parquet"
req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT")
urllib.request.urlopen(req)
body = json.dumps({"name": name, "schema_fingerprint": "auto",
"objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]}).encode()
req = urllib.request.Request(f"{API}/catalog/datasets", data=body, method="POST",
headers={"Content-Type": "application/json"})
urllib.request.urlopen(req)
print(f" {name}: {table.num_rows:,} rows ({len(data)/1024:.0f} KB)")
# ============================================================
# Shared reference data
# ============================================================
first_names = ["James","Mary","Robert","Patricia","John","Jennifer","Michael","Linda","David","Elizabeth",
"William","Barbara","Richard","Susan","Joseph","Jessica","Thomas","Sarah","Christopher","Karen",
"Charles","Lisa","Daniel","Nancy","Matthew","Betty","Anthony","Margaret","Mark","Sandra",
"Donald","Ashley","Steven","Dorothy","Paul","Kimberly","Andrew","Emily","Joshua","Donna",
"Kenneth","Michelle","Kevin","Carol","Brian","Amanda","George","Melissa","Timothy","Deborah",
"Ronald","Stephanie","Edward","Rebecca","Jason","Sharon","Jeffrey","Laura","Ryan","Cynthia",
"Jacob","Kathleen","Gary","Amy","Nicholas","Angela","Eric","Shirley","Jonathan","Anna",
"Stephen","Brenda","Larry","Pamela","Justin","Emma","Scott","Nicole","Brandon","Helen",
"Benjamin","Samantha","Samuel","Katherine","Raymond","Christine","Gregory","Debra","Frank","Rachel",
"Alexander","Carolyn","Patrick","Janet","Jack","Catherine","Dennis","Maria","Jerry","Heather"]
last_names = ["Smith","Johnson","Williams","Brown","Jones","Garcia","Miller","Davis","Rodriguez","Martinez",
"Hernandez","Lopez","Gonzalez","Wilson","Anderson","Thomas","Taylor","Moore","Jackson","Martin",
"Lee","Perez","Thompson","White","Harris","Sanchez","Clark","Ramirez","Lewis","Robinson",
"Walker","Young","Allen","King","Wright","Scott","Torres","Nguyen","Hill","Flores",
"Green","Adams","Nelson","Baker","Hall","Rivera","Campbell","Mitchell","Carter","Roberts",
"Gomez","Phillips","Evans","Turner","Diaz","Parker","Cruz","Edwards","Collins","Reyes",
"Stewart","Morris","Morales","Murphy","Cook","Rogers","Gutierrez","Ortiz","Morgan","Cooper",
"Peterson","Bailey","Reed","Kelly","Howard","Ramos","Kim","Cox","Ward","Richardson"]
cities_zips = [
("Chicago","IL","60601"),("Chicago","IL","60602"),("Chicago","IL","60603"),("Chicago","IL","60610"),
("Chicago","IL","60614"),("Chicago","IL","60616"),("Chicago","IL","60622"),("Chicago","IL","60647"),
("New York","NY","10001"),("New York","NY","10002"),("New York","NY","10003"),("New York","NY","10010"),
("New York","NY","10016"),("New York","NY","10019"),("New York","NY","10022"),("New York","NY","10036"),
("Los Angeles","CA","90001"),("Los Angeles","CA","90012"),("Los Angeles","CA","90024"),("Los Angeles","CA","90036"),
("Houston","TX","77001"),("Houston","TX","77002"),("Houston","TX","77003"),("Houston","TX","77019"),
("Dallas","TX","75201"),("Dallas","TX","75202"),("Dallas","TX","75204"),("Dallas","TX","75219"),
("Atlanta","GA","30301"),("Atlanta","GA","30303"),("Atlanta","GA","30305"),("Atlanta","GA","30309"),
("Denver","CO","80201"),("Denver","CO","80202"),("Denver","CO","80204"),("Denver","CO","80206"),
("Phoenix","AZ","85001"),("Phoenix","AZ","85003"),("Phoenix","AZ","85004"),("Phoenix","AZ","85006"),
("Seattle","WA","98101"),("Seattle","WA","98102"),("Seattle","WA","98103"),("Seattle","WA","98104"),
("Miami","FL","33101"),("Miami","FL","33125"),("Miami","FL","33130"),("Miami","FL","33132"),
]
skills_pool = {
"IT": ["Java","Python","C#",".NET","JavaScript","TypeScript","React","Angular","Node.js","SQL",
"AWS","Azure","GCP","Docker","Kubernetes","Linux","Git","REST APIs","GraphQL","MongoDB",
"PostgreSQL","MySQL","Redis","Terraform","Jenkins","CI/CD","Agile","Scrum","DevOps","Microservices",
"Spring Boot","Django","Flask","Ruby on Rails","Go","Rust","Swift","Kotlin","PHP","Vue.js"],
"Healthcare": ["RN","LPN","CNA","BLS","ACLS","PALS","EMR","Epic","Cerner","Meditech",
"ICD-10","CPT","Medical Billing","Medical Coding","HIPAA","Phlebotomy","IV Therapy",
"Telemetry","ICU","OR","ER","Med-Surg","Labor & Delivery","Pediatrics","Oncology"],
"Industrial": ["Forklift","OSHA 10","OSHA 30","Welding","MIG","TIG","CNC","PLC","Blueprint Reading",
"Quality Control","Six Sigma","Lean Manufacturing","AutoCAD","SolidWorks","GD&T",
"Mechanical Assembly","Electrical","Hydraulics","Pneumatics","Warehouse"],
"Accounting": ["QuickBooks","SAP","Oracle","Accounts Payable","Accounts Receivable","General Ledger",
"Financial Reporting","Tax Preparation","CPA","Payroll","Budgeting","Forecasting",
"Audit","Compliance","Excel Advanced","Power BI","Tableau","GAAP","SOX"],
"Admin": ["Microsoft Office","Data Entry","Customer Service","Scheduling","Filing","Receptionist",
"Executive Assistant","Travel Coordination","Calendar Management","SAP","Salesforce",
"CRM","Multi-line Phone","Typing 60+ WPM","Notary","Bilingual Spanish"],
}
verticals = list(skills_pool.keys())
email_domains = ["gmail.com","yahoo.com","hotmail.com","outlook.com","aol.com","icloud.com","protonmail.com"]
def make_phone():
return f"({random.randint(200,999)}) {random.randint(200,999)}-{random.randint(1000,9999)}"
def make_email(first, last):
sep = random.choice([".", "_", ""])
num = random.choice(["", str(random.randint(1,99))])
return f"{first.lower()}{sep}{last.lower()}{num}@{random.choice(email_domains)}"
base_date = datetime(2026, 1, 1)
# ============================================================
# 1. CANDIDATES — 15,000 from ATS
# ============================================================
print("Generating candidates (15K)...")
N_CAND = 15000
c_ids, c_first, c_last, c_emails, c_phones, c_phones_alt = [], [], [], [], [], []
c_city, c_state, c_zip = [], [], []
c_vertical, c_skills, c_resume_summary = [], [], []
c_status, c_source, c_pay_rate_min, c_created = [], [], [], []
c_availability, c_years_exp = [], []
for i in range(N_CAND):
fn = random.choice(first_names)
ln = random.choice(last_names)
city, state, zipcode = random.choice(cities_zips)
vert = random.choice(verticals)
n_skills = random.randint(3, 12)
sk = random.sample(skills_pool[vert], min(n_skills, len(skills_pool[vert])))
yrs = random.randint(0, 25)
resume = f"{fn} {ln}{vert} professional with {yrs} years experience. "
resume += f"Based in {city}, {state} {zipcode}. "
resume += f"Key skills: {', '.join(sk)}. "
resume += random.choice([
f"Previously worked at {random.choice(['Acme Corp','TechFlow','GlobalStaff','MedPro','BuildRight'])} as a {random.choice(['Senior','Lead','Staff','Junior'])} {vert} specialist.",
f"Seeking {random.choice(['contract','full-time','temp-to-hire'])} opportunities in the {city} metro area.",
f"Available {random.choice(['immediately','in 2 weeks','after current contract ends'])}. Open to {random.choice(['remote','hybrid','on-site'])} work.",
])
c_ids.append(f"CAND-{i+1:05d}")
c_first.append(fn)
c_last.append(ln)
c_emails.append(make_email(fn, ln))
c_phones.append(make_phone())
c_phones_alt.append(make_phone() if random.random() < 0.3 else "")
c_city.append(city)
c_state.append(state)
c_zip.append(zipcode)
c_vertical.append(vert)
c_skills.append("|".join(sk))
c_resume_summary.append(resume)
c_status.append(random.choice(["active","active","active","active","inactive","do_not_contact","placed"]))
c_source.append(random.choice(["Indeed","LinkedIn","Referral","Walk-in","Monster","CareerBuilder","Website","Job Fair"]))
c_pay_rate_min.append(round(random.uniform(12, 85), 2))
c_created.append((base_date - timedelta(days=random.randint(0, 1095))).strftime("%Y-%m-%d"))
c_availability.append(random.choice(["immediate","1_week","2_weeks","1_month","not_available"]))
c_years_exp.append(yrs)
candidates = pa.table({
"candidate_id": c_ids, "first_name": c_first, "last_name": c_last,
"email": c_emails, "phone": c_phones, "phone_alt": c_phones_alt,
"city": c_city, "state": c_state, "zip": c_zip,
"vertical": c_vertical, "skills": c_skills, "resume_summary": c_resume_summary,
"status": c_status, "source": c_source, "min_pay_rate": c_pay_rate_min,
"created_date": c_created, "availability": c_availability, "years_experience": c_years_exp,
})
upload("candidates", candidates)
# ============================================================
# 2. CLIENTS — 500 companies
# ============================================================
print("Generating clients (500)...")
company_prefixes = ["Apex","Summit","Core","First","National","Metro","Pacific","Atlantic","Central","Premier",
"Global","United","Alliance","Pinnacle","Elite","Horizon","Pioneer","Titan","Quantum","Vertex"]
company_suffixes = ["Industries","Solutions","Systems","Group","Corp","Technologies","Services","Partners",
"Holdings","Enterprises","Manufacturing","Healthcare","Logistics","Financial","Engineering"]
cl_ids, cl_names, cl_verticals, cl_contacts, cl_contact_emails, cl_contact_phones = [], [], [], [], [], []
cl_city, cl_state, cl_zip, cl_bill_rate_avg, cl_status, cl_since = [], [], [], [], [], []
for i in range(500):
name = f"{random.choice(company_prefixes)} {random.choice(company_suffixes)}"
city, state, zipcode = random.choice(cities_zips)
vert = random.choice(verticals)
contact_fn = random.choice(first_names)
contact_ln = random.choice(last_names)
cl_ids.append(f"CLI-{i+1:04d}")
cl_names.append(name)
cl_verticals.append(vert)
cl_contacts.append(f"{contact_fn} {contact_ln}")
cl_contact_emails.append(f"{contact_fn.lower()}.{contact_ln.lower()}@{name.lower().replace(' ','')}.com")
cl_contact_phones.append(make_phone())
cl_city.append(city)
cl_state.append(state)
cl_zip.append(zipcode)
cl_bill_rate_avg.append(round(random.uniform(25, 150), 2))
cl_status.append(random.choice(["active","active","active","inactive","prospect"]))
cl_since.append((base_date - timedelta(days=random.randint(30, 2000))).strftime("%Y-%m-%d"))
clients = pa.table({
"client_id": cl_ids, "company_name": cl_names, "vertical": cl_verticals,
"contact_name": cl_contacts, "contact_email": cl_contact_emails, "contact_phone": cl_contact_phones,
"city": cl_city, "state": cl_state, "zip": cl_zip,
"avg_bill_rate": cl_bill_rate_avg, "status": cl_status, "client_since": cl_since,
})
upload("clients", clients)
# ============================================================
# 3. JOB ORDERS — 3,000 open/filled/closed
# ============================================================
print("Generating job_orders (3K)...")
titles = {
"IT": ["Software Developer","Java Developer",".NET Developer","DevOps Engineer","Data Analyst",
"QA Engineer","Systems Admin","Help Desk","Network Engineer","Cloud Architect",
"Full Stack Developer","Python Developer","React Developer","DBA","Security Analyst"],
"Healthcare": ["Registered Nurse","LPN","CNA","Medical Assistant","Phlebotomist",
"Radiology Tech","Pharmacy Tech","Medical Coder","Billing Specialist","Case Manager"],
"Industrial": ["Forklift Operator","Welder","CNC Machinist","Quality Inspector","Maintenance Tech",
"Electrician","Warehouse Associate","Assembly Technician","Production Supervisor","Shipping Clerk"],
"Accounting": ["Staff Accountant","AP Specialist","AR Specialist","Payroll Clerk","Tax Preparer",
"Financial Analyst","Bookkeeper","Audit Associate","Controller","Cost Accountant"],
"Admin": ["Administrative Assistant","Executive Assistant","Receptionist","Data Entry Clerk","Office Manager",
"Customer Service Rep","HR Coordinator","Legal Secretary","Office Coordinator","Scheduler"],
}
jo_ids, jo_client_ids, jo_titles, jo_verticals, jo_descriptions = [], [], [], [], []
jo_city, jo_state, jo_zip = [], [], []
jo_bill_rate, jo_pay_rate, jo_status, jo_openings, jo_created = [], [], [], [], []
jo_work_type, jo_duration = [], []
for i in range(3000):
vert = random.choice(verticals)
title = random.choice(titles[vert])
ci = random.randint(0, 499)
city, state, zipcode = random.choice(cities_zips)
bill = round(random.uniform(25, 150), 2)
pay = round(bill * random.uniform(0.55, 0.75), 2)
req_skills = random.sample(skills_pool[vert], min(random.randint(3, 6), len(skills_pool[vert])))
desc = f"{title} needed for {cl_names[ci]} in {city}, {state}. "
desc += f"Requirements: {', '.join(req_skills)}. "
desc += f"{random.randint(1,10)}+ years experience preferred. "
desc += f"Bill rate: ${bill}/hr. "
desc += random.choice([
"Background check required.",
"Drug screen required.",
"Must have reliable transportation.",
"Steel-toe boots required on site.",
"Remote work available.",
"Hybrid schedule: 3 days on-site.",
])
jo_ids.append(f"JO-{i+1:05d}")
jo_client_ids.append(cl_ids[ci])
jo_titles.append(title)
jo_verticals.append(vert)
jo_descriptions.append(desc)
jo_city.append(city)
jo_state.append(state)
jo_zip.append(zipcode)
jo_bill_rate.append(bill)
jo_pay_rate.append(pay)
jo_status.append(random.choice(["open","open","open","filled","filled","closed","on_hold"]))
jo_openings.append(random.randint(1, 5))
jo_created.append((base_date - timedelta(days=random.randint(0, 365))).strftime("%Y-%m-%d"))
jo_work_type.append(random.choice(["contract","temp_to_hire","direct_hire","contract"]))
jo_duration.append(random.choice(["3 months","6 months","12 months","ongoing","project-based"]))
job_orders = pa.table({
"job_order_id": jo_ids, "client_id": jo_client_ids, "title": jo_titles,
"vertical": jo_verticals, "description": jo_descriptions,
"city": jo_city, "state": jo_state, "zip": jo_zip,
"bill_rate": jo_bill_rate, "pay_rate": jo_pay_rate, "status": jo_status,
"openings": jo_openings, "created_date": jo_created,
"work_type": jo_work_type, "duration": jo_duration,
})
upload("job_orders", job_orders)
# ============================================================
# 4. PLACEMENTS — 8,000 candidate-job matches
# ============================================================
print("Generating placements (8K)...")
p_ids, p_cand_ids, p_job_ids, p_client_ids = [], [], [], []
p_start, p_end, p_status, p_bill, p_pay, p_recruiter = [], [], [], [], [], []
recruiters = [f"{random.choice(first_names)} {random.choice(last_names)}" for _ in range(30)]
for i in range(8000):
ci = random.randint(0, N_CAND - 1)
ji = random.randint(0, 2999)
start = base_date - timedelta(days=random.randint(0, 730))
end = start + timedelta(days=random.randint(30, 365))
p_ids.append(f"PL-{i+1:05d}")
p_cand_ids.append(c_ids[ci])
p_job_ids.append(jo_ids[ji])
p_client_ids.append(jo_client_ids[ji])
p_start.append(start.strftime("%Y-%m-%d"))
p_end.append(end.strftime("%Y-%m-%d") if random.random() < 0.7 else "")
p_status.append(random.choice(["active","active","completed","completed","terminated","no_show"]))
p_bill.append(jo_bill_rate[ji])
p_pay.append(jo_pay_rate[ji])
p_recruiter.append(random.choice(recruiters))
placements = pa.table({
"placement_id": p_ids, "candidate_id": p_cand_ids, "job_order_id": p_job_ids,
"client_id": p_client_ids, "start_date": p_start, "end_date": p_end,
"status": p_status, "bill_rate": p_bill, "pay_rate": p_pay, "recruiter": p_recruiter,
})
upload("placements", placements)
# ============================================================
# 5. TIMESHEETS — 120K weekly entries
# ============================================================
print("Generating timesheets (120K)...")
ts_ids, ts_placement_ids, ts_cand_ids, ts_client_ids = [], [], [], []
ts_week_ending, ts_hours_reg, ts_hours_ot, ts_bill_total, ts_pay_total = [], [], [], [], []
ts_approved, ts_approved_by = [], []
for i in range(120000):
pi = random.randint(0, 7999)
hrs_reg = round(random.choice([40, 40, 40, 32, 24, 20, 8]), 1)
hrs_ot = round(random.choice([0, 0, 0, 0, 4, 8, 12, 16]), 1)
bill = p_bill[pi]
pay = p_pay[pi]
ts_ids.append(f"TS-{i+1:06d}")
ts_placement_ids.append(p_ids[pi])
ts_cand_ids.append(p_cand_ids[pi])
ts_client_ids.append(p_client_ids[pi])
ts_week_ending.append((base_date - timedelta(weeks=random.randint(0, 104))).strftime("%Y-%m-%d"))
ts_hours_reg.append(hrs_reg)
ts_hours_ot.append(hrs_ot)
ts_bill_total.append(round(hrs_reg * bill + hrs_ot * bill * 1.5, 2))
ts_pay_total.append(round(hrs_reg * pay + hrs_ot * pay * 1.5, 2))
ts_approved.append(random.choice([True, True, True, True, False]))
ts_approved_by.append(random.choice(cl_contacts) if ts_approved[-1] else "")
timesheets = pa.table({
"timesheet_id": ts_ids, "placement_id": ts_placement_ids,
"candidate_id": ts_cand_ids, "client_id": ts_client_ids,
"week_ending": ts_week_ending, "hours_regular": ts_hours_reg, "hours_overtime": ts_hours_ot,
"bill_total": ts_bill_total, "pay_total": ts_pay_total,
"approved": ts_approved, "approved_by": ts_approved_by,
})
upload("timesheets", timesheets)
# ============================================================
# 6. CALL LOG — 80K phone records (CDR)
# ============================================================
print("Generating call_log (80K)...")
call_ids, call_from, call_to, call_direction = [], [], [], []
call_duration, call_timestamp, call_recruiter, call_cand_id, call_disposition = [], [], [], [], []
dispositions = ["connected","voicemail","no_answer","busy","wrong_number","callback_scheduled","declined"]
for i in range(80000):
ci = random.randint(0, N_CAND - 1)
rec = random.choice(recruiters)
direction = random.choice(["outbound","outbound","outbound","inbound"])
call_ids.append(f"CALL-{i+1:06d}")
if direction == "outbound":
call_from.append(make_phone()) # recruiter's line
call_to.append(c_phones[ci])
else:
call_from.append(c_phones[ci])
call_to.append(make_phone())
call_direction.append(direction)
call_duration.append(random.randint(0, 1800))
call_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat())
call_recruiter.append(rec)
call_cand_id.append(c_ids[ci])
call_disposition.append(random.choice(dispositions))
call_log = pa.table({
"call_id": call_ids, "from_number": call_from, "to_number": call_to,
"direction": call_direction, "duration_seconds": call_duration,
"timestamp": call_timestamp, "recruiter": call_recruiter,
"candidate_id": call_cand_id, "disposition": call_disposition,
})
upload("call_log", call_log)
# ============================================================
# 7. EMAIL LOG — 60K email records
# ============================================================
print("Generating email_log (60K)...")
em_ids, em_from, em_to, em_subject, em_timestamp = [], [], [], [], []
em_recruiter, em_cand_id, em_direction, em_opened = [], [], [], []
subjects = [
"New job opportunity — {title} in {city}",
"Following up on your application",
"Interview scheduled — {title}",
"Timesheet reminder for week ending {date}",
"Your background check is complete",
"New assignment details — {client}",
"Pay rate update for your current assignment",
"Re: Availability for {title} position",
"Welcome to {client} — your first day info",
"Reference check request",
]
for i in range(60000):
ci = random.randint(0, N_CAND - 1)
ji = random.randint(0, 2999)
rec = random.choice(recruiters)
direction = random.choice(["outbound","outbound","outbound","inbound"])
subj = random.choice(subjects).format(
title=jo_titles[ji], city=jo_city[ji], date="2026-01-05", client=cl_names[random.randint(0,499)]
)
em_ids.append(f"EM-{i+1:06d}")
if direction == "outbound":
em_from.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com")
em_to.append(c_emails[ci])
else:
em_from.append(c_emails[ci])
em_to.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com")
em_subject.append(subj)
em_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat())
em_recruiter.append(rec)
em_cand_id.append(c_ids[ci])
em_direction.append(direction)
em_opened.append(random.random() < 0.6 if direction == "outbound" else True)
email_log = pa.table({
"email_id": em_ids, "from_addr": em_from, "to_addr": em_to,
"subject": em_subject, "timestamp": em_timestamp,
"recruiter": em_recruiter, "candidate_id": em_cand_id,
"direction": em_direction, "opened": em_opened,
})
upload("email_log", email_log)
# ============================================================
total = sum([candidates.num_rows, clients.num_rows, job_orders.num_rows,
placements.num_rows, timesheets.num_rows, call_log.num_rows, email_log.num_rows])
print(f"\n{'='*60}")
print(f"Staffing company data loaded: {total:,} total rows across 7 tables")
print(f"{'='*60}")
print(f"""
Cross-reference queries to try:
"Find all Java developers in Chicago who are available immediately"
"Which recruiter has the most placements this year?"
"Show me the total revenue by client for Q1 2026"
"Find candidates who were called more than 5 times but never placed"
"What's the average bill rate for .NET developers in New York?"
"Which clients have the highest overtime hours?"
"Show candidates in zip 60601 with Healthcare skills"
"Find the spread (bill - pay) by vertical"
"Which candidates have worked for multiple different clients?"
"Show email open rates by recruiter"
""")