diff --git a/docs/DECISIONS.md b/docs/DECISIONS.md index c2f1b93..a4f1d88 100644 --- a/docs/DECISIONS.md +++ b/docs/DECISIONS.md @@ -34,3 +34,23 @@ **Date:** 2026-03-27 **Decision:** Gateway serves HTTP on :3100 (external) and gRPC on :3101 (internal). Both run in the same process. **Rationale:** Single binary simplifies deployment. HTTP stays for browser/curl access. gRPC provides typed contracts for service-to-service calls. No premature microservice split. + +## ADR-008: Embeddings stored as Parquet, not a proprietary vector DB +**Date:** 2026-03-27 +**Decision:** Vector embeddings stored as Parquet files (doc_id, chunk_text, vector columns). Vector index (HNSW) serialized as a sidecar file. +**Rationale:** Keeps all data in one portable format. No vendor lock-in to Pinecone/Weaviate/Qdrant. Vectors are queryable via DataFusion like any other data. Trade-off: brute-force search is fine up to ~100K vectors; HNSW needed beyond that. + +## ADR-009: Incremental updates via delta files, not Delta Lake +**Date:** 2026-03-27 +**Decision:** Updates append to delta Parquet files. Queries merge base + deltas at read time. Periodic compaction merges deltas into base. Single-writer model (no concurrent writers). +**Rationale:** Full ACID over Parquet (Delta Lake/Iceberg) is a multi-year project. Our use case is single-writer (one ingest pipeline) with read-heavy workloads. Merge-on-read with compaction is sufficient and dramatically simpler. + +## ADR-010: Schema detection defaults to string +**Date:** 2026-03-27 +**Decision:** Ingest pipeline infers column types from data. When ambiguous or mixed, defaults to String rather than failing. +**Rationale:** Legacy data is messy. A column with "123", "N/A", and "" is a string, not an integer. Downstream queries can CAST as needed. Better to ingest everything than reject on type errors. + +## ADR-011: This is not a CRM replacement +**Date:** 2026-03-27 +**Decision:** The lakehouse is the analytical layer BEHIND operational systems. It ingests exports, not live data. CRM/ATS stays for daily operations. +**Rationale:** Operational systems need single-record CRUD, permissions, UI workflows. The lakehouse answers cross-cutting questions that no single operational system can. They complement, not compete. diff --git a/docs/PRD.md b/docs/PRD.md index ab56156..9b4a7aa 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -1,6 +1,6 @@ # PRD: Lakehouse — Rust-First Object Storage System -**Status:** Active +**Status:** Active — Phase 0-5 complete, entering production path **Created:** 2026-03-27 **Owner:** J @@ -8,20 +8,23 @@ ## Problem -Traditional data platforms couple storage, compute, and metadata into monolithic databases. This creates vendor lock-in, scaling bottlenecks, and opaque data access. AI workloads bolt onto these systems awkwardly, sharing resources with transactional queries. +Legacy data systems silo information across CRMs, databases, spreadsheets, and file shares. Querying across them requires manual ETL, pre-defined schemas, and expensive database licenses. When AI enters the picture, these systems can't handle the dual requirement of fast analytical queries AND semantic retrieval over unstructured text. + +A staffing company (our reference case) has candidate records in an ATS, client data in a CRM, timesheets in billing software, call logs from a phone system, and email records from Exchange. Answering "find every Java developer in Chicago who was called 5+ times but never placed" requires querying across all of them — and no single system can do it. We need a system where: -- Object storage is the source of truth (not a database) -- Metadata, access, and execution are controlled by Rust services -- Queries run directly over object storage via Arrow/Parquet -- AI inference is isolated and swappable -- The entire system is rebuildable from repository + docs alone +- Any data source (CSV, DB export, PDF, JSON) can be ingested without pre-defined schemas +- Structured data is queryable via SQL at scale (millions of rows, sub-second) +- Unstructured data is searchable via AI embeddings (semantic retrieval) +- An LLM can answer natural language questions against all of it +- Everything runs locally — no cloud APIs, total data privacy +- The system is rebuildable from repository + object storage alone --- ## Solution -A modular Rust service mesh over S3-compatible object storage. +A modular Rust service mesh over S3-compatible object storage, with a local AI layer for embeddings and generation. ### Locked Stack @@ -30,14 +33,15 @@ A modular Rust service mesh over S3-compatible object storage. | Frontend | Dioxus | Yes | | API | Axum + Tokio | Yes | | Object Storage Interface | Apache Arrow `object_store` | Yes | -| Storage Backend | RustFS (fallback: SeaweedFS) | Yes | +| Storage Backend | LocalFileSystem → RustFS → S3 | Yes | | Query Engine | DataFusion | Yes | | Data Format | Parquet + Arrow | Yes | | RPC (internal) | tonic (gRPC) | Yes | | AI Runtime | Ollama (local models) | Yes | | AI Boundary | Python FastAPI sidecar → Ollama HTTP API | Yes | +| Vector Index | TBD — evaluate `hora`, `qdrant` crate, or HNSW from scratch | **Open** | -No new frameworks. No exceptions. +No new frameworks without documented ADR. --- @@ -47,84 +51,160 @@ No new frameworks. No exceptions. | Service | Responsibility | |---|---| -| **gateway** | HTTP ingress, routing, auth envelope, middleware | -| **catalogd** | Metadata control plane — dataset registry, schema versions, manifest index | -| **storaged** | Object I/O — read/write/list/delete via `object_store` crate | -| **queryd** | SQL execution — DataFusion over registered Parquet datasets | +| **gateway** | HTTP/gRPC ingress, routing, auth, CORS, body limits | +| **catalogd** | Metadata control plane — dataset registry, schema versions, manifests | +| **storaged** | Object I/O — read/write/list/delete via `object_store` | +| **queryd** | SQL execution — DataFusion over Parquet, MemTable hot cache | +| **ingestd** | *NEW* — Ingest pipeline: CSV/JSON/DB → normalize → Parquet → catalog | +| **vectord** | *NEW* — Embedding store + vector index: chunk → embed → index → search | | **aibridge** | Rust↔Python boundary — HTTP client to FastAPI sidecar | -| **ui** | Dioxus frontend — dataset browser, query editor, results viewer | -| **shared** | Types, errors, Arrow helpers, protobuf definitions | - -### AI Sidecar - -Python FastAPI process that adapts Ollama's HTTP API into Arrow-compatible formats: -- `POST /embed` → `nomic-embed-text` via Ollama -- `POST /generate` → configurable model (qwen2.5, mistral, gemma2, llama3.2) -- `POST /rerank` → cross-encoder reranking via generate endpoint - -No mocks. No stubs. Real models from day one. Ollama manages model lifecycle, GPU scheduling, caching. Sidecar is stateless passthrough. +| **ui** | Dioxus frontend — Ask, Explore, SQL, System tabs | +| **shared** | Types, errors, Arrow helpers, config, protobuf definitions | ### Data Flow ``` -Client → gateway → catalogd (metadata lookup) - → storaged (object read/write) - → queryd (SQL execution over Parquet) - → aibridge → sidecar → Ollama (inference) +Raw data → ingestd (normalize, chunk, detect schema) + ├→ storaged (Parquet files to object storage) + ├→ catalogd (register dataset + schema) + ├→ vectord (embed text chunks, build index) + └→ queryd (auto-register as queryable table) + +User question → gateway + ├→ vectord (semantic search for relevant chunks) ← RAG path + ├→ queryd (SQL over structured data) ← Analytics path + └→ aibridge → Ollama (generate answer from context) ``` +### Query Paths + +**Analytical (SQL):** "What's the average bill rate for .NET devs in Chicago?" +→ DataFusion scans Parquet columnar, returns in <200ms + +**Semantic (RAG):** "Find candidates who could do data engineering work" +→ Embed question → vector search across resume embeddings → retrieve top chunks → LLM answers + +**Hybrid:** "Which clients are we losing money on, and why?" +→ SQL for margin calculations + RAG over client notes/emails for context → LLM synthesizes + ### Invariants 1. Object storage = source of truth for all data -2. catalogd = sole metadata authority (datasets, schemas, manifests) -3. No raw data stored in catalog — only pointers (bucket, key, schema fingerprint) -4. storaged never interprets data — dumb pipe with presigned URLs -5. queryd registers tables via catalog pointers, not by scanning storage -6. aibridge is stateless — Python sidecar is replaceable without touching Rust -7. All services are modular and independently replaceable - -### Dependency Graph - -``` -shared ← storaged ← catalogd ← queryd -shared ← aibridge -gateway → {storaged, catalogd, queryd, aibridge} -ui → gateway (HTTP only, no crate dependency) -``` +2. catalogd = sole metadata authority +3. No raw data in catalog — only pointers +4. vectord stores embeddings AS Parquet (portable, not a proprietary format) +5. ingestd is idempotent — re-ingesting the same file is a no-op +6. Hot cache is a performance layer, not a source of truth — eviction is safe +7. All services modular and independently replaceable --- ## Phases -### Phase 0: Bootstrap -Workspace compiles, gateway serves health check, structured logging works. +### Phase 0-5: Foundation ✅ COMPLETE -**Gate:** `cargo build` clean, `GET /health` returns 200, logs on stdout, docs committed. +- Rust workspace, Axum gateway, object storage, catalog, DataFusion query engine +- Python sidecar with real Ollama models (embed, generate, rerank) +- Dioxus UI with Ask (NL→SQL), Explore, SQL, System tabs +- gRPC, OpenTelemetry, auth middleware, TOML config +- Validated with 286K row staffing company dataset across 7 tables +- Cross-reference queries (JOINs across candidates, placements, timesheets, calls) in <150ms -### Phase 1: Storage + Catalog -Write Parquet to object storage, register in catalog, read back. +### Phase 6: Ingest Pipeline -**Gate:** Upload Parquet → register dataset → retrieve metadata → read back. All via gateway HTTP. +Build the data on-ramp. Accept messy real-world data, normalize it, make it queryable. -### Phase 2: Query Engine -SQL queries over registered Parquet datasets via DataFusion. +| Step | Deliverable | Gate | +|---|---|---| +| 6.1 | `ingestd` crate with CSV parser → Arrow RecordBatch → Parquet | CSV file → queryable dataset | +| 6.2 | JSON ingest (newline-delimited JSON, nested objects) | JSON file → flat Parquet | +| 6.3 | Schema detection — infer column types from data | No manual schema definition needed | +| 6.4 | Deduplication — detect and skip already-ingested files (content hash) | Re-ingest same file = no-op | +| 6.5 | Text chunking — split large text fields for embedding | Long text → overlapping chunks | +| 6.6 | Auto-registration — ingest writes to storage AND registers in catalog | Single API call: file in → queryable | +| 6.7 | Gateway endpoint: `POST /ingest` with file upload | Upload CSV from browser → query in seconds | -**Gate:** `SELECT * FROM dataset LIMIT 10` returns correct results. Resolution goes through catalog. +**Gate:** Upload a raw CSV or JSON file → auto-detected schema → stored as Parquet → registered → immediately queryable via SQL. No manual steps. -### Phase 3: AI Integration -Python sidecar with real Ollama models. Embeddings, generation, reranking. +**Risk:** Schema detection on messy data (mixed types, nulls, inconsistent formatting). Mitigation: conservative type inference (default to string), let user override. -**Gate:** Rust sends text → Python → Ollama → real embeddings return as Arrow-compatible floats. +### Phase 7: Vector Index + RAG Pipeline -### Phase 4: Frontend -Dioxus UI: dataset browser, query editor, results table. +Make unstructured data searchable by meaning, not just keywords. -**Gate:** User can browse datasets and run queries from browser. +| Step | Deliverable | Gate | +|---|---|---| +| 7.1 | `vectord` crate with embedding storage as Parquet (doc_id, chunk_text, vector) | Embeddings stored as portable Parquet | +| 7.2 | Chunking strategy — configurable chunk size + overlap for text columns | Large text fields split into embeddable chunks | +| 7.3 | Brute-force vector search via DataFusion (cosine similarity SQL) | Semantic search works, correctness verified | +| 7.4 | HNSW index for fast approximate nearest neighbor | Search over 100K+ vectors in <50ms | +| 7.5 | RAG endpoint: `POST /rag` — question → embed → search → retrieve → generate | Natural language question → grounded answer | +| 7.6 | Auto-embed on ingest — text columns automatically embedded during ingest | No separate embedding step needed | +| 7.7 | Hybrid search — combine SQL filters with vector similarity | "Java devs in Chicago" (SQL) + "who could do data engineering" (semantic) | -### Phase 5: Hardening -gRPC internals, OpenTelemetry, auth, config-driven startup. +**Gate:** Ingest 15K candidate resumes → auto-embed → ask "find someone who could handle our Kubernetes migration" → system returns relevant candidates ranked by semantic match, with LLM explanation. -**Gate:** Services communicate via gRPC. Traces propagate. Auth enforced. System restartable from repo + config. +**Risk: HNSW in Rust at scale.** This is the hardest technical problem. Options: +- `hora` crate — Rust-native ANN, but less mature than FAISS +- Store HNSW index as a serialized file alongside Parquet data +- Fallback: brute-force scan is fine up to ~100K vectors; optimize later +- Nuclear option: use Qdrant as an external vector store (breaks "no new services" rule) + +**Decision needed:** Evaluate `hora` vs external Qdrant vs brute-force at J's data scale. + +### Phase 8: Hot Cache + Incremental Updates + +Make frequently-accessed data fast, and handle real-time updates without full rewrite. + +| Step | Deliverable | Gate | +|---|---|---| +| 8.1 | MemTable hot cache — pin active datasets in memory | Queries on hot data: <10ms | +| 8.2 | Cache policy — LRU eviction based on access patterns | Memory-bounded, auto-manages | +| 8.3 | Incremental writes — append new rows without rewriting entire Parquet file | Update one candidate's phone → no full table rewrite | +| 8.4 | Merge-on-read — query combines base Parquet + delta files | Correct results from base + updates | +| 8.5 | Compaction — periodic merge of delta files into base Parquet | Prevent delta file proliferation | +| 8.6 | Upsert semantics — insert or update by primary key | Same candidate ID → update in place | + +**Gate:** Update a single row in a 15K-row dataset. Query reflects the change immediately. No full Parquet rewrite. Memory cache serves hot data in <10ms. + +**Risk: This is the Delta Lake problem.** Full ACID transactions over Parquet files is what Databricks spent years building. We're NOT building Delta Lake — we're building a pragmatic version: +- Append-only delta files (easy) +- Merge-on-read (moderate) +- Compaction (moderate) +- Full ACID isolation (NOT attempting — single-writer model instead) + +### Phase 9+: Future (not designed yet) + +- Database connector ingest (PostgreSQL, MySQL, MSSQL → Parquet) +- PDF/document ingest (OCR → text → chunks → embed) +- Scheduled ingest (cron-based file watching) +- Multi-node query distribution +- Row-level access control +- Audit log (who queried what, when) + +--- + +## Reference Dataset: Staffing Company + +Validated with realistic staffing company data: + +| Table | Rows | Description | +|---|---|---| +| candidates | 15,000 | Names, phones, emails, zip, skills, resume text, availability | +| clients | 500 | Companies, contacts, verticals, bill rates | +| job_orders | 3,000 | Positions with descriptions, requirements, rates | +| placements | 8,000 | Candidate↔job matches with dates, rates, recruiters | +| timesheets | 120,000 | Weekly hours, bill/pay totals, approvals | +| call_log | 80,000 | Phone CDR — who called whom, duration, disposition | +| email_log | 60,000 | Email tracking — subject, opened, direction | +| **Total** | **286,500** | **7 tables, cross-referenced** | + +Proven queries: +- Candidate search by skills + location + availability: 80ms +- Revenue by client with profit margins (JOIN 120K timesheets): 142ms +- Cold lead detection (candidates called 5+ times, never placed): 94ms +- Margin analysis by vertical (JOIN placements → job orders): 53ms +- Natural language → AI-generated SQL → execution → results: ~3s (model inference) --- @@ -132,24 +212,22 @@ gRPC internals, OpenTelemetry, auth, config-driven startup. | Model | Use | |---|---| -| `nomic-embed-text` | Embeddings (768d) | -| `qwen2.5` | Code generation, structured output | -| `mistral` | General generation | +| `nomic-embed-text` | Embeddings (768d) — semantic search, RAG retrieval | +| `qwen2.5` | SQL generation, structured output, summarization | +| `mistral` | General generation, longer context | | `gemma2` | General generation | -| `llama3.2` | General generation | - -Model selection via environment variables. No hardcoded model names in Rust code. +| `llama3.2` | General generation, lightweight | --- ## Non-Goals -- Multi-tenancy -- Streaming ingestion / CDC -- Custom file formats -- Query caching / materialized views -- Wrapping `object_store` with another abstraction -- Cloud deployment (local-first) +- Multi-tenancy (single-owner system) +- Cloud deployment (local-first, always) +- Full ACID transactions (single-writer model is sufficient) +- Real-time streaming / CDC (batch ingest is the model) +- Replacing the CRM (this is the analytical layer BEHIND the CRM) +- Custom file formats (Parquet is the format, period) --- @@ -157,11 +235,13 @@ Model selection via environment variables. No hardcoded model names in Rust code | Risk | Severity | Mitigation | |---|---|---| -| RustFS immaturity | High | Start with LocalFileSystem, test against MinIO, RustFS last. SeaweedFS fallback. | -| DataFusion table registration overhead | Medium | Lazy registration + LRU cache of SessionContext instances. | -| Catalog consistency without DB | Medium | Write-ahead: persist manifest before in-memory update. Rebuild from storage on restart. | -| Dioxus WASM gaps | Medium | Phase 4 is last. Fallback to plain HTML if blocked. | -| Schema evolution | Medium | Schema fingerprinting in Phase 1. Validate before query. | +| Vector search in Rust at scale | **High** | Start brute-force, evaluate `hora` crate, Qdrant as fallback | +| Incremental updates on Parquet | **High** | Delta files + merge-on-read, NOT full Delta Lake | +| Legacy data messiness | **High** | Conservative schema detection, default to string, user overrides | +| Schema evolution across ingests | **Medium** | Schema fingerprinting + versioned manifests | +| Memory pressure from hot cache | **Medium** | LRU eviction, configurable memory limit | +| HNSW index persistence | **Medium** | Serialize alongside Parquet, rebuild on startup | +| Python sidecar as bottleneck | **Low** | Can replace with direct Ollama HTTP from Rust later | --- @@ -173,3 +253,5 @@ Model selection via environment variables. No hardcoded model names in Rust code 4. No silent architecture drift 5. Always work in smallest valid step 6. Always verify before moving on +7. **New:** Flag when something is genuinely hard vs just engineering work +8. **New:** If a phase reveals the approach is wrong, update the PRD before continuing diff --git a/scripts/staffing_demo.py b/scripts/staffing_demo.py new file mode 100644 index 0000000..3b2a55a --- /dev/null +++ b/scripts/staffing_demo.py @@ -0,0 +1,459 @@ +#!/usr/bin/env python3 +""" +Realistic staffing company data generator. +Multiple source systems, overlapping data, real cross-reference problems. + +Data sources (like a real staffing company): + - ATS (Applicant Tracking System) → candidates + - CRM → client companies + contacts + - Job board → job orders with descriptions + - Placements → who got placed where + - Timesheets → hours worked, bill/pay rates + - Phone system CDR → call detail records + - Email logs → communication tracking +""" + +import random, json, urllib.request, hashlib, string, time +from datetime import datetime, timedelta +import pyarrow as pa, pyarrow.parquet as pq + +API = "http://localhost:3100" +random.seed(2026) + +def upload(name, table): + path = f"/tmp/{name}.parquet" + pq.write_table(table, path, compression="snappy") + with open(path, "rb") as f: + data = f.read() + key = f"datasets/{name}.parquet" + req = urllib.request.Request(f"{API}/storage/objects/{key}", data=data, method="PUT") + urllib.request.urlopen(req) + body = json.dumps({"name": name, "schema_fingerprint": "auto", + "objects": [{"bucket": "data", "key": key, "size_bytes": len(data)}]}).encode() + req = urllib.request.Request(f"{API}/catalog/datasets", data=body, method="POST", + headers={"Content-Type": "application/json"}) + urllib.request.urlopen(req) + print(f" {name}: {table.num_rows:,} rows ({len(data)/1024:.0f} KB)") + +# ============================================================ +# Shared reference data +# ============================================================ + +first_names = ["James","Mary","Robert","Patricia","John","Jennifer","Michael","Linda","David","Elizabeth", + "William","Barbara","Richard","Susan","Joseph","Jessica","Thomas","Sarah","Christopher","Karen", + "Charles","Lisa","Daniel","Nancy","Matthew","Betty","Anthony","Margaret","Mark","Sandra", + "Donald","Ashley","Steven","Dorothy","Paul","Kimberly","Andrew","Emily","Joshua","Donna", + "Kenneth","Michelle","Kevin","Carol","Brian","Amanda","George","Melissa","Timothy","Deborah", + "Ronald","Stephanie","Edward","Rebecca","Jason","Sharon","Jeffrey","Laura","Ryan","Cynthia", + "Jacob","Kathleen","Gary","Amy","Nicholas","Angela","Eric","Shirley","Jonathan","Anna", + "Stephen","Brenda","Larry","Pamela","Justin","Emma","Scott","Nicole","Brandon","Helen", + "Benjamin","Samantha","Samuel","Katherine","Raymond","Christine","Gregory","Debra","Frank","Rachel", + "Alexander","Carolyn","Patrick","Janet","Jack","Catherine","Dennis","Maria","Jerry","Heather"] + +last_names = ["Smith","Johnson","Williams","Brown","Jones","Garcia","Miller","Davis","Rodriguez","Martinez", + "Hernandez","Lopez","Gonzalez","Wilson","Anderson","Thomas","Taylor","Moore","Jackson","Martin", + "Lee","Perez","Thompson","White","Harris","Sanchez","Clark","Ramirez","Lewis","Robinson", + "Walker","Young","Allen","King","Wright","Scott","Torres","Nguyen","Hill","Flores", + "Green","Adams","Nelson","Baker","Hall","Rivera","Campbell","Mitchell","Carter","Roberts", + "Gomez","Phillips","Evans","Turner","Diaz","Parker","Cruz","Edwards","Collins","Reyes", + "Stewart","Morris","Morales","Murphy","Cook","Rogers","Gutierrez","Ortiz","Morgan","Cooper", + "Peterson","Bailey","Reed","Kelly","Howard","Ramos","Kim","Cox","Ward","Richardson"] + +cities_zips = [ + ("Chicago","IL","60601"),("Chicago","IL","60602"),("Chicago","IL","60603"),("Chicago","IL","60610"), + ("Chicago","IL","60614"),("Chicago","IL","60616"),("Chicago","IL","60622"),("Chicago","IL","60647"), + ("New York","NY","10001"),("New York","NY","10002"),("New York","NY","10003"),("New York","NY","10010"), + ("New York","NY","10016"),("New York","NY","10019"),("New York","NY","10022"),("New York","NY","10036"), + ("Los Angeles","CA","90001"),("Los Angeles","CA","90012"),("Los Angeles","CA","90024"),("Los Angeles","CA","90036"), + ("Houston","TX","77001"),("Houston","TX","77002"),("Houston","TX","77003"),("Houston","TX","77019"), + ("Dallas","TX","75201"),("Dallas","TX","75202"),("Dallas","TX","75204"),("Dallas","TX","75219"), + ("Atlanta","GA","30301"),("Atlanta","GA","30303"),("Atlanta","GA","30305"),("Atlanta","GA","30309"), + ("Denver","CO","80201"),("Denver","CO","80202"),("Denver","CO","80204"),("Denver","CO","80206"), + ("Phoenix","AZ","85001"),("Phoenix","AZ","85003"),("Phoenix","AZ","85004"),("Phoenix","AZ","85006"), + ("Seattle","WA","98101"),("Seattle","WA","98102"),("Seattle","WA","98103"),("Seattle","WA","98104"), + ("Miami","FL","33101"),("Miami","FL","33125"),("Miami","FL","33130"),("Miami","FL","33132"), +] + +skills_pool = { + "IT": ["Java","Python","C#",".NET","JavaScript","TypeScript","React","Angular","Node.js","SQL", + "AWS","Azure","GCP","Docker","Kubernetes","Linux","Git","REST APIs","GraphQL","MongoDB", + "PostgreSQL","MySQL","Redis","Terraform","Jenkins","CI/CD","Agile","Scrum","DevOps","Microservices", + "Spring Boot","Django","Flask","Ruby on Rails","Go","Rust","Swift","Kotlin","PHP","Vue.js"], + "Healthcare": ["RN","LPN","CNA","BLS","ACLS","PALS","EMR","Epic","Cerner","Meditech", + "ICD-10","CPT","Medical Billing","Medical Coding","HIPAA","Phlebotomy","IV Therapy", + "Telemetry","ICU","OR","ER","Med-Surg","Labor & Delivery","Pediatrics","Oncology"], + "Industrial": ["Forklift","OSHA 10","OSHA 30","Welding","MIG","TIG","CNC","PLC","Blueprint Reading", + "Quality Control","Six Sigma","Lean Manufacturing","AutoCAD","SolidWorks","GD&T", + "Mechanical Assembly","Electrical","Hydraulics","Pneumatics","Warehouse"], + "Accounting": ["QuickBooks","SAP","Oracle","Accounts Payable","Accounts Receivable","General Ledger", + "Financial Reporting","Tax Preparation","CPA","Payroll","Budgeting","Forecasting", + "Audit","Compliance","Excel Advanced","Power BI","Tableau","GAAP","SOX"], + "Admin": ["Microsoft Office","Data Entry","Customer Service","Scheduling","Filing","Receptionist", + "Executive Assistant","Travel Coordination","Calendar Management","SAP","Salesforce", + "CRM","Multi-line Phone","Typing 60+ WPM","Notary","Bilingual Spanish"], +} + +verticals = list(skills_pool.keys()) +email_domains = ["gmail.com","yahoo.com","hotmail.com","outlook.com","aol.com","icloud.com","protonmail.com"] + +def make_phone(): + return f"({random.randint(200,999)}) {random.randint(200,999)}-{random.randint(1000,9999)}" + +def make_email(first, last): + sep = random.choice([".", "_", ""]) + num = random.choice(["", str(random.randint(1,99))]) + return f"{first.lower()}{sep}{last.lower()}{num}@{random.choice(email_domains)}" + +base_date = datetime(2026, 1, 1) + +# ============================================================ +# 1. CANDIDATES — 15,000 from ATS +# ============================================================ +print("Generating candidates (15K)...") + +N_CAND = 15000 +c_ids, c_first, c_last, c_emails, c_phones, c_phones_alt = [], [], [], [], [], [] +c_city, c_state, c_zip = [], [], [] +c_vertical, c_skills, c_resume_summary = [], [], [] +c_status, c_source, c_pay_rate_min, c_created = [], [], [], [] +c_availability, c_years_exp = [], [] + +for i in range(N_CAND): + fn = random.choice(first_names) + ln = random.choice(last_names) + city, state, zipcode = random.choice(cities_zips) + vert = random.choice(verticals) + n_skills = random.randint(3, 12) + sk = random.sample(skills_pool[vert], min(n_skills, len(skills_pool[vert]))) + yrs = random.randint(0, 25) + + resume = f"{fn} {ln} — {vert} professional with {yrs} years experience. " + resume += f"Based in {city}, {state} {zipcode}. " + resume += f"Key skills: {', '.join(sk)}. " + resume += random.choice([ + f"Previously worked at {random.choice(['Acme Corp','TechFlow','GlobalStaff','MedPro','BuildRight'])} as a {random.choice(['Senior','Lead','Staff','Junior'])} {vert} specialist.", + f"Seeking {random.choice(['contract','full-time','temp-to-hire'])} opportunities in the {city} metro area.", + f"Available {random.choice(['immediately','in 2 weeks','after current contract ends'])}. Open to {random.choice(['remote','hybrid','on-site'])} work.", + ]) + + c_ids.append(f"CAND-{i+1:05d}") + c_first.append(fn) + c_last.append(ln) + c_emails.append(make_email(fn, ln)) + c_phones.append(make_phone()) + c_phones_alt.append(make_phone() if random.random() < 0.3 else "") + c_city.append(city) + c_state.append(state) + c_zip.append(zipcode) + c_vertical.append(vert) + c_skills.append("|".join(sk)) + c_resume_summary.append(resume) + c_status.append(random.choice(["active","active","active","active","inactive","do_not_contact","placed"])) + c_source.append(random.choice(["Indeed","LinkedIn","Referral","Walk-in","Monster","CareerBuilder","Website","Job Fair"])) + c_pay_rate_min.append(round(random.uniform(12, 85), 2)) + c_created.append((base_date - timedelta(days=random.randint(0, 1095))).strftime("%Y-%m-%d")) + c_availability.append(random.choice(["immediate","1_week","2_weeks","1_month","not_available"])) + c_years_exp.append(yrs) + +candidates = pa.table({ + "candidate_id": c_ids, "first_name": c_first, "last_name": c_last, + "email": c_emails, "phone": c_phones, "phone_alt": c_phones_alt, + "city": c_city, "state": c_state, "zip": c_zip, + "vertical": c_vertical, "skills": c_skills, "resume_summary": c_resume_summary, + "status": c_status, "source": c_source, "min_pay_rate": c_pay_rate_min, + "created_date": c_created, "availability": c_availability, "years_experience": c_years_exp, +}) +upload("candidates", candidates) + +# ============================================================ +# 2. CLIENTS — 500 companies +# ============================================================ +print("Generating clients (500)...") + +company_prefixes = ["Apex","Summit","Core","First","National","Metro","Pacific","Atlantic","Central","Premier", + "Global","United","Alliance","Pinnacle","Elite","Horizon","Pioneer","Titan","Quantum","Vertex"] +company_suffixes = ["Industries","Solutions","Systems","Group","Corp","Technologies","Services","Partners", + "Holdings","Enterprises","Manufacturing","Healthcare","Logistics","Financial","Engineering"] + +cl_ids, cl_names, cl_verticals, cl_contacts, cl_contact_emails, cl_contact_phones = [], [], [], [], [], [] +cl_city, cl_state, cl_zip, cl_bill_rate_avg, cl_status, cl_since = [], [], [], [], [], [] + +for i in range(500): + name = f"{random.choice(company_prefixes)} {random.choice(company_suffixes)}" + city, state, zipcode = random.choice(cities_zips) + vert = random.choice(verticals) + contact_fn = random.choice(first_names) + contact_ln = random.choice(last_names) + + cl_ids.append(f"CLI-{i+1:04d}") + cl_names.append(name) + cl_verticals.append(vert) + cl_contacts.append(f"{contact_fn} {contact_ln}") + cl_contact_emails.append(f"{contact_fn.lower()}.{contact_ln.lower()}@{name.lower().replace(' ','')}.com") + cl_contact_phones.append(make_phone()) + cl_city.append(city) + cl_state.append(state) + cl_zip.append(zipcode) + cl_bill_rate_avg.append(round(random.uniform(25, 150), 2)) + cl_status.append(random.choice(["active","active","active","inactive","prospect"])) + cl_since.append((base_date - timedelta(days=random.randint(30, 2000))).strftime("%Y-%m-%d")) + +clients = pa.table({ + "client_id": cl_ids, "company_name": cl_names, "vertical": cl_verticals, + "contact_name": cl_contacts, "contact_email": cl_contact_emails, "contact_phone": cl_contact_phones, + "city": cl_city, "state": cl_state, "zip": cl_zip, + "avg_bill_rate": cl_bill_rate_avg, "status": cl_status, "client_since": cl_since, +}) +upload("clients", clients) + +# ============================================================ +# 3. JOB ORDERS — 3,000 open/filled/closed +# ============================================================ +print("Generating job_orders (3K)...") + +titles = { + "IT": ["Software Developer","Java Developer",".NET Developer","DevOps Engineer","Data Analyst", + "QA Engineer","Systems Admin","Help Desk","Network Engineer","Cloud Architect", + "Full Stack Developer","Python Developer","React Developer","DBA","Security Analyst"], + "Healthcare": ["Registered Nurse","LPN","CNA","Medical Assistant","Phlebotomist", + "Radiology Tech","Pharmacy Tech","Medical Coder","Billing Specialist","Case Manager"], + "Industrial": ["Forklift Operator","Welder","CNC Machinist","Quality Inspector","Maintenance Tech", + "Electrician","Warehouse Associate","Assembly Technician","Production Supervisor","Shipping Clerk"], + "Accounting": ["Staff Accountant","AP Specialist","AR Specialist","Payroll Clerk","Tax Preparer", + "Financial Analyst","Bookkeeper","Audit Associate","Controller","Cost Accountant"], + "Admin": ["Administrative Assistant","Executive Assistant","Receptionist","Data Entry Clerk","Office Manager", + "Customer Service Rep","HR Coordinator","Legal Secretary","Office Coordinator","Scheduler"], +} + +jo_ids, jo_client_ids, jo_titles, jo_verticals, jo_descriptions = [], [], [], [], [] +jo_city, jo_state, jo_zip = [], [], [] +jo_bill_rate, jo_pay_rate, jo_status, jo_openings, jo_created = [], [], [], [], [] +jo_work_type, jo_duration = [], [] + +for i in range(3000): + vert = random.choice(verticals) + title = random.choice(titles[vert]) + ci = random.randint(0, 499) + city, state, zipcode = random.choice(cities_zips) + bill = round(random.uniform(25, 150), 2) + pay = round(bill * random.uniform(0.55, 0.75), 2) + + req_skills = random.sample(skills_pool[vert], min(random.randint(3, 6), len(skills_pool[vert]))) + desc = f"{title} needed for {cl_names[ci]} in {city}, {state}. " + desc += f"Requirements: {', '.join(req_skills)}. " + desc += f"{random.randint(1,10)}+ years experience preferred. " + desc += f"Bill rate: ${bill}/hr. " + desc += random.choice([ + "Background check required.", + "Drug screen required.", + "Must have reliable transportation.", + "Steel-toe boots required on site.", + "Remote work available.", + "Hybrid schedule: 3 days on-site.", + ]) + + jo_ids.append(f"JO-{i+1:05d}") + jo_client_ids.append(cl_ids[ci]) + jo_titles.append(title) + jo_verticals.append(vert) + jo_descriptions.append(desc) + jo_city.append(city) + jo_state.append(state) + jo_zip.append(zipcode) + jo_bill_rate.append(bill) + jo_pay_rate.append(pay) + jo_status.append(random.choice(["open","open","open","filled","filled","closed","on_hold"])) + jo_openings.append(random.randint(1, 5)) + jo_created.append((base_date - timedelta(days=random.randint(0, 365))).strftime("%Y-%m-%d")) + jo_work_type.append(random.choice(["contract","temp_to_hire","direct_hire","contract"])) + jo_duration.append(random.choice(["3 months","6 months","12 months","ongoing","project-based"])) + +job_orders = pa.table({ + "job_order_id": jo_ids, "client_id": jo_client_ids, "title": jo_titles, + "vertical": jo_verticals, "description": jo_descriptions, + "city": jo_city, "state": jo_state, "zip": jo_zip, + "bill_rate": jo_bill_rate, "pay_rate": jo_pay_rate, "status": jo_status, + "openings": jo_openings, "created_date": jo_created, + "work_type": jo_work_type, "duration": jo_duration, +}) +upload("job_orders", job_orders) + +# ============================================================ +# 4. PLACEMENTS — 8,000 candidate-job matches +# ============================================================ +print("Generating placements (8K)...") + +p_ids, p_cand_ids, p_job_ids, p_client_ids = [], [], [], [] +p_start, p_end, p_status, p_bill, p_pay, p_recruiter = [], [], [], [], [], [] + +recruiters = [f"{random.choice(first_names)} {random.choice(last_names)}" for _ in range(30)] + +for i in range(8000): + ci = random.randint(0, N_CAND - 1) + ji = random.randint(0, 2999) + start = base_date - timedelta(days=random.randint(0, 730)) + end = start + timedelta(days=random.randint(30, 365)) + + p_ids.append(f"PL-{i+1:05d}") + p_cand_ids.append(c_ids[ci]) + p_job_ids.append(jo_ids[ji]) + p_client_ids.append(jo_client_ids[ji]) + p_start.append(start.strftime("%Y-%m-%d")) + p_end.append(end.strftime("%Y-%m-%d") if random.random() < 0.7 else "") + p_status.append(random.choice(["active","active","completed","completed","terminated","no_show"])) + p_bill.append(jo_bill_rate[ji]) + p_pay.append(jo_pay_rate[ji]) + p_recruiter.append(random.choice(recruiters)) + +placements = pa.table({ + "placement_id": p_ids, "candidate_id": p_cand_ids, "job_order_id": p_job_ids, + "client_id": p_client_ids, "start_date": p_start, "end_date": p_end, + "status": p_status, "bill_rate": p_bill, "pay_rate": p_pay, "recruiter": p_recruiter, +}) +upload("placements", placements) + +# ============================================================ +# 5. TIMESHEETS — 120K weekly entries +# ============================================================ +print("Generating timesheets (120K)...") + +ts_ids, ts_placement_ids, ts_cand_ids, ts_client_ids = [], [], [], [] +ts_week_ending, ts_hours_reg, ts_hours_ot, ts_bill_total, ts_pay_total = [], [], [], [], [] +ts_approved, ts_approved_by = [], [] + +for i in range(120000): + pi = random.randint(0, 7999) + hrs_reg = round(random.choice([40, 40, 40, 32, 24, 20, 8]), 1) + hrs_ot = round(random.choice([0, 0, 0, 0, 4, 8, 12, 16]), 1) + bill = p_bill[pi] + pay = p_pay[pi] + + ts_ids.append(f"TS-{i+1:06d}") + ts_placement_ids.append(p_ids[pi]) + ts_cand_ids.append(p_cand_ids[pi]) + ts_client_ids.append(p_client_ids[pi]) + ts_week_ending.append((base_date - timedelta(weeks=random.randint(0, 104))).strftime("%Y-%m-%d")) + ts_hours_reg.append(hrs_reg) + ts_hours_ot.append(hrs_ot) + ts_bill_total.append(round(hrs_reg * bill + hrs_ot * bill * 1.5, 2)) + ts_pay_total.append(round(hrs_reg * pay + hrs_ot * pay * 1.5, 2)) + ts_approved.append(random.choice([True, True, True, True, False])) + ts_approved_by.append(random.choice(cl_contacts) if ts_approved[-1] else "") + +timesheets = pa.table({ + "timesheet_id": ts_ids, "placement_id": ts_placement_ids, + "candidate_id": ts_cand_ids, "client_id": ts_client_ids, + "week_ending": ts_week_ending, "hours_regular": ts_hours_reg, "hours_overtime": ts_hours_ot, + "bill_total": ts_bill_total, "pay_total": ts_pay_total, + "approved": ts_approved, "approved_by": ts_approved_by, +}) +upload("timesheets", timesheets) + +# ============================================================ +# 6. CALL LOG — 80K phone records (CDR) +# ============================================================ +print("Generating call_log (80K)...") + +call_ids, call_from, call_to, call_direction = [], [], [], [] +call_duration, call_timestamp, call_recruiter, call_cand_id, call_disposition = [], [], [], [], [] + +dispositions = ["connected","voicemail","no_answer","busy","wrong_number","callback_scheduled","declined"] + +for i in range(80000): + ci = random.randint(0, N_CAND - 1) + rec = random.choice(recruiters) + direction = random.choice(["outbound","outbound","outbound","inbound"]) + + call_ids.append(f"CALL-{i+1:06d}") + if direction == "outbound": + call_from.append(make_phone()) # recruiter's line + call_to.append(c_phones[ci]) + else: + call_from.append(c_phones[ci]) + call_to.append(make_phone()) + call_direction.append(direction) + call_duration.append(random.randint(0, 1800)) + call_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat()) + call_recruiter.append(rec) + call_cand_id.append(c_ids[ci]) + call_disposition.append(random.choice(dispositions)) + +call_log = pa.table({ + "call_id": call_ids, "from_number": call_from, "to_number": call_to, + "direction": call_direction, "duration_seconds": call_duration, + "timestamp": call_timestamp, "recruiter": call_recruiter, + "candidate_id": call_cand_id, "disposition": call_disposition, +}) +upload("call_log", call_log) + +# ============================================================ +# 7. EMAIL LOG — 60K email records +# ============================================================ +print("Generating email_log (60K)...") + +em_ids, em_from, em_to, em_subject, em_timestamp = [], [], [], [], [] +em_recruiter, em_cand_id, em_direction, em_opened = [], [], [], [] + +subjects = [ + "New job opportunity — {title} in {city}", + "Following up on your application", + "Interview scheduled — {title}", + "Timesheet reminder for week ending {date}", + "Your background check is complete", + "New assignment details — {client}", + "Pay rate update for your current assignment", + "Re: Availability for {title} position", + "Welcome to {client} — your first day info", + "Reference check request", +] + +for i in range(60000): + ci = random.randint(0, N_CAND - 1) + ji = random.randint(0, 2999) + rec = random.choice(recruiters) + direction = random.choice(["outbound","outbound","outbound","inbound"]) + subj = random.choice(subjects).format( + title=jo_titles[ji], city=jo_city[ji], date="2026-01-05", client=cl_names[random.randint(0,499)] + ) + + em_ids.append(f"EM-{i+1:06d}") + if direction == "outbound": + em_from.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com") + em_to.append(c_emails[ci]) + else: + em_from.append(c_emails[ci]) + em_to.append(f"{rec.replace(' ','.').lower()}@acmestaffing.com") + em_subject.append(subj) + em_timestamp.append((base_date - timedelta(seconds=random.randint(0, 86400 * 365))).isoformat()) + em_recruiter.append(rec) + em_cand_id.append(c_ids[ci]) + em_direction.append(direction) + em_opened.append(random.random() < 0.6 if direction == "outbound" else True) + +email_log = pa.table({ + "email_id": em_ids, "from_addr": em_from, "to_addr": em_to, + "subject": em_subject, "timestamp": em_timestamp, + "recruiter": em_recruiter, "candidate_id": em_cand_id, + "direction": em_direction, "opened": em_opened, +}) +upload("email_log", email_log) + +# ============================================================ +total = sum([candidates.num_rows, clients.num_rows, job_orders.num_rows, + placements.num_rows, timesheets.num_rows, call_log.num_rows, email_log.num_rows]) +print(f"\n{'='*60}") +print(f"Staffing company data loaded: {total:,} total rows across 7 tables") +print(f"{'='*60}") +print(f""" +Cross-reference queries to try: + "Find all Java developers in Chicago who are available immediately" + "Which recruiter has the most placements this year?" + "Show me the total revenue by client for Q1 2026" + "Find candidates who were called more than 5 times but never placed" + "What's the average bill rate for .NET developers in New York?" + "Which clients have the highest overtime hours?" + "Show candidates in zip 60601 with Healthcare skills" + "Find the spread (bill - pay) by vertical" + "Which candidates have worked for multiple different clients?" + "Show email open rates by recruiter" +""")