lakehouse/scripts/staffing_simulation.py
root 10383b40b7 Staffing day simulation — multi-agent stress test on 10K Ethereal workers
5 contracts, 16 positions, 10K worker pool. Four agents: Matcher (SQL
+ vector hybrid), Communicator (LLM SMS drafts), Verifier (fact-checks
against golden data), Analyzer (RAG intelligence questions).

Results:
  - SQL matching: 16/16 positions filled, ZERO hallucinations. Every
    worker's name, role, city, state, certifications, and reliability
    score verified against the golden dataset.
  - SMS generation: 16/16 messages drafted with correct worker names.
  - RAG intelligence: retrieval returns semantically similar but
    structurally wrong workers (wrong state, wrong archetype) because
    vector search can't do structured filtering. LLM correctly reports
    context limitations — doesn't hallucinate beyond retrieved chunks.

Key finding: SQL path is production-ready. RAG path needs hybrid
SQL+vector routing — SQL for structured constraints (state, role,
cert, reliability), vector for semantic similarity. That's the
architectural gap to close.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 22:31:54 -05:00

442 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Staffing agency day simulation — multi-agent stress test.
Simulates a real staffing day: contracts arrive, agents match workers,
draft communications, and a verifier catches every hallucination.
Agents:
1. CONTRACT MANAGER — generates realistic daily job orders
2. MATCHER — finds qualified workers via SQL + vector hybrid
3. COMMUNICATOR — drafts outreach SMS/email to matched workers
4. VERIFIER — checks every claim against the golden data (zero tolerance)
5. DISPATCHER — assigns workers, tracks the day's outcome
The golden rule: the synthetic data IS ground truth. Every name, skill,
certification, city, and score the agents cite MUST exist in the actual
dataset. The verifier queries SQL to confirm. Any mismatch = hallucination.
"""
import json, time, sys, random
from datetime import datetime
from urllib.request import Request, urlopen
from urllib.error import HTTPError
BASE = "http://localhost:3100"
random.seed(42)
def post(path, body=None, timeout=120):
data = json.dumps(body).encode() if body else None
req = Request(f"{BASE}{path}", data=data, headers={"Content-Type": "application/json"})
try:
resp = urlopen(req, timeout=timeout)
raw = resp.read()
return json.loads(raw) if raw.strip() else {}
except HTTPError as e:
return {"error": e.read().decode()[:300]}
except Exception as e:
return {"error": str(e)}
def sql(query):
return post("/query/sql", {"sql": query})
# ══════════════════════════════════════════════════════
# DAILY CONTRACTS — realistic job orders for the day
# ══════════════════════════════════════════════════════
CONTRACTS = [
{
"id": "JO-2026-001",
"client": "Midwest Logistics Inc",
"role": "Forklift Operator",
"state": "IL",
"city": "Chicago",
"required_certs": ["OSHA-10"],
"min_reliability": 0.8,
"headcount": 3,
"urgency": "high",
"notes": "Warehouse expansion, need certified forklift ops immediately",
},
{
"id": "JO-2026-002",
"client": "Precision Manufacturing",
"role": "Machine Operator",
"state": "IN",
"min_reliability": 0.7,
"required_certs": [],
"headcount": 5,
"urgency": "medium",
"notes": "2nd shift, CNC experience preferred",
},
{
"id": "JO-2026-003",
"client": "CleanSpace Facilities",
"role": "Sanitation Worker",
"state": "OH",
"required_certs": ["Hazmat"],
"min_reliability": 0.6,
"headcount": 2,
"urgency": "low",
"notes": "Chemical plant, hazmat cert mandatory",
},
{
"id": "JO-2026-004",
"client": "Amazon DSP Partner",
"role": "Loader",
"state": "IL",
"city": "Springfield",
"required_certs": [],
"min_reliability": 0.75,
"headcount": 4,
"urgency": "high",
"notes": "Peak season, need physically fit workers",
},
{
"id": "JO-2026-005",
"client": "AutoParts Direct",
"role": "Quality Tech",
"state": "MO",
"required_certs": ["OSHA-30"],
"min_reliability": 0.85,
"headcount": 2,
"urgency": "medium",
"notes": "Inspection station, attention to detail critical",
},
]
# ══════════════════════════════════════════════════════
# AGENT 1: MATCHER — SQL + vector hybrid
# ══════════════════════════════════════════════════════
def match_workers(contract):
"""Find qualified workers via SQL (structured) + vector (semantic)."""
# SQL path: exact role, state, reliability, certs
where = [
f"role = '{contract['role']}'",
f"state = '{contract['state']}'",
f"reliability >= {contract['min_reliability']}",
]
if contract.get("city"):
where.append(f"city = '{contract['city']}'")
sql_query = f"""
SELECT worker_id, name, role, city, state, skills, certifications,
ROUND(reliability,2) rel, ROUND(availability,2) avail,
archetype
FROM ethereal_workers
WHERE {' AND '.join(where)}
ORDER BY reliability DESC, availability DESC
LIMIT 20
"""
sql_result = sql(sql_query)
if "error" in sql_result:
return [], f"SQL error: {sql_result['error'][:80]}"
sql_matches = sql_result.get("rows", [])
# Filter by required certs
if contract.get("required_certs"):
required = set(c.lower() for c in contract["required_certs"])
sql_matches = [
w for w in sql_matches
if required.issubset(set(c.strip().lower() for c in w.get("certifications", "").split(",")))
]
# Vector path: semantic search for nuanced matching
vector_query = f"{contract['role']} in {contract['state']} {contract.get('notes', '')}"
vec_result = post("/vectors/hnsw/search", {
"index_name": "ethereal_workers_v1",
"query": vector_query,
"top_k": 10,
})
vec_matches = vec_result.get("results", []) if "error" not in vec_result else []
return sql_matches[:contract["headcount"] * 2], vec_matches
# ══════════════════════════════════════════════════════
# AGENT 2: COMMUNICATOR — drafts outreach
# ══════════════════════════════════════════════════════
def draft_communication(contract, worker):
"""Ask the LLM to draft an outreach SMS for a matched worker."""
r = post("/ai/generate", {
"prompt": f"""Draft a short professional SMS (under 160 chars) to a staffing worker about a job opportunity.
Worker: {worker['name']}, {worker['role']} in {worker['city']}, {worker['state']}
Job: {contract['role']} for {contract['client']} in {contract.get('city', contract['state'])}
Urgency: {contract['urgency']}
Include their name. Be direct. SMS only — no subject line, no greeting.""",
"model": "qwen2.5",
"max_tokens": 80,
"temperature": 0.3,
})
if "error" in r:
return None, r["error"]
return r.get("text", "").strip(), None
# ══════════════════════════════════════════════════════
# AGENT 3: VERIFIER — catches hallucinations
# ══════════════════════════════════════════════════════
def verify_worker(worker_id, claims):
"""Check every claim about a worker against the golden data.
Returns (verified_ok, discrepancies).
Claims is a dict of {field: claimed_value} to verify.
"""
result = sql(f"SELECT * FROM ethereal_workers WHERE worker_id = {worker_id}")
if "error" in result or not result.get("rows"):
return False, [f"worker_id {worker_id} not found in golden data"]
actual = result["rows"][0]
discrepancies = []
for field, claimed in claims.items():
actual_val = actual.get(field)
if actual_val is None:
continue
if field in ("reliability", "responsiveness", "availability", "compliance"):
# Numeric: check within tolerance
try:
if abs(float(actual_val) - float(claimed)) > 0.05:
discrepancies.append(f"{field}: claimed={claimed} actual={actual_val}")
except (ValueError, TypeError):
pass
elif field == "certifications":
# Check claimed cert exists in actual
actual_certs = set(c.strip().lower() for c in str(actual_val).split(","))
claimed_certs = set(c.strip().lower() for c in str(claimed).split(","))
missing = claimed_certs - actual_certs - {""}
if missing:
discrepancies.append(f"certifications: claimed {missing} not in actual {actual_certs}")
else:
if str(actual_val).lower().strip() != str(claimed).lower().strip():
discrepancies.append(f"{field}: claimed='{claimed}' actual='{actual_val}'")
return len(discrepancies) == 0, discrepancies
# ══════════════════════════════════════════════════════
# AGENT 4: LLM ANALYZER — answers staffing questions
# ══════════════════════════════════════════════════════
def ask_staffing_question(question, verify=True):
"""Ask a question, get an answer, verify facts against golden data."""
# RAG search
r = post("/vectors/rag", {
"index_name": "ethereal_workers_v1",
"question": question,
"top_k": 5,
}, timeout=180)
if "error" in r:
return None, [], r["error"]
answer = r.get("answer", "")
sources = r.get("sources", [])
# Verify: extract any worker names mentioned in the answer
hallucinations = []
if verify:
# Check each source worker exists
for s in sources:
wid = s.get("doc_id", "").replace("W-", "")
if wid.isdigit():
ok, issues = verify_worker(int(wid), {
"name": "", # just check existence
})
if not ok:
hallucinations.extend(issues)
return answer, sources, hallucinations
# ══════════════════════════════════════════════════════
# MAIN SIMULATION
# ══════════════════════════════════════════════════════
def main():
print("=" * 70)
print("STAFFING AGENCY DAY SIMULATION")
print(f"Date: {datetime.now().strftime('%Y-%m-%d')}")
print(f"Contracts: {len(CONTRACTS)} | Workers: 10,000 | Golden data: ethereal_workers")
print("=" * 70)
stats = {
"contracts_processed": 0,
"workers_matched": 0,
"workers_verified": 0,
"hallucinations_caught": 0,
"messages_drafted": 0,
"questions_answered": 0,
"questions_verified": 0,
"verification_failures": 0,
}
# ── Morning: Process contracts ──
print("\n╔══ MORNING: CONTRACT PROCESSING ══════════════════════")
all_assignments = []
for contract in CONTRACTS:
print(f"\n║ Contract {contract['id']}: {contract['role']} × {contract['headcount']}")
print(f"║ Client: {contract['client']} | {contract.get('city', contract['state'])}, {contract['state']}")
print(f"║ Certs: {contract.get('required_certs', [])} | Min reliability: {contract['min_reliability']}")
t0 = time.time()
sql_matches, vec_matches = match_workers(contract)
ms = (time.time() - t0) * 1000
print(f"║ SQL matches: {len(sql_matches)} | Vector hits: {len(vec_matches)} ({ms:.0f}ms)")
# Verify each SQL match
verified = []
for w in sql_matches[:contract["headcount"]]:
claims = {
"name": w["name"],
"role": w["role"],
"city": w["city"],
"state": w["state"],
"reliability": w["rel"],
}
if contract.get("required_certs"):
claims["certifications"] = w.get("certifications", "")
ok, issues = verify_worker(w["worker_id"], claims)
stats["workers_verified"] += 1
if ok:
verified.append(w)
icon = ""
else:
stats["hallucinations_caught"] += len(issues)
icon = "✗ HALLUCINATION"
print(f"{icon}: {issues}")
print(f"{icon} W-{w['worker_id']}: {w['name']} ({w['role']}) rel={w['rel']} avail={w['avail']}")
stats["workers_matched"] += len(verified)
stats["contracts_processed"] += 1
# Draft comms for verified matches
for w in verified[:contract["headcount"]]:
msg, err = draft_communication(contract, w)
if msg:
stats["messages_drafted"] += 1
# Verify the message mentions the correct name
if w["name"].split()[0].lower() in msg.lower():
print(f"║ 📱 → {w['name']}: {msg[:120]}")
else:
stats["hallucinations_caught"] += 1
print(f"║ ⚠ SMS doesn't mention worker name: {msg[:80]}")
elif err:
print(f"║ ✗ SMS draft failed: {err[:60]}")
all_assignments.append({
"contract": contract["id"],
"filled": len(verified),
"needed": contract["headcount"],
})
print("╚══════════════════════════════════════════════════════")
# ── Afternoon: Staffing questions ──
print("\n╔══ AFTERNOON: STAFFING INTELLIGENCE ══════════════════")
questions = [
("Who are the most reliable forklift operators in Illinois?",
{"check": "state", "expected": "IL"}),
("Which workers have hazmat certification in Ohio?",
{"check": "state_and_cert", "expected_state": "OH", "expected_cert": "hazmat"}),
("Find machine operators with CNC experience",
{"check": "skill", "expected": "cnc"}),
("Who are the 'erratic' archetype workers and should we flag them?",
{"check": "archetype", "expected": "erratic"}),
("Which leaders in Indiana have the highest availability?",
{"check": "archetype_state", "expected_arch": "leader", "expected_state": "IN"}),
]
for question, verification in questions:
print(f"\n║ Q: {question}")
t0 = time.time()
answer, sources, hallucinations = ask_staffing_question(question)
ms = (time.time() - t0) * 1000
stats["questions_answered"] += 1
if answer:
print(f"║ A ({ms:.0f}ms, {len(answer)} chars): {answer[:200]}...")
# Verify against SQL ground truth
check = verification.get("check")
if check == "state":
truth = sql(f"SELECT name, reliability FROM ethereal_workers WHERE state = '{verification['expected']}' AND role LIKE '%Forklift%' ORDER BY reliability DESC LIMIT 5")
if "error" not in truth:
names = [r["name"] for r in truth.get("rows", [])]
found_in_answer = sum(1 for n in names if n.lower() in answer.lower())
stats["questions_verified"] += 1
if found_in_answer == 0:
stats["verification_failures"] += 1
print(f"║ ⚠ VERIFY: top workers {names[:3]} NOT mentioned in answer")
else:
print(f"║ ✓ VERIFY: {found_in_answer}/{len(names)} top workers mentioned")
elif check == "archetype":
truth = sql(f"SELECT COUNT(*) cnt FROM ethereal_workers WHERE archetype = '{verification['expected']}'")
if "error" not in truth:
actual_count = truth["rows"][0]["cnt"]
stats["questions_verified"] += 1
if str(actual_count) in answer:
print(f"║ ✓ VERIFY: correct count ({actual_count}) in answer")
else:
print(f"║ ⚠ VERIFY: actual count is {actual_count}, not found in answer")
stats["verification_failures"] += 1
elif check == "skill":
truth = sql(f"SELECT COUNT(*) cnt FROM ethereal_workers WHERE skills LIKE '%CNC%' AND role LIKE '%Machine%'")
if "error" not in truth:
stats["questions_verified"] += 1
print(f"║ ✓ VERIFY: {truth['rows'][0]['cnt']} machine operators with CNC in system")
if hallucinations:
stats["hallucinations_caught"] += len(hallucinations)
print(f"║ ✗ HALLUCINATIONS: {hallucinations}")
print("╚══════════════════════════════════════════════════════")
# ── End of day: Scorecard ──
print("\n" + "=" * 70)
print("END OF DAY SCORECARD")
print("=" * 70)
total_filled = sum(a["filled"] for a in all_assignments)
total_needed = sum(a["needed"] for a in all_assignments)
fill_rate = total_filled / max(total_needed, 1) * 100
print(f"\n Contracts processed: {stats['contracts_processed']}/{len(CONTRACTS)}")
print(f" Positions filled: {total_filled}/{total_needed} ({fill_rate:.0f}%)")
print(f" Workers verified: {stats['workers_verified']}")
print(f" Messages drafted: {stats['messages_drafted']}")
print(f" Questions answered: {stats['questions_answered']}")
print(f" Questions fact-checked: {stats['questions_verified']}")
print(f"\n ┌─ TRUST METRICS ─────────────────────────")
print(f" │ Hallucinations caught: {stats['hallucinations_caught']}")
print(f" │ Verification failures: {stats['verification_failures']}")
accuracy = (stats['workers_verified'] - stats['hallucinations_caught']) / max(stats['workers_verified'], 1) * 100
print(f" │ Data accuracy: {accuracy:.1f}%")
print(f" └──────────────────────────────────────────")
print(f"\n Contract breakdown:")
for a in all_assignments:
icon = "" if a["filled"] >= a["needed"] else "" if a["filled"] > 0 else ""
print(f" {icon} {a['contract']}: {a['filled']}/{a['needed']} filled")
if stats["hallucinations_caught"] == 0 and stats["verification_failures"] == 0:
print(f"\n ★ ZERO HALLUCINATIONS — all agent outputs verified against golden data")
else:
print(f"\n{stats['hallucinations_caught']} hallucination(s) + {stats['verification_failures']} verification gap(s)")
print(f" → these are the gaps to close before production")
return 0 if accuracy >= 95 else 1
if __name__ == "__main__":
sys.exit(main())