#!/usr/bin/env python3 """LLM Team UI - Web interface to configure and run multi-model teams.""" import json import os import time import threading import secrets import hashlib import requests import random import psycopg2 import psycopg2.extras import bcrypt from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, render_template_string, request, jsonify, Response, redirect, url_for, session from functools import wraps app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET", secrets.token_hex(32)) # ─── AUTH + DEMO MODE ───────────────────────────────────────── _rate_limit = {} # ip -> (count, window_start) RATE_LIMIT_WINDOW = 60 RATE_LIMIT_MAX = 60 LOGIN_RATE_MAX = 5 # IPs that never get rate-limited (your LAN, localhost) ALLOWLIST_IPS = {"127.0.0.1", "::1", "192.168.1.1"} # Demo mode state — toggled by admin at runtime _demo_mode = {"active": False, "started_by": None} # Admin-only write routes — blocked in demo for non-admin users ADMIN_WRITE_ROUTES = { "/api/admin/config": ["POST"], "/api/admin/test-provider": ["POST"], "/api/auth/login": ["POST"], } def is_allowlisted(ip): return ip in ALLOWLIST_IPS or ip.startswith("192.168.1.") def rate_limited(ip, max_req=RATE_LIMIT_MAX): if is_allowlisted(ip): return False now = time.time() if ip not in _rate_limit or now - _rate_limit[ip][1] > RATE_LIMIT_WINDOW: _rate_limit[ip] = (1, now) return False count, start = _rate_limit[ip] if count >= max_req: return True _rate_limit[ip] = (count + 1, start) return False def is_admin(): return session.get("role") == "admin" def is_demo(): return _demo_mode["active"] def login_required(f): @wraps(f) def decorated(*args, **kwargs): # Demo mode: everyone gets in if is_demo() and not session.get("user_id"): session["demo_user"] = True if not session.get("user_id") and not is_demo(): if request.path.startswith("/api/"): return jsonify({"error": "unauthorized"}), 401 return redirect("/login") return f(*args, **kwargs) return decorated def admin_required(f): @wraps(f) def decorated(*args, **kwargs): # Demo mode: allow read access (GET), block writes unless admin if is_demo(): if request.method == "GET": return f(*args, **kwargs) if not is_admin(): return jsonify({"error": "demo mode: read-only", "demo": True}), 403 if not session.get("user_id"): if request.path.startswith("/api/"): return jsonify({"error": "unauthorized"}), 401 return redirect("/login") if session.get("role") != "admin": return "Forbidden", 403 return f(*args, **kwargs) return decorated @app.before_request def security_checks(): ip = request.remote_addr # Rate limit (allowlisted IPs skip) if rate_limited(ip): return jsonify({"error": "rate limited"}), 429 # Always allow these if request.path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"): return if request.path.startswith("/static"): return # In demo mode, block admin write routes for non-admins if is_demo() and not is_admin(): for route, methods in ADMIN_WRITE_ROUTES.items(): if request.path == route and request.method in methods: return jsonify({"error": "demo mode: admin settings are read-only", "demo": True}), 403 @app.after_request def security_headers(response): response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" if request.path.startswith("/api/"): response.headers["Cache-Control"] = "no-store" return response LOGIN_HTML = """ LLM Team - Login

LLM Team

Sign in to continue

""" @app.route("/login") def login_page(): if session.get("user_id"): return redirect("/") return LOGIN_HTML @app.route("/api/auth/setup") def auth_setup(): try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM users") count = cur.fetchone()[0] return jsonify({"needs_setup": count == 0}) except Exception: return jsonify({"needs_setup": True}) @app.route("/api/auth/login", methods=["POST"]) def auth_login(): ip = request.remote_addr if rate_limited(ip, LOGIN_RATE_MAX): return jsonify({"error": "Too many attempts. Wait a minute."}), 429 data = request.json or {} username = data.get("username", "").strip() password = data.get("password", "") is_setup = data.get("setup", False) if not username or not password: return jsonify({"error": "Username and password required"}), 400 if len(password) < 8: return jsonify({"error": "Password must be at least 8 characters"}), 400 try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: if is_setup: # First-time setup: create admin cur.execute("SELECT COUNT(*) as c FROM users") if cur.fetchone()["c"] > 0: return jsonify({"error": "Setup already completed"}), 400 pw_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() cur.execute("INSERT INTO users (username, password_hash, role) VALUES (%s, %s, 'admin') RETURNING id", (username, pw_hash)) uid = cur.fetchone()["id"] conn.commit() session["user_id"] = uid session["username"] = username session["role"] = "admin" session.permanent = True return jsonify({"ok": True}) # Normal login cur.execute("SELECT * FROM users WHERE username = %s", (username,)) user = cur.fetchone() if not user or not bcrypt.checkpw(password.encode(), user["password_hash"].encode()): return jsonify({"error": "Invalid credentials"}), 401 session["user_id"] = user["id"] session["username"] = user["username"] session["role"] = user["role"] session.permanent = True return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/auth/logout", methods=["POST"]) def auth_logout(): session.clear() return jsonify({"ok": True}) @app.route("/logout") def logout_page(): session.clear() return redirect("/login") @app.route("/api/demo/status") def demo_status(): return jsonify({"active": is_demo(), "started_by": _demo_mode.get("started_by")}) @app.route("/api/demo/toggle", methods=["POST"]) def demo_toggle(): if not is_admin(): return jsonify({"error": "admin only"}), 403 _demo_mode["active"] = not _demo_mode["active"] _demo_mode["started_by"] = session.get("username") if _demo_mode["active"] else None return jsonify({"active": _demo_mode["active"]}) @app.route("/api/demo/allowlist", methods=["GET"]) def demo_get_allowlist(): if not is_admin(): return jsonify({"error": "admin only"}), 403 return jsonify({"ips": sorted(ALLOWLIST_IPS)}) @app.route("/api/demo/allowlist", methods=["POST"]) def demo_set_allowlist(): if not is_admin(): return jsonify({"error": "admin only"}), 403 data = request.json or {} ip = data.get("ip", "").strip() action = data.get("action", "add") if not ip: return jsonify({"error": "ip required"}), 400 if action == "add": ALLOWLIST_IPS.add(ip) elif action == "remove" and ip in ALLOWLIST_IPS: ALLOWLIST_IPS.discard(ip) return jsonify({"ips": sorted(ALLOWLIST_IPS)}) @app.route("/logs") def logs_page(): if not is_admin(): return redirect("/login") try: with open("/var/www/html/report.html") as f: return f.read() except Exception: return "GoAccess report not found. Run: goaccess /var/log/nginx/access.log -o /var/www/html/report.html --log-format=COMBINED", 404 CONFIG_PATH = "/root/llm_team_config.json" DEFAULT_CONFIG = { "providers": { "ollama": {"enabled": True, "base_url": "http://localhost:11434", "timeout": 300}, "openrouter": {"enabled": False, "base_url": "https://openrouter.ai/api/v1", "api_key": "", "timeout": 120}, "openai": {"enabled": False, "base_url": "https://api.openai.com/v1", "api_key": "", "timeout": 120}, "anthropic": {"enabled": False, "base_url": "https://api.anthropic.com/v1", "api_key": "", "timeout": 120}, }, "disabled_models": [], "cloud_models": [], "timeouts": {"global": 300, "per_model": {}}, } def load_dotenv(): for p in ["/root/.env", "/home/profit/.env"]: if os.path.exists(p): with open(p) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) os.environ.setdefault(k.strip(), v.strip()) load_dotenv() def load_config(): if os.path.exists(CONFIG_PATH): with open(CONFIG_PATH) as f: cfg = json.load(f) # merge any missing defaults for k, v in DEFAULT_CONFIG.items(): cfg.setdefault(k, v) for k, v in DEFAULT_CONFIG["providers"].items(): cfg["providers"].setdefault(k, v) return cfg return json.loads(json.dumps(DEFAULT_CONFIG)) def save_config(cfg): with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2) def get_api_key(provider_name): cfg = load_config() prov = cfg["providers"].get(provider_name, {}) key = prov.get("api_key", "") if key: return key env_map = {"openrouter": "OPENROUTER_API_KEY", "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY"} return os.environ.get(env_map.get(provider_name, ""), "") DB_DSN = "dbname=knowledge_base user=kbuser password=IPbLBA0EQI8u4TeM2YZrbm1OAy5nSwqC host=localhost" def get_db(): return psycopg2.connect(DB_DSN) def save_run(mode, prompt, config_data, responses): models = list({r.get("model", "") for r in responses if r.get("model")}) try: with get_db() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO team_runs (mode, prompt, config, responses, models_used) VALUES (%s, %s, %s, %s, %s)", (mode, prompt, json.dumps(config_data), json.dumps(responses), models) ) conn.commit() except Exception as e: print(f"[DB] save_run error: {e}") HTML = r""" LLM Team

LLM Team

0 models
Mode: Brainstorm

Mode

BrainstormAll + synthesize
PipelineChain sequence
DebateArgue + judge
ValidatorFact-check
Round RobinIterate improve
Red TeamAttack + defend
ConsensusConverge
Code ReviewWrite+review+test
ELI Ladder5 levels
TournamentCompete + vote
EvolutionGenetic algo
Blind AssemblySplit + merge
StaircaseAdd constraints
Drift DetectConfidence map
PerspectiveStakeholder 360
Hallucinate?Claim verify
Time LoopCatastrophe fix!
Autonomous Pipelines
ResearchAuto brief
Model EvalBenchmark
KnowledgeExtract facts
All models answer in parallel, then one synthesizes the best parts into a final answer.

Models

Prompt

Output

◆ ◆ ◆

Select a mode, pick your models, and enter a prompt to run the team.

Response

Re-pipe into mode

History

""" ADMIN_HTML = r""" LLM Team - Admin

LLM Team Admin

Providers
Models
OpenRouter
Timeouts
Security

Ollama (Local)

OpenRouter

OpenAI

Anthropic

Local Models (Ollama)

Loading...

Cloud Models

No cloud models configured.

Free Models on OpenRouter

Click "Fetch Models" to load the list.

Global Default

Per-Model Overrides

Loading models...

Demo Mode

When active, the public can view and use the Team UI, Lab, and all modes without logging in. Admin settings (API keys, config saves) are read-only for non-admins.

Status: Off

IP Allowlist

These IPs are never rate-limited. Your local network (192.168.1.*) is always allowed.

""" LAB_HTML = r""" LLM Team - Lab

Lab AutoResearch

Experiments
Mutable Config
Live Monitor
Results

Create Experiment

Loading...

Mutable Config

Select an experiment from the Experiments tab first.

No Experiment Selected

Status: idle
Trials: 0
Best: 0.0/10
Improvements: 0

Score Progression

Trial Log

Start an experiment to see trials here.

Best Config

No best config yet.

All Experiments

Loading...
""" # ─── HELPERS ─────────────────────────────────────────────────── def _get_timeout(model_id): cfg = load_config() t = cfg["timeouts"]["per_model"].get(model_id) if t: return t if "::" in model_id: prov = model_id.split("::")[0] return cfg["providers"].get(prov, {}).get("timeout", cfg["timeouts"]["global"]) return cfg["providers"].get("ollama", {}).get("timeout", cfg["timeouts"]["global"]) def query_ollama(model, prompt, timeout): cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") resp = requests.post(f"{base}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, }, timeout=timeout) resp.raise_for_status() return resp.json()["response"] def query_openai_compatible(model, prompt, provider_name, timeout): cfg = load_config() prov = cfg["providers"].get(provider_name, {}) base = prov.get("base_url", "https://openrouter.ai/api/v1") api_key = get_api_key(provider_name) headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} if provider_name == "openrouter": headers["HTTP-Referer"] = "http://localhost:5000" headers["X-Title"] = "LLM Team UI" resp = requests.post(f"{base}/chat/completions", headers=headers, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False, }, timeout=timeout) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"] def query_anthropic(model, prompt, timeout): cfg = load_config() prov = cfg["providers"].get("anthropic", {}) base = prov.get("base_url", "https://api.anthropic.com/v1") api_key = get_api_key("anthropic") resp = requests.post(f"{base}/messages", headers={ "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json", }, json={ "model": model, "max_tokens": 4096, "messages": [{"role": "user", "content": prompt}], }, timeout=timeout) resp.raise_for_status() return resp.json()["content"][0]["text"] def query_model(model_id, prompt): timeout = _get_timeout(model_id) if "::" in model_id: provider_name, model_name = model_id.split("::", 1) if provider_name == "anthropic": return query_anthropic(model_name, prompt, timeout) return query_openai_compatible(model_name, prompt, provider_name, timeout) return query_ollama(model_id, prompt, timeout) # ─── CONTEXT MANAGEMENT ─────────────────────────────────────── # Context window sizes (tokens) — conservative estimates for safe prompting MODEL_CONTEXT = { "llama3.2": 4096, "mistral": 8192, "gemma2": 8192, "qwen2.5": 8192, "gpt-oss": 4096, "gpt-4o": 128000, "gpt-4o-mini": 128000, "claude-3": 200000, "claude-sonnet": 200000, "claude-haiku": 200000, } DEFAULT_CONTEXT = 4096 # safe fallback for unknown models MAX_RESPONSE_CHARS = 12000 # cap individual responses (~3K tokens) def estimate_tokens(text): """Rough token estimate: ~4 chars per token for English.""" return len(text) // 4 + 1 def get_context_limit(model_id): """Get context window size for a model.""" name = model_id.split("::")[-1].split(":")[0].lower() for key, limit in MODEL_CONTEXT.items(): if key in name: return limit # OpenRouter models generally have larger contexts if "::" in model_id: return 16000 return DEFAULT_CONTEXT def smart_truncate(text, max_tokens, preserve_end=200): """Truncate text preserving start and end, with a clear marker.""" if estimate_tokens(text) <= max_tokens: return text max_chars = max_tokens * 4 end_chars = preserve_end * 4 if max_chars <= end_chars * 2: return text[:max_chars] start = text[:max_chars - end_chars - 60] end = text[-end_chars:] return f"{start}\n\n[... truncated {estimate_tokens(text) - max_tokens} tokens ...]\n\n{end}" def cap_response(text): """Cap a single model response to prevent runaway output.""" if len(text) <= MAX_RESPONSE_CHARS: return text return smart_truncate(text, MAX_RESPONSE_CHARS // 4) def build_context(parts, model_id, reserve_for_response=1024): """Build a prompt from parts, fitting within model's context window. parts: list of (label, text, priority) tuples priority: 1=must keep, 2=important, 3=can truncate heavily Returns: assembled prompt string that fits in context. """ limit = get_context_limit(model_id) budget = limit - reserve_for_response if budget <= 0: budget = limit // 2 # First pass: measure everything total = sum(estimate_tokens(t) for _, t, _ in parts) if total <= budget: return "\n\n".join(f"{label}\n{text}" if label else text for label, text, _ in parts) # Need to truncate — allocate budget by priority p1 = [(l, t, p) for l, t, p in parts if p == 1] p2 = [(l, t, p) for l, t, p in parts if p == 2] p3 = [(l, t, p) for l, t, p in parts if p == 3] p1_tokens = sum(estimate_tokens(t) for _, t, _ in p1) remaining = budget - p1_tokens if remaining <= 0: # Even priority 1 doesn't fit — truncate p1 per_part = budget // max(len(p1), 1) result = [] for label, text, _ in p1: result.append(f"{label}\n{smart_truncate(text, per_part)}" if label else smart_truncate(text, per_part)) return "\n\n".join(result) # Allocate remaining to p2, then p3 result = [f"{l}\n{t}" if l else t for l, t, _ in p1] for group in [p2, p3]: if not group or remaining <= 0: continue per_part = remaining // max(len(group), 1) for label, text, _ in group: truncated = smart_truncate(text, max(per_part, 100)) result.append(f"{label}\n{truncated}" if label else truncated) remaining -= estimate_tokens(truncated) return "\n\n".join(result) def safe_query(model_id, prompt, fallback_summarize=True): """Query with context safety — auto-truncates prompt if too large, retries on overflow errors.""" limit = get_context_limit(model_id) prompt_tokens = estimate_tokens(prompt) # Pre-flight check: truncate if obviously too large if prompt_tokens > limit - 500: prompt = smart_truncate(prompt, limit - 1000) try: response = query_model(model_id, prompt) return cap_response(response) except Exception as e: err = str(e).lower() # Detect context overflow errors from various providers if any(k in err for k in ["context length", "too many tokens", "maximum context", "token limit", "content_too_large", "request too large", "413", "400"]): if fallback_summarize: # Aggressive truncation and retry truncated = smart_truncate(prompt, limit // 2) try: response = query_model(model_id, truncated) return cap_response(response) except Exception: pass return f"[Context overflow: prompt was ~{prompt_tokens} tokens, model limit ~{limit}. Response truncated to fit.]" raise def parallel_safe_query(models, prompt): """Like parallel_query but with context safety on each model.""" results = {} max_timeout = max((_get_timeout(m) for m in models), default=300) + 30 with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: futures = {pool.submit(safe_query, m, prompt): m for m in models} for future in as_completed(futures, timeout=max_timeout): model = futures[future] try: results[model] = future.result(timeout=10) except Exception as e: results[model] = f"Error: {e}" return results def sse(data): return f"data: {json.dumps(data)}\n\n" def parallel_query(models, prompt): """Query multiple models in parallel with context safety.""" return parallel_safe_query(models, prompt) # ─── ROUTES ──────────────────────────────────────────────────── @app.route("/") @login_required def index(): return render_template_string(HTML) @app.route("/api/models") @login_required def get_models(): SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"} cfg = load_config() models = [] # Local Ollama models if cfg["providers"]["ollama"].get("enabled", True): try: base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") resp = requests.get(f"{base}/api/tags", timeout=10) seen = set() for m in resp.json().get("models", []): full = m["name"] short = full.split(":")[0] size = m.get("size", 0) if short in SKIP or size < 1_000_000 or short in seen: continue if full in cfg.get("disabled_models", []): continue seen.add(short) models.append({"name": full, "size": f"{size/(1024**3):.1f} GB", "provider": "ollama", "provider_label": "Local", "display_name": short}) except Exception: pass # Cloud models for cm in cfg.get("cloud_models", []): if not cm.get("enabled", True): continue prov = cm["id"].split("::")[0] if "::" in cm["id"] else "cloud" if not cfg["providers"].get(prov, {}).get("enabled", False): continue models.append({"name": cm["id"], "size": cm.get("context", "cloud"), "provider": prov, "provider_label": prov.title(), "display_name": cm.get("display_name", cm["id"].split("::")[-1])}) return jsonify({"models": models}) # ─── ADMIN ROUTES ───────────────────────────────────────────── @app.route("/admin") @admin_required def admin_page(): return render_template_string(ADMIN_HTML) @app.route("/api/admin/config", methods=["GET"]) @admin_required def admin_get_config(): cfg = load_config() safe = json.loads(json.dumps(cfg)) for name, p in safe["providers"].items(): if p.get("api_key"): p["api_key_set"] = True p["api_key"] = "" else: p["api_key_set"] = bool(get_api_key(name)) return jsonify(safe) @app.route("/api/admin/config", methods=["POST"]) @admin_required def admin_save_config(): data = request.json cfg = load_config() # update providers (preserve existing keys if not sent) for name, prov in data.get("providers", {}).items(): if name in cfg["providers"]: new_key = prov.get("api_key", "") if not new_key: prov["api_key"] = cfg["providers"][name].get("api_key", "") cfg["providers"][name].update(prov) if "disabled_models" in data: cfg["disabled_models"] = data["disabled_models"] if "cloud_models" in data: cfg["cloud_models"] = data["cloud_models"] if "timeouts" in data: cfg["timeouts"] = data["timeouts"] save_config(cfg) return jsonify({"ok": True}) @app.route("/api/admin/test-provider", methods=["POST"]) @admin_required def admin_test_provider(): data = request.json name = data.get("provider", "") cfg = load_config() prov = cfg["providers"].get(name, {}) try: if name == "ollama": r = requests.get(f"{prov.get('base_url', 'http://localhost:11434')}/api/tags", timeout=5) count = len(r.json().get("models", [])) return jsonify({"ok": True, "message": f"Connected. {count} models available."}) elif name == "openrouter": key = data.get("api_key") or get_api_key("openrouter") r = requests.get(f"{prov.get('base_url', 'https://openrouter.ai/api/v1')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=10) count = len(r.json().get("data", [])) return jsonify({"ok": True, "message": f"Connected. {count} models available."}) elif name == "openai": key = data.get("api_key") or get_api_key("openai") r = requests.get(f"{prov.get('base_url', 'https://api.openai.com/v1')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=10) return jsonify({"ok": True, "message": f"Connected. {len(r.json().get('data', []))} models."}) elif name == "anthropic": key = data.get("api_key") or get_api_key("anthropic") r = requests.post(f"{prov.get('base_url', 'https://api.anthropic.com/v1')}/messages", headers={"x-api-key": key, "anthropic-version": "2023-06-01", "Content-Type": "application/json"}, json={"model": "claude-haiku-4-5-20251001", "max_tokens": 1, "messages": [{"role": "user", "content": "hi"}]}, timeout=10) return jsonify({"ok": True, "message": "Connected to Anthropic."}) return jsonify({"ok": False, "message": "Unknown provider"}) except Exception as e: return jsonify({"ok": False, "message": str(e)}) _or_models_cache = {"data": None, "ts": 0} @app.route("/api/admin/openrouter/models") @admin_required def admin_openrouter_models(): import time now = time.time() if _or_models_cache["data"] and now - _or_models_cache["ts"] < 300: return jsonify({"models": _or_models_cache["data"]}) key = get_api_key("openrouter") headers = {"Authorization": f"Bearer {key}"} if key else {} try: r = requests.get("https://openrouter.ai/api/v1/models", headers=headers, timeout=15) r.raise_for_status() free = [] for m in r.json().get("data", []): pricing = m.get("pricing", {}) if pricing.get("prompt") == "0" and pricing.get("completion") == "0": free.append({"id": m["id"], "name": m.get("name", m["id"]), "context_length": m.get("context_length", 0)}) _or_models_cache["data"] = free _or_models_cache["ts"] = now return jsonify({"models": free}) except Exception as e: return jsonify({"models": [], "error": str(e)}) @app.route("/api/admin/ollama-models") @admin_required def admin_ollama_models(): cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"} try: resp = requests.get(f"{base}/api/tags", timeout=10) models = [] seen = set() for m in resp.json().get("models", []): full = m["name"] short = full.split(":")[0] size = m.get("size", 0) if short in SKIP or size < 1_000_000 or short in seen: continue seen.add(short) models.append({"name": full, "size": f"{size/(1024**3):.1f} GB", "disabled": full in cfg.get("disabled_models", [])}) return jsonify({"models": models}) except Exception as e: return jsonify({"models": [], "error": str(e)}) # ─── HISTORY ROUTES ──────────────────────────────────────────── @app.route("/api/runs") @login_required def get_runs(): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT id, mode, prompt, models_used, created_at FROM team_runs ORDER BY created_at DESC LIMIT 50") runs = cur.fetchall() for r in runs: r["created_at"] = r["created_at"].isoformat() return jsonify({"runs": runs}) except Exception as e: return jsonify({"runs": [], "error": str(e)}) @app.route("/api/runs/") @login_required def get_run(run_id): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM team_runs WHERE id = %s", (run_id,)) run = cur.fetchone() if not run: return jsonify({"error": "not found"}), 404 run["created_at"] = run["created_at"].isoformat() return jsonify(run) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs/", methods=["DELETE"]) @login_required def delete_run(run_id): try: with get_db() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM team_runs WHERE id = %s", (run_id,)) conn.commit() return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/pipelines") @login_required def get_pipelines(): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT id, pipeline, topic, status, models_used, duration_ms, created_at FROM pipeline_runs ORDER BY created_at DESC LIMIT 50") runs = cur.fetchall() for r in runs: r["created_at"] = r["created_at"].isoformat() if r["created_at"] else None return jsonify({"pipelines": runs}) except Exception as e: return jsonify({"pipelines": [], "error": str(e)}) @app.route("/api/pipelines/") @login_required def get_pipeline(pid): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM pipeline_runs WHERE id = %s", (pid,)) run = cur.fetchone() if not run: return jsonify({"error": "not found"}), 404 run["created_at"] = run["created_at"].isoformat() if run["created_at"] else None run["completed_at"] = run["completed_at"].isoformat() if run["completed_at"] else None return jsonify(run) except Exception as e: return jsonify({"error": str(e)}), 500 # ─── LAB: RATCHET ENGINE ────────────────────────────────────── _lab_threads = {} # experiment_id -> thread _lab_streams = {} # experiment_id -> [queue, ...] def _lab_emit(exp_id, data): for q in _lab_streams.get(exp_id, []): q.append(data) def _score_response(response, expected, metric, judge_model=None): if metric == "accuracy": if not expected: return 5.0 resp_lower = response.lower().strip() exp_lower = expected.lower().strip() if exp_lower in resp_lower: return 10.0 if any(w in resp_lower for w in exp_lower.split()): return 5.0 return 1.0 elif metric == "speed": return 10.0 # speed scored externally by duration elif metric == "quality" and judge_model: try: judgment = query_model(judge_model, f"Rate this response 1-10 for quality, relevance, and completeness.\n\n" f"EXPECTED: {expected or 'No expected output specified'}\n\n" f"RESPONSE: {response[:1500]}\n\n" f"Return ONLY a number 1-10, nothing else.") import re m = re.search(r'\b(\d+)\b', judgment) return min(float(m.group(1)), 10.0) if m else 5.0 except Exception: return 5.0 return 5.0 def _ratchet_loop(exp_id): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (exp_id,)) exp = cur.fetchone() if not exp: return eval_cases = exp["eval_cases"] or [] models_pool = exp["models_pool"] or [] metric = exp["metric"] or "quality" objective = exp["objective"] or "Improve response quality" mutable = exp["mutable_config"] or { "system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": models_pool[0] if models_pool else "llama3.2:latest", } best_config = exp["best_config"] or json.loads(json.dumps(mutable)) best_score = exp["best_score"] or 0 trial_num = exp["total_trials"] or 0 # Pick meta-model (largest in pool) meta_model = models_pool[-1] if models_pool else "qwen2.5:latest" judge_model = models_pool[0] if models_pool else "llama3.2:latest" while True: # Check if still running with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT status FROM lab_experiments WHERE id = %s", (exp_id,)) row = cur.fetchone() if not row or row[0] != "running": break trial_num += 1 trial_start = time.time() _lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": "Proposing change..."}) # Step 1: Meta-model proposes a change history_hint = "" if trial_num > 1: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT config_diff, avg_score, improved FROM lab_trials WHERE experiment_id = %s ORDER BY id DESC LIMIT 5", (exp_id,)) recent = cur.fetchall() if recent: history_hint = "\n\nRecent trials:\n" + "\n".join( f" {'KEPT' if r['improved'] else 'DISCARDED'} (score {r['avg_score']:.1f}): {r['config_diff']}" for r in recent) propose_prompt = ( f"You are optimizing an LLM pipeline. Objective: {objective}\n" f"Metric: {metric} (higher is better, max 10)\n" f"Current best score: {best_score:.1f}/10\n\n" f"Current config:\n{json.dumps(mutable, indent=2)}\n\n" f"Available models: {models_pool}\n" f"Eval cases: {len(eval_cases)}\n" f"{history_hint}\n\n" f"Suggest exactly ONE change to improve the score. Return ONLY valid JSON with the FULL updated config. " f"Keys: system_prompt (string), temperature (0.0-1.5), model (string from available models). " f"Be creative but focused. Change only one thing at a time." ) try: proposal_raw = query_model(meta_model, propose_prompt) import re json_match = re.search(r'\{[\s\S]*\}', proposal_raw) if json_match: proposed = json.loads(json_match.group()) # Validate keys if "system_prompt" not in proposed: proposed["system_prompt"] = mutable.get("system_prompt", "") if "temperature" not in proposed: proposed["temperature"] = mutable.get("temperature", 0.7) if "model" not in proposed: proposed["model"] = mutable.get("model", models_pool[0]) else: proposed = mutable except Exception: proposed = mutable # Describe the diff diffs = [] for k in set(list(mutable.keys()) + list(proposed.keys())): old_v = mutable.get(k) new_v = proposed.get(k) if old_v != new_v: if k == "system_prompt": diffs.append(f"system_prompt changed ({len(str(old_v))} → {len(str(new_v))} chars)") else: diffs.append(f"{k}: {old_v} → {new_v}") config_diff = "; ".join(diffs) if diffs else "no change" _lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": f"Testing: {config_diff[:80]}"}) # Step 2: Run eval cases with proposed config trial_scores = [] model_to_use = proposed.get("model", models_pool[0] if models_pool else "llama3.2:latest") sys_prompt = proposed.get("system_prompt", "") for ci, case in enumerate(eval_cases): inp = case.get("input", "") expected = case.get("expected", "") full_prompt = f"{sys_prompt}\n\n{inp}" if sys_prompt else inp try: resp = query_model(model_to_use, full_prompt) score = _score_response(resp, expected, metric, judge_model if metric == "quality" else None) trial_scores.append({"input": inp[:100], "score": score, "response": resp[:300]}) except Exception as e: trial_scores.append({"input": inp[:100], "score": 0, "error": str(e)}) avg_score = sum(s["score"] for s in trial_scores) / max(len(trial_scores), 1) duration_ms = int((time.time() - trial_start) * 1000) improved = avg_score > best_score # Step 3: Ratchet if improved: best_score = avg_score best_config = json.loads(json.dumps(proposed)) mutable = json.loads(json.dumps(proposed)) else: mutable = json.loads(json.dumps(best_config)) # Save trial with get_db() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO lab_trials (experiment_id, trial_num, config_diff, config_snapshot, scores, avg_score, improved, duration_ms) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (exp_id, trial_num, config_diff, json.dumps(proposed), json.dumps(trial_scores), avg_score, improved, duration_ms) ) cur.execute( """UPDATE lab_experiments SET total_trials = %s, best_score = %s, best_config = %s, mutable_config = %s, improvements = improvements + %s WHERE id = %s""", (trial_num, best_score, json.dumps(best_config), json.dumps(mutable), 1 if improved else 0, exp_id) ) conn.commit() _lab_emit(exp_id, { "type": "trial", "trial": trial_num, "score": round(avg_score, 2), "best": round(best_score, 2), "improved": improved, "diff": config_diff[:100], "duration_ms": duration_ms }) # Done with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s AND status = 'running'", (exp_id,)) conn.commit() _lab_emit(exp_id, {"type": "done"}) except Exception as e: _lab_emit(exp_id, {"type": "error", "message": str(e)}) try: with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'error' WHERE id = %s", (exp_id,)) conn.commit() except Exception: pass # ─── LAB API ROUTES ─────────────────────────────────────────── @app.route("/lab") @admin_required def lab_page(): return render_template_string(LAB_HTML) @app.route("/api/lab/experiments", methods=["GET"]) @admin_required def lab_list(): with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT id, name, status, metric, best_score, total_trials, improvements, models_pool, created_at FROM lab_experiments ORDER BY created_at DESC") rows = cur.fetchall() for r in rows: r["created_at"] = r["created_at"].isoformat() return jsonify({"experiments": rows}) @app.route("/api/lab/experiments", methods=["POST"]) @admin_required def lab_create(): d = request.json with get_db() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO lab_experiments (name, objective, metric, eval_cases, mutable_config, best_config, models_pool) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""", (d["name"], d.get("objective", ""), d.get("metric", "quality"), json.dumps(d.get("eval_cases", [])), json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})), json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})), d.get("models_pool", [])) ) eid = cur.fetchone()[0] conn.commit() return jsonify({"id": eid}) @app.route("/api/lab/experiments/", methods=["GET"]) @admin_required def lab_get(eid): with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (eid,)) exp = cur.fetchone() if not exp: return jsonify({"error": "not found"}), 404 exp["created_at"] = exp["created_at"].isoformat() cur.execute("SELECT * FROM lab_trials WHERE experiment_id = %s ORDER BY trial_num", (eid,)) exp["trials"] = cur.fetchall() for t in exp["trials"]: t["created_at"] = t["created_at"].isoformat() return jsonify(exp) @app.route("/api/lab/experiments/", methods=["PUT"]) @admin_required def lab_update(eid): d = request.json sets, vals = [], [] for k in ["name", "objective", "metric"]: if k in d: sets.append(f"{k} = %s") vals.append(d[k]) for k in ["eval_cases", "mutable_config"]: if k in d: sets.append(f"{k} = %s") vals.append(json.dumps(d[k])) if "models_pool" in d: sets.append("models_pool = %s") vals.append(d["models_pool"]) if not sets: return jsonify({"ok": True}) vals.append(eid) with get_db() as conn: with conn.cursor() as cur: cur.execute(f"UPDATE lab_experiments SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//start", methods=["POST"]) @admin_required def lab_start(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'running' WHERE id = %s", (eid,)) conn.commit() if eid in _lab_threads and _lab_threads[eid].is_alive(): return jsonify({"ok": True, "message": "Already running"}) t = threading.Thread(target=_ratchet_loop, args=(eid,), daemon=True) _lab_threads[eid] = t t.start() return jsonify({"ok": True}) @app.route("/api/lab/experiments//pause", methods=["POST"]) @admin_required def lab_pause(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s", (eid,)) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//reset", methods=["POST"]) @admin_required def lab_reset(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'idle', total_trials = 0, improvements = 0, best_score = 0, best_config = mutable_config WHERE id = %s", (eid,)) cur.execute("DELETE FROM lab_trials WHERE experiment_id = %s", (eid,)) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//delete", methods=["DELETE"]) @admin_required def lab_delete(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM lab_experiments WHERE id = %s", (eid,)) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//stream") @admin_required def lab_stream(eid): q = [] _lab_streams.setdefault(eid, []).append(q) def generate(): try: while True: if q: data = q.pop(0) yield f"data: {json.dumps(data)}\n\n" if data.get("type") == "done": break else: time.sleep(0.5) yield ": keepalive\n\n" finally: _lab_streams.get(eid, []).remove(q) if q in _lab_streams.get(eid, []) else None return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ─── TEAM ROUTES ────────────────────────────────────────────── @app.route("/api/run", methods=["POST"]) @login_required def run_team(): config = request.json mode = config["mode"] RUNNERS = { "brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate, "validator": run_validator, "roundrobin": run_roundrobin, "redteam": run_redteam, "consensus": run_consensus, "codereview": run_codereview, "ladder": run_ladder, "tournament": run_tournament, "evolution": run_evolution, "blindassembly": run_blindassembly, "staircase": run_staircase, "drift": run_drift, "mesh": run_mesh, "hallucination": run_hallucination, "timeloop": run_timeloop, "research": run_research, "eval": run_eval, "extract": run_extract, } def generate(): collected = [] runner = RUNNERS.get(mode) if runner: for event_str in runner(config): yield event_str try: data = json.loads(event_str.replace("data: ", "", 1).strip()) if data.get("type") == "response": collected.append({"model": data.get("model", ""), "text": data.get("text", ""), "role": data.get("role", "")}) except Exception: pass else: yield sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"}) yield sse({"type": "done"}) if collected: save_run(mode, config.get("prompt", ""), config, collected) return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"}) # ─── ORIGINAL 10 MODES ──────────────────────────────────────── def run_brainstorm(config): models, prompt = config.get("models", []), config["prompt"] synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"Querying {len(models)} models..."}) responses = parallel_query(models, prompt) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": r, "role": "respondent"}) if len(responses) > 1: yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."}) parts = [("QUESTION:", prompt, 1), ("INSTRUCTION:", "Synthesize the best answer. Be concise.", 1)] for m, r in responses.items(): parts.append((f"[{m}]:", cap_response(r), 3)) sp = build_context(parts, synthesizer) try: yield sse({"type": "response", "model": synthesizer, "text": safe_query(synthesizer, sp), "role": "synthesis"}) except Exception as e: yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"}) def run_pipeline(config): steps, current = config.get("steps", []), config["prompt"] yield sse({"type": "clear"}) for i, step in enumerate(steps): model = step["model"] yield sse({"type": "status", "message": f"Step {i+1}/{len(steps)}: {model}..."}) try: prompt = step["instruction"].replace("{input}", cap_response(current)) current = safe_query(model, prompt) yield sse({"type": "response", "model": model, "text": current, "role": f"step {i+1}"}) except Exception as e: yield sse({"type": "response", "model": model, "text": str(e), "role": "error"}); break def run_debate(config): prompt, d1, d2, judge = config["prompt"], config["debater1"], config["debater2"], config["judge"] rounds = config.get("rounds", 2) yield sse({"type": "clear"}) history = [] for m in [d1, d2]: yield sse({"type": "status", "message": f"{m} opening..."}) try: r = safe_query(m, f"Give your position on: {prompt}") history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": "opening"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) for rd in range(rounds): for i, m in enumerate([d1, d2]): other = [d1, d2][1-i] other_last = [h[1] for h in history if h[0] == other][-1] yield sse({"type": "status", "message": f"Round {rd+1}: {m}..."}) try: rebuttal_prompt = build_context([ ("Topic:", prompt, 1), (f"Opponent ({other}) said:", cap_response(other_last), 2), ("INSTRUCTION:", "Rebuttal or concede:", 1), ], m) r = safe_query(m, rebuttal_prompt) history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": f"round {rd+1}"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) yield sse({"type": "status", "message": f"{judge} judging..."}) parts = [("Topic:", prompt, 1), ("INSTRUCTION:", "Judge: who won and why?", 1)] for m, t in history: parts.append((f"[{m}]:", cap_response(t), 3)) jp = build_context(parts, judge) try: yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "judge"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) def run_validator(config): prompt, answerer, validators = config["prompt"], config["answerer"], config.get("validators", []) yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{answerer} answering..."}) try: answer = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return yield sse({"type": "status", "message": f"Validating with {len(validators)} models..."}) vp = f"QUESTION: {prompt}\n\nANSWER:\n{answer}\n\nFact-check. Score 1-10 for accuracy, completeness, clarity. Flag errors." results = parallel_query(validators, vp) for m, r in results.items(): yield sse({"type": "response", "model": m, "text": r, "role": "validator"}) def run_roundrobin(config): prompt, models, cycles = config["prompt"], config.get("models", []), config.get("cycles", 2) yield sse({"type": "clear"}) if not models: return yield sse({"type": "status", "message": f"{models[0]} drafting..."}) try: current = query_model(models[0], f"Answer:\n\n{prompt}") yield sse({"type": "response", "model": models[0], "text": current, "role": "draft"}) except Exception as e: yield sse({"type": "response", "model": models[0], "text": str(e), "role": "error"}); return for cycle in range(cycles): start = 1 if cycle == 0 else 0 for i in range(start, len(models)): m = models[i] yield sse({"type": "status", "message": f"Cycle {cycle+1}: {m}..."}) try: current = query_model(m, f"Question: {prompt}\n\nCurrent answer:\n{current}\n\nImprove it. Return full improved answer.") is_last = (cycle == cycles-1) and (i == len(models)-1) yield sse({"type": "response", "model": m, "text": current, "role": "final" if is_last else f"cycle {cycle+1}"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) def run_redteam(config): prompt, author, attacker, patcher = config["prompt"], config["author"], config["attacker"], config["patcher"] rounds = config.get("rounds", 2) yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{author} writing..."}) try: current = query_model(author, prompt) yield sse({"type": "response", "model": author, "text": current, "role": "author"}) except Exception as e: yield sse({"type": "response", "model": author, "text": str(e), "role": "error"}); return for r in range(rounds): yield sse({"type": "status", "message": f"Round {r+1}: {attacker} attacking..."}) try: attack = query_model(attacker, f"Question: {prompt}\n\nAnswer:\n{current}\n\nRED TEAM: find every flaw, error, weakness, edge case. Be aggressive.") yield sse({"type": "response", "model": attacker, "text": attack, "role": f"attack {r+1}"}) except Exception as e: yield sse({"type": "response", "model": attacker, "text": str(e), "role": "error"}); continue yield sse({"type": "status", "message": f"Round {r+1}: {patcher} fixing..."}) try: current = query_model(patcher, f"Question: {prompt}\n\nAnswer:\n{current}\n\nFlaws found:\n{attack}\n\nFix ALL issues. Return complete improved answer.") yield sse({"type": "response", "model": patcher, "text": current, "role": "patcher" if r == rounds-1 else f"patch {r+1}"}) except Exception as e: yield sse({"type": "response", "model": patcher, "text": str(e), "role": "error"}) def run_consensus(config): prompt, models, max_rounds = config["prompt"], config.get("models", []), config.get("max_rounds", 3) yield sse({"type": "clear"}) if not models: return yield sse({"type": "status", "message": f"Round 1: {len(models)} models answering..."}) responses = parallel_query(models, prompt) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": r, "role": "round 1"}) for rd in range(2, max_rounds + 1): yield sse({"type": "status", "message": f"Round {rd}: reviewing each other..."}) new = {} for m in models: parts = [("Question:", prompt, 1), ("Your answer:", cap_response(responses.get(m, "")), 2), ("INSTRUCTION:", "Revise considering other perspectives. Adopt good points, defend if right.", 1)] for o, r in responses.items(): if o != m: parts.append((f"[{o}]:", cap_response(r), 3)) ctx = build_context(parts, m) try: new[m] = safe_query(m, ctx) yield sse({"type": "response", "model": m, "text": new[m], "role": "consensus" if rd == max_rounds else f"round {rd}"}) except Exception as e: new[m] = responses.get(m, ""); yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) responses = new def run_codereview(config): prompt, coder, reviewer, tester = config["prompt"], config["coder"], config["reviewer"], config["tester"] yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{coder} coding..."}) try: code = query_model(coder, f"Write code for this task. Only output code with brief comments.\n\n{prompt}") yield sse({"type": "response", "model": coder, "text": code, "role": "coder"}) except Exception as e: yield sse({"type": "response", "model": coder, "text": str(e), "role": "error"}); return yield sse({"type": "status", "message": f"{reviewer} reviewing..."}) try: review = query_model(reviewer, f"Task: {prompt}\n\nCode:\n{code}\n\nReview: bugs, security, performance, style, edge cases. Provide corrected code if needed.") yield sse({"type": "response", "model": reviewer, "text": review, "role": "reviewer"}) except Exception as e: review = ""; yield sse({"type": "response", "model": reviewer, "text": str(e), "role": "error"}) yield sse({"type": "status", "message": f"{tester} testing..."}) try: tests = query_model(tester, f"Task: {prompt}\n\nCode:\n{code}\n\nReview:\n{review}\n\nWrite comprehensive unit tests. Cover normal, edge, error cases.") yield sse({"type": "response", "model": tester, "text": tests, "role": "tester"}) except Exception as e: yield sse({"type": "response", "model": tester, "text": str(e), "role": "error"}) def run_ladder(config): prompt, models = config["prompt"], config.get("models", []) levels = [ ("Child (5yo)", "Explain to a 5-year-old. Very simple words, short sentences, fun analogies."), ("Teenager", "Explain to a 15-year-old. Everyday language, relatable examples, some technical terms."), ("College Student", "College level. Proper terminology, theory, structured explanation."), ("Professional", "Professional level. Technical language, real-world applications, trade-offs."), ("PhD Expert", "PhD/expert level. Nuanced details, current research, math if relevant, edge cases."), ] yield sse({"type": "clear"}) for i, (name, instr) in enumerate(levels): m = models[i % len(models)] if models else "qwen2.5" yield sse({"type": "status", "message": f"Level {i+1}/5: {name} ({m})..."}) try: yield sse({"type": "response", "model": m, "text": query_model(m, f"{instr}\n\nQuestion: {prompt}"), "role": name}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) def run_tournament(config): prompt, models, judge = config["prompt"], config.get("models", []), config.get("judge", "qwen2.5") yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{len(models)} models competing..."}) responses = parallel_query(models, prompt) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": r, "role": "competitor"}) if len(responses) < 2: return yield sse({"type": "status", "message": f"{judge} ranking..."}) parts = [("Question:", prompt, 1), ("INSTRUCTION:", "Rank all from best to worst. Score 1-10 each. Then refine the winner into the ultimate answer.", 1)] for m, r in responses.items(): parts.append((f"[{m}]:", cap_response(r), 3)) jp = build_context(parts, judge) try: yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "verdict"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) # ─── NEW 7 MODES ────────────────────────────────────────────── def run_evolution(config): """Genetic algorithm: generate, score, breed, mutate across generations.""" prompt, models = config["prompt"], config.get("models", []) generations = config.get("generations", 3) judge = config.get("judge", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) if not models: return # Gen 0: each model generates an answer yield sse({"type": "status", "message": "Generation 0: spawning initial population..."}) population = parallel_query(models, prompt) for m, r in population.items(): yield sse({"type": "response", "model": m, "text": r, "role": "gen 0"}) for gen in range(1, generations + 1): # Fitness scoring yield sse({"type": "status", "message": f"Generation {gen}: fitness evaluation..."}) score_prompt = f"Question: {prompt}\n\nRate each answer 1-100. Return ONLY a JSON object like {{\"model_name\": score}}.\n\n" for m, r in population.items(): score_prompt += f"[{m}]: {r.strip()}\n\n" try: scores_raw = query_model(judge, score_prompt) yield sse({"type": "response", "model": judge, "text": scores_raw, "role": f"fitness gen {gen}"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}); continue # Breed: take top 2 answers, ask a model to combine them pop_list = list(population.items()) if len(pop_list) < 2: break parent1, parent2 = pop_list[0], pop_list[1] yield sse({"type": "status", "message": f"Generation {gen}: breeding + mutating..."}) new_population = {} for m in models: breed_prompt = ( f"Question: {prompt}\n\n" f"Parent A ({parent1[0]}):\n{parent1[1].strip()}\n\n" f"Parent B ({parent2[0]}):\n{parent2[1].strip()}\n\n" f"You are {m}. Breed these two answers: take the best parts of each parent, " f"combine them, then MUTATE by adding one novel insight or improvement. " f"Return your evolved answer." ) try: offspring = query_model(m, breed_prompt) new_population[m] = offspring is_last = gen == generations yield sse({"type": "response", "model": m, "text": offspring, "role": "final" if is_last else f"gen {gen}"}) except Exception as e: new_population[m] = population.get(m, "") yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) population = new_population def run_blindassembly(config): """Split question into parts, each model answers blind, assembler stitches.""" prompt, models = config["prompt"], config.get("models", []) assembler = config.get("assembler", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) if not models: return n = len(models) # Step 1: Decompose the question yield sse({"type": "status", "message": "Decomposing question into sub-tasks..."}) decompose_prompt = ( f"Split this question into exactly {n} independent sub-parts that together fully answer it. " f"Return ONLY a numbered list, one sub-question per line. No other text.\n\n" f"Question: {prompt}" ) try: parts_raw = query_model(assembler, decompose_prompt) yield sse({"type": "response", "model": assembler, "text": parts_raw, "role": "decomposer"}) except Exception as e: yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}); return # Parse parts parts = [line.strip() for line in parts_raw.strip().split("\n") if line.strip() and any(c.isalpha() for c in line)] while len(parts) < n: parts.append(f"Additional aspect of: {prompt}") parts = parts[:n] # Step 2: Each model answers their part BLIND yield sse({"type": "status", "message": f"Sending {n} sub-tasks to models (blind)..."}) fragments = {} with ThreadPoolExecutor(max_workers=n) as pool: futures = {} for i, m in enumerate(models): blind_prompt = ( f"Answer ONLY this specific sub-question. Do not address anything else.\n\n" f"Sub-question: {parts[i]}" ) futures[pool.submit(query_model, m, blind_prompt)] = (m, parts[i]) for future in as_completed(futures): m, part = futures[future] try: fragments[m] = {"part": part, "answer": future.result()} yield sse({"type": "response", "model": m, "text": f"SUB-TASK: {part}\n\nANSWER:\n{fragments[m]['answer']}", "role": "blind worker"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) # Step 3: Assemble yield sse({"type": "status", "message": f"{assembler} assembling blind fragments..."}) assemble_prompt = f"Original question: {prompt}\n\nMultiple models each answered a sub-part WITHOUT seeing each other:\n\n" for m, data in fragments.items(): assemble_prompt += f"[{m}] (sub-task: {data['part']}):\n{data['answer'].strip()}\n\n" assemble_prompt += "Stitch these fragments into ONE coherent, complete answer. Fill any gaps. Remove contradictions." try: yield sse({"type": "response", "model": assembler, "text": query_model(assembler, assemble_prompt), "role": "assembler"}) except Exception as e: yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}) def run_staircase(config): """Devil's Staircase: each round adds a new constraint.""" prompt = config["prompt"] answerer = config["answerer"] challenger = config["challenger"] steps = config.get("steps", 4) yield sse({"type": "clear"}) # Initial answer yield sse({"type": "status", "message": f"{answerer} answering..."}) try: current = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": current, "role": "initial answer"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return constraints = [] for s in range(steps): # Challenger adds a constraint yield sse({"type": "status", "message": f"Step {s+1}: {challenger} adding constraint..."}) constraint_prompt = ( f"Original question: {prompt}\n\n" f"Current answer:\n{current}\n\n" f"Existing constraints: {constraints if constraints else 'None yet'}\n\n" f"Add ONE new realistic constraint, complication, or edge case that the current answer doesn't handle. " f"Make it specific and challenging but plausible. State ONLY the new constraint, nothing else." ) try: new_constraint = query_model(challenger, constraint_prompt) constraints.append(new_constraint.strip()) yield sse({"type": "response", "model": challenger, "text": new_constraint, "role": f"constraint {s+1}"}) except Exception as e: yield sse({"type": "response", "model": challenger, "text": str(e), "role": "error"}); continue # Answerer must adapt yield sse({"type": "status", "message": f"Step {s+1}: {answerer} adapting..."}) adapt_prompt = ( f"Original question: {prompt}\n\n" f"ALL constraints you must satisfy:\n" + "\n".join(f" {i+1}. {c}" for i, c in enumerate(constraints)) + f"\n\nYour previous answer:\n{current}\n\n" f"Rewrite your answer to handle ALL constraints. Return the complete updated answer." ) try: current = query_model(answerer, adapt_prompt) is_last = s == steps - 1 yield sse({"type": "response", "model": answerer, "text": current, "role": "final" if is_last else f"adapted {s+1}"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}) def run_drift(config): """Same prompt N times to same model, analyze variance.""" prompt = config["prompt"] target = config["target"] samples = config.get("samples", 5) analyzer = config["analyzer"] yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"Sampling {target} {samples} times..."}) results = [] for i in range(samples): yield sse({"type": "status", "message": f"Sample {i+1}/{samples}..."}) try: r = query_model(target, prompt) results.append(r) yield sse({"type": "response", "model": target, "text": r, "role": f"sample {i+1}"}) except Exception as e: yield sse({"type": "response", "model": target, "text": str(e), "role": "error"}) if len(results) < 2: return # Analyze yield sse({"type": "status", "message": f"{analyzer} analyzing drift..."}) analysis_prompt = ( f"Question asked: {prompt}\n\n" f"The model '{target}' was asked this same question {len(results)} times. Here are all responses:\n\n" ) for i, r in enumerate(results): analysis_prompt += f"--- Sample {i+1} ---\n{r.strip()}\n\n" analysis_prompt += ( "DRIFT ANALYSIS:\n" "1. What claims/facts are CONSISTENT across all samples? (HIGH CONFIDENCE)\n" "2. What claims VARY between samples? (LOW CONFIDENCE - possible hallucination)\n" "3. What is completely CONTRADICTED between samples? (UNRELIABLE)\n" "4. Give an overall confidence score 1-10 for the model's answer to this question.\n" "5. Provide the 'true' answer using only high-confidence claims." ) try: yield sse({"type": "response", "model": analyzer, "text": query_model(analyzer, analysis_prompt), "role": "analyzer"}) except Exception as e: yield sse({"type": "response", "model": analyzer, "text": str(e), "role": "error"}) def run_mesh(config): """Each model answers as a different stakeholder.""" prompt, models = config["prompt"], config.get("models", []) synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) perspectives = [ ("CEO / Business Leader", "You are a CEO. Answer from a business strategy perspective: ROI, market impact, competitive advantage, risk."), ("Software Engineer", "You are a senior engineer. Answer from a technical perspective: architecture, implementation, scalability, tech debt."), ("End User / Customer", "You are an end user/customer. Answer from a usability perspective: experience, pain points, what you actually need."), ("Regulator / Legal", "You are a regulator/legal advisor. Answer from a compliance perspective: laws, regulations, liability, ethics, privacy."), ("Competitor", "You are a competitor analyzing this. What threats/opportunities does this create? What would you do differently?"), ] if not models: return responses = {} for i, (role_name, instruction) in enumerate(perspectives): m = models[i % len(models)] yield sse({"type": "status", "message": f"{role_name}: {m}..."}) try: r = query_model(m, f"{instruction}\n\nQuestion: {prompt}") responses[role_name] = (m, r) yield sse({"type": "response", "model": m, "text": r, "role": role_name}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) # 360 synthesis yield sse({"type": "status", "message": f"{synthesizer} weaving 360-degree view..."}) syn = f"Question: {prompt}\n\nMultiple stakeholders gave their perspective:\n\n" for role, (m, r) in responses.items(): syn += f"[{role} ({m})]: {r.strip()}\n\n" syn += "Synthesize a 360-degree view that balances all stakeholder perspectives. Highlight tensions and trade-offs." try: yield sse({"type": "response", "model": synthesizer, "text": query_model(synthesizer, syn), "role": "mesh-360"}) except Exception as e: yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"}) def run_hallucination(config): """One answers, hunters verify each claim independently.""" prompt, answerer = config["prompt"], config["answerer"] hunters = config.get("hunters", []) yield sse({"type": "clear"}) # Get answer yield sse({"type": "status", "message": f"{answerer} answering..."}) try: answer = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return # Extract claims yield sse({"type": "status", "message": "Extracting factual claims..."}) extract_prompt = ( f"Extract every factual claim from this answer as a numbered list. Include specific facts, numbers, dates, " f"names, and cause-effect relationships. One claim per line.\n\nAnswer:\n{answer}" ) try: claims = query_model(answerer, extract_prompt) yield sse({"type": "response", "model": answerer, "text": claims, "role": "claims extracted"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return # Each hunter verifies independently yield sse({"type": "status", "message": f"{len(hunters)} hunters verifying claims..."}) hunt_prompt = ( f"Original question: {prompt}\n\n" f"An AI generated this answer:\n{answer}\n\n" f"Here are the extracted claims:\n{claims}\n\n" f"For EACH claim, verdict:\n" f" VERIFIED - you are confident this is correct\n" f" SUSPICIOUS - might be wrong or misleading\n" f" HALLUCINATED - this is likely made up or incorrect\n" f" UNVERIFIABLE - cannot determine from your knowledge\n" f"Explain your reasoning for suspicious/hallucinated claims." ) results = parallel_query(hunters, hunt_prompt) for m, r in results.items(): yield sse({"type": "response", "model": m, "text": r, "role": "hunter"}) def run_timeloop(config): """CHAOS MODE: answer -> catastrophe -> fix -> new catastrophe -> repeat.""" prompt = config["prompt"] answerer = config["answerer"] chaos = config["chaos"] loops = config.get("loops", 4) yield sse({"type": "clear"}) # Initial answer yield sse({"type": "status", "message": f"{answerer} answering (unaware of impending doom)..."}) try: current = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": current, "role": "initial (doomed)"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return catastrophes = [] for i in range(loops): # Chaos agent creates a catastrophe yield sse({"type": "status", "message": f"Loop {i+1}: CHAOS AGENT unleashed..."}) chaos_prompt = ( f"Original question: {prompt}\n\n" f"Someone implemented this answer:\n{current}\n\n" f"Previous catastrophes that were already fixed: {catastrophes if catastrophes else 'None yet'}\n\n" f"You are a CHAOS AGENT. Describe a SPECIFIC, VIVID catastrophe that happened because of a flaw " f"in this answer. Be creative and dramatic but grounded in a real flaw. " f"Describe: 1) What went wrong 2) The cascading consequences 3) Who/what was affected. " f"Make it different from previous catastrophes. Be theatrical!" ) try: catastrophe = query_model(chaos, chaos_prompt) catastrophes.append(catastrophe.strip()[:200]) yield sse({"type": "response", "model": chaos, "text": catastrophe, "role": f"catastrophe {i+1}"}) except Exception as e: yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}); continue # Answerer must fix yield sse({"type": "status", "message": f"Loop {i+1}: {answerer} desperately fixing..."}) fix_prompt = ( f"Original question: {prompt}\n\n" f"Your previous answer:\n{current}\n\n" f"CATASTROPHE REPORT:\n{catastrophe}\n\n" f"ALL previous catastrophes you must also prevent:\n" + "\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) + f"\n\nRewrite your answer to prevent THIS catastrophe and ALL previous ones. " f"Your answer must be BULLETPROOF. Return the complete fixed answer." ) try: current = query_model(answerer, fix_prompt) is_last = i == loops - 1 yield sse({"type": "response", "model": answerer, "text": current, "role": "survivor" if is_last else f"fix {i+1}"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}) # Final verdict from chaos agent yield sse({"type": "status", "message": f"{chaos} final inspection..."}) final_prompt = ( f"Original question: {prompt}\n\n" f"After {loops} catastrophes, the final answer is:\n{current}\n\n" f"All catastrophes it survived:\n" + "\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) + f"\n\nAs the Chaos Agent, give your final verdict: Is this answer now truly bulletproof? " f"Rate its resilience 1-10. Can you find ONE MORE flaw? If not, admit defeat." ) try: yield sse({"type": "response", "model": chaos, "text": query_model(chaos, final_prompt), "role": "final judgment"}) except Exception as e: yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}) # ─── AUTONOMOUS PIPELINES ───────────────────────────────────── def _save_pipeline(pipeline, topic, steps, result, models, start_ms): import time duration = int((time.time() * 1000) - start_ms) try: with get_db() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO pipeline_runs (pipeline, topic, status, steps, result, models_used, duration_ms, completed_at) VALUES (%s, %s, 'completed', %s, %s, %s, %s, NOW())""", (pipeline, topic, json.dumps(steps), json.dumps(result), list(set(models)), duration) ) conn.commit() except Exception as e: print(f"[DB] pipeline save error: {e}") def run_research(config): """Autonomous research pipeline: scout → parallel research → fact-check → synthesize.""" import time start = time.time() * 1000 prompt = config["prompt"] scout = config.get("scout", "llama3.2:latest") models = config.get("models", []) checker = config.get("checker", models[0] if models else scout) synth = config.get("synthesizer", models[0] if models else scout) num_q = config.get("num_questions", 5) yield sse({"type": "clear"}) steps = [] all_models = [scout, checker, synth] + models # Step 1: Scout generates research questions yield sse({"type": "status", "message": f"Step 1/4: {scout} generating {num_q} research questions..."}) try: q_prompt = ( f"You are a research scout. Given the topic below, generate exactly {num_q} specific, " f"diverse research questions that would build a comprehensive understanding. " f"Return ONLY a numbered list.\n\nTopic: {prompt}" ) questions_raw = query_model(scout, q_prompt) yield sse({"type": "response", "model": scout, "text": questions_raw, "role": "scout"}) steps.append({"step": "scout", "model": scout, "output": questions_raw}) except Exception as e: yield sse({"type": "response", "model": scout, "text": str(e), "role": "error"}) return # Parse questions questions = [l.strip() for l in questions_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)] questions = questions[:num_q] if not questions: yield sse({"type": "response", "model": "system", "text": "Failed to parse research questions.", "role": "error"}) return # Step 2: Parallel research — distribute questions across models yield sse({"type": "status", "message": f"Step 2/4: {len(models)} models researching {len(questions)} questions..."}) research_results = {} with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: futures = {} for i, q in enumerate(questions): m = models[i % len(models)] if models else scout rp = f"Research this question thoroughly. Provide specific facts, data, and examples.\n\nQuestion: {q}" futures[pool.submit(query_model, m, rp)] = (m, q) for future in as_completed(futures): m, q = futures[future] try: answer = future.result() research_results[q] = {"model": m, "answer": answer} yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\n{answer}", "role": "researcher"}) except Exception as e: yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\nError: {e}", "role": "error"}) research_results[q] = {"model": m, "answer": f"Error: {e}"} steps.append({"step": "research", "results": {q: r["answer"][:500] for q, r in research_results.items()}}) # Step 3: Fact-check yield sse({"type": "status", "message": f"Step 3/4: {checker} fact-checking all findings..."}) check_prompt = f"Topic: {prompt}\n\nResearch findings to fact-check:\n\n" for q, r in research_results.items(): check_prompt += f"Q: {q}\nA: {r['answer'][:300]}\n\n" check_prompt += ( "For each finding, mark as:\n" " VERIFIED — likely accurate\n" " UNCERTAIN — may be wrong or outdated\n" " FLAGGED — likely inaccurate\n" "Be specific about what's wrong with flagged items." ) try: check_result = query_model(checker, check_prompt) yield sse({"type": "response", "model": checker, "text": check_result, "role": "fact-checker"}) steps.append({"step": "fact-check", "model": checker, "output": check_result[:1000]}) except Exception as e: check_result = f"Error: {e}" yield sse({"type": "response", "model": checker, "text": str(e), "role": "error"}) # Step 4: Synthesize into brief yield sse({"type": "status", "message": f"Step 4/4: {synth} synthesizing research brief..."}) synth_prompt = f"Topic: {prompt}\n\nResearch findings:\n\n" for q, r in research_results.items(): synth_prompt += f"Q: {q}\nA: {r['answer'][:400]}\n\n" synth_prompt += f"\nFact-check notes:\n{check_result[:500]}\n\n" synth_prompt += ( "Synthesize ALL findings into a structured research brief with these sections:\n" "1. EXECUTIVE SUMMARY (2-3 sentences)\n" "2. KEY FINDINGS (bulleted list)\n" "3. DETAILED ANALYSIS (organized by theme)\n" "4. UNCERTAINTIES & GAPS (what needs more research)\n" "5. RECOMMENDATIONS (actionable next steps)\n" "Be comprehensive but concise." ) try: brief = query_model(synth, synth_prompt) yield sse({"type": "response", "model": synth, "text": brief, "role": "synthesis"}) steps.append({"step": "synthesis", "model": synth, "output": brief[:2000]}) except Exception as e: brief = f"Error: {e}" yield sse({"type": "response", "model": synth, "text": str(e), "role": "error"}) # Save pipeline run _save_pipeline("research", prompt, steps, {"brief": brief, "questions": questions, "fact_check": check_result[:1000]}, all_models, start) def run_eval(config): """Model evaluation pipeline: same prompts → all models → judge scores → leaderboard.""" import time start = time.time() * 1000 prompt = config["prompt"] models = config.get("models", []) judge = config.get("judge", models[0] if models else "qwen2.5:latest") eval_type = config.get("eval_type", "general") rounds = config.get("rounds", 3) yield sse({"type": "clear"}) steps = [] all_models = models + [judge] # Generate eval prompts based on type yield sse({"type": "status", "message": f"Generating {rounds} {eval_type} evaluation prompts..."}) gen_prompt = ( f"Generate exactly {rounds} evaluation prompts for testing LLM capability in: {eval_type}.\n" f"Context/focus area: {prompt}\n\n" f"Each prompt should test a different aspect. Return ONLY a numbered list of prompts, nothing else." ) try: prompts_raw = query_model(judge, gen_prompt) yield sse({"type": "response", "model": judge, "text": prompts_raw, "role": "prompt generator"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) return eval_prompts = [l.strip() for l in prompts_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)] eval_prompts = eval_prompts[:rounds] if not eval_prompts: yield sse({"type": "response", "model": "system", "text": "Failed to generate eval prompts.", "role": "error"}) return # Run each prompt against all models scores = {m: [] for m in models} for ri, ep in enumerate(eval_prompts): yield sse({"type": "status", "message": f"Round {ri+1}/{len(eval_prompts)}: Testing {len(models)} models..."}) # All models answer in parallel responses = parallel_query(models, ep) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": f"[Round {ri+1}] {ep[:80]}...\n\n{r}", "role": f"round {ri+1}"}) # Judge scores all responses yield sse({"type": "status", "message": f"Round {ri+1}: Judging..."}) judge_prompt = ( f"Evaluation prompt: {ep}\n\n" f"Score each model's response 1-10 on: accuracy, completeness, clarity, reasoning.\n" f"Return a JSON object: {{\"model_name\": {{\"score\": N, \"notes\": \"brief note\"}}}}.\n\n" ) for m, r in responses.items(): judge_prompt += f"[{m}]:\n{r[:500]}\n\n" try: judgment = query_model(judge, judge_prompt) yield sse({"type": "response", "model": judge, "text": judgment, "role": f"judge round {ri+1}"}) # Try to parse scores try: import re # Find numbers after model names for m in models: # Look for score patterns near model name pattern = re.escape(m) + r'.*?["\s:]+(\d+)' match = re.search(pattern, judgment, re.IGNORECASE | re.DOTALL) if match: scores[m].append(int(match.group(1))) except Exception: pass except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) steps.append({"round": ri+1, "prompt": ep, "responses": {m: r[:300] for m, r in responses.items()}}) # Final leaderboard yield sse({"type": "status", "message": "Generating leaderboard..."}) leaderboard = [] for m in models: avg = sum(scores[m]) / len(scores[m]) if scores[m] else 0 leaderboard.append({"model": m, "avg_score": round(avg, 1), "rounds": len(scores[m]), "scores": scores[m]}) leaderboard.sort(key=lambda x: x["avg_score"], reverse=True) board_text = f"LEADERBOARD — {eval_type.upper()} ({len(eval_prompts)} rounds)\n{'='*50}\n\n" for i, entry in enumerate(leaderboard): medal = ["1st", "2nd", "3rd"][i] if i < 3 else f"{i+1}th" bar = "#" * int(entry["avg_score"]) board_text += f" {medal} {entry['model']:<30} {entry['avg_score']:>4}/10 {bar}\n" if entry["scores"]: board_text += f" Round scores: {entry['scores']}\n\n" yield sse({"type": "response", "model": judge, "text": board_text, "role": "final"}) _save_pipeline("eval", prompt, steps, {"leaderboard": leaderboard, "eval_type": eval_type}, all_models, start) def run_extract(config): """Knowledge extraction pipeline: chunk text → extract facts → verify → structured output.""" import time start = time.time() * 1000 prompt = config["prompt"] extractor = config.get("extractor", "qwen2.5:latest") verifier = config.get("verifier", "gemma2:latest") source = config.get("source", "prompt") yield sse({"type": "clear"}) steps = [] all_models = [extractor, verifier] # Get source text source_text = prompt if source != "prompt": file_map = { "ontology": "/home/profit/ONTOLOGY.md", "index": "/home/profit/INDEX.md", "summaries": "/home/profit/SUMMARIES.md", "guides": "/home/profit/GUIDES.md", } fpath = file_map.get(source) if fpath and os.path.exists(fpath): yield sse({"type": "status", "message": f"Reading {source}..."}) with open(fpath) as f: source_text = f.read()[:15000] # limit to ~15K chars yield sse({"type": "response", "model": "system", "text": f"Loaded {source} ({len(source_text)} chars)", "role": "source"}) else: yield sse({"type": "response", "model": "system", "text": f"File not found: {source}", "role": "error"}) return # Chunk if too long chunks = [] chunk_size = 4000 for i in range(0, len(source_text), chunk_size): chunks.append(source_text[i:i+chunk_size]) yield sse({"type": "status", "message": f"Processing {len(chunks)} chunk(s) with {extractor}..."}) all_facts = [] all_entities = [] all_relations = [] for ci, chunk in enumerate(chunks): yield sse({"type": "status", "message": f"Extracting from chunk {ci+1}/{len(chunks)}..."}) extract_prompt = ( f"Extract structured knowledge from this text. Return a JSON object with:\n" f" \"facts\": [\"fact 1\", \"fact 2\", ...],\n" f" \"entities\": [{{\"name\": \"...\", \"type\": \"...\", \"description\": \"...\"}}, ...],\n" f" \"relationships\": [{{\"from\": \"...\", \"to\": \"...\", \"type\": \"...\"}}, ...]\n\n" f"Be thorough. Extract EVERY factual claim, named entity, and relationship.\n\n" f"Text:\n{chunk}" ) try: result = query_model(extractor, extract_prompt) yield sse({"type": "response", "model": extractor, "text": result, "role": f"extraction {ci+1}"}) # Try to parse JSON from response try: import re json_match = re.search(r'\{[\s\S]*\}', result) if json_match: parsed = json.loads(json_match.group()) all_facts.extend(parsed.get("facts", [])) all_entities.extend(parsed.get("entities", [])) all_relations.extend(parsed.get("relationships", [])) except Exception: all_facts.append(result[:500]) except Exception as e: yield sse({"type": "response", "model": extractor, "text": str(e), "role": "error"}) steps.append({"step": "extraction", "facts": len(all_facts), "entities": len(all_entities), "relations": len(all_relations)}) # Verify key facts yield sse({"type": "status", "message": f"{verifier} verifying {len(all_facts)} facts..."}) facts_sample = all_facts[:20] # verify up to 20 verify_prompt = ( f"Verify these extracted facts. For each, mark CORRECT, INCORRECT, or UNVERIFIABLE.\n" f"If incorrect, provide the correction.\n\n" ) for i, f in enumerate(facts_sample): fact_str = f if isinstance(f, str) else json.dumps(f) verify_prompt += f"{i+1}. {fact_str}\n" try: verification = query_model(verifier, verify_prompt) yield sse({"type": "response", "model": verifier, "text": verification, "role": "verifier"}) steps.append({"step": "verification", "model": verifier, "output": verification[:1000]}) except Exception as e: verification = str(e) yield sse({"type": "response", "model": verifier, "text": str(e), "role": "error"}) # Summary summary = ( f"KNOWLEDGE EXTRACTION SUMMARY\n{'='*40}\n\n" f"Source: {source}\n" f"Facts extracted: {len(all_facts)}\n" f"Entities found: {len(all_entities)}\n" f"Relationships mapped: {len(all_relations)}\n\n" f"TOP ENTITIES:\n" ) for e in all_entities[:15]: if isinstance(e, dict): summary += f" [{e.get('type','?')}] {e.get('name','?')} — {e.get('description','')[:60]}\n" summary += f"\nTOP RELATIONSHIPS:\n" for r in all_relations[:15]: if isinstance(r, dict): summary += f" {r.get('from','?')} --[{r.get('type','?')}]--> {r.get('to','?')}\n" yield sse({"type": "response", "model": "system", "text": summary, "role": "final"}) result_data = { "facts": all_facts[:100], "entities": all_entities[:50], "relationships": all_relations[:50], "verification": verification[:1000], "source": source, } _save_pipeline("extract", prompt or source, steps, result_data, all_models, start) if __name__ == "__main__": print("\n LLM Team UI running at http://localhost:5000\n") app.run(host="0.0.0.0", port=5000, debug=False)