#!/usr/bin/env python3
"""LLM Team UI - Web interface to configure and run multi-model teams."""
import json
import os
import time
import threading
import secrets
import hashlib
import requests
import random
import psycopg2
import psycopg2.extras
import bcrypt
from concurrent.futures import ThreadPoolExecutor, as_completed
from flask import Flask, render_template_string, request, jsonify, Response, redirect, url_for, session
from functools import wraps
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET", secrets.token_hex(32))
# ─── AUTH + DEMO MODE ─────────────────────────────────────────
_rate_limit = {} # ip -> (count, window_start)
RATE_LIMIT_WINDOW = 60
RATE_LIMIT_MAX = 60
LOGIN_RATE_MAX = 5
# IPs that never get rate-limited (your LAN, localhost)
ALLOWLIST_IPS = {"127.0.0.1", "::1", "192.168.1.1"}
# Demo mode state — toggled by admin at runtime
_demo_mode = {"active": False, "started_by": None}
# Admin-only write routes — blocked in demo for non-admin users
ADMIN_WRITE_ROUTES = {
"/api/admin/config": ["POST"],
"/api/admin/test-provider": ["POST"],
"/api/auth/login": ["POST"],
}
def is_allowlisted(ip):
return ip in ALLOWLIST_IPS or ip.startswith("192.168.1.")
def rate_limited(ip, max_req=RATE_LIMIT_MAX):
if is_allowlisted(ip):
return False
now = time.time()
if ip not in _rate_limit or now - _rate_limit[ip][1] > RATE_LIMIT_WINDOW:
_rate_limit[ip] = (1, now)
return False
count, start = _rate_limit[ip]
if count >= max_req:
return True
_rate_limit[ip] = (count + 1, start)
return False
def is_admin():
return session.get("role") == "admin"
def is_demo():
return _demo_mode["active"]
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
# Demo mode: everyone gets in
if is_demo() and not session.get("user_id"):
session["demo_user"] = True
if not session.get("user_id") and not is_demo():
if request.path.startswith("/api/"):
return jsonify({"error": "unauthorized"}), 401
return redirect("/login")
return f(*args, **kwargs)
return decorated
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
# Demo mode: allow read access (GET), block writes unless admin
if is_demo():
if request.method == "GET":
return f(*args, **kwargs)
if not is_admin():
return jsonify({"error": "demo mode: read-only", "demo": True}), 403
if not session.get("user_id"):
if request.path.startswith("/api/"):
return jsonify({"error": "unauthorized"}), 401
return redirect("/login")
if session.get("role") != "admin":
return "Forbidden", 403
return f(*args, **kwargs)
return decorated
@app.before_request
def security_checks():
ip = request.remote_addr
# Rate limit (allowlisted IPs skip)
if rate_limited(ip):
return jsonify({"error": "rate limited"}), 429
# Always allow these
if request.path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"):
return
if request.path.startswith("/static"):
return
# In demo mode, block admin write routes for non-admins
if is_demo() and not is_admin():
for route, methods in ADMIN_WRITE_ROUTES.items():
if request.path == route and request.method in methods:
return jsonify({"error": "demo mode: admin settings are read-only", "demo": True}), 403
@app.after_request
def security_headers(response):
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
if request.path.startswith("/api/"):
response.headers["Cache-Control"] = "no-store"
return response
LOGIN_HTML = """
LLM Team - Login
LLM Team
Sign in to continue
"""
@app.route("/login")
def login_page():
if session.get("user_id"):
return redirect("/")
return LOGIN_HTML
@app.route("/api/auth/setup")
def auth_setup():
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM users")
count = cur.fetchone()[0]
return jsonify({"needs_setup": count == 0})
except Exception:
return jsonify({"needs_setup": True})
@app.route("/api/auth/login", methods=["POST"])
def auth_login():
ip = request.remote_addr
if rate_limited(ip, LOGIN_RATE_MAX):
return jsonify({"error": "Too many attempts. Wait a minute."}), 429
data = request.json or {}
username = data.get("username", "").strip()
password = data.get("password", "")
is_setup = data.get("setup", False)
if not username or not password:
return jsonify({"error": "Username and password required"}), 400
if len(password) < 8:
return jsonify({"error": "Password must be at least 8 characters"}), 400
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
if is_setup:
# First-time setup: create admin
cur.execute("SELECT COUNT(*) as c FROM users")
if cur.fetchone()["c"] > 0:
return jsonify({"error": "Setup already completed"}), 400
pw_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
cur.execute("INSERT INTO users (username, password_hash, role) VALUES (%s, %s, 'admin') RETURNING id",
(username, pw_hash))
uid = cur.fetchone()["id"]
conn.commit()
session["user_id"] = uid
session["username"] = username
session["role"] = "admin"
session.permanent = True
return jsonify({"ok": True})
# Normal login
cur.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cur.fetchone()
if not user or not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
return jsonify({"error": "Invalid credentials"}), 401
session["user_id"] = user["id"]
session["username"] = user["username"]
session["role"] = user["role"]
session.permanent = True
return jsonify({"ok": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/auth/logout", methods=["POST"])
def auth_logout():
session.clear()
return jsonify({"ok": True})
@app.route("/logout")
def logout_page():
session.clear()
return redirect("/login")
@app.route("/api/demo/status")
def demo_status():
return jsonify({"active": is_demo(), "started_by": _demo_mode.get("started_by")})
@app.route("/api/demo/toggle", methods=["POST"])
def demo_toggle():
if not is_admin():
return jsonify({"error": "admin only"}), 403
_demo_mode["active"] = not _demo_mode["active"]
_demo_mode["started_by"] = session.get("username") if _demo_mode["active"] else None
return jsonify({"active": _demo_mode["active"]})
@app.route("/api/demo/allowlist", methods=["GET"])
def demo_get_allowlist():
if not is_admin():
return jsonify({"error": "admin only"}), 403
return jsonify({"ips": sorted(ALLOWLIST_IPS)})
@app.route("/api/demo/allowlist", methods=["POST"])
def demo_set_allowlist():
if not is_admin():
return jsonify({"error": "admin only"}), 403
data = request.json or {}
ip = data.get("ip", "").strip()
action = data.get("action", "add")
if not ip:
return jsonify({"error": "ip required"}), 400
if action == "add":
ALLOWLIST_IPS.add(ip)
elif action == "remove" and ip in ALLOWLIST_IPS:
ALLOWLIST_IPS.discard(ip)
return jsonify({"ips": sorted(ALLOWLIST_IPS)})
@app.route("/logs")
@admin_required
def logs_page():
try:
with open("/var/www/html/report.html") as f:
return f.read()
except Exception:
return "GoAccess report not found. Run: goaccess /var/log/nginx/access.log -o /var/www/html/report.html --log-format=COMBINED", 404
CONFIG_PATH = "/root/llm_team_config.json"
DEFAULT_CONFIG = {
"providers": {
"ollama": {"enabled": True, "base_url": "http://localhost:11434", "timeout": 300},
"openrouter": {"enabled": False, "base_url": "https://openrouter.ai/api/v1", "api_key": "", "timeout": 120},
"openai": {"enabled": False, "base_url": "https://api.openai.com/v1", "api_key": "", "timeout": 120},
"anthropic": {"enabled": False, "base_url": "https://api.anthropic.com/v1", "api_key": "", "timeout": 120},
},
"disabled_models": [],
"cloud_models": [],
"timeouts": {"global": 300, "per_model": {}},
}
def load_dotenv():
for p in ["/root/.env", "/home/profit/.env"]:
if os.path.exists(p):
with open(p) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_dotenv()
def load_config():
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH) as f:
cfg = json.load(f)
# merge any missing defaults
for k, v in DEFAULT_CONFIG.items():
cfg.setdefault(k, v)
for k, v in DEFAULT_CONFIG["providers"].items():
cfg["providers"].setdefault(k, v)
return cfg
return json.loads(json.dumps(DEFAULT_CONFIG))
def save_config(cfg):
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2)
def get_api_key(provider_name):
cfg = load_config()
prov = cfg["providers"].get(provider_name, {})
key = prov.get("api_key", "")
if key:
return key
env_map = {"openrouter": "OPENROUTER_API_KEY", "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY"}
return os.environ.get(env_map.get(provider_name, ""), "")
DB_DSN = "dbname=knowledge_base user=kbuser password=IPbLBA0EQI8u4TeM2YZrbm1OAy5nSwqC host=localhost"
def get_db():
return psycopg2.connect(DB_DSN)
def save_run(mode, prompt, config_data, responses):
models = list({r.get("model", "") for r in responses if r.get("model")})
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO team_runs (mode, prompt, config, responses, models_used) VALUES (%s, %s, %s, %s, %s)",
(mode, prompt, json.dumps(config_data), json.dumps(responses), models)
)
conn.commit()
except Exception as e:
print(f"[DB] save_run error: {e}")
HTML = r"""
LLM Team
LLM Team
0 models
Mode
BrainstormAll + synthesize
PipelineChain sequence
DebateArgue + judge
ValidatorFact-check
Round RobinIterate improve
Red TeamAttack + defend
ConsensusConverge
Code ReviewWrite+review+test
ELI Ladder5 levels
TournamentCompete + vote
EvolutionGenetic algo
Blind AssemblySplit + merge
StaircaseAdd constraints
Drift DetectConfidence map
PerspectiveStakeholder 360
Hallucinate?Claim verify
Time LoopCatastrophe fix!
Autonomous Pipelines
ResearchAuto brief
Model EvalBenchmark
KnowledgeExtract facts
All models answer in parallel, then one synthesizes the best parts into a final answer.
Models
Pipeline Steps
Setup
Setup
Validators
Models
Setup
Models
Setup
Models (rotated across 5 levels)
Competitors
Gene Pool (models)
Workers (each gets a sub-task)
Setup
Setup
Models (rotated across perspectives)
Setup
Hunters
Setup
Research Pipeline
Model Evaluation
Knowledge Extraction
Prompt
Output
◆ ◆ ◆
Select a mode, pick your models, and enter a prompt to run the team.
Response
Re-pipe into mode
History
"""
ADMIN_HTML = r"""
LLM Team - Admin
LLM Team Admin
Providers
Models
OpenRouter
Timeouts
Security
Ollama (Local)
OpenRouter
OpenAI
Anthropic
Local Models (Ollama)
Loading...
Cloud Models
No cloud models configured.
Add Cloud Model
Free Models on OpenRouter
Click "Fetch Models" to load the list.
Global Default
Per-Model Overrides
Loading models...
Demo Mode
When active, the public can view and use the Team UI, Lab, and all modes without logging in. Admin settings (API keys, config saves) are read-only for non-admins.
Status: Off
IP Allowlist
These IPs are never rate-limited. Your local network (192.168.1.*) is always allowed.
"""
LAB_HTML = r"""
LLM Team - Lab
Lab AutoResearch
Experiments
Mutable Config
Live Monitor
Results
Create Experiment
Loading...
New Experiment
Model Pool
Eval Cases
Mutable Config
Select an experiment from the Experiments tab first.
0.7
No Experiment Selected
Status: idle
Trials: 0
Best: 0.0/10
Improvements: 0
Score Progression
Trial Log
Start an experiment to see trials here.
Best Config
No best config yet.
All Experiments
Loading...
Experiment
"""
# ─── HELPERS ───────────────────────────────────────────────────
def _get_timeout(model_id):
cfg = load_config()
t = cfg["timeouts"]["per_model"].get(model_id)
if t:
return t
if "::" in model_id:
prov = model_id.split("::")[0]
return cfg["providers"].get(prov, {}).get("timeout", cfg["timeouts"]["global"])
return cfg["providers"].get("ollama", {}).get("timeout", cfg["timeouts"]["global"])
def query_ollama(model, prompt, timeout):
cfg = load_config()
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
resp = requests.post(f"{base}/api/generate", json={
"model": model, "prompt": prompt, "stream": False,
}, timeout=timeout)
resp.raise_for_status()
return resp.json()["response"]
def query_openai_compatible(model, prompt, provider_name, timeout):
cfg = load_config()
prov = cfg["providers"].get(provider_name, {})
base = prov.get("base_url", "https://openrouter.ai/api/v1")
api_key = get_api_key(provider_name)
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
if provider_name == "openrouter":
headers["HTTP-Referer"] = "http://localhost:5000"
headers["X-Title"] = "LLM Team UI"
resp = requests.post(f"{base}/chat/completions", headers=headers, json={
"model": model, "messages": [{"role": "user", "content": prompt}], "stream": False,
}, timeout=timeout)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
def query_anthropic(model, prompt, timeout):
cfg = load_config()
prov = cfg["providers"].get("anthropic", {})
base = prov.get("base_url", "https://api.anthropic.com/v1")
api_key = get_api_key("anthropic")
resp = requests.post(f"{base}/messages", headers={
"x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json",
}, json={
"model": model, "max_tokens": 4096,
"messages": [{"role": "user", "content": prompt}],
}, timeout=timeout)
resp.raise_for_status()
return resp.json()["content"][0]["text"]
def query_model(model_id, prompt):
timeout = _get_timeout(model_id)
if "::" in model_id:
provider_name, model_name = model_id.split("::", 1)
if provider_name == "anthropic":
return query_anthropic(model_name, prompt, timeout)
return query_openai_compatible(model_name, prompt, provider_name, timeout)
return query_ollama(model_id, prompt, timeout)
# ─── CONTEXT MANAGEMENT ───────────────────────────────────────
# Context window sizes (tokens) — conservative estimates for safe prompting
MODEL_CONTEXT = {
"llama3.2": 4096, "mistral": 8192, "gemma2": 8192, "qwen2.5": 8192,
"gpt-oss": 4096, "gpt-4o": 128000, "gpt-4o-mini": 128000,
"claude-3": 200000, "claude-sonnet": 200000, "claude-haiku": 200000,
}
DEFAULT_CONTEXT = 4096 # safe fallback for unknown models
MAX_RESPONSE_CHARS = 12000 # cap individual responses (~3K tokens)
def estimate_tokens(text):
"""Rough token estimate: ~4 chars per token for English."""
return len(text) // 4 + 1
def get_context_limit(model_id):
"""Get context window size for a model."""
name = model_id.split("::")[-1].split(":")[0].lower()
for key, limit in MODEL_CONTEXT.items():
if key in name:
return limit
# OpenRouter models generally have larger contexts
if "::" in model_id:
return 16000
return DEFAULT_CONTEXT
def smart_truncate(text, max_tokens, preserve_end=200):
"""Truncate text preserving start and end, with a clear marker."""
if estimate_tokens(text) <= max_tokens:
return text
max_chars = max_tokens * 4
end_chars = preserve_end * 4
if max_chars <= end_chars * 2:
return text[:max_chars]
start = text[:max_chars - end_chars - 60]
end = text[-end_chars:]
return f"{start}\n\n[... truncated {estimate_tokens(text) - max_tokens} tokens ...]\n\n{end}"
def cap_response(text):
"""Cap a single model response to prevent runaway output."""
if len(text) <= MAX_RESPONSE_CHARS:
return text
return smart_truncate(text, MAX_RESPONSE_CHARS // 4)
def build_context(parts, model_id, reserve_for_response=1024):
"""Build a prompt from parts, fitting within model's context window.
parts: list of (label, text, priority) tuples
priority: 1=must keep, 2=important, 3=can truncate heavily
Returns: assembled prompt string that fits in context.
"""
limit = get_context_limit(model_id)
budget = limit - reserve_for_response
if budget <= 0:
budget = limit // 2
# First pass: measure everything
total = sum(estimate_tokens(t) for _, t, _ in parts)
if total <= budget:
return "\n\n".join(f"{label}\n{text}" if label else text for label, text, _ in parts)
# Need to truncate — allocate budget by priority
p1 = [(l, t, p) for l, t, p in parts if p == 1]
p2 = [(l, t, p) for l, t, p in parts if p == 2]
p3 = [(l, t, p) for l, t, p in parts if p == 3]
p1_tokens = sum(estimate_tokens(t) for _, t, _ in p1)
remaining = budget - p1_tokens
if remaining <= 0:
# Even priority 1 doesn't fit — truncate p1
per_part = budget // max(len(p1), 1)
result = []
for label, text, _ in p1:
result.append(f"{label}\n{smart_truncate(text, per_part)}" if label else smart_truncate(text, per_part))
return "\n\n".join(result)
# Allocate remaining to p2, then p3
result = [f"{l}\n{t}" if l else t for l, t, _ in p1]
for group in [p2, p3]:
if not group or remaining <= 0:
continue
per_part = remaining // max(len(group), 1)
for label, text, _ in group:
truncated = smart_truncate(text, max(per_part, 100))
result.append(f"{label}\n{truncated}" if label else truncated)
remaining -= estimate_tokens(truncated)
return "\n\n".join(result)
def safe_query(model_id, prompt, fallback_summarize=True):
"""Query with context safety — auto-truncates prompt if too large, retries on overflow errors."""
limit = get_context_limit(model_id)
prompt_tokens = estimate_tokens(prompt)
# Pre-flight check: truncate if obviously too large
if prompt_tokens > limit - 500:
prompt = smart_truncate(prompt, limit - 1000)
try:
response = query_model(model_id, prompt)
return cap_response(response)
except Exception as e:
err = str(e).lower()
# Detect context overflow errors from various providers
if any(k in err for k in ["context length", "too many tokens", "maximum context", "token limit",
"content_too_large", "request too large", "413", "400"]):
if fallback_summarize:
# Aggressive truncation and retry
truncated = smart_truncate(prompt, limit // 2)
try:
response = query_model(model_id, truncated)
return cap_response(response)
except Exception:
pass
return f"[Context overflow: prompt was ~{prompt_tokens} tokens, model limit ~{limit}. Response truncated to fit.]"
raise
def parallel_safe_query(models, prompt):
"""Like parallel_query but with context safety on each model."""
results = {}
max_timeout = max((_get_timeout(m) for m in models), default=300) + 30
with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool:
futures = {pool.submit(safe_query, m, prompt): m for m in models}
for future in as_completed(futures, timeout=max_timeout):
model = futures[future]
try:
results[model] = future.result(timeout=10)
except Exception as e:
results[model] = f"Error: {e}"
return results
def sse(data):
return f"data: {json.dumps(data)}\n\n"
def parallel_query(models, prompt):
"""Query multiple models in parallel with context safety."""
return parallel_safe_query(models, prompt)
# ─── ROUTES ────────────────────────────────────────────────────
@app.route("/")
@login_required
def index():
return render_template_string(HTML)
@app.route("/api/models")
@login_required
def get_models():
SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"}
cfg = load_config()
models = []
# Local Ollama models
if cfg["providers"]["ollama"].get("enabled", True):
try:
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
resp = requests.get(f"{base}/api/tags", timeout=10)
seen = set()
for m in resp.json().get("models", []):
full = m["name"]
short = full.split(":")[0]
size = m.get("size", 0)
if short in SKIP or size < 1_000_000 or short in seen:
continue
if full in cfg.get("disabled_models", []):
continue
seen.add(short)
models.append({"name": full, "size": f"{size/(1024**3):.1f} GB",
"provider": "ollama", "provider_label": "Local",
"display_name": short})
except Exception:
pass
# Cloud models
for cm in cfg.get("cloud_models", []):
if not cm.get("enabled", True):
continue
prov = cm["id"].split("::")[0] if "::" in cm["id"] else "cloud"
if not cfg["providers"].get(prov, {}).get("enabled", False):
continue
models.append({"name": cm["id"], "size": cm.get("context", "cloud"),
"provider": prov, "provider_label": prov.title(),
"display_name": cm.get("display_name", cm["id"].split("::")[-1])})
return jsonify({"models": models})
# ─── ADMIN ROUTES ─────────────────────────────────────────────
@app.route("/admin")
@admin_required
def admin_page():
return render_template_string(ADMIN_HTML)
@app.route("/api/admin/config", methods=["GET"])
@admin_required
def admin_get_config():
cfg = load_config()
safe = json.loads(json.dumps(cfg))
for name, p in safe["providers"].items():
if p.get("api_key"):
p["api_key_set"] = True
p["api_key"] = ""
else:
p["api_key_set"] = bool(get_api_key(name))
return jsonify(safe)
@app.route("/api/admin/config", methods=["POST"])
@admin_required
def admin_save_config():
data = request.json
cfg = load_config()
# update providers (preserve existing keys if not sent)
for name, prov in data.get("providers", {}).items():
if name in cfg["providers"]:
new_key = prov.get("api_key", "")
if not new_key:
prov["api_key"] = cfg["providers"][name].get("api_key", "")
cfg["providers"][name].update(prov)
if "disabled_models" in data:
cfg["disabled_models"] = data["disabled_models"]
if "cloud_models" in data:
cfg["cloud_models"] = data["cloud_models"]
if "timeouts" in data:
cfg["timeouts"] = data["timeouts"]
save_config(cfg)
return jsonify({"ok": True})
@app.route("/api/admin/test-provider", methods=["POST"])
@admin_required
def admin_test_provider():
data = request.json
name = data.get("provider", "")
cfg = load_config()
prov = cfg["providers"].get(name, {})
try:
if name == "ollama":
r = requests.get(f"{prov.get('base_url', 'http://localhost:11434')}/api/tags", timeout=5)
count = len(r.json().get("models", []))
return jsonify({"ok": True, "message": f"Connected. {count} models available."})
elif name == "openrouter":
key = data.get("api_key") or get_api_key("openrouter")
r = requests.get(f"{prov.get('base_url', 'https://openrouter.ai/api/v1')}/models",
headers={"Authorization": f"Bearer {key}"}, timeout=10)
count = len(r.json().get("data", []))
return jsonify({"ok": True, "message": f"Connected. {count} models available."})
elif name == "openai":
key = data.get("api_key") or get_api_key("openai")
r = requests.get(f"{prov.get('base_url', 'https://api.openai.com/v1')}/models",
headers={"Authorization": f"Bearer {key}"}, timeout=10)
return jsonify({"ok": True, "message": f"Connected. {len(r.json().get('data', []))} models."})
elif name == "anthropic":
key = data.get("api_key") or get_api_key("anthropic")
r = requests.post(f"{prov.get('base_url', 'https://api.anthropic.com/v1')}/messages",
headers={"x-api-key": key, "anthropic-version": "2023-06-01", "Content-Type": "application/json"},
json={"model": "claude-haiku-4-5-20251001", "max_tokens": 1, "messages": [{"role": "user", "content": "hi"}]},
timeout=10)
return jsonify({"ok": True, "message": "Connected to Anthropic."})
return jsonify({"ok": False, "message": "Unknown provider"})
except Exception as e:
return jsonify({"ok": False, "message": str(e)})
_or_models_cache = {"data": None, "ts": 0}
@app.route("/api/admin/openrouter/models")
@admin_required
def admin_openrouter_models():
import time
now = time.time()
if _or_models_cache["data"] and now - _or_models_cache["ts"] < 300:
return jsonify({"models": _or_models_cache["data"]})
key = get_api_key("openrouter")
headers = {"Authorization": f"Bearer {key}"} if key else {}
try:
r = requests.get("https://openrouter.ai/api/v1/models", headers=headers, timeout=15)
r.raise_for_status()
free = []
for m in r.json().get("data", []):
pricing = m.get("pricing", {})
if pricing.get("prompt") == "0" and pricing.get("completion") == "0":
free.append({"id": m["id"], "name": m.get("name", m["id"]),
"context_length": m.get("context_length", 0)})
_or_models_cache["data"] = free
_or_models_cache["ts"] = now
return jsonify({"models": free})
except Exception as e:
return jsonify({"models": [], "error": str(e)})
@app.route("/api/admin/ollama-models")
@admin_required
def admin_ollama_models():
cfg = load_config()
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"}
try:
resp = requests.get(f"{base}/api/tags", timeout=10)
models = []
seen = set()
for m in resp.json().get("models", []):
full = m["name"]
short = full.split(":")[0]
size = m.get("size", 0)
if short in SKIP or size < 1_000_000 or short in seen:
continue
seen.add(short)
models.append({"name": full, "size": f"{size/(1024**3):.1f} GB",
"disabled": full in cfg.get("disabled_models", [])})
return jsonify({"models": models})
except Exception as e:
return jsonify({"models": [], "error": str(e)})
# ─── HISTORY ROUTES ────────────────────────────────────────────
@app.route("/api/runs")
@login_required
def get_runs():
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, mode, prompt, models_used, created_at FROM team_runs ORDER BY created_at DESC LIMIT 50")
runs = cur.fetchall()
for r in runs:
r["created_at"] = r["created_at"].isoformat()
return jsonify({"runs": runs})
except Exception as e:
return jsonify({"runs": [], "error": str(e)})
@app.route("/api/runs/")
@login_required
def get_run(run_id):
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM team_runs WHERE id = %s", (run_id,))
run = cur.fetchone()
if not run:
return jsonify({"error": "not found"}), 404
run["created_at"] = run["created_at"].isoformat()
return jsonify(run)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/runs/", methods=["DELETE"])
@login_required
def delete_run(run_id):
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM team_runs WHERE id = %s", (run_id,))
conn.commit()
return jsonify({"ok": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/pipelines")
@login_required
def get_pipelines():
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, pipeline, topic, status, models_used, duration_ms, created_at FROM pipeline_runs ORDER BY created_at DESC LIMIT 50")
runs = cur.fetchall()
for r in runs:
r["created_at"] = r["created_at"].isoformat() if r["created_at"] else None
return jsonify({"pipelines": runs})
except Exception as e:
return jsonify({"pipelines": [], "error": str(e)})
@app.route("/api/pipelines/")
@login_required
def get_pipeline(pid):
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM pipeline_runs WHERE id = %s", (pid,))
run = cur.fetchone()
if not run:
return jsonify({"error": "not found"}), 404
run["created_at"] = run["created_at"].isoformat() if run["created_at"] else None
run["completed_at"] = run["completed_at"].isoformat() if run["completed_at"] else None
return jsonify(run)
except Exception as e:
return jsonify({"error": str(e)}), 500
# ─── LAB: RATCHET ENGINE ──────────────────────────────────────
_lab_threads = {} # experiment_id -> thread
_lab_streams = {} # experiment_id -> [queue, ...]
def _lab_emit(exp_id, data):
for q in _lab_streams.get(exp_id, []):
q.append(data)
def _score_response(response, expected, metric, judge_model=None):
if metric == "accuracy":
if not expected:
return 5.0
resp_lower = response.lower().strip()
exp_lower = expected.lower().strip()
if exp_lower in resp_lower:
return 10.0
if any(w in resp_lower for w in exp_lower.split()):
return 5.0
return 1.0
elif metric == "speed":
return 10.0 # speed scored externally by duration
elif metric == "quality" and judge_model:
try:
judgment = query_model(judge_model,
f"Rate this response 1-10 for quality, relevance, and completeness.\n\n"
f"EXPECTED: {expected or 'No expected output specified'}\n\n"
f"RESPONSE: {response[:1500]}\n\n"
f"Return ONLY a number 1-10, nothing else.")
import re
m = re.search(r'\b(\d+)\b', judgment)
return min(float(m.group(1)), 10.0) if m else 5.0
except Exception:
return 5.0
return 5.0
def _ratchet_loop(exp_id):
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (exp_id,))
exp = cur.fetchone()
if not exp:
return
eval_cases = exp["eval_cases"] or []
models_pool = exp["models_pool"] or []
metric = exp["metric"] or "quality"
objective = exp["objective"] or "Improve response quality"
mutable = exp["mutable_config"] or {
"system_prompt": "You are a helpful assistant.",
"temperature": 0.7,
"model": models_pool[0] if models_pool else "llama3.2:latest",
}
best_config = exp["best_config"] or json.loads(json.dumps(mutable))
best_score = exp["best_score"] or 0
trial_num = exp["total_trials"] or 0
# Pick meta-model (largest in pool)
meta_model = models_pool[-1] if models_pool else "qwen2.5:latest"
judge_model = models_pool[0] if models_pool else "llama3.2:latest"
while True:
# Check if still running
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT status FROM lab_experiments WHERE id = %s", (exp_id,))
row = cur.fetchone()
if not row or row[0] != "running":
break
trial_num += 1
trial_start = time.time()
_lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": "Proposing change..."})
# Step 1: Meta-model proposes a change
history_hint = ""
if trial_num > 1:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT config_diff, avg_score, improved FROM lab_trials WHERE experiment_id = %s ORDER BY id DESC LIMIT 5", (exp_id,))
recent = cur.fetchall()
if recent:
history_hint = "\n\nRecent trials:\n" + "\n".join(
f" {'KEPT' if r['improved'] else 'DISCARDED'} (score {r['avg_score']:.1f}): {r['config_diff']}" for r in recent)
propose_prompt = (
f"You are optimizing an LLM pipeline. Objective: {objective}\n"
f"Metric: {metric} (higher is better, max 10)\n"
f"Current best score: {best_score:.1f}/10\n\n"
f"Current config:\n{json.dumps(mutable, indent=2)}\n\n"
f"Available models: {models_pool}\n"
f"Eval cases: {len(eval_cases)}\n"
f"{history_hint}\n\n"
f"Suggest exactly ONE change to improve the score. Return ONLY valid JSON with the FULL updated config. "
f"Keys: system_prompt (string), temperature (0.0-1.5), model (string from available models). "
f"Be creative but focused. Change only one thing at a time."
)
try:
proposal_raw = query_model(meta_model, propose_prompt)
import re
json_match = re.search(r'\{[\s\S]*\}', proposal_raw)
if json_match:
proposed = json.loads(json_match.group())
# Validate keys
if "system_prompt" not in proposed:
proposed["system_prompt"] = mutable.get("system_prompt", "")
if "temperature" not in proposed:
proposed["temperature"] = mutable.get("temperature", 0.7)
if "model" not in proposed:
proposed["model"] = mutable.get("model", models_pool[0])
else:
proposed = mutable
except Exception:
proposed = mutable
# Describe the diff
diffs = []
for k in set(list(mutable.keys()) + list(proposed.keys())):
old_v = mutable.get(k)
new_v = proposed.get(k)
if old_v != new_v:
if k == "system_prompt":
diffs.append(f"system_prompt changed ({len(str(old_v))} → {len(str(new_v))} chars)")
else:
diffs.append(f"{k}: {old_v} → {new_v}")
config_diff = "; ".join(diffs) if diffs else "no change"
_lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": f"Testing: {config_diff[:80]}"})
# Step 2: Run eval cases with proposed config
trial_scores = []
model_to_use = proposed.get("model", models_pool[0] if models_pool else "llama3.2:latest")
sys_prompt = proposed.get("system_prompt", "")
for ci, case in enumerate(eval_cases):
inp = case.get("input", "")
expected = case.get("expected", "")
full_prompt = f"{sys_prompt}\n\n{inp}" if sys_prompt else inp
try:
resp = query_model(model_to_use, full_prompt)
score = _score_response(resp, expected, metric, judge_model if metric == "quality" else None)
trial_scores.append({"input": inp[:100], "score": score, "response": resp[:300]})
except Exception as e:
trial_scores.append({"input": inp[:100], "score": 0, "error": str(e)})
avg_score = sum(s["score"] for s in trial_scores) / max(len(trial_scores), 1)
duration_ms = int((time.time() - trial_start) * 1000)
improved = avg_score > best_score
# Step 3: Ratchet
if improved:
best_score = avg_score
best_config = json.loads(json.dumps(proposed))
mutable = json.loads(json.dumps(proposed))
else:
mutable = json.loads(json.dumps(best_config))
# Save trial
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO lab_trials (experiment_id, trial_num, config_diff, config_snapshot, scores, avg_score, improved, duration_ms)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
(exp_id, trial_num, config_diff, json.dumps(proposed), json.dumps(trial_scores), avg_score, improved, duration_ms)
)
cur.execute(
"""UPDATE lab_experiments SET total_trials = %s, best_score = %s, best_config = %s, mutable_config = %s,
improvements = improvements + %s WHERE id = %s""",
(trial_num, best_score, json.dumps(best_config), json.dumps(mutable), 1 if improved else 0, exp_id)
)
conn.commit()
_lab_emit(exp_id, {
"type": "trial", "trial": trial_num, "score": round(avg_score, 2),
"best": round(best_score, 2), "improved": improved, "diff": config_diff[:100],
"duration_ms": duration_ms
})
# Done
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s AND status = 'running'", (exp_id,))
conn.commit()
_lab_emit(exp_id, {"type": "done"})
except Exception as e:
_lab_emit(exp_id, {"type": "error", "message": str(e)})
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'error' WHERE id = %s", (exp_id,))
conn.commit()
except Exception:
pass
# ─── LAB API ROUTES ───────────────────────────────────────────
@app.route("/lab")
@admin_required
def lab_page():
return render_template_string(LAB_HTML)
@app.route("/api/lab/experiments", methods=["GET"])
@admin_required
def lab_list():
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, name, status, metric, best_score, total_trials, improvements, models_pool, created_at FROM lab_experiments ORDER BY created_at DESC")
rows = cur.fetchall()
for r in rows:
r["created_at"] = r["created_at"].isoformat()
return jsonify({"experiments": rows})
@app.route("/api/lab/experiments", methods=["POST"])
@admin_required
def lab_create():
d = request.json
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO lab_experiments (name, objective, metric, eval_cases, mutable_config, best_config, models_pool)
VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""",
(d["name"], d.get("objective", ""), d.get("metric", "quality"),
json.dumps(d.get("eval_cases", [])),
json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})),
json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})),
d.get("models_pool", []))
)
eid = cur.fetchone()[0]
conn.commit()
return jsonify({"id": eid})
@app.route("/api/lab/experiments/", methods=["GET"])
@admin_required
def lab_get(eid):
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (eid,))
exp = cur.fetchone()
if not exp:
return jsonify({"error": "not found"}), 404
exp["created_at"] = exp["created_at"].isoformat()
cur.execute("SELECT * FROM lab_trials WHERE experiment_id = %s ORDER BY trial_num", (eid,))
exp["trials"] = cur.fetchall()
for t in exp["trials"]:
t["created_at"] = t["created_at"].isoformat()
return jsonify(exp)
@app.route("/api/lab/experiments/", methods=["PUT"])
@admin_required
def lab_update(eid):
d = request.json
sets, vals = [], []
for k in ["name", "objective", "metric"]:
if k in d:
sets.append(f"{k} = %s")
vals.append(d[k])
for k in ["eval_cases", "mutable_config"]:
if k in d:
sets.append(f"{k} = %s")
vals.append(json.dumps(d[k]))
if "models_pool" in d:
sets.append("models_pool = %s")
vals.append(d["models_pool"])
if not sets:
return jsonify({"ok": True})
vals.append(eid)
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(f"UPDATE lab_experiments SET {', '.join(sets)} WHERE id = %s", vals)
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//start", methods=["POST"])
@admin_required
def lab_start(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'running' WHERE id = %s", (eid,))
conn.commit()
if eid in _lab_threads and _lab_threads[eid].is_alive():
return jsonify({"ok": True, "message": "Already running"})
t = threading.Thread(target=_ratchet_loop, args=(eid,), daemon=True)
_lab_threads[eid] = t
t.start()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//pause", methods=["POST"])
@admin_required
def lab_pause(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s", (eid,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//reset", methods=["POST"])
@admin_required
def lab_reset(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'idle', total_trials = 0, improvements = 0, best_score = 0, best_config = mutable_config WHERE id = %s", (eid,))
cur.execute("DELETE FROM lab_trials WHERE experiment_id = %s", (eid,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//delete", methods=["DELETE"])
@admin_required
def lab_delete(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM lab_experiments WHERE id = %s", (eid,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//stream")
@admin_required
def lab_stream(eid):
q = []
_lab_streams.setdefault(eid, []).append(q)
def generate():
try:
while True:
if q:
data = q.pop(0)
yield f"data: {json.dumps(data)}\n\n"
if data.get("type") == "done":
break
else:
time.sleep(0.5)
yield ": keepalive\n\n"
finally:
_lab_streams.get(eid, []).remove(q) if q in _lab_streams.get(eid, []) else None
return Response(generate(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ─── TEAM ROUTES ──────────────────────────────────────────────
@app.route("/api/run", methods=["POST"])
@login_required
def run_team():
config = request.json
mode = config["mode"]
RUNNERS = {
"brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate,
"validator": run_validator, "roundrobin": run_roundrobin, "redteam": run_redteam,
"consensus": run_consensus, "codereview": run_codereview, "ladder": run_ladder,
"tournament": run_tournament, "evolution": run_evolution, "blindassembly": run_blindassembly,
"staircase": run_staircase, "drift": run_drift, "mesh": run_mesh,
"hallucination": run_hallucination, "timeloop": run_timeloop,
"research": run_research, "eval": run_eval, "extract": run_extract,
}
def generate():
collected = []
runner = RUNNERS.get(mode)
if runner:
for event_str in runner(config):
yield event_str
try:
data = json.loads(event_str.replace("data: ", "", 1).strip())
if data.get("type") == "response":
collected.append({"model": data.get("model", ""), "text": data.get("text", ""), "role": data.get("role", "")})
except Exception:
pass
else:
yield sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"})
yield sse({"type": "done"})
if collected:
save_run(mode, config.get("prompt", ""), config, collected)
return Response(generate(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"})
# ─── ORIGINAL 10 MODES ────────────────────────────────────────
def run_brainstorm(config):
models, prompt = config.get("models", []), config["prompt"]
synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"Querying {len(models)} models..."})
responses = parallel_query(models, prompt)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": r, "role": "respondent"})
if len(responses) > 1:
yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."})
parts = [("QUESTION:", prompt, 1), ("INSTRUCTION:", "Synthesize the best answer. Be concise.", 1)]
for m, r in responses.items():
parts.append((f"[{m}]:", cap_response(r), 3))
sp = build_context(parts, synthesizer)
try:
yield sse({"type": "response", "model": synthesizer, "text": safe_query(synthesizer, sp), "role": "synthesis"})
except Exception as e:
yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"})
def run_pipeline(config):
steps, current = config.get("steps", []), config["prompt"]
yield sse({"type": "clear"})
for i, step in enumerate(steps):
model = step["model"]
yield sse({"type": "status", "message": f"Step {i+1}/{len(steps)}: {model}..."})
try:
prompt = step["instruction"].replace("{input}", cap_response(current))
current = safe_query(model, prompt)
yield sse({"type": "response", "model": model, "text": current, "role": f"step {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": model, "text": str(e), "role": "error"}); break
def run_debate(config):
prompt, d1, d2, judge = config["prompt"], config["debater1"], config["debater2"], config["judge"]
rounds = config.get("rounds", 2)
yield sse({"type": "clear"})
history = []
for m in [d1, d2]:
yield sse({"type": "status", "message": f"{m} opening..."})
try:
r = safe_query(m, f"Give your position on: {prompt}")
history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": "opening"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
for rd in range(rounds):
for i, m in enumerate([d1, d2]):
other = [d1, d2][1-i]
other_last = [h[1] for h in history if h[0] == other][-1]
yield sse({"type": "status", "message": f"Round {rd+1}: {m}..."})
try:
rebuttal_prompt = build_context([
("Topic:", prompt, 1),
(f"Opponent ({other}) said:", cap_response(other_last), 2),
("INSTRUCTION:", "Rebuttal or concede:", 1),
], m)
r = safe_query(m, rebuttal_prompt)
history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": f"round {rd+1}"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
yield sse({"type": "status", "message": f"{judge} judging..."})
parts = [("Topic:", prompt, 1), ("INSTRUCTION:", "Judge: who won and why?", 1)]
for m, t in history:
parts.append((f"[{m}]:", cap_response(t), 3))
jp = build_context(parts, judge)
try:
yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "judge"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
def run_validator(config):
prompt, answerer, validators = config["prompt"], config["answerer"], config.get("validators", [])
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{answerer} answering..."})
try:
answer = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
yield sse({"type": "status", "message": f"Validating with {len(validators)} models..."})
vp = f"QUESTION: {prompt}\n\nANSWER:\n{answer}\n\nFact-check. Score 1-10 for accuracy, completeness, clarity. Flag errors."
results = parallel_query(validators, vp)
for m, r in results.items():
yield sse({"type": "response", "model": m, "text": r, "role": "validator"})
def run_roundrobin(config):
prompt, models, cycles = config["prompt"], config.get("models", []), config.get("cycles", 2)
yield sse({"type": "clear"})
if not models: return
yield sse({"type": "status", "message": f"{models[0]} drafting..."})
try:
current = query_model(models[0], f"Answer:\n\n{prompt}")
yield sse({"type": "response", "model": models[0], "text": current, "role": "draft"})
except Exception as e:
yield sse({"type": "response", "model": models[0], "text": str(e), "role": "error"}); return
for cycle in range(cycles):
start = 1 if cycle == 0 else 0
for i in range(start, len(models)):
m = models[i]
yield sse({"type": "status", "message": f"Cycle {cycle+1}: {m}..."})
try:
current = query_model(m, f"Question: {prompt}\n\nCurrent answer:\n{current}\n\nImprove it. Return full improved answer.")
is_last = (cycle == cycles-1) and (i == len(models)-1)
yield sse({"type": "response", "model": m, "text": current, "role": "final" if is_last else f"cycle {cycle+1}"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
def run_redteam(config):
prompt, author, attacker, patcher = config["prompt"], config["author"], config["attacker"], config["patcher"]
rounds = config.get("rounds", 2)
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{author} writing..."})
try:
current = query_model(author, prompt)
yield sse({"type": "response", "model": author, "text": current, "role": "author"})
except Exception as e:
yield sse({"type": "response", "model": author, "text": str(e), "role": "error"}); return
for r in range(rounds):
yield sse({"type": "status", "message": f"Round {r+1}: {attacker} attacking..."})
try:
attack = query_model(attacker, f"Question: {prompt}\n\nAnswer:\n{current}\n\nRED TEAM: find every flaw, error, weakness, edge case. Be aggressive.")
yield sse({"type": "response", "model": attacker, "text": attack, "role": f"attack {r+1}"})
except Exception as e:
yield sse({"type": "response", "model": attacker, "text": str(e), "role": "error"}); continue
yield sse({"type": "status", "message": f"Round {r+1}: {patcher} fixing..."})
try:
current = query_model(patcher, f"Question: {prompt}\n\nAnswer:\n{current}\n\nFlaws found:\n{attack}\n\nFix ALL issues. Return complete improved answer.")
yield sse({"type": "response", "model": patcher, "text": current, "role": "patcher" if r == rounds-1 else f"patch {r+1}"})
except Exception as e:
yield sse({"type": "response", "model": patcher, "text": str(e), "role": "error"})
def run_consensus(config):
prompt, models, max_rounds = config["prompt"], config.get("models", []), config.get("max_rounds", 3)
yield sse({"type": "clear"})
if not models: return
yield sse({"type": "status", "message": f"Round 1: {len(models)} models answering..."})
responses = parallel_query(models, prompt)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": r, "role": "round 1"})
for rd in range(2, max_rounds + 1):
yield sse({"type": "status", "message": f"Round {rd}: reviewing each other..."})
new = {}
for m in models:
parts = [("Question:", prompt, 1),
("Your answer:", cap_response(responses.get(m, "")), 2),
("INSTRUCTION:", "Revise considering other perspectives. Adopt good points, defend if right.", 1)]
for o, r in responses.items():
if o != m:
parts.append((f"[{o}]:", cap_response(r), 3))
ctx = build_context(parts, m)
try:
new[m] = safe_query(m, ctx)
yield sse({"type": "response", "model": m, "text": new[m], "role": "consensus" if rd == max_rounds else f"round {rd}"})
except Exception as e:
new[m] = responses.get(m, ""); yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
responses = new
def run_codereview(config):
prompt, coder, reviewer, tester = config["prompt"], config["coder"], config["reviewer"], config["tester"]
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{coder} coding..."})
try:
code = query_model(coder, f"Write code for this task. Only output code with brief comments.\n\n{prompt}")
yield sse({"type": "response", "model": coder, "text": code, "role": "coder"})
except Exception as e:
yield sse({"type": "response", "model": coder, "text": str(e), "role": "error"}); return
yield sse({"type": "status", "message": f"{reviewer} reviewing..."})
try:
review = query_model(reviewer, f"Task: {prompt}\n\nCode:\n{code}\n\nReview: bugs, security, performance, style, edge cases. Provide corrected code if needed.")
yield sse({"type": "response", "model": reviewer, "text": review, "role": "reviewer"})
except Exception as e:
review = ""; yield sse({"type": "response", "model": reviewer, "text": str(e), "role": "error"})
yield sse({"type": "status", "message": f"{tester} testing..."})
try:
tests = query_model(tester, f"Task: {prompt}\n\nCode:\n{code}\n\nReview:\n{review}\n\nWrite comprehensive unit tests. Cover normal, edge, error cases.")
yield sse({"type": "response", "model": tester, "text": tests, "role": "tester"})
except Exception as e:
yield sse({"type": "response", "model": tester, "text": str(e), "role": "error"})
def run_ladder(config):
prompt, models = config["prompt"], config.get("models", [])
levels = [
("Child (5yo)", "Explain to a 5-year-old. Very simple words, short sentences, fun analogies."),
("Teenager", "Explain to a 15-year-old. Everyday language, relatable examples, some technical terms."),
("College Student", "College level. Proper terminology, theory, structured explanation."),
("Professional", "Professional level. Technical language, real-world applications, trade-offs."),
("PhD Expert", "PhD/expert level. Nuanced details, current research, math if relevant, edge cases."),
]
yield sse({"type": "clear"})
for i, (name, instr) in enumerate(levels):
m = models[i % len(models)] if models else "qwen2.5"
yield sse({"type": "status", "message": f"Level {i+1}/5: {name} ({m})..."})
try:
yield sse({"type": "response", "model": m, "text": query_model(m, f"{instr}\n\nQuestion: {prompt}"), "role": name})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
def run_tournament(config):
prompt, models, judge = config["prompt"], config.get("models", []), config.get("judge", "qwen2.5")
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{len(models)} models competing..."})
responses = parallel_query(models, prompt)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": r, "role": "competitor"})
if len(responses) < 2: return
yield sse({"type": "status", "message": f"{judge} ranking..."})
parts = [("Question:", prompt, 1),
("INSTRUCTION:", "Rank all from best to worst. Score 1-10 each. Then refine the winner into the ultimate answer.", 1)]
for m, r in responses.items():
parts.append((f"[{m}]:", cap_response(r), 3))
jp = build_context(parts, judge)
try:
yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "verdict"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
# ─── NEW 7 MODES ──────────────────────────────────────────────
def run_evolution(config):
"""Genetic algorithm: generate, score, breed, mutate across generations."""
prompt, models = config["prompt"], config.get("models", [])
generations = config.get("generations", 3)
judge = config.get("judge", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
if not models: return
# Gen 0: each model generates an answer
yield sse({"type": "status", "message": "Generation 0: spawning initial population..."})
population = parallel_query(models, prompt)
for m, r in population.items():
yield sse({"type": "response", "model": m, "text": r, "role": "gen 0"})
for gen in range(1, generations + 1):
# Fitness scoring
yield sse({"type": "status", "message": f"Generation {gen}: fitness evaluation..."})
score_prompt = f"Question: {prompt}\n\nRate each answer 1-100. Return ONLY a JSON object like {{\"model_name\": score}}.\n\n"
for m, r in population.items():
score_prompt += f"[{m}]: {r.strip()}\n\n"
try:
scores_raw = query_model(judge, score_prompt)
yield sse({"type": "response", "model": judge, "text": scores_raw, "role": f"fitness gen {gen}"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}); continue
# Breed: take top 2 answers, ask a model to combine them
pop_list = list(population.items())
if len(pop_list) < 2: break
parent1, parent2 = pop_list[0], pop_list[1]
yield sse({"type": "status", "message": f"Generation {gen}: breeding + mutating..."})
new_population = {}
for m in models:
breed_prompt = (
f"Question: {prompt}\n\n"
f"Parent A ({parent1[0]}):\n{parent1[1].strip()}\n\n"
f"Parent B ({parent2[0]}):\n{parent2[1].strip()}\n\n"
f"You are {m}. Breed these two answers: take the best parts of each parent, "
f"combine them, then MUTATE by adding one novel insight or improvement. "
f"Return your evolved answer."
)
try:
offspring = query_model(m, breed_prompt)
new_population[m] = offspring
is_last = gen == generations
yield sse({"type": "response", "model": m, "text": offspring, "role": "final" if is_last else f"gen {gen}"})
except Exception as e:
new_population[m] = population.get(m, "")
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
population = new_population
def run_blindassembly(config):
"""Split question into parts, each model answers blind, assembler stitches."""
prompt, models = config["prompt"], config.get("models", [])
assembler = config.get("assembler", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
if not models: return
n = len(models)
# Step 1: Decompose the question
yield sse({"type": "status", "message": "Decomposing question into sub-tasks..."})
decompose_prompt = (
f"Split this question into exactly {n} independent sub-parts that together fully answer it. "
f"Return ONLY a numbered list, one sub-question per line. No other text.\n\n"
f"Question: {prompt}"
)
try:
parts_raw = query_model(assembler, decompose_prompt)
yield sse({"type": "response", "model": assembler, "text": parts_raw, "role": "decomposer"})
except Exception as e:
yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}); return
# Parse parts
parts = [line.strip() for line in parts_raw.strip().split("\n") if line.strip() and any(c.isalpha() for c in line)]
while len(parts) < n:
parts.append(f"Additional aspect of: {prompt}")
parts = parts[:n]
# Step 2: Each model answers their part BLIND
yield sse({"type": "status", "message": f"Sending {n} sub-tasks to models (blind)..."})
fragments = {}
with ThreadPoolExecutor(max_workers=n) as pool:
futures = {}
for i, m in enumerate(models):
blind_prompt = (
f"Answer ONLY this specific sub-question. Do not address anything else.\n\n"
f"Sub-question: {parts[i]}"
)
futures[pool.submit(query_model, m, blind_prompt)] = (m, parts[i])
for future in as_completed(futures):
m, part = futures[future]
try:
fragments[m] = {"part": part, "answer": future.result()}
yield sse({"type": "response", "model": m, "text": f"SUB-TASK: {part}\n\nANSWER:\n{fragments[m]['answer']}", "role": "blind worker"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
# Step 3: Assemble
yield sse({"type": "status", "message": f"{assembler} assembling blind fragments..."})
assemble_prompt = f"Original question: {prompt}\n\nMultiple models each answered a sub-part WITHOUT seeing each other:\n\n"
for m, data in fragments.items():
assemble_prompt += f"[{m}] (sub-task: {data['part']}):\n{data['answer'].strip()}\n\n"
assemble_prompt += "Stitch these fragments into ONE coherent, complete answer. Fill any gaps. Remove contradictions."
try:
yield sse({"type": "response", "model": assembler, "text": query_model(assembler, assemble_prompt), "role": "assembler"})
except Exception as e:
yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"})
def run_staircase(config):
"""Devil's Staircase: each round adds a new constraint."""
prompt = config["prompt"]
answerer = config["answerer"]
challenger = config["challenger"]
steps = config.get("steps", 4)
yield sse({"type": "clear"})
# Initial answer
yield sse({"type": "status", "message": f"{answerer} answering..."})
try:
current = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": current, "role": "initial answer"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
constraints = []
for s in range(steps):
# Challenger adds a constraint
yield sse({"type": "status", "message": f"Step {s+1}: {challenger} adding constraint..."})
constraint_prompt = (
f"Original question: {prompt}\n\n"
f"Current answer:\n{current}\n\n"
f"Existing constraints: {constraints if constraints else 'None yet'}\n\n"
f"Add ONE new realistic constraint, complication, or edge case that the current answer doesn't handle. "
f"Make it specific and challenging but plausible. State ONLY the new constraint, nothing else."
)
try:
new_constraint = query_model(challenger, constraint_prompt)
constraints.append(new_constraint.strip())
yield sse({"type": "response", "model": challenger, "text": new_constraint, "role": f"constraint {s+1}"})
except Exception as e:
yield sse({"type": "response", "model": challenger, "text": str(e), "role": "error"}); continue
# Answerer must adapt
yield sse({"type": "status", "message": f"Step {s+1}: {answerer} adapting..."})
adapt_prompt = (
f"Original question: {prompt}\n\n"
f"ALL constraints you must satisfy:\n" +
"\n".join(f" {i+1}. {c}" for i, c in enumerate(constraints)) +
f"\n\nYour previous answer:\n{current}\n\n"
f"Rewrite your answer to handle ALL constraints. Return the complete updated answer."
)
try:
current = query_model(answerer, adapt_prompt)
is_last = s == steps - 1
yield sse({"type": "response", "model": answerer, "text": current, "role": "final" if is_last else f"adapted {s+1}"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"})
def run_drift(config):
"""Same prompt N times to same model, analyze variance."""
prompt = config["prompt"]
target = config["target"]
samples = config.get("samples", 5)
analyzer = config["analyzer"]
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"Sampling {target} {samples} times..."})
results = []
for i in range(samples):
yield sse({"type": "status", "message": f"Sample {i+1}/{samples}..."})
try:
r = query_model(target, prompt)
results.append(r)
yield sse({"type": "response", "model": target, "text": r, "role": f"sample {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": target, "text": str(e), "role": "error"})
if len(results) < 2: return
# Analyze
yield sse({"type": "status", "message": f"{analyzer} analyzing drift..."})
analysis_prompt = (
f"Question asked: {prompt}\n\n"
f"The model '{target}' was asked this same question {len(results)} times. Here are all responses:\n\n"
)
for i, r in enumerate(results):
analysis_prompt += f"--- Sample {i+1} ---\n{r.strip()}\n\n"
analysis_prompt += (
"DRIFT ANALYSIS:\n"
"1. What claims/facts are CONSISTENT across all samples? (HIGH CONFIDENCE)\n"
"2. What claims VARY between samples? (LOW CONFIDENCE - possible hallucination)\n"
"3. What is completely CONTRADICTED between samples? (UNRELIABLE)\n"
"4. Give an overall confidence score 1-10 for the model's answer to this question.\n"
"5. Provide the 'true' answer using only high-confidence claims."
)
try:
yield sse({"type": "response", "model": analyzer, "text": query_model(analyzer, analysis_prompt), "role": "analyzer"})
except Exception as e:
yield sse({"type": "response", "model": analyzer, "text": str(e), "role": "error"})
def run_mesh(config):
"""Each model answers as a different stakeholder."""
prompt, models = config["prompt"], config.get("models", [])
synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
perspectives = [
("CEO / Business Leader", "You are a CEO. Answer from a business strategy perspective: ROI, market impact, competitive advantage, risk."),
("Software Engineer", "You are a senior engineer. Answer from a technical perspective: architecture, implementation, scalability, tech debt."),
("End User / Customer", "You are an end user/customer. Answer from a usability perspective: experience, pain points, what you actually need."),
("Regulator / Legal", "You are a regulator/legal advisor. Answer from a compliance perspective: laws, regulations, liability, ethics, privacy."),
("Competitor", "You are a competitor analyzing this. What threats/opportunities does this create? What would you do differently?"),
]
if not models: return
responses = {}
for i, (role_name, instruction) in enumerate(perspectives):
m = models[i % len(models)]
yield sse({"type": "status", "message": f"{role_name}: {m}..."})
try:
r = query_model(m, f"{instruction}\n\nQuestion: {prompt}")
responses[role_name] = (m, r)
yield sse({"type": "response", "model": m, "text": r, "role": role_name})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
# 360 synthesis
yield sse({"type": "status", "message": f"{synthesizer} weaving 360-degree view..."})
syn = f"Question: {prompt}\n\nMultiple stakeholders gave their perspective:\n\n"
for role, (m, r) in responses.items():
syn += f"[{role} ({m})]: {r.strip()}\n\n"
syn += "Synthesize a 360-degree view that balances all stakeholder perspectives. Highlight tensions and trade-offs."
try:
yield sse({"type": "response", "model": synthesizer, "text": query_model(synthesizer, syn), "role": "mesh-360"})
except Exception as e:
yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"})
def run_hallucination(config):
"""One answers, hunters verify each claim independently."""
prompt, answerer = config["prompt"], config["answerer"]
hunters = config.get("hunters", [])
yield sse({"type": "clear"})
# Get answer
yield sse({"type": "status", "message": f"{answerer} answering..."})
try:
answer = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
# Extract claims
yield sse({"type": "status", "message": "Extracting factual claims..."})
extract_prompt = (
f"Extract every factual claim from this answer as a numbered list. Include specific facts, numbers, dates, "
f"names, and cause-effect relationships. One claim per line.\n\nAnswer:\n{answer}"
)
try:
claims = query_model(answerer, extract_prompt)
yield sse({"type": "response", "model": answerer, "text": claims, "role": "claims extracted"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
# Each hunter verifies independently
yield sse({"type": "status", "message": f"{len(hunters)} hunters verifying claims..."})
hunt_prompt = (
f"Original question: {prompt}\n\n"
f"An AI generated this answer:\n{answer}\n\n"
f"Here are the extracted claims:\n{claims}\n\n"
f"For EACH claim, verdict:\n"
f" VERIFIED - you are confident this is correct\n"
f" SUSPICIOUS - might be wrong or misleading\n"
f" HALLUCINATED - this is likely made up or incorrect\n"
f" UNVERIFIABLE - cannot determine from your knowledge\n"
f"Explain your reasoning for suspicious/hallucinated claims."
)
results = parallel_query(hunters, hunt_prompt)
for m, r in results.items():
yield sse({"type": "response", "model": m, "text": r, "role": "hunter"})
def run_timeloop(config):
"""CHAOS MODE: answer -> catastrophe -> fix -> new catastrophe -> repeat."""
prompt = config["prompt"]
answerer = config["answerer"]
chaos = config["chaos"]
loops = config.get("loops", 4)
yield sse({"type": "clear"})
# Initial answer
yield sse({"type": "status", "message": f"{answerer} answering (unaware of impending doom)..."})
try:
current = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": current, "role": "initial (doomed)"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
catastrophes = []
for i in range(loops):
# Chaos agent creates a catastrophe
yield sse({"type": "status", "message": f"Loop {i+1}: CHAOS AGENT unleashed..."})
chaos_prompt = (
f"Original question: {prompt}\n\n"
f"Someone implemented this answer:\n{current}\n\n"
f"Previous catastrophes that were already fixed: {catastrophes if catastrophes else 'None yet'}\n\n"
f"You are a CHAOS AGENT. Describe a SPECIFIC, VIVID catastrophe that happened because of a flaw "
f"in this answer. Be creative and dramatic but grounded in a real flaw. "
f"Describe: 1) What went wrong 2) The cascading consequences 3) Who/what was affected. "
f"Make it different from previous catastrophes. Be theatrical!"
)
try:
catastrophe = query_model(chaos, chaos_prompt)
catastrophes.append(catastrophe.strip()[:200])
yield sse({"type": "response", "model": chaos, "text": catastrophe, "role": f"catastrophe {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}); continue
# Answerer must fix
yield sse({"type": "status", "message": f"Loop {i+1}: {answerer} desperately fixing..."})
fix_prompt = (
f"Original question: {prompt}\n\n"
f"Your previous answer:\n{current}\n\n"
f"CATASTROPHE REPORT:\n{catastrophe}\n\n"
f"ALL previous catastrophes you must also prevent:\n" +
"\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) +
f"\n\nRewrite your answer to prevent THIS catastrophe and ALL previous ones. "
f"Your answer must be BULLETPROOF. Return the complete fixed answer."
)
try:
current = query_model(answerer, fix_prompt)
is_last = i == loops - 1
yield sse({"type": "response", "model": answerer, "text": current, "role": "survivor" if is_last else f"fix {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"})
# Final verdict from chaos agent
yield sse({"type": "status", "message": f"{chaos} final inspection..."})
final_prompt = (
f"Original question: {prompt}\n\n"
f"After {loops} catastrophes, the final answer is:\n{current}\n\n"
f"All catastrophes it survived:\n" +
"\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) +
f"\n\nAs the Chaos Agent, give your final verdict: Is this answer now truly bulletproof? "
f"Rate its resilience 1-10. Can you find ONE MORE flaw? If not, admit defeat."
)
try:
yield sse({"type": "response", "model": chaos, "text": query_model(chaos, final_prompt), "role": "final judgment"})
except Exception as e:
yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"})
# ─── AUTONOMOUS PIPELINES ─────────────────────────────────────
def _save_pipeline(pipeline, topic, steps, result, models, start_ms):
import time
duration = int((time.time() * 1000) - start_ms)
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO pipeline_runs (pipeline, topic, status, steps, result, models_used, duration_ms, completed_at)
VALUES (%s, %s, 'completed', %s, %s, %s, %s, NOW())""",
(pipeline, topic, json.dumps(steps), json.dumps(result), list(set(models)), duration)
)
conn.commit()
except Exception as e:
print(f"[DB] pipeline save error: {e}")
def run_research(config):
"""Autonomous research pipeline: scout → parallel research → fact-check → synthesize."""
import time
start = time.time() * 1000
prompt = config["prompt"]
scout = config.get("scout", "llama3.2:latest")
models = config.get("models", [])
checker = config.get("checker", models[0] if models else scout)
synth = config.get("synthesizer", models[0] if models else scout)
num_q = config.get("num_questions", 5)
yield sse({"type": "clear"})
steps = []
all_models = [scout, checker, synth] + models
# Step 1: Scout generates research questions
yield sse({"type": "status", "message": f"Step 1/4: {scout} generating {num_q} research questions..."})
try:
q_prompt = (
f"You are a research scout. Given the topic below, generate exactly {num_q} specific, "
f"diverse research questions that would build a comprehensive understanding. "
f"Return ONLY a numbered list.\n\nTopic: {prompt}"
)
questions_raw = query_model(scout, q_prompt)
yield sse({"type": "response", "model": scout, "text": questions_raw, "role": "scout"})
steps.append({"step": "scout", "model": scout, "output": questions_raw})
except Exception as e:
yield sse({"type": "response", "model": scout, "text": str(e), "role": "error"})
return
# Parse questions
questions = [l.strip() for l in questions_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)]
questions = questions[:num_q]
if not questions:
yield sse({"type": "response", "model": "system", "text": "Failed to parse research questions.", "role": "error"})
return
# Step 2: Parallel research — distribute questions across models
yield sse({"type": "status", "message": f"Step 2/4: {len(models)} models researching {len(questions)} questions..."})
research_results = {}
with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool:
futures = {}
for i, q in enumerate(questions):
m = models[i % len(models)] if models else scout
rp = f"Research this question thoroughly. Provide specific facts, data, and examples.\n\nQuestion: {q}"
futures[pool.submit(query_model, m, rp)] = (m, q)
for future in as_completed(futures):
m, q = futures[future]
try:
answer = future.result()
research_results[q] = {"model": m, "answer": answer}
yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\n{answer}", "role": "researcher"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\nError: {e}", "role": "error"})
research_results[q] = {"model": m, "answer": f"Error: {e}"}
steps.append({"step": "research", "results": {q: r["answer"][:500] for q, r in research_results.items()}})
# Step 3: Fact-check
yield sse({"type": "status", "message": f"Step 3/4: {checker} fact-checking all findings..."})
check_prompt = f"Topic: {prompt}\n\nResearch findings to fact-check:\n\n"
for q, r in research_results.items():
check_prompt += f"Q: {q}\nA: {r['answer'][:300]}\n\n"
check_prompt += (
"For each finding, mark as:\n"
" VERIFIED — likely accurate\n"
" UNCERTAIN — may be wrong or outdated\n"
" FLAGGED — likely inaccurate\n"
"Be specific about what's wrong with flagged items."
)
try:
check_result = query_model(checker, check_prompt)
yield sse({"type": "response", "model": checker, "text": check_result, "role": "fact-checker"})
steps.append({"step": "fact-check", "model": checker, "output": check_result[:1000]})
except Exception as e:
check_result = f"Error: {e}"
yield sse({"type": "response", "model": checker, "text": str(e), "role": "error"})
# Step 4: Synthesize into brief
yield sse({"type": "status", "message": f"Step 4/4: {synth} synthesizing research brief..."})
synth_prompt = f"Topic: {prompt}\n\nResearch findings:\n\n"
for q, r in research_results.items():
synth_prompt += f"Q: {q}\nA: {r['answer'][:400]}\n\n"
synth_prompt += f"\nFact-check notes:\n{check_result[:500]}\n\n"
synth_prompt += (
"Synthesize ALL findings into a structured research brief with these sections:\n"
"1. EXECUTIVE SUMMARY (2-3 sentences)\n"
"2. KEY FINDINGS (bulleted list)\n"
"3. DETAILED ANALYSIS (organized by theme)\n"
"4. UNCERTAINTIES & GAPS (what needs more research)\n"
"5. RECOMMENDATIONS (actionable next steps)\n"
"Be comprehensive but concise."
)
try:
brief = query_model(synth, synth_prompt)
yield sse({"type": "response", "model": synth, "text": brief, "role": "synthesis"})
steps.append({"step": "synthesis", "model": synth, "output": brief[:2000]})
except Exception as e:
brief = f"Error: {e}"
yield sse({"type": "response", "model": synth, "text": str(e), "role": "error"})
# Save pipeline run
_save_pipeline("research", prompt, steps, {"brief": brief, "questions": questions, "fact_check": check_result[:1000]}, all_models, start)
def run_eval(config):
"""Model evaluation pipeline: same prompts → all models → judge scores → leaderboard."""
import time
start = time.time() * 1000
prompt = config["prompt"]
models = config.get("models", [])
judge = config.get("judge", models[0] if models else "qwen2.5:latest")
eval_type = config.get("eval_type", "general")
rounds = config.get("rounds", 3)
yield sse({"type": "clear"})
steps = []
all_models = models + [judge]
# Generate eval prompts based on type
yield sse({"type": "status", "message": f"Generating {rounds} {eval_type} evaluation prompts..."})
gen_prompt = (
f"Generate exactly {rounds} evaluation prompts for testing LLM capability in: {eval_type}.\n"
f"Context/focus area: {prompt}\n\n"
f"Each prompt should test a different aspect. Return ONLY a numbered list of prompts, nothing else."
)
try:
prompts_raw = query_model(judge, gen_prompt)
yield sse({"type": "response", "model": judge, "text": prompts_raw, "role": "prompt generator"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
return
eval_prompts = [l.strip() for l in prompts_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)]
eval_prompts = eval_prompts[:rounds]
if not eval_prompts:
yield sse({"type": "response", "model": "system", "text": "Failed to generate eval prompts.", "role": "error"})
return
# Run each prompt against all models
scores = {m: [] for m in models}
for ri, ep in enumerate(eval_prompts):
yield sse({"type": "status", "message": f"Round {ri+1}/{len(eval_prompts)}: Testing {len(models)} models..."})
# All models answer in parallel
responses = parallel_query(models, ep)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": f"[Round {ri+1}] {ep[:80]}...\n\n{r}", "role": f"round {ri+1}"})
# Judge scores all responses
yield sse({"type": "status", "message": f"Round {ri+1}: Judging..."})
judge_prompt = (
f"Evaluation prompt: {ep}\n\n"
f"Score each model's response 1-10 on: accuracy, completeness, clarity, reasoning.\n"
f"Return a JSON object: {{\"model_name\": {{\"score\": N, \"notes\": \"brief note\"}}}}.\n\n"
)
for m, r in responses.items():
judge_prompt += f"[{m}]:\n{r[:500]}\n\n"
try:
judgment = query_model(judge, judge_prompt)
yield sse({"type": "response", "model": judge, "text": judgment, "role": f"judge round {ri+1}"})
# Try to parse scores
try:
import re
# Find numbers after model names
for m in models:
# Look for score patterns near model name
pattern = re.escape(m) + r'.*?["\s:]+(\d+)'
match = re.search(pattern, judgment, re.IGNORECASE | re.DOTALL)
if match:
scores[m].append(int(match.group(1)))
except Exception:
pass
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
steps.append({"round": ri+1, "prompt": ep, "responses": {m: r[:300] for m, r in responses.items()}})
# Final leaderboard
yield sse({"type": "status", "message": "Generating leaderboard..."})
leaderboard = []
for m in models:
avg = sum(scores[m]) / len(scores[m]) if scores[m] else 0
leaderboard.append({"model": m, "avg_score": round(avg, 1), "rounds": len(scores[m]), "scores": scores[m]})
leaderboard.sort(key=lambda x: x["avg_score"], reverse=True)
board_text = f"LEADERBOARD — {eval_type.upper()} ({len(eval_prompts)} rounds)\n{'='*50}\n\n"
for i, entry in enumerate(leaderboard):
medal = ["1st", "2nd", "3rd"][i] if i < 3 else f"{i+1}th"
bar = "#" * int(entry["avg_score"])
board_text += f" {medal} {entry['model']:<30} {entry['avg_score']:>4}/10 {bar}\n"
if entry["scores"]:
board_text += f" Round scores: {entry['scores']}\n\n"
yield sse({"type": "response", "model": judge, "text": board_text, "role": "final"})
_save_pipeline("eval", prompt, steps, {"leaderboard": leaderboard, "eval_type": eval_type}, all_models, start)
def run_extract(config):
"""Knowledge extraction pipeline: chunk text → extract facts → verify → structured output."""
import time
start = time.time() * 1000
prompt = config["prompt"]
extractor = config.get("extractor", "qwen2.5:latest")
verifier = config.get("verifier", "gemma2:latest")
source = config.get("source", "prompt")
yield sse({"type": "clear"})
steps = []
all_models = [extractor, verifier]
# Get source text
source_text = prompt
if source != "prompt":
file_map = {
"ontology": "/home/profit/ONTOLOGY.md",
"index": "/home/profit/INDEX.md",
"summaries": "/home/profit/SUMMARIES.md",
"guides": "/home/profit/GUIDES.md",
}
fpath = file_map.get(source)
if fpath and os.path.exists(fpath):
yield sse({"type": "status", "message": f"Reading {source}..."})
with open(fpath) as f:
source_text = f.read()[:15000] # limit to ~15K chars
yield sse({"type": "response", "model": "system", "text": f"Loaded {source} ({len(source_text)} chars)", "role": "source"})
else:
yield sse({"type": "response", "model": "system", "text": f"File not found: {source}", "role": "error"})
return
# Chunk if too long
chunks = []
chunk_size = 4000
for i in range(0, len(source_text), chunk_size):
chunks.append(source_text[i:i+chunk_size])
yield sse({"type": "status", "message": f"Processing {len(chunks)} chunk(s) with {extractor}..."})
all_facts = []
all_entities = []
all_relations = []
for ci, chunk in enumerate(chunks):
yield sse({"type": "status", "message": f"Extracting from chunk {ci+1}/{len(chunks)}..."})
extract_prompt = (
f"Extract structured knowledge from this text. Return a JSON object with:\n"
f" \"facts\": [\"fact 1\", \"fact 2\", ...],\n"
f" \"entities\": [{{\"name\": \"...\", \"type\": \"...\", \"description\": \"...\"}}, ...],\n"
f" \"relationships\": [{{\"from\": \"...\", \"to\": \"...\", \"type\": \"...\"}}, ...]\n\n"
f"Be thorough. Extract EVERY factual claim, named entity, and relationship.\n\n"
f"Text:\n{chunk}"
)
try:
result = query_model(extractor, extract_prompt)
yield sse({"type": "response", "model": extractor, "text": result, "role": f"extraction {ci+1}"})
# Try to parse JSON from response
try:
import re
json_match = re.search(r'\{[\s\S]*\}', result)
if json_match:
parsed = json.loads(json_match.group())
all_facts.extend(parsed.get("facts", []))
all_entities.extend(parsed.get("entities", []))
all_relations.extend(parsed.get("relationships", []))
except Exception:
all_facts.append(result[:500])
except Exception as e:
yield sse({"type": "response", "model": extractor, "text": str(e), "role": "error"})
steps.append({"step": "extraction", "facts": len(all_facts), "entities": len(all_entities), "relations": len(all_relations)})
# Verify key facts
yield sse({"type": "status", "message": f"{verifier} verifying {len(all_facts)} facts..."})
facts_sample = all_facts[:20] # verify up to 20
verify_prompt = (
f"Verify these extracted facts. For each, mark CORRECT, INCORRECT, or UNVERIFIABLE.\n"
f"If incorrect, provide the correction.\n\n"
)
for i, f in enumerate(facts_sample):
fact_str = f if isinstance(f, str) else json.dumps(f)
verify_prompt += f"{i+1}. {fact_str}\n"
try:
verification = query_model(verifier, verify_prompt)
yield sse({"type": "response", "model": verifier, "text": verification, "role": "verifier"})
steps.append({"step": "verification", "model": verifier, "output": verification[:1000]})
except Exception as e:
verification = str(e)
yield sse({"type": "response", "model": verifier, "text": str(e), "role": "error"})
# Summary
summary = (
f"KNOWLEDGE EXTRACTION SUMMARY\n{'='*40}\n\n"
f"Source: {source}\n"
f"Facts extracted: {len(all_facts)}\n"
f"Entities found: {len(all_entities)}\n"
f"Relationships mapped: {len(all_relations)}\n\n"
f"TOP ENTITIES:\n"
)
for e in all_entities[:15]:
if isinstance(e, dict):
summary += f" [{e.get('type','?')}] {e.get('name','?')} — {e.get('description','')[:60]}\n"
summary += f"\nTOP RELATIONSHIPS:\n"
for r in all_relations[:15]:
if isinstance(r, dict):
summary += f" {r.get('from','?')} --[{r.get('type','?')}]--> {r.get('to','?')}\n"
yield sse({"type": "response", "model": "system", "text": summary, "role": "final"})
result_data = {
"facts": all_facts[:100],
"entities": all_entities[:50],
"relationships": all_relations[:50],
"verification": verification[:1000],
"source": source,
}
_save_pipeline("extract", prompt or source, steps, result_data, all_models, start)
if __name__ == "__main__":
print("\n LLM Team UI running at http://localhost:5000\n")
app.run(host="0.0.0.0", port=5000, debug=False)