#!/usr/bin/env python3 """Quality evaluation pipeline — tests whether the system gives CORRECT answers, not just structurally valid ones. Three tiers: 1. GOLDEN EVAL: Questions with SQL-verifiable ground truth. Ask the system via RAG + NL→SQL, compare answers to known-correct values. 2. RERANKER: Add a cross-encoder rerank step between retrieval and generation. Measure if it improves answer quality. 3. SELF-ASSESSMENT: After each answer, ask the model to rate its own confidence. Log for quality monitoring. This is the test that actually matters. """ import json, time, re, sys from urllib.request import Request, urlopen from urllib.error import HTTPError BASE = "http://localhost:3100" def post(path, body=None, timeout=120): data = json.dumps(body).encode() if body else None req = Request(f"{BASE}{path}", data=data, headers={"Content-Type": "application/json"}) try: resp = urlopen(req, timeout=timeout) raw = resp.read() return json.loads(raw) if raw.strip() else {} except HTTPError as e: return {"error": e.read().decode()[:300]} except Exception as e: return {"error": str(e)} # ═══════════════════════════════════════════════════════ # GOLDEN EVALUATION SET # Questions where we KNOW the right answer from SQL. # ═══════════════════════════════════════════════════════ GOLDEN = [ { "id": "G1", "question": "How many Java developers are in Chicago?", "sql_truth": "SELECT COUNT(*) FROM candidates WHERE skills LIKE '%Java%' AND city = 'Chicago'", "expected_number": 1287, "tolerance": 0, # exact match "type": "count", }, { "id": "G2", "question": "How many active candidates are in the system?", "sql_truth": "SELECT COUNT(*) FROM candidates WHERE status = 'active'", "expected_number": 60353, "tolerance": 0, "type": "count", }, { "id": "G3", "question": "What is the average bill rate for active placements?", "sql_truth": "SELECT AVG(bill_rate) FROM placements WHERE status = 'active'", "expected_number": 86.89, "tolerance": 1.0, # within $1 "type": "number", }, { "id": "G4", "question": "How many unique candidates have been placed?", "sql_truth": "SELECT COUNT(DISTINCT candidate_id) FROM placements", "expected_number": 39361, "tolerance": 0, "type": "count", }, { "id": "G5", "question": "Who is the top recruiter by number of placements?", "sql_truth": "SELECT recruiter, COUNT(*) cnt FROM placements GROUP BY recruiter ORDER BY cnt DESC LIMIT 1", "expected_text": "Betty King", "type": "name", }, { "id": "G6", "question": "Which city has the most candidates?", "sql_truth": "SELECT city, COUNT(*) cnt FROM candidates GROUP BY city ORDER BY cnt DESC LIMIT 1", "expected_text": "New York", "type": "name", }, { "id": "G7", "question": "What is the largest candidate vertical?", "sql_truth": "SELECT vertical, COUNT(*) cnt FROM candidates GROUP BY vertical ORDER BY cnt DESC LIMIT 1", "expected_text": "Industrial", "type": "name", }, { "id": "G8", "question": "How many total timesheets are in the system?", "sql_truth": "SELECT COUNT(*) FROM timesheets", "expected_number": 1000000, "tolerance": 0, "type": "count", }, { "id": "G9", "question": "What is the total revenue across all timesheets?", "sql_truth": "SELECT SUM(bill_total) FROM timesheets", "expected_number": None, # will be filled by SQL "tolerance_pct": 5, # within 5% "type": "revenue", }, { "id": "G10", "question": "How many candidates are in Dallas?", "sql_truth": "SELECT COUNT(*) FROM candidates WHERE city = 'Dallas'", "expected_number": 8555, "tolerance": 0, "type": "count", }, ] def extract_number(text): """Pull the first number-ish thing from LLM output.""" # Try to find numbers with commas, decimals patterns = [ r'\$?([\d,]+\.?\d*)\s*(million|mil|M)\b', r'\$?([\d,]+\.?\d*)\s*(billion|bil|B)\b', r'(?:approximately|about|around|roughly|nearly)?\s*\$?([\d,]+\.?\d*)', ] for pat in patterns: m = re.search(pat, text, re.IGNORECASE) if m: groups = m.groups() num_str = groups[-1] if len(groups) == 1 else groups[0] num_str = num_str.replace(',', '') try: val = float(num_str) # Check for million/billion suffix full = m.group(0).lower() if 'million' in full or 'mil' in full: val *= 1_000_000 elif 'billion' in full or 'bil' in full: val *= 1_000_000_000 return val except ValueError: pass return None def check_answer(golden, answer_text): """Compare LLM answer against ground truth. Returns (passed, detail).""" if golden["type"] == "count" or golden["type"] == "number" or golden["type"] == "revenue": expected = golden.get("expected_number") if expected is None: return None, "no expected value" extracted = extract_number(answer_text) if extracted is None: return False, f"couldn't extract number from: {answer_text[:100]}" tolerance = golden.get("tolerance", 0) tolerance_pct = golden.get("tolerance_pct", 0) if tolerance_pct: actual_tol = expected * tolerance_pct / 100 else: actual_tol = tolerance diff = abs(extracted - expected) passed = diff <= actual_tol return passed, f"expected={expected:,.0f} got={extracted:,.0f} diff={diff:,.0f} tol={actual_tol:,.0f}" elif golden["type"] == "name": expected = golden["expected_text"].lower() passed = expected in answer_text.lower() return passed, f"expected '{golden['expected_text']}' in answer" return None, "unknown type" # ═══════════════════════════════════════════════════════ # SELF-ASSESSMENT # ═══════════════════════════════════════════════════════ def self_assess(question, answer): """Ask the model to rate its own answer.""" r = post("/ai/generate", { "prompt": f"""Rate this answer on a scale of 1-5 for accuracy and helpfulness. Question: {question} Answer: {answer} Respond with ONLY a JSON object: {{"score": <1-5>, "reason": ""}}""", "model": "qwen2.5", "max_tokens": 100, "temperature": 0.1, }) if "error" in r: return None, "self-assessment failed" text = r.get("text", "") try: # Try to parse JSON from the response m = re.search(r'\{[^}]+\}', text) if m: obj = json.loads(m.group()) return obj.get("score"), obj.get("reason", "") except: pass return None, text[:100] # ═══════════════════════════════════════════════════════ # RERANKER # ═══════════════════════════════════════════════════════ def rerank_results(question, chunks): """Use the LLM as a cross-encoder reranker.""" if not chunks: return chunks # Build a ranking prompt chunk_list = "\n".join( f"[{i}] {c.get('chunk_text', c.get('text', ''))[:200]}" for i, c in enumerate(chunks[:10]) ) r = post("/ai/generate", { "prompt": f"""Given this question, rank these text chunks by relevance. Return ONLY a comma-separated list of indices, most relevant first. Question: {question} Chunks: {chunk_list} Ranking (indices only, e.g. "3,1,0,2"):""", "model": "qwen2.5", "max_tokens": 50, "temperature": 0.0, }) if "error" in r: return chunks # fallback to original order text = r.get("text", "") try: indices = [int(x.strip()) for x in text.strip().split(",") if x.strip().isdigit()] reranked = [chunks[i] for i in indices if i < len(chunks)] # Append any we missed seen = set(indices) for i, c in enumerate(chunks): if i not in seen: reranked.append(c) return reranked except: return chunks # ═══════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════ def main(): print("=" * 65) print("QUALITY EVALUATION PIPELINE") print("Testing whether the system gives CORRECT answers") print("=" * 65) # Fill in SQL-derived expected values for g in GOLDEN: if g.get("expected_number") is None and g.get("sql_truth"): r = post("/query/sql", {"sql": g["sql_truth"]}) if "error" not in r and r.get("rows"): vals = list(r["rows"][0].values()) g["expected_number"] = vals[0] if vals else None # ── Tier 1: NL→SQL path (structured) ── print("\n┌─ TIER 1: NL→SQL (structured answers) ─────────────") sql_results = [] for g in GOLDEN: t0 = time.time() r = post("/ai/generate", { "prompt": f"""Convert this question to a SQL query for a staffing database. Tables: candidates (candidate_id, first_name, last_name, email, phone, city, state, zip, vertical, skills, resume_summary, status, source, min_pay_rate, years_experience), placements (placement_id, candidate_id, job_order_id, client_id, bill_rate, pay_rate, recruiter, status), timesheets (timesheet_id, placement_id, candidate_id, client_id, hours_regular, hours_overtime, bill_total, pay_total, week_ending, approved), call_log (call_id, from_number, to_number, candidate_id, duration_seconds, disposition, recruiter, timestamp), email_log (email_id, from_addr, to_addr, subject, timestamp, recruiter, candidate_id, opened), clients (client_id, company_name, contact_name, vertical, city), job_orders (job_order_id, client_id, job_title, city, state, bill_rate, pay_rate, status, description) Question: {g['question']} Return ONLY the SQL query, nothing else.""", "model": "qwen2.5", "max_tokens": 200, "temperature": 0.0, }) ms = (time.time() - t0) * 1000 if "error" in r: print(f"│ ✗ {g['id']}: generate failed") sql_results.append({"id": g["id"], "passed": False, "detail": "generate failed"}) continue generated_sql = r.get("text", "").strip() # Clean up: extract SQL from markdown code blocks if present if "```" in generated_sql: lines = generated_sql.split("```") for block in lines[1:]: clean = block.strip() if clean.upper().startswith("SQL"): clean = clean[3:].strip() if clean.upper().startswith("SELECT"): generated_sql = clean.split("```")[0].strip() break # Execute the generated SQL sql_r = post("/query/sql", {"sql": generated_sql}) if "error" in sql_r: print(f"│ ✗ {g['id']}: SQL execution failed: {str(sql_r['error'])[:60]}") sql_results.append({"id": g["id"], "passed": False, "detail": f"SQL error: {sql_r['error'][:60]}", "sql": generated_sql}) continue # Extract answer from SQL result rows = sql_r.get("rows", []) if not rows: answer_text = "no results" else: answer_text = json.dumps(rows[0]) passed, detail = check_answer(g, answer_text) icon = "✓" if passed else "✗" if passed is not None else "?" print(f"│ {icon} {g['id']}: {g['question'][:45]} → {detail}") sql_results.append({"id": g["id"], "passed": passed, "detail": detail, "sql": generated_sql, "ms": ms}) sql_pass = sum(1 for r in sql_results if r["passed"]) print(f"│ Score: {sql_pass}/{len(sql_results)}") print("└────────────────────────────────────────────────────") # ── Tier 2: RAG path (with reranker) ── print("\n┌─ TIER 2: RAG with reranker ────────────────────────") rag_results = [] # Use a subset that makes sense for RAG (not pure analytical) rag_questions = [ {"id": "R1", "question": "Who is Betty King and what is her placement record?", "must_contain": "betty king", "type": "name_check"}, {"id": "R2", "question": "What skills do candidates in Chicago typically have?", "must_contain": "java", "type": "relevance"}, {"id": "R3", "question": "Describe the candidate pool in New York", "must_contain": "new york", "type": "relevance"}, {"id": "R4", "question": "What industrial positions are available?", "must_contain": "industrial", "type": "relevance"}, {"id": "R5", "question": "Find IT candidates with cloud experience", "must_contain": "it", "type": "relevance"}, ] for rq in rag_questions: t0 = time.time() # Step 1: Vector search search_r = post("/vectors/hnsw/search", { "index_name": "resumes_100k_v2", "query": rq["question"], "top_k": 10, }) if "error" in search_r: print(f"│ ✗ {rq['id']}: search failed") rag_results.append({"id": rq["id"], "passed": False, "detail": "search failed"}) continue results_raw = search_r.get("results", []) # Step 2: Rerank reranked = rerank_results(rq["question"], results_raw) # Step 3: Generate answer from top-3 reranked context = "\n\n".join( r.get("chunk_text", r.get("text", ""))[:300] for r in reranked[:3] ) gen_r = post("/ai/generate", { "prompt": f"""Based on the following candidate records, answer the question. Be specific — cite names, numbers, and skills from the records. Records: {context} Question: {rq['question']} Answer:""", "model": "qwen2.5", "max_tokens": 300, }) ms = (time.time() - t0) * 1000 if "error" in gen_r: print(f"│ ✗ {rq['id']}: generate failed") rag_results.append({"id": rq["id"], "passed": False, "detail": "generate failed"}) continue answer = gen_r.get("text", "") # Step 4: Self-assessment score, reason = self_assess(rq["question"], answer) # Check passed = rq["must_contain"].lower() in answer.lower() detail = f"contains='{rq['must_contain']}'={'Y' if passed else 'N'} self_score={score} reranked={len(reranked)} " icon = "✓" if passed else "✗" print(f"│ {icon} {rq['id']}: {rq['question'][:45]}") print(f"│ answer: {answer[:120]}...") print(f"│ {detail} reason: {str(reason)[:60]}") rag_results.append({"id": rq["id"], "passed": passed, "detail": detail, "answer": answer[:200], "self_score": score, "ms": ms}) rag_pass = sum(1 for r in rag_results if r["passed"]) print(f"│ Score: {rag_pass}/{len(rag_results)}") print("└────────────────────────────────────────────────────") # ── Tier 3: Self-assessment calibration ── print("\n┌─ TIER 3: Self-assessment calibration ──────────────") scores = [r.get("self_score") for r in rag_results if r.get("self_score")] if scores: avg = sum(scores) / len(scores) print(f"│ Average self-score: {avg:.1f}/5 across {len(scores)} answers") correct_scores = [r.get("self_score", 0) for r in rag_results if r["passed"] and r.get("self_score")] wrong_scores = [r.get("self_score", 0) for r in rag_results if not r["passed"] and r.get("self_score")] if correct_scores: print(f"│ Correct answers avg score: {sum(correct_scores)/len(correct_scores):.1f}") if wrong_scores: print(f"│ Wrong answers avg score: {sum(wrong_scores)/len(wrong_scores):.1f}") calibrated = (correct_scores and wrong_scores and sum(correct_scores)/len(correct_scores) > sum(wrong_scores)/len(wrong_scores)) print(f"│ Calibrated (correct > wrong): {'YES' if calibrated else 'NO / insufficient data'}") else: print("│ No self-assessment scores collected") print("└────────────────────────────────────────────────────") # ── Final scorecard ── print(f"\n{'═'*65}") print(f" QUALITY SCORECARD") print(f"{'═'*65}") total_pass = sql_pass + rag_pass total = len(sql_results) + len(rag_results) print(f" NL→SQL accuracy: {sql_pass}/{len(sql_results)} ({100*sql_pass/max(len(sql_results),1):.0f}%)") print(f" RAG relevance: {rag_pass}/{len(rag_results)} ({100*rag_pass/max(len(rag_results),1):.0f}%)") print(f" Overall: {total_pass}/{total} ({100*total_pass/max(total,1):.0f}%)") if sql_pass < len(sql_results): print(f"\n NL→SQL failures:") for r in sql_results: if not r["passed"]: print(f" {r['id']}: {r['detail']}") if r.get('sql'): print(f" generated: {r['sql'][:80]}") if rag_pass < len(rag_results): print(f"\n RAG failures:") for r in rag_results: if not r["passed"]: print(f" {r['id']}: {r['detail']}") print(f"\n Recommendations:") if sql_pass / max(len(sql_results), 1) < 0.8: print(f" → NL→SQL needs work: provide few-shot examples in the prompt") if rag_pass / max(len(rag_results), 1) < 0.8: print(f" → RAG relevance low: consider domain-tuned embeddings or smaller chunks") if scores and sum(scores)/len(scores) < 3: print(f" → Self-assessment scores low: model not confident in its own answers") return 0 if total_pass / max(total, 1) >= 0.7 else 1 if __name__ == "__main__": sys.exit(main())