#!/usr/bin/env python3 """Real-world staffing agency day simulation — multi-model, multi-phase. Designed to validate before batching. Each phase has a gate: if the gate fails, we stop and fix before continuing. Models: qwen3 — query classification, reasoning, communication drafting (40K ctx) qwen2.5 — fast SQL generation, structured output nomic-embed-text — embedding (automatic, behind the scenes) Validation approach: every answer is checked against SQL ground truth. Every success/failure is logged to the playbook database so the next run learns from this one. """ import json, time, sys, re from datetime import datetime from urllib.request import Request, urlopen from urllib.error import HTTPError GW = "http://localhost:3700" LH = "http://localhost:3100" def gw(path, body=None, timeout=180): data = json.dumps(body).encode() if body else None method = "POST" if body else "GET" req = Request(f"{GW}{path}", data=data, method=method, headers={"Content-Type": "application/json"} if body else {}) try: return json.loads(urlopen(req, timeout=timeout).read()) except HTTPError as e: return {"error": e.read().decode()[:200]} except Exception as e: return {"error": str(e)} def gen(prompt, model="qwen3", max_tokens=200): """Generate with specified model, strip thinking tags.""" r = gw("/api/ai/generate", {"prompt": prompt, "model": model, "max_tokens": max_tokens, "temperature": 0.2}) text = r.get("text", r.get("raw", "")) if "" in text: text = text.split("")[-1].strip() return text def sql(query): return gw("/sql", {"sql": query}) def log_playbook(op, approach, result, ctx=""): gw("/log", {"operation": op, "approach": approach, "result": result, "context": ctx}) stats = {"passed": 0, "failed": 0, "total_ms": 0, "phase_results": {}} def check(phase, name, passed, detail, ms=None): stats["passed" if passed else "failed"] += 1 if ms: stats["total_ms"] += ms stats["phase_results"].setdefault(phase, []).append({"name": name, "passed": passed}) icon = "✓" if passed else "✗" ms_s = f" ({ms:.0f}ms)" if ms else "" print(f" {icon} {name}{ms_s}: {detail}") def gate(phase): """Validation gate — stop if this phase has too many failures.""" results = stats["phase_results"].get(phase, []) passed = sum(1 for r in results if r["passed"]) total = len(results) pct = passed / max(total, 1) * 100 if pct < 60: print(f"\n ⛔ GATE FAILED: {phase} at {pct:.0f}% ({passed}/{total})") print(f" Stopping before next phase. Fix issues, re-run.") return False print(f" ✅ GATE PASSED: {phase} at {pct:.0f}% ({passed}/{total})") return True # ═══════════════════════════════════════════════════ # QUERY CLASSIFIER — the playbook fix # ═══════════════════════════════════════════════════ def classify_query(question): """Keyword-based query classification — deterministic, instant, no LLM call. This is the playbook fix: route count/aggregation to SQL, semantic to hybrid. """ q = question.lower() # COUNT patterns if any(p in q for p in ["how many", "total number", "headcount", "count of"]): return "sql" # AGGREGATE patterns if any(p in q for p in ["average", "avg ", "sum of", "minimum", "maximum", "distribution", "ranked by", "top roles"]): return "sql" # WHICH/WHAT + superlative → usually needs SQL if any(p in q for p in ["which state has the most", "which role", "what's the total"]): return "sql" # MATCH/FIND patterns → hybrid if any(p in q for p in ["find me", "recommend", "best worker", "who should", "match for", "qualified"]): return "hybrid" # LOOKUP patterns if any(p in q for p in ["tell me about", "worker profile", "details on"]): return "lookup" # Default: hybrid (safe — works for both, just slower) return "hybrid" def smart_answer(question, sql_filter=None): """Route intelligently based on query classification.""" route = classify_query(question) t0 = time.time() if route == "sql": # qwen2.5 for SQL generation — few-shot examples fix the schema confusion sql_text = gen(f"""Convert to SQL for the ethereal_workers table. Columns: worker_id (int), name (text), role (text — job title like 'Forklift Operator', 'Machine Operator', 'Welder'), city (text), state (text — 2-letter code like 'IL', 'OH'), skills (text — comma-separated), certifications (text — comma-separated), archetype (text — 'reliable','communicator','flexible','leader','specialist','erratic','silent','improving'), reliability (float 0-1), responsiveness (float 0-1), engagement (float 0-1), compliance (float 0-1), availability (float 0-1) Examples: Q: How many forklift operators in Illinois? SQL: SELECT COUNT(*) cnt FROM ethereal_workers WHERE role = 'Forklift Operator' AND state = 'IL' Q: Average reliability of workers in Ohio? SQL: SELECT ROUND(AVG(reliability),3) avg FROM ethereal_workers WHERE state = 'OH' Q: Which state has the most workers? SQL: SELECT state, COUNT(*) cnt FROM ethereal_workers GROUP BY state ORDER BY cnt DESC LIMIT 1 Q: How many maintenance techs? SQL: SELECT COUNT(*) cnt FROM ethereal_workers WHERE role = 'Maintenance Tech' Q: {question} SQL:""", model="qwen2.5", max_tokens=100) # Clean SQL sql_text = sql_text.strip() if "```" in sql_text: sql_text = sql_text.split("```")[1].replace("sql", "").strip() if not sql_text.upper().startswith("SELECT"): sql_text = f"SELECT COUNT(*) FROM ethereal_workers" # safe fallback result = sql(sql_text) ms = (time.time() - t0) * 1000 if "error" in result: return {"route": route, "answer": f"SQL error: {result['error'][:80]}", "ms": ms, "ok": False} return {"route": route, "answer": json.dumps(result.get("rows", [])[:5]), "ms": ms, "ok": True, "rows": result.get("rows", []), "sql": sql_text} elif route == "hybrid" and sql_filter: result = gw("/search", {"question": question, "sql_filter": sql_filter, "top_k": 5}) ms = (time.time() - t0) * 1000 answer = result.get("answer", "") if "" in answer: answer = answer.split("")[-1].strip() return {"route": route, "answer": answer[:300], "ms": ms, "ok": "error" not in result, "sources": result.get("sources", []), "sql_matches": result.get("sql_matches", 0)} else: result = gw("/search", {"question": question, "top_k": 5}) ms = (time.time() - t0) * 1000 answer = result.get("answer", "") if "" in answer: answer = answer.split("")[-1].strip() return {"route": route, "answer": answer[:300], "ms": ms, "ok": "error" not in result} # ═══════════════════════════════════════════════════ print("═" * 65) print("STAFFING AGENCY DAY — multi-model, validated, playbook-building") print(f"Models: qwen3 (classify+reason), qwen2.5 (SQL), nomic (embed)") print(f"Started: {datetime.now().strftime('%H:%M:%S')}") print("═" * 65) # Check playbooks first print("\n📚 Checking prior playbooks...") pbs = gw("/playbooks?limit=5") playbooks = pbs.get("playbooks", []) if isinstance(pbs, dict) else pbs if isinstance(pbs, list) else [] if playbooks: for p in playbooks[:3]: op = p.get("operation", "?") if isinstance(p, dict) else str(p) print(f" → {str(op)[:70]}") else: print(" (first run — no playbooks)") # ═══════════════════════════════════════════════════ # PHASE 1: MORNING OPS — triage + match # ═══════════════════════════════════════════════════ print(f"\n{'─'*65}") print(" PHASE 1: MORNING OPS — contract triage + matching") print(f"{'─'*65}") morning_contracts = [ {"id": "REG-001", "type": "regular", "role": "Forklift Operator", "state": "IL", "city": "Chicago", "headcount": 3, "min_rel": 0.8, "certs": ["OSHA-10"], "note": "Warehouse expansion"}, {"id": "REG-002", "type": "regular", "role": "Machine Operator", "state": "OH", "headcount": 4, "min_rel": 0.75, "certs": [], "note": "2nd shift, CNC preferred"}, {"id": "REG-003", "type": "regular", "role": "Quality Tech", "state": "MO", "headcount": 2, "min_rel": 0.85, "certs": [], "note": "ISO audit coming up"}, {"id": "EMER-001", "type": "emergency", "role": "Loader", "state": "IL", "city": "Springfield", "headcount": 6, "min_rel": 0.6, "certs": [], "note": "Peak volume, client called at 6AM"}, {"id": "EMER-002", "type": "emergency", "role": "Sanitation Worker", "state": "IN", "headcount": 3, "min_rel": 0.5, "certs": ["Hazmat"], "note": "Chemical spill cleanup crew"}, {"id": "CHG-001", "type": "change", "role": "Assembler", "state": "OH", "headcount": 8, "min_rel": 0.7, "certs": [], "note": "Client doubled order, was 4 now 8"}, ] total_filled = 0 total_needed = 0 for c in morning_contracts: t0 = time.time() filt = f"role = '{c['role']}' AND state = '{c['state']}' AND reliability >= {c['min_rel']}" if c.get("city"): filt += f" AND city = '{c['city']}'" r = gw("/search", { "question": f"Find {c['role']} workers for {c['note']}", "sql_filter": filt, "top_k": c["headcount"], "generate": False, }) ms = (time.time() - t0) * 1000 matched = len(r.get("sources", [])) filled = min(matched, c["headcount"]) total_filled += filled total_needed += c["headcount"] tag = "🔴" if c["type"] == "emergency" else "🔄" if c["type"] == "change" else "📋" check("morning", f"{tag} {c['id']} {c['role']} ×{c['headcount']}", filled >= c["headcount"], f"{filled}/{c['headcount']} (sql={r.get('sql_matches',0)}, {c['type']})", ms) check("morning", "overall morning fill", total_filled / max(total_needed, 1) >= 0.75, f"{total_filled}/{total_needed} ({100*total_filled/max(total_needed,1):.0f}%)") if not gate("morning"): sys.exit(1) # ═══════════════════════════════════════════════════ # PHASE 2: MIDDAY OPS — smart questions using classifier # ═══════════════════════════════════════════════════ print(f"\n{'─'*65}") print(" PHASE 2: MIDDAY OPS — intelligence questions (classified routing)") print(f"{'─'*65}") midday_questions = [ ("How many forklift operators do we have in Illinois?", "SELECT COUNT(*) cnt FROM ethereal_workers WHERE role = 'Forklift Operator' AND state = 'IL'", "count"), ("What's the average reliability across all workers in Ohio?", "SELECT ROUND(AVG(reliability),3) avg FROM ethereal_workers WHERE state = 'OH'", "number"), ("Which state has the most workers?", "SELECT state, COUNT(*) cnt FROM ethereal_workers GROUP BY state ORDER BY cnt DESC LIMIT 1", "state"), ("How many workers have the 'erratic' archetype?", "SELECT COUNT(*) cnt FROM ethereal_workers WHERE archetype = 'erratic'", "count"), ("What's the total headcount of maintenance techs?", "SELECT COUNT(*) cnt FROM ethereal_workers WHERE role = 'Maintenance Tech'", "count"), ] for question, truth_sql, qtype in midday_questions: # Get ground truth truth = sql(truth_sql) truth_val = list(truth.get("rows", [{}])[0].values())[0] if truth.get("rows") else None # Smart route result = smart_answer(question) route = result["route"] ms = result["ms"] # Check accuracy passed = False detail = f"route={route}" if qtype == "count" and truth_val is not None: if route == "sql" and result.get("rows"): got = list(result["rows"][0].values())[0] passed = got == truth_val detail = f"route=sql got={got} expected={truth_val}" elif str(truth_val) in result.get("answer", ""): passed = True detail = f"route={route} found {truth_val} in answer" else: detail = f"route={route} expected={truth_val} not found" elif qtype == "number": passed = route == "sql" # routing correctly is the win detail = f"route={route} truth={truth_val}" elif qtype == "state" and truth_val: passed = str(truth_val).lower() in result.get("answer", "").lower() or ( route == "sql" and result.get("rows") and str(truth_val) in json.dumps(result["rows"])) detail = f"route={route} expected={truth_val}" check("midday", f"Q: {question[:50]}", passed, detail, ms) if not gate("midday"): log_playbook("GATE_FAIL: midday", "classified routing", f"{stats['phase_results']['midday']}") sys.exit(1) # ═══════════════════════════════════════════════════ # PHASE 3: AFTERNOON OPS — analytics + alerts # ═══════════════════════════════════════════════════ print(f"\n{'─'*65}") print(" PHASE 3: AFTERNOON OPS — analytics + alerts (qwen2.5 SQL)") print(f"{'─'*65}") analytics = [ ("Workers with expiring certs this month", "SELECT COUNT(*) cnt FROM ethereal_workers WHERE certifications != '' AND certifications IS NOT NULL"), ("Erratic workers with low reliability", "SELECT name, role, city, state, ROUND(reliability,2) rel FROM ethereal_workers WHERE archetype = 'erratic' AND reliability < 0.5 ORDER BY reliability LIMIT 5"), ("States ranked by average availability", "SELECT state, ROUND(AVG(availability),3) avg_avail, COUNT(*) workers FROM ethereal_workers GROUP BY state ORDER BY avg_avail DESC LIMIT 5"), ("Top roles by headcount", "SELECT role, COUNT(*) cnt FROM ethereal_workers GROUP BY role ORDER BY cnt DESC LIMIT 5"), ("Silent workers needing follow-up", "SELECT name, role, city, state, ROUND(responsiveness,2) resp FROM ethereal_workers WHERE archetype = 'silent' ORDER BY responsiveness LIMIT 5"), ] for name, query in analytics: t0 = time.time() r = sql(query) ms = (time.time() - t0) * 1000 if "error" in r: check("afternoon", name, False, r["error"][:60], ms) else: rows = r.get("rows", []) check("afternoon", name, len(rows) > 0, f"{r.get('row_count',0)} rows", ms) if not gate("afternoon"): sys.exit(1) # ═══════════════════════════════════════════════════ # PHASE 4: END OF DAY — report + playbook # ═══════════════════════════════════════════════════ print(f"\n{'─'*65}") print(" PHASE 4: END OF DAY — report + playbook update") print(f"{'─'*65}") # Generate the day's summary with qwen3 total = stats["passed"] + stats["failed"] pct = stats["passed"] / max(total, 1) * 100 summary_prompt = f"""Write a brief end-of-day staffing report (5 lines max): Morning: {total_filled}/{total_needed} positions filled across {len(morning_contracts)} contracts Emergency fills: 2 contracts (loader + sanitation) Midday: {len(midday_questions)} intelligence queries, classified routing used Afternoon: {len(analytics)} analytics queries run Overall: {stats['passed']}/{total} checks passed ({pct:.0f}%) Include: what went well, what needs attention, recommendation for tomorrow.""" report = gen(summary_prompt, model="qwen3", max_tokens=250) print(f"\n 📋 Daily Report:") for line in report.strip().split("\n")[:8]: print(f" {line}") # Log everything to playbooks log_playbook( f"staffing_day: {stats['passed']}/{total} ({pct:.0f}%)", f"multi-model: qwen3 (classify+reason), qwen2.5 (SQL), classified routing", f"filled={total_filled}/{total_needed}, gates={'all passed' if stats['failed'] < total * 0.4 else 'some failed'}", f"morning={len(stats['phase_results'].get('morning',[]))}, midday={len(stats['phase_results'].get('midday',[]))}, afternoon={len(stats['phase_results'].get('afternoon',[]))}" ) check("eod", "playbook updated", True, "logged to successful_playbooks") check("eod", "report generated", len(report) > 50, f"{len(report)} chars") # ═══════════════════════════════════════════════════ # FINAL SCORECARD # ═══════════════════════════════════════════════════ print(f"\n{'═'*65}") print(f" SCORECARD") print(f"{'═'*65}") print(f" Total: {stats['passed']}/{total} passed ({pct:.0f}%)") print(f" Fill rate: {total_filled}/{total_needed} ({100*total_filled/max(total_needed,1):.0f}%)") for phase, results in stats["phase_results"].items(): p = sum(1 for r in results if r["passed"]) print(f" {phase}: {p}/{len(results)}") print(f" Total time: {stats['total_ms']/1000:.1f}s") print(f"\n Models used: qwen3 (classify+generate), qwen2.5 (SQL), nomic-embed-text (embed)") if pct >= 80: print(f"\n ★ READY FOR BATCH OPERATIONS — all gates passed, playbook growing") else: print(f"\n ⚠ NOT YET READY — fix failures before batching")