From 1711d3333799c04bd3228036f33dcc0ca4842a12 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 25 Mar 2026 02:51:36 -0500 Subject: [PATCH] =?UTF-8?q?LLM=20Team=20UI=20v1.0=20=E2=80=94=20full-stack?= =?UTF-8?q?=20local=20AI=20orchestration=20platform?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Features: - 20 team modes (brainstorm, debate, consensus, red team, etc.) - 3 autonomous pipelines (research, model eval, knowledge extraction) - AutoResearch Lab with ratchet engine (Karpathy-inspired) - Multi-provider support (Ollama, OpenRouter, OpenAI, Anthropic) - Admin panel (providers, models, timeouts, OpenRouter browser) - History panel with copy/iterate/re-pipe workflow - Context budget system (smart truncation, safe_query, overflow recovery) - PostgreSQL persistence (team_runs, pipeline_runs, lab_experiments, lab_trials) - Pure Python + embedded HTML/CSS/JS, no external JS dependencies - Inline SVG score charts in Lab monitor - SSE streaming for real-time output - Systemd service with auto-restart Stack: Flask + Ollama + PostgreSQL + Bun-compatible Hardware: RTX A4000 (16GB) + 128GB RAM Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 4 + llm-team-ui.service | 14 + llm_team_config.json | 33 + llm_team_ui.py | 3472 ++++++++++++++++++++++++++++++++++++++++++ schema.sql | 54 + 5 files changed, 3577 insertions(+) create mode 100644 .gitignore create mode 100644 llm-team-ui.service create mode 100644 llm_team_config.json create mode 100644 llm_team_ui.py create mode 100644 schema.sql diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8b6087b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.pyc +.env +*.log diff --git a/llm-team-ui.service b/llm-team-ui.service new file mode 100644 index 0000000..71971c9 --- /dev/null +++ b/llm-team-ui.service @@ -0,0 +1,14 @@ +[Unit] +Description=LLM Team UI - Multi-model team web interface +After=network.target ollama.service + +[Service] +Type=simple +User=root +WorkingDirectory=/root +ExecStart=/usr/bin/python3 /root/llm_team_ui.py +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/llm_team_config.json b/llm_team_config.json new file mode 100644 index 0000000..75eb7f7 --- /dev/null +++ b/llm_team_config.json @@ -0,0 +1,33 @@ +{ + "providers": { + "ollama": { + "enabled": true, + "base_url": "http://localhost:11434", + "timeout": 300 + }, + "openrouter": { + "enabled": false, + "base_url": "https://openrouter.ai/api/v1", + "api_key": "", + "timeout": 120 + }, + "openai": { + "enabled": false, + "base_url": "https://api.openai.com/v1", + "api_key": "", + "timeout": 120 + }, + "anthropic": { + "enabled": false, + "base_url": "https://api.anthropic.com/v1", + "api_key": "", + "timeout": 120 + } + }, + "disabled_models": [], + "cloud_models": [], + "timeouts": { + "global": 300, + "per_model": {} + } +} \ No newline at end of file diff --git a/llm_team_ui.py b/llm_team_ui.py new file mode 100644 index 0000000..9c69331 --- /dev/null +++ b/llm_team_ui.py @@ -0,0 +1,3472 @@ +#!/usr/bin/env python3 +"""LLM Team UI - Web interface to configure and run multi-model teams.""" + +import json +import os +import time +import threading +import requests +import random +import psycopg2 +import psycopg2.extras +from concurrent.futures import ThreadPoolExecutor, as_completed +from flask import Flask, render_template_string, request, jsonify, Response + +app = Flask(__name__) + +CONFIG_PATH = "/root/llm_team_config.json" +DEFAULT_CONFIG = { + "providers": { + "ollama": {"enabled": True, "base_url": "http://localhost:11434", "timeout": 300}, + "openrouter": {"enabled": False, "base_url": "https://openrouter.ai/api/v1", "api_key": "", "timeout": 120}, + "openai": {"enabled": False, "base_url": "https://api.openai.com/v1", "api_key": "", "timeout": 120}, + "anthropic": {"enabled": False, "base_url": "https://api.anthropic.com/v1", "api_key": "", "timeout": 120}, + }, + "disabled_models": [], + "cloud_models": [], + "timeouts": {"global": 300, "per_model": {}}, +} + +def load_dotenv(): + for p in ["/root/.env", "/home/profit/.env"]: + if os.path.exists(p): + with open(p) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, v = line.split("=", 1) + os.environ.setdefault(k.strip(), v.strip()) + +load_dotenv() + +def load_config(): + if os.path.exists(CONFIG_PATH): + with open(CONFIG_PATH) as f: + cfg = json.load(f) + # merge any missing defaults + for k, v in DEFAULT_CONFIG.items(): + cfg.setdefault(k, v) + for k, v in DEFAULT_CONFIG["providers"].items(): + cfg["providers"].setdefault(k, v) + return cfg + return json.loads(json.dumps(DEFAULT_CONFIG)) + +def save_config(cfg): + with open(CONFIG_PATH, "w") as f: + json.dump(cfg, f, indent=2) + +def get_api_key(provider_name): + cfg = load_config() + prov = cfg["providers"].get(provider_name, {}) + key = prov.get("api_key", "") + if key: + return key + env_map = {"openrouter": "OPENROUTER_API_KEY", "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY"} + return os.environ.get(env_map.get(provider_name, ""), "") + +DB_DSN = "dbname=knowledge_base user=kbuser password=IPbLBA0EQI8u4TeM2YZrbm1OAy5nSwqC host=localhost" + +def get_db(): + return psycopg2.connect(DB_DSN) + +def save_run(mode, prompt, config_data, responses): + models = list({r.get("model", "") for r in responses if r.get("model")}) + try: + with get_db() as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO team_runs (mode, prompt, config, responses, models_used) VALUES (%s, %s, %s, %s, %s)", + (mode, prompt, json.dumps(config_data), json.dumps(responses), models) + ) + conn.commit() + except Exception as e: + print(f"[DB] save_run error: {e}") + +HTML = r""" + + + + + +LLM Team + + + +
+
+

LLM Team

+
0 models
+
+ + Lab + Admin +
+
+
+
+
+

Mode

+
+
BrainstormAll + synthesize
+
PipelineChain sequence
+
DebateArgue + judge
+
ValidatorFact-check
+
Round RobinIterate improve
+
Red TeamAttack + defend
+
ConsensusConverge
+
Code ReviewWrite+review+test
+
ELI Ladder5 levels
+
TournamentCompete + vote
+
EvolutionGenetic algo
+
Blind AssemblySplit + merge
+
StaircaseAdd constraints
+
Drift DetectConfidence map
+
PerspectiveStakeholder 360
+
Hallucinate?Claim verify
+
Time LoopCatastrophe fix!
+
+
Autonomous Pipelines
+
+
ResearchAuto brief
+
Model EvalBenchmark
+
KnowledgeExtract facts
+
+
All models answer in parallel, then one synthesizes the best parts into a final answer.
+ + +
+

Models

+
+
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+
+

Prompt

+ + +
+
+
+

Output

+
+
◆ ◆ ◆

Select a mode, pick your models, and enter a prompt to run the team.

+
+
+
+
+ +
+
+
+

Response

+ +
+
+
+
+ + + +
+
Re-pipe into mode
+
+
+ + +
+
+
+
+
+
+

History

+
+
+ + +""" + +ADMIN_HTML = r""" + + + + + +LLM Team - Admin + + + +
+
+

LLM Team Admin

+ Team UI Lab +
+
+
Providers
+
Models
+
OpenRouter
+
Timeouts
+
+ + +
+
+

Ollama (Local) +

+
+
+
+
+
+

OpenRouter +

+
+
+
+
+
+
+
+

OpenAI +

+
+
+
+
+
+
+
+

Anthropic +

+
+
+
+
+
+
+
+ + +
+
+

Local Models (Ollama)

+
Loading...
+
+
+

Cloud Models

+
No cloud models configured.
+
+ +
+ + +
+
+

Free Models on OpenRouter

+ +
Click "Fetch Models" to load the list.
+
+
+ + +
+
+

Global Default

+
+
+
+

Per-Model Overrides

+
Loading models...
+
+
+
+ + + + +""" + +LAB_HTML = r""" + + + + + +LLM Team - Lab + + + +
+
+

Lab AutoResearch

+ +
+
+
Experiments
+
Mutable Config
+
Live Monitor
+
Results
+
+ + +
+
+

Create Experiment

+
Loading...
+
+ +
+ + +
+
+

Mutable Config

+
Select an experiment from the Experiments tab first.
+ +
+
+ + +
+
+

No Experiment Selected +
+ + + +
+

+
+
Status: idle
+
Trials: 0
+
Best: 0.0/10
+
Improvements: 0
+
+
+
+

Score Progression

+
+
+
+

Trial Log

+
Start an experiment to see trials here.
+
+
+

Best Config

+
No best config yet.
+
+
+ + +
+
+

All Experiments

+
Loading...
+
+ +
+
+ + + + +""" + + +# ─── HELPERS ─────────────────────────────────────────────────── + +def _get_timeout(model_id): + cfg = load_config() + t = cfg["timeouts"]["per_model"].get(model_id) + if t: + return t + if "::" in model_id: + prov = model_id.split("::")[0] + return cfg["providers"].get(prov, {}).get("timeout", cfg["timeouts"]["global"]) + return cfg["providers"].get("ollama", {}).get("timeout", cfg["timeouts"]["global"]) + + +def query_ollama(model, prompt, timeout): + cfg = load_config() + base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") + resp = requests.post(f"{base}/api/generate", json={ + "model": model, "prompt": prompt, "stream": False, + }, timeout=timeout) + resp.raise_for_status() + return resp.json()["response"] + + +def query_openai_compatible(model, prompt, provider_name, timeout): + cfg = load_config() + prov = cfg["providers"].get(provider_name, {}) + base = prov.get("base_url", "https://openrouter.ai/api/v1") + api_key = get_api_key(provider_name) + headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} + if provider_name == "openrouter": + headers["HTTP-Referer"] = "http://localhost:5000" + headers["X-Title"] = "LLM Team UI" + resp = requests.post(f"{base}/chat/completions", headers=headers, json={ + "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False, + }, timeout=timeout) + resp.raise_for_status() + return resp.json()["choices"][0]["message"]["content"] + + +def query_anthropic(model, prompt, timeout): + cfg = load_config() + prov = cfg["providers"].get("anthropic", {}) + base = prov.get("base_url", "https://api.anthropic.com/v1") + api_key = get_api_key("anthropic") + resp = requests.post(f"{base}/messages", headers={ + "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json", + }, json={ + "model": model, "max_tokens": 4096, + "messages": [{"role": "user", "content": prompt}], + }, timeout=timeout) + resp.raise_for_status() + return resp.json()["content"][0]["text"] + + +def query_model(model_id, prompt): + timeout = _get_timeout(model_id) + if "::" in model_id: + provider_name, model_name = model_id.split("::", 1) + if provider_name == "anthropic": + return query_anthropic(model_name, prompt, timeout) + return query_openai_compatible(model_name, prompt, provider_name, timeout) + return query_ollama(model_id, prompt, timeout) + + +# ─── CONTEXT MANAGEMENT ─────────────────────────────────────── + +# Context window sizes (tokens) — conservative estimates for safe prompting +MODEL_CONTEXT = { + "llama3.2": 4096, "mistral": 8192, "gemma2": 8192, "qwen2.5": 8192, + "gpt-oss": 4096, "gpt-4o": 128000, "gpt-4o-mini": 128000, + "claude-3": 200000, "claude-sonnet": 200000, "claude-haiku": 200000, +} +DEFAULT_CONTEXT = 4096 # safe fallback for unknown models +MAX_RESPONSE_CHARS = 12000 # cap individual responses (~3K tokens) + + +def estimate_tokens(text): + """Rough token estimate: ~4 chars per token for English.""" + return len(text) // 4 + 1 + + +def get_context_limit(model_id): + """Get context window size for a model.""" + name = model_id.split("::")[-1].split(":")[0].lower() + for key, limit in MODEL_CONTEXT.items(): + if key in name: + return limit + # OpenRouter models generally have larger contexts + if "::" in model_id: + return 16000 + return DEFAULT_CONTEXT + + +def smart_truncate(text, max_tokens, preserve_end=200): + """Truncate text preserving start and end, with a clear marker.""" + if estimate_tokens(text) <= max_tokens: + return text + max_chars = max_tokens * 4 + end_chars = preserve_end * 4 + if max_chars <= end_chars * 2: + return text[:max_chars] + start = text[:max_chars - end_chars - 60] + end = text[-end_chars:] + return f"{start}\n\n[... truncated {estimate_tokens(text) - max_tokens} tokens ...]\n\n{end}" + + +def cap_response(text): + """Cap a single model response to prevent runaway output.""" + if len(text) <= MAX_RESPONSE_CHARS: + return text + return smart_truncate(text, MAX_RESPONSE_CHARS // 4) + + +def build_context(parts, model_id, reserve_for_response=1024): + """Build a prompt from parts, fitting within model's context window. + + parts: list of (label, text, priority) tuples + priority: 1=must keep, 2=important, 3=can truncate heavily + Returns: assembled prompt string that fits in context. + """ + limit = get_context_limit(model_id) + budget = limit - reserve_for_response + if budget <= 0: + budget = limit // 2 + + # First pass: measure everything + total = sum(estimate_tokens(t) for _, t, _ in parts) + if total <= budget: + return "\n\n".join(f"{label}\n{text}" if label else text for label, text, _ in parts) + + # Need to truncate — allocate budget by priority + p1 = [(l, t, p) for l, t, p in parts if p == 1] + p2 = [(l, t, p) for l, t, p in parts if p == 2] + p3 = [(l, t, p) for l, t, p in parts if p == 3] + + p1_tokens = sum(estimate_tokens(t) for _, t, _ in p1) + remaining = budget - p1_tokens + + if remaining <= 0: + # Even priority 1 doesn't fit — truncate p1 + per_part = budget // max(len(p1), 1) + result = [] + for label, text, _ in p1: + result.append(f"{label}\n{smart_truncate(text, per_part)}" if label else smart_truncate(text, per_part)) + return "\n\n".join(result) + + # Allocate remaining to p2, then p3 + result = [f"{l}\n{t}" if l else t for l, t, _ in p1] + + for group in [p2, p3]: + if not group or remaining <= 0: + continue + per_part = remaining // max(len(group), 1) + for label, text, _ in group: + truncated = smart_truncate(text, max(per_part, 100)) + result.append(f"{label}\n{truncated}" if label else truncated) + remaining -= estimate_tokens(truncated) + + return "\n\n".join(result) + + +def safe_query(model_id, prompt, fallback_summarize=True): + """Query with context safety — auto-truncates prompt if too large, retries on overflow errors.""" + limit = get_context_limit(model_id) + prompt_tokens = estimate_tokens(prompt) + + # Pre-flight check: truncate if obviously too large + if prompt_tokens > limit - 500: + prompt = smart_truncate(prompt, limit - 1000) + + try: + response = query_model(model_id, prompt) + return cap_response(response) + except Exception as e: + err = str(e).lower() + # Detect context overflow errors from various providers + if any(k in err for k in ["context length", "too many tokens", "maximum context", "token limit", + "content_too_large", "request too large", "413", "400"]): + if fallback_summarize: + # Aggressive truncation and retry + truncated = smart_truncate(prompt, limit // 2) + try: + response = query_model(model_id, truncated) + return cap_response(response) + except Exception: + pass + return f"[Context overflow: prompt was ~{prompt_tokens} tokens, model limit ~{limit}. Response truncated to fit.]" + raise + + +def parallel_safe_query(models, prompt): + """Like parallel_query but with context safety on each model.""" + results = {} + max_timeout = max((_get_timeout(m) for m in models), default=300) + 30 + with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: + futures = {pool.submit(safe_query, m, prompt): m for m in models} + for future in as_completed(futures, timeout=max_timeout): + model = futures[future] + try: + results[model] = future.result(timeout=10) + except Exception as e: + results[model] = f"Error: {e}" + return results + + +def sse(data): + return f"data: {json.dumps(data)}\n\n" + + +def parallel_query(models, prompt): + """Query multiple models in parallel with context safety.""" + return parallel_safe_query(models, prompt) + + +# ─── ROUTES ──────────────────────────────────────────────────── + +@app.route("/") +def index(): + return render_template_string(HTML) + + +@app.route("/api/models") +def get_models(): + SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"} + cfg = load_config() + models = [] + # Local Ollama models + if cfg["providers"]["ollama"].get("enabled", True): + try: + base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") + resp = requests.get(f"{base}/api/tags", timeout=10) + seen = set() + for m in resp.json().get("models", []): + full = m["name"] + short = full.split(":")[0] + size = m.get("size", 0) + if short in SKIP or size < 1_000_000 or short in seen: + continue + if full in cfg.get("disabled_models", []): + continue + seen.add(short) + models.append({"name": full, "size": f"{size/(1024**3):.1f} GB", + "provider": "ollama", "provider_label": "Local", + "display_name": short}) + except Exception: + pass + # Cloud models + for cm in cfg.get("cloud_models", []): + if not cm.get("enabled", True): + continue + prov = cm["id"].split("::")[0] if "::" in cm["id"] else "cloud" + if not cfg["providers"].get(prov, {}).get("enabled", False): + continue + models.append({"name": cm["id"], "size": cm.get("context", "cloud"), + "provider": prov, "provider_label": prov.title(), + "display_name": cm.get("display_name", cm["id"].split("::")[-1])}) + return jsonify({"models": models}) + + +# ─── ADMIN ROUTES ───────────────────────────────────────────── + +@app.route("/admin") +def admin_page(): + return render_template_string(ADMIN_HTML) + + +@app.route("/api/admin/config", methods=["GET"]) +def admin_get_config(): + cfg = load_config() + safe = json.loads(json.dumps(cfg)) + for name, p in safe["providers"].items(): + if p.get("api_key"): + p["api_key_set"] = True + p["api_key"] = "" + else: + p["api_key_set"] = bool(get_api_key(name)) + return jsonify(safe) + + +@app.route("/api/admin/config", methods=["POST"]) +def admin_save_config(): + data = request.json + cfg = load_config() + # update providers (preserve existing keys if not sent) + for name, prov in data.get("providers", {}).items(): + if name in cfg["providers"]: + new_key = prov.get("api_key", "") + if not new_key: + prov["api_key"] = cfg["providers"][name].get("api_key", "") + cfg["providers"][name].update(prov) + if "disabled_models" in data: + cfg["disabled_models"] = data["disabled_models"] + if "cloud_models" in data: + cfg["cloud_models"] = data["cloud_models"] + if "timeouts" in data: + cfg["timeouts"] = data["timeouts"] + save_config(cfg) + return jsonify({"ok": True}) + + +@app.route("/api/admin/test-provider", methods=["POST"]) +def admin_test_provider(): + data = request.json + name = data.get("provider", "") + cfg = load_config() + prov = cfg["providers"].get(name, {}) + try: + if name == "ollama": + r = requests.get(f"{prov.get('base_url', 'http://localhost:11434')}/api/tags", timeout=5) + count = len(r.json().get("models", [])) + return jsonify({"ok": True, "message": f"Connected. {count} models available."}) + elif name == "openrouter": + key = data.get("api_key") or get_api_key("openrouter") + r = requests.get(f"{prov.get('base_url', 'https://openrouter.ai/api/v1')}/models", + headers={"Authorization": f"Bearer {key}"}, timeout=10) + count = len(r.json().get("data", [])) + return jsonify({"ok": True, "message": f"Connected. {count} models available."}) + elif name == "openai": + key = data.get("api_key") or get_api_key("openai") + r = requests.get(f"{prov.get('base_url', 'https://api.openai.com/v1')}/models", + headers={"Authorization": f"Bearer {key}"}, timeout=10) + return jsonify({"ok": True, "message": f"Connected. {len(r.json().get('data', []))} models."}) + elif name == "anthropic": + key = data.get("api_key") or get_api_key("anthropic") + r = requests.post(f"{prov.get('base_url', 'https://api.anthropic.com/v1')}/messages", + headers={"x-api-key": key, "anthropic-version": "2023-06-01", "Content-Type": "application/json"}, + json={"model": "claude-haiku-4-5-20251001", "max_tokens": 1, "messages": [{"role": "user", "content": "hi"}]}, + timeout=10) + return jsonify({"ok": True, "message": "Connected to Anthropic."}) + return jsonify({"ok": False, "message": "Unknown provider"}) + except Exception as e: + return jsonify({"ok": False, "message": str(e)}) + + +_or_models_cache = {"data": None, "ts": 0} + +@app.route("/api/admin/openrouter/models") +def admin_openrouter_models(): + import time + now = time.time() + if _or_models_cache["data"] and now - _or_models_cache["ts"] < 300: + return jsonify({"models": _or_models_cache["data"]}) + key = get_api_key("openrouter") + headers = {"Authorization": f"Bearer {key}"} if key else {} + try: + r = requests.get("https://openrouter.ai/api/v1/models", headers=headers, timeout=15) + r.raise_for_status() + free = [] + for m in r.json().get("data", []): + pricing = m.get("pricing", {}) + if pricing.get("prompt") == "0" and pricing.get("completion") == "0": + free.append({"id": m["id"], "name": m.get("name", m["id"]), + "context_length": m.get("context_length", 0)}) + _or_models_cache["data"] = free + _or_models_cache["ts"] = now + return jsonify({"models": free}) + except Exception as e: + return jsonify({"models": [], "error": str(e)}) + + +@app.route("/api/admin/ollama-models") +def admin_ollama_models(): + cfg = load_config() + base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") + SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"} + try: + resp = requests.get(f"{base}/api/tags", timeout=10) + models = [] + seen = set() + for m in resp.json().get("models", []): + full = m["name"] + short = full.split(":")[0] + size = m.get("size", 0) + if short in SKIP or size < 1_000_000 or short in seen: + continue + seen.add(short) + models.append({"name": full, "size": f"{size/(1024**3):.1f} GB", + "disabled": full in cfg.get("disabled_models", [])}) + return jsonify({"models": models}) + except Exception as e: + return jsonify({"models": [], "error": str(e)}) + + +# ─── HISTORY ROUTES ──────────────────────────────────────────── + +@app.route("/api/runs") +def get_runs(): + try: + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT id, mode, prompt, models_used, created_at FROM team_runs ORDER BY created_at DESC LIMIT 50") + runs = cur.fetchall() + for r in runs: + r["created_at"] = r["created_at"].isoformat() + return jsonify({"runs": runs}) + except Exception as e: + return jsonify({"runs": [], "error": str(e)}) + + +@app.route("/api/runs/") +def get_run(run_id): + try: + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT * FROM team_runs WHERE id = %s", (run_id,)) + run = cur.fetchone() + if not run: + return jsonify({"error": "not found"}), 404 + run["created_at"] = run["created_at"].isoformat() + return jsonify(run) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/runs/", methods=["DELETE"]) +def delete_run(run_id): + try: + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("DELETE FROM team_runs WHERE id = %s", (run_id,)) + conn.commit() + return jsonify({"ok": True}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/api/pipelines") +def get_pipelines(): + try: + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT id, pipeline, topic, status, models_used, duration_ms, created_at FROM pipeline_runs ORDER BY created_at DESC LIMIT 50") + runs = cur.fetchall() + for r in runs: + r["created_at"] = r["created_at"].isoformat() if r["created_at"] else None + return jsonify({"pipelines": runs}) + except Exception as e: + return jsonify({"pipelines": [], "error": str(e)}) + + +@app.route("/api/pipelines/") +def get_pipeline(pid): + try: + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT * FROM pipeline_runs WHERE id = %s", (pid,)) + run = cur.fetchone() + if not run: + return jsonify({"error": "not found"}), 404 + run["created_at"] = run["created_at"].isoformat() if run["created_at"] else None + run["completed_at"] = run["completed_at"].isoformat() if run["completed_at"] else None + return jsonify(run) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# ─── LAB: RATCHET ENGINE ────────────────────────────────────── + +_lab_threads = {} # experiment_id -> thread +_lab_streams = {} # experiment_id -> [queue, ...] + +def _lab_emit(exp_id, data): + for q in _lab_streams.get(exp_id, []): + q.append(data) + + +def _score_response(response, expected, metric, judge_model=None): + if metric == "accuracy": + if not expected: + return 5.0 + resp_lower = response.lower().strip() + exp_lower = expected.lower().strip() + if exp_lower in resp_lower: + return 10.0 + if any(w in resp_lower for w in exp_lower.split()): + return 5.0 + return 1.0 + elif metric == "speed": + return 10.0 # speed scored externally by duration + elif metric == "quality" and judge_model: + try: + judgment = query_model(judge_model, + f"Rate this response 1-10 for quality, relevance, and completeness.\n\n" + f"EXPECTED: {expected or 'No expected output specified'}\n\n" + f"RESPONSE: {response[:1500]}\n\n" + f"Return ONLY a number 1-10, nothing else.") + import re + m = re.search(r'\b(\d+)\b', judgment) + return min(float(m.group(1)), 10.0) if m else 5.0 + except Exception: + return 5.0 + return 5.0 + + +def _ratchet_loop(exp_id): + try: + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (exp_id,)) + exp = cur.fetchone() + if not exp: + return + + eval_cases = exp["eval_cases"] or [] + models_pool = exp["models_pool"] or [] + metric = exp["metric"] or "quality" + objective = exp["objective"] or "Improve response quality" + mutable = exp["mutable_config"] or { + "system_prompt": "You are a helpful assistant.", + "temperature": 0.7, + "model": models_pool[0] if models_pool else "llama3.2:latest", + } + best_config = exp["best_config"] or json.loads(json.dumps(mutable)) + best_score = exp["best_score"] or 0 + trial_num = exp["total_trials"] or 0 + + # Pick meta-model (largest in pool) + meta_model = models_pool[-1] if models_pool else "qwen2.5:latest" + judge_model = models_pool[0] if models_pool else "llama3.2:latest" + + while True: + # Check if still running + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("SELECT status FROM lab_experiments WHERE id = %s", (exp_id,)) + row = cur.fetchone() + if not row or row[0] != "running": + break + + trial_num += 1 + trial_start = time.time() + _lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": "Proposing change..."}) + + # Step 1: Meta-model proposes a change + history_hint = "" + if trial_num > 1: + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT config_diff, avg_score, improved FROM lab_trials WHERE experiment_id = %s ORDER BY id DESC LIMIT 5", (exp_id,)) + recent = cur.fetchall() + if recent: + history_hint = "\n\nRecent trials:\n" + "\n".join( + f" {'KEPT' if r['improved'] else 'DISCARDED'} (score {r['avg_score']:.1f}): {r['config_diff']}" for r in recent) + + propose_prompt = ( + f"You are optimizing an LLM pipeline. Objective: {objective}\n" + f"Metric: {metric} (higher is better, max 10)\n" + f"Current best score: {best_score:.1f}/10\n\n" + f"Current config:\n{json.dumps(mutable, indent=2)}\n\n" + f"Available models: {models_pool}\n" + f"Eval cases: {len(eval_cases)}\n" + f"{history_hint}\n\n" + f"Suggest exactly ONE change to improve the score. Return ONLY valid JSON with the FULL updated config. " + f"Keys: system_prompt (string), temperature (0.0-1.5), model (string from available models). " + f"Be creative but focused. Change only one thing at a time." + ) + try: + proposal_raw = query_model(meta_model, propose_prompt) + import re + json_match = re.search(r'\{[\s\S]*\}', proposal_raw) + if json_match: + proposed = json.loads(json_match.group()) + # Validate keys + if "system_prompt" not in proposed: + proposed["system_prompt"] = mutable.get("system_prompt", "") + if "temperature" not in proposed: + proposed["temperature"] = mutable.get("temperature", 0.7) + if "model" not in proposed: + proposed["model"] = mutable.get("model", models_pool[0]) + else: + proposed = mutable + except Exception: + proposed = mutable + + # Describe the diff + diffs = [] + for k in set(list(mutable.keys()) + list(proposed.keys())): + old_v = mutable.get(k) + new_v = proposed.get(k) + if old_v != new_v: + if k == "system_prompt": + diffs.append(f"system_prompt changed ({len(str(old_v))} → {len(str(new_v))} chars)") + else: + diffs.append(f"{k}: {old_v} → {new_v}") + config_diff = "; ".join(diffs) if diffs else "no change" + _lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": f"Testing: {config_diff[:80]}"}) + + # Step 2: Run eval cases with proposed config + trial_scores = [] + model_to_use = proposed.get("model", models_pool[0] if models_pool else "llama3.2:latest") + sys_prompt = proposed.get("system_prompt", "") + + for ci, case in enumerate(eval_cases): + inp = case.get("input", "") + expected = case.get("expected", "") + full_prompt = f"{sys_prompt}\n\n{inp}" if sys_prompt else inp + try: + resp = query_model(model_to_use, full_prompt) + score = _score_response(resp, expected, metric, judge_model if metric == "quality" else None) + trial_scores.append({"input": inp[:100], "score": score, "response": resp[:300]}) + except Exception as e: + trial_scores.append({"input": inp[:100], "score": 0, "error": str(e)}) + + avg_score = sum(s["score"] for s in trial_scores) / max(len(trial_scores), 1) + duration_ms = int((time.time() - trial_start) * 1000) + improved = avg_score > best_score + + # Step 3: Ratchet + if improved: + best_score = avg_score + best_config = json.loads(json.dumps(proposed)) + mutable = json.loads(json.dumps(proposed)) + else: + mutable = json.loads(json.dumps(best_config)) + + # Save trial + with get_db() as conn: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO lab_trials (experiment_id, trial_num, config_diff, config_snapshot, scores, avg_score, improved, duration_ms) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", + (exp_id, trial_num, config_diff, json.dumps(proposed), json.dumps(trial_scores), avg_score, improved, duration_ms) + ) + cur.execute( + """UPDATE lab_experiments SET total_trials = %s, best_score = %s, best_config = %s, mutable_config = %s, + improvements = improvements + %s WHERE id = %s""", + (trial_num, best_score, json.dumps(best_config), json.dumps(mutable), 1 if improved else 0, exp_id) + ) + conn.commit() + + _lab_emit(exp_id, { + "type": "trial", "trial": trial_num, "score": round(avg_score, 2), + "best": round(best_score, 2), "improved": improved, "diff": config_diff[:100], + "duration_ms": duration_ms + }) + + # Done + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s AND status = 'running'", (exp_id,)) + conn.commit() + _lab_emit(exp_id, {"type": "done"}) + + except Exception as e: + _lab_emit(exp_id, {"type": "error", "message": str(e)}) + try: + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("UPDATE lab_experiments SET status = 'error' WHERE id = %s", (exp_id,)) + conn.commit() + except Exception: + pass + + +# ─── LAB API ROUTES ─────────────────────────────────────────── + +@app.route("/lab") +def lab_page(): + return render_template_string(LAB_HTML) + + +@app.route("/api/lab/experiments", methods=["GET"]) +def lab_list(): + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT id, name, status, metric, best_score, total_trials, improvements, models_pool, created_at FROM lab_experiments ORDER BY created_at DESC") + rows = cur.fetchall() + for r in rows: + r["created_at"] = r["created_at"].isoformat() + return jsonify({"experiments": rows}) + + +@app.route("/api/lab/experiments", methods=["POST"]) +def lab_create(): + d = request.json + with get_db() as conn: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO lab_experiments (name, objective, metric, eval_cases, mutable_config, best_config, models_pool) + VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""", + (d["name"], d.get("objective", ""), d.get("metric", "quality"), + json.dumps(d.get("eval_cases", [])), + json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})), + json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})), + d.get("models_pool", [])) + ) + eid = cur.fetchone()[0] + conn.commit() + return jsonify({"id": eid}) + + +@app.route("/api/lab/experiments/", methods=["GET"]) +def lab_get(eid): + with get_db() as conn: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (eid,)) + exp = cur.fetchone() + if not exp: + return jsonify({"error": "not found"}), 404 + exp["created_at"] = exp["created_at"].isoformat() + cur.execute("SELECT * FROM lab_trials WHERE experiment_id = %s ORDER BY trial_num", (eid,)) + exp["trials"] = cur.fetchall() + for t in exp["trials"]: + t["created_at"] = t["created_at"].isoformat() + return jsonify(exp) + + +@app.route("/api/lab/experiments/", methods=["PUT"]) +def lab_update(eid): + d = request.json + sets, vals = [], [] + for k in ["name", "objective", "metric"]: + if k in d: + sets.append(f"{k} = %s") + vals.append(d[k]) + for k in ["eval_cases", "mutable_config"]: + if k in d: + sets.append(f"{k} = %s") + vals.append(json.dumps(d[k])) + if "models_pool" in d: + sets.append("models_pool = %s") + vals.append(d["models_pool"]) + if not sets: + return jsonify({"ok": True}) + vals.append(eid) + with get_db() as conn: + with conn.cursor() as cur: + cur.execute(f"UPDATE lab_experiments SET {', '.join(sets)} WHERE id = %s", vals) + conn.commit() + return jsonify({"ok": True}) + + +@app.route("/api/lab/experiments//start", methods=["POST"]) +def lab_start(eid): + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("UPDATE lab_experiments SET status = 'running' WHERE id = %s", (eid,)) + conn.commit() + if eid in _lab_threads and _lab_threads[eid].is_alive(): + return jsonify({"ok": True, "message": "Already running"}) + t = threading.Thread(target=_ratchet_loop, args=(eid,), daemon=True) + _lab_threads[eid] = t + t.start() + return jsonify({"ok": True}) + + +@app.route("/api/lab/experiments//pause", methods=["POST"]) +def lab_pause(eid): + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s", (eid,)) + conn.commit() + return jsonify({"ok": True}) + + +@app.route("/api/lab/experiments//reset", methods=["POST"]) +def lab_reset(eid): + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("UPDATE lab_experiments SET status = 'idle', total_trials = 0, improvements = 0, best_score = 0, best_config = mutable_config WHERE id = %s", (eid,)) + cur.execute("DELETE FROM lab_trials WHERE experiment_id = %s", (eid,)) + conn.commit() + return jsonify({"ok": True}) + + +@app.route("/api/lab/experiments//delete", methods=["DELETE"]) +def lab_delete(eid): + with get_db() as conn: + with conn.cursor() as cur: + cur.execute("DELETE FROM lab_experiments WHERE id = %s", (eid,)) + conn.commit() + return jsonify({"ok": True}) + + +@app.route("/api/lab/experiments//stream") +def lab_stream(eid): + q = [] + _lab_streams.setdefault(eid, []).append(q) + def generate(): + try: + while True: + if q: + data = q.pop(0) + yield f"data: {json.dumps(data)}\n\n" + if data.get("type") == "done": + break + else: + time.sleep(0.5) + yield ": keepalive\n\n" + finally: + _lab_streams.get(eid, []).remove(q) if q in _lab_streams.get(eid, []) else None + return Response(generate(), mimetype="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) + + +# ─── TEAM ROUTES ────────────────────────────────────────────── + +@app.route("/api/run", methods=["POST"]) +def run_team(): + config = request.json + mode = config["mode"] + + RUNNERS = { + "brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate, + "validator": run_validator, "roundrobin": run_roundrobin, "redteam": run_redteam, + "consensus": run_consensus, "codereview": run_codereview, "ladder": run_ladder, + "tournament": run_tournament, "evolution": run_evolution, "blindassembly": run_blindassembly, + "staircase": run_staircase, "drift": run_drift, "mesh": run_mesh, + "hallucination": run_hallucination, "timeloop": run_timeloop, + "research": run_research, "eval": run_eval, "extract": run_extract, + } + + def generate(): + collected = [] + runner = RUNNERS.get(mode) + if runner: + for event_str in runner(config): + yield event_str + try: + data = json.loads(event_str.replace("data: ", "", 1).strip()) + if data.get("type") == "response": + collected.append({"model": data.get("model", ""), "text": data.get("text", ""), "role": data.get("role", "")}) + except Exception: + pass + else: + yield sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"}) + yield sse({"type": "done"}) + if collected: + save_run(mode, config.get("prompt", ""), config, collected) + + return Response(generate(), mimetype="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"}) + + +# ─── ORIGINAL 10 MODES ──────────────────────────────────────── + +def run_brainstorm(config): + models, prompt = config.get("models", []), config["prompt"] + synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5") + yield sse({"type": "clear"}) + yield sse({"type": "status", "message": f"Querying {len(models)} models..."}) + responses = parallel_query(models, prompt) + for m, r in responses.items(): + yield sse({"type": "response", "model": m, "text": r, "role": "respondent"}) + if len(responses) > 1: + yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."}) + parts = [("QUESTION:", prompt, 1), ("INSTRUCTION:", "Synthesize the best answer. Be concise.", 1)] + for m, r in responses.items(): + parts.append((f"[{m}]:", cap_response(r), 3)) + sp = build_context(parts, synthesizer) + try: + yield sse({"type": "response", "model": synthesizer, "text": safe_query(synthesizer, sp), "role": "synthesis"}) + except Exception as e: + yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"}) + + +def run_pipeline(config): + steps, current = config.get("steps", []), config["prompt"] + yield sse({"type": "clear"}) + for i, step in enumerate(steps): + model = step["model"] + yield sse({"type": "status", "message": f"Step {i+1}/{len(steps)}: {model}..."}) + try: + prompt = step["instruction"].replace("{input}", cap_response(current)) + current = safe_query(model, prompt) + yield sse({"type": "response", "model": model, "text": current, "role": f"step {i+1}"}) + except Exception as e: + yield sse({"type": "response", "model": model, "text": str(e), "role": "error"}); break + + +def run_debate(config): + prompt, d1, d2, judge = config["prompt"], config["debater1"], config["debater2"], config["judge"] + rounds = config.get("rounds", 2) + yield sse({"type": "clear"}) + history = [] + for m in [d1, d2]: + yield sse({"type": "status", "message": f"{m} opening..."}) + try: + r = safe_query(m, f"Give your position on: {prompt}") + history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": "opening"}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + for rd in range(rounds): + for i, m in enumerate([d1, d2]): + other = [d1, d2][1-i] + other_last = [h[1] for h in history if h[0] == other][-1] + yield sse({"type": "status", "message": f"Round {rd+1}: {m}..."}) + try: + rebuttal_prompt = build_context([ + ("Topic:", prompt, 1), + (f"Opponent ({other}) said:", cap_response(other_last), 2), + ("INSTRUCTION:", "Rebuttal or concede:", 1), + ], m) + r = safe_query(m, rebuttal_prompt) + history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": f"round {rd+1}"}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + yield sse({"type": "status", "message": f"{judge} judging..."}) + parts = [("Topic:", prompt, 1), ("INSTRUCTION:", "Judge: who won and why?", 1)] + for m, t in history: + parts.append((f"[{m}]:", cap_response(t), 3)) + jp = build_context(parts, judge) + try: + yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "judge"}) + except Exception as e: + yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) + + +def run_validator(config): + prompt, answerer, validators = config["prompt"], config["answerer"], config.get("validators", []) + yield sse({"type": "clear"}) + yield sse({"type": "status", "message": f"{answerer} answering..."}) + try: + answer = query_model(answerer, prompt) + yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return + yield sse({"type": "status", "message": f"Validating with {len(validators)} models..."}) + vp = f"QUESTION: {prompt}\n\nANSWER:\n{answer}\n\nFact-check. Score 1-10 for accuracy, completeness, clarity. Flag errors." + results = parallel_query(validators, vp) + for m, r in results.items(): + yield sse({"type": "response", "model": m, "text": r, "role": "validator"}) + + +def run_roundrobin(config): + prompt, models, cycles = config["prompt"], config.get("models", []), config.get("cycles", 2) + yield sse({"type": "clear"}) + if not models: return + yield sse({"type": "status", "message": f"{models[0]} drafting..."}) + try: + current = query_model(models[0], f"Answer:\n\n{prompt}") + yield sse({"type": "response", "model": models[0], "text": current, "role": "draft"}) + except Exception as e: + yield sse({"type": "response", "model": models[0], "text": str(e), "role": "error"}); return + for cycle in range(cycles): + start = 1 if cycle == 0 else 0 + for i in range(start, len(models)): + m = models[i] + yield sse({"type": "status", "message": f"Cycle {cycle+1}: {m}..."}) + try: + current = query_model(m, f"Question: {prompt}\n\nCurrent answer:\n{current}\n\nImprove it. Return full improved answer.") + is_last = (cycle == cycles-1) and (i == len(models)-1) + yield sse({"type": "response", "model": m, "text": current, "role": "final" if is_last else f"cycle {cycle+1}"}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + + +def run_redteam(config): + prompt, author, attacker, patcher = config["prompt"], config["author"], config["attacker"], config["patcher"] + rounds = config.get("rounds", 2) + yield sse({"type": "clear"}) + yield sse({"type": "status", "message": f"{author} writing..."}) + try: + current = query_model(author, prompt) + yield sse({"type": "response", "model": author, "text": current, "role": "author"}) + except Exception as e: + yield sse({"type": "response", "model": author, "text": str(e), "role": "error"}); return + for r in range(rounds): + yield sse({"type": "status", "message": f"Round {r+1}: {attacker} attacking..."}) + try: + attack = query_model(attacker, f"Question: {prompt}\n\nAnswer:\n{current}\n\nRED TEAM: find every flaw, error, weakness, edge case. Be aggressive.") + yield sse({"type": "response", "model": attacker, "text": attack, "role": f"attack {r+1}"}) + except Exception as e: + yield sse({"type": "response", "model": attacker, "text": str(e), "role": "error"}); continue + yield sse({"type": "status", "message": f"Round {r+1}: {patcher} fixing..."}) + try: + current = query_model(patcher, f"Question: {prompt}\n\nAnswer:\n{current}\n\nFlaws found:\n{attack}\n\nFix ALL issues. Return complete improved answer.") + yield sse({"type": "response", "model": patcher, "text": current, "role": "patcher" if r == rounds-1 else f"patch {r+1}"}) + except Exception as e: + yield sse({"type": "response", "model": patcher, "text": str(e), "role": "error"}) + + +def run_consensus(config): + prompt, models, max_rounds = config["prompt"], config.get("models", []), config.get("max_rounds", 3) + yield sse({"type": "clear"}) + if not models: return + yield sse({"type": "status", "message": f"Round 1: {len(models)} models answering..."}) + responses = parallel_query(models, prompt) + for m, r in responses.items(): + yield sse({"type": "response", "model": m, "text": r, "role": "round 1"}) + for rd in range(2, max_rounds + 1): + yield sse({"type": "status", "message": f"Round {rd}: reviewing each other..."}) + new = {} + for m in models: + parts = [("Question:", prompt, 1), + ("Your answer:", cap_response(responses.get(m, "")), 2), + ("INSTRUCTION:", "Revise considering other perspectives. Adopt good points, defend if right.", 1)] + for o, r in responses.items(): + if o != m: + parts.append((f"[{o}]:", cap_response(r), 3)) + ctx = build_context(parts, m) + try: + new[m] = safe_query(m, ctx) + yield sse({"type": "response", "model": m, "text": new[m], "role": "consensus" if rd == max_rounds else f"round {rd}"}) + except Exception as e: + new[m] = responses.get(m, ""); yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + responses = new + + +def run_codereview(config): + prompt, coder, reviewer, tester = config["prompt"], config["coder"], config["reviewer"], config["tester"] + yield sse({"type": "clear"}) + yield sse({"type": "status", "message": f"{coder} coding..."}) + try: + code = query_model(coder, f"Write code for this task. Only output code with brief comments.\n\n{prompt}") + yield sse({"type": "response", "model": coder, "text": code, "role": "coder"}) + except Exception as e: + yield sse({"type": "response", "model": coder, "text": str(e), "role": "error"}); return + yield sse({"type": "status", "message": f"{reviewer} reviewing..."}) + try: + review = query_model(reviewer, f"Task: {prompt}\n\nCode:\n{code}\n\nReview: bugs, security, performance, style, edge cases. Provide corrected code if needed.") + yield sse({"type": "response", "model": reviewer, "text": review, "role": "reviewer"}) + except Exception as e: + review = ""; yield sse({"type": "response", "model": reviewer, "text": str(e), "role": "error"}) + yield sse({"type": "status", "message": f"{tester} testing..."}) + try: + tests = query_model(tester, f"Task: {prompt}\n\nCode:\n{code}\n\nReview:\n{review}\n\nWrite comprehensive unit tests. Cover normal, edge, error cases.") + yield sse({"type": "response", "model": tester, "text": tests, "role": "tester"}) + except Exception as e: + yield sse({"type": "response", "model": tester, "text": str(e), "role": "error"}) + + +def run_ladder(config): + prompt, models = config["prompt"], config.get("models", []) + levels = [ + ("Child (5yo)", "Explain to a 5-year-old. Very simple words, short sentences, fun analogies."), + ("Teenager", "Explain to a 15-year-old. Everyday language, relatable examples, some technical terms."), + ("College Student", "College level. Proper terminology, theory, structured explanation."), + ("Professional", "Professional level. Technical language, real-world applications, trade-offs."), + ("PhD Expert", "PhD/expert level. Nuanced details, current research, math if relevant, edge cases."), + ] + yield sse({"type": "clear"}) + for i, (name, instr) in enumerate(levels): + m = models[i % len(models)] if models else "qwen2.5" + yield sse({"type": "status", "message": f"Level {i+1}/5: {name} ({m})..."}) + try: + yield sse({"type": "response", "model": m, "text": query_model(m, f"{instr}\n\nQuestion: {prompt}"), "role": name}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + + +def run_tournament(config): + prompt, models, judge = config["prompt"], config.get("models", []), config.get("judge", "qwen2.5") + yield sse({"type": "clear"}) + yield sse({"type": "status", "message": f"{len(models)} models competing..."}) + responses = parallel_query(models, prompt) + for m, r in responses.items(): + yield sse({"type": "response", "model": m, "text": r, "role": "competitor"}) + if len(responses) < 2: return + yield sse({"type": "status", "message": f"{judge} ranking..."}) + parts = [("Question:", prompt, 1), + ("INSTRUCTION:", "Rank all from best to worst. Score 1-10 each. Then refine the winner into the ultimate answer.", 1)] + for m, r in responses.items(): + parts.append((f"[{m}]:", cap_response(r), 3)) + jp = build_context(parts, judge) + try: + yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "verdict"}) + except Exception as e: + yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) + + +# ─── NEW 7 MODES ────────────────────────────────────────────── + +def run_evolution(config): + """Genetic algorithm: generate, score, breed, mutate across generations.""" + prompt, models = config["prompt"], config.get("models", []) + generations = config.get("generations", 3) + judge = config.get("judge", models[0] if models else "qwen2.5") + yield sse({"type": "clear"}) + + if not models: return + + # Gen 0: each model generates an answer + yield sse({"type": "status", "message": "Generation 0: spawning initial population..."}) + population = parallel_query(models, prompt) + for m, r in population.items(): + yield sse({"type": "response", "model": m, "text": r, "role": "gen 0"}) + + for gen in range(1, generations + 1): + # Fitness scoring + yield sse({"type": "status", "message": f"Generation {gen}: fitness evaluation..."}) + score_prompt = f"Question: {prompt}\n\nRate each answer 1-100. Return ONLY a JSON object like {{\"model_name\": score}}.\n\n" + for m, r in population.items(): + score_prompt += f"[{m}]: {r.strip()}\n\n" + try: + scores_raw = query_model(judge, score_prompt) + yield sse({"type": "response", "model": judge, "text": scores_raw, "role": f"fitness gen {gen}"}) + except Exception as e: + yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}); continue + + # Breed: take top 2 answers, ask a model to combine them + pop_list = list(population.items()) + if len(pop_list) < 2: break + + parent1, parent2 = pop_list[0], pop_list[1] + yield sse({"type": "status", "message": f"Generation {gen}: breeding + mutating..."}) + + new_population = {} + for m in models: + breed_prompt = ( + f"Question: {prompt}\n\n" + f"Parent A ({parent1[0]}):\n{parent1[1].strip()}\n\n" + f"Parent B ({parent2[0]}):\n{parent2[1].strip()}\n\n" + f"You are {m}. Breed these two answers: take the best parts of each parent, " + f"combine them, then MUTATE by adding one novel insight or improvement. " + f"Return your evolved answer." + ) + try: + offspring = query_model(m, breed_prompt) + new_population[m] = offspring + is_last = gen == generations + yield sse({"type": "response", "model": m, "text": offspring, "role": "final" if is_last else f"gen {gen}"}) + except Exception as e: + new_population[m] = population.get(m, "") + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + + population = new_population + + +def run_blindassembly(config): + """Split question into parts, each model answers blind, assembler stitches.""" + prompt, models = config["prompt"], config.get("models", []) + assembler = config.get("assembler", models[0] if models else "qwen2.5") + yield sse({"type": "clear"}) + + if not models: return + n = len(models) + + # Step 1: Decompose the question + yield sse({"type": "status", "message": "Decomposing question into sub-tasks..."}) + decompose_prompt = ( + f"Split this question into exactly {n} independent sub-parts that together fully answer it. " + f"Return ONLY a numbered list, one sub-question per line. No other text.\n\n" + f"Question: {prompt}" + ) + try: + parts_raw = query_model(assembler, decompose_prompt) + yield sse({"type": "response", "model": assembler, "text": parts_raw, "role": "decomposer"}) + except Exception as e: + yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}); return + + # Parse parts + parts = [line.strip() for line in parts_raw.strip().split("\n") if line.strip() and any(c.isalpha() for c in line)] + while len(parts) < n: + parts.append(f"Additional aspect of: {prompt}") + parts = parts[:n] + + # Step 2: Each model answers their part BLIND + yield sse({"type": "status", "message": f"Sending {n} sub-tasks to models (blind)..."}) + fragments = {} + with ThreadPoolExecutor(max_workers=n) as pool: + futures = {} + for i, m in enumerate(models): + blind_prompt = ( + f"Answer ONLY this specific sub-question. Do not address anything else.\n\n" + f"Sub-question: {parts[i]}" + ) + futures[pool.submit(query_model, m, blind_prompt)] = (m, parts[i]) + + for future in as_completed(futures): + m, part = futures[future] + try: + fragments[m] = {"part": part, "answer": future.result()} + yield sse({"type": "response", "model": m, "text": f"SUB-TASK: {part}\n\nANSWER:\n{fragments[m]['answer']}", "role": "blind worker"}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + + # Step 3: Assemble + yield sse({"type": "status", "message": f"{assembler} assembling blind fragments..."}) + assemble_prompt = f"Original question: {prompt}\n\nMultiple models each answered a sub-part WITHOUT seeing each other:\n\n" + for m, data in fragments.items(): + assemble_prompt += f"[{m}] (sub-task: {data['part']}):\n{data['answer'].strip()}\n\n" + assemble_prompt += "Stitch these fragments into ONE coherent, complete answer. Fill any gaps. Remove contradictions." + + try: + yield sse({"type": "response", "model": assembler, "text": query_model(assembler, assemble_prompt), "role": "assembler"}) + except Exception as e: + yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}) + + +def run_staircase(config): + """Devil's Staircase: each round adds a new constraint.""" + prompt = config["prompt"] + answerer = config["answerer"] + challenger = config["challenger"] + steps = config.get("steps", 4) + yield sse({"type": "clear"}) + + # Initial answer + yield sse({"type": "status", "message": f"{answerer} answering..."}) + try: + current = query_model(answerer, prompt) + yield sse({"type": "response", "model": answerer, "text": current, "role": "initial answer"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return + + constraints = [] + for s in range(steps): + # Challenger adds a constraint + yield sse({"type": "status", "message": f"Step {s+1}: {challenger} adding constraint..."}) + constraint_prompt = ( + f"Original question: {prompt}\n\n" + f"Current answer:\n{current}\n\n" + f"Existing constraints: {constraints if constraints else 'None yet'}\n\n" + f"Add ONE new realistic constraint, complication, or edge case that the current answer doesn't handle. " + f"Make it specific and challenging but plausible. State ONLY the new constraint, nothing else." + ) + try: + new_constraint = query_model(challenger, constraint_prompt) + constraints.append(new_constraint.strip()) + yield sse({"type": "response", "model": challenger, "text": new_constraint, "role": f"constraint {s+1}"}) + except Exception as e: + yield sse({"type": "response", "model": challenger, "text": str(e), "role": "error"}); continue + + # Answerer must adapt + yield sse({"type": "status", "message": f"Step {s+1}: {answerer} adapting..."}) + adapt_prompt = ( + f"Original question: {prompt}\n\n" + f"ALL constraints you must satisfy:\n" + + "\n".join(f" {i+1}. {c}" for i, c in enumerate(constraints)) + + f"\n\nYour previous answer:\n{current}\n\n" + f"Rewrite your answer to handle ALL constraints. Return the complete updated answer." + ) + try: + current = query_model(answerer, adapt_prompt) + is_last = s == steps - 1 + yield sse({"type": "response", "model": answerer, "text": current, "role": "final" if is_last else f"adapted {s+1}"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}) + + +def run_drift(config): + """Same prompt N times to same model, analyze variance.""" + prompt = config["prompt"] + target = config["target"] + samples = config.get("samples", 5) + analyzer = config["analyzer"] + yield sse({"type": "clear"}) + + yield sse({"type": "status", "message": f"Sampling {target} {samples} times..."}) + results = [] + for i in range(samples): + yield sse({"type": "status", "message": f"Sample {i+1}/{samples}..."}) + try: + r = query_model(target, prompt) + results.append(r) + yield sse({"type": "response", "model": target, "text": r, "role": f"sample {i+1}"}) + except Exception as e: + yield sse({"type": "response", "model": target, "text": str(e), "role": "error"}) + + if len(results) < 2: return + + # Analyze + yield sse({"type": "status", "message": f"{analyzer} analyzing drift..."}) + analysis_prompt = ( + f"Question asked: {prompt}\n\n" + f"The model '{target}' was asked this same question {len(results)} times. Here are all responses:\n\n" + ) + for i, r in enumerate(results): + analysis_prompt += f"--- Sample {i+1} ---\n{r.strip()}\n\n" + + analysis_prompt += ( + "DRIFT ANALYSIS:\n" + "1. What claims/facts are CONSISTENT across all samples? (HIGH CONFIDENCE)\n" + "2. What claims VARY between samples? (LOW CONFIDENCE - possible hallucination)\n" + "3. What is completely CONTRADICTED between samples? (UNRELIABLE)\n" + "4. Give an overall confidence score 1-10 for the model's answer to this question.\n" + "5. Provide the 'true' answer using only high-confidence claims." + ) + try: + yield sse({"type": "response", "model": analyzer, "text": query_model(analyzer, analysis_prompt), "role": "analyzer"}) + except Exception as e: + yield sse({"type": "response", "model": analyzer, "text": str(e), "role": "error"}) + + +def run_mesh(config): + """Each model answers as a different stakeholder.""" + prompt, models = config["prompt"], config.get("models", []) + synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5") + yield sse({"type": "clear"}) + + perspectives = [ + ("CEO / Business Leader", "You are a CEO. Answer from a business strategy perspective: ROI, market impact, competitive advantage, risk."), + ("Software Engineer", "You are a senior engineer. Answer from a technical perspective: architecture, implementation, scalability, tech debt."), + ("End User / Customer", "You are an end user/customer. Answer from a usability perspective: experience, pain points, what you actually need."), + ("Regulator / Legal", "You are a regulator/legal advisor. Answer from a compliance perspective: laws, regulations, liability, ethics, privacy."), + ("Competitor", "You are a competitor analyzing this. What threats/opportunities does this create? What would you do differently?"), + ] + + if not models: return + + responses = {} + for i, (role_name, instruction) in enumerate(perspectives): + m = models[i % len(models)] + yield sse({"type": "status", "message": f"{role_name}: {m}..."}) + try: + r = query_model(m, f"{instruction}\n\nQuestion: {prompt}") + responses[role_name] = (m, r) + yield sse({"type": "response", "model": m, "text": r, "role": role_name}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) + + # 360 synthesis + yield sse({"type": "status", "message": f"{synthesizer} weaving 360-degree view..."}) + syn = f"Question: {prompt}\n\nMultiple stakeholders gave their perspective:\n\n" + for role, (m, r) in responses.items(): + syn += f"[{role} ({m})]: {r.strip()}\n\n" + syn += "Synthesize a 360-degree view that balances all stakeholder perspectives. Highlight tensions and trade-offs." + try: + yield sse({"type": "response", "model": synthesizer, "text": query_model(synthesizer, syn), "role": "mesh-360"}) + except Exception as e: + yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"}) + + +def run_hallucination(config): + """One answers, hunters verify each claim independently.""" + prompt, answerer = config["prompt"], config["answerer"] + hunters = config.get("hunters", []) + yield sse({"type": "clear"}) + + # Get answer + yield sse({"type": "status", "message": f"{answerer} answering..."}) + try: + answer = query_model(answerer, prompt) + yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return + + # Extract claims + yield sse({"type": "status", "message": "Extracting factual claims..."}) + extract_prompt = ( + f"Extract every factual claim from this answer as a numbered list. Include specific facts, numbers, dates, " + f"names, and cause-effect relationships. One claim per line.\n\nAnswer:\n{answer}" + ) + try: + claims = query_model(answerer, extract_prompt) + yield sse({"type": "response", "model": answerer, "text": claims, "role": "claims extracted"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return + + # Each hunter verifies independently + yield sse({"type": "status", "message": f"{len(hunters)} hunters verifying claims..."}) + hunt_prompt = ( + f"Original question: {prompt}\n\n" + f"An AI generated this answer:\n{answer}\n\n" + f"Here are the extracted claims:\n{claims}\n\n" + f"For EACH claim, verdict:\n" + f" VERIFIED - you are confident this is correct\n" + f" SUSPICIOUS - might be wrong or misleading\n" + f" HALLUCINATED - this is likely made up or incorrect\n" + f" UNVERIFIABLE - cannot determine from your knowledge\n" + f"Explain your reasoning for suspicious/hallucinated claims." + ) + results = parallel_query(hunters, hunt_prompt) + for m, r in results.items(): + yield sse({"type": "response", "model": m, "text": r, "role": "hunter"}) + + +def run_timeloop(config): + """CHAOS MODE: answer -> catastrophe -> fix -> new catastrophe -> repeat.""" + prompt = config["prompt"] + answerer = config["answerer"] + chaos = config["chaos"] + loops = config.get("loops", 4) + yield sse({"type": "clear"}) + + # Initial answer + yield sse({"type": "status", "message": f"{answerer} answering (unaware of impending doom)..."}) + try: + current = query_model(answerer, prompt) + yield sse({"type": "response", "model": answerer, "text": current, "role": "initial (doomed)"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return + + catastrophes = [] + for i in range(loops): + # Chaos agent creates a catastrophe + yield sse({"type": "status", "message": f"Loop {i+1}: CHAOS AGENT unleashed..."}) + chaos_prompt = ( + f"Original question: {prompt}\n\n" + f"Someone implemented this answer:\n{current}\n\n" + f"Previous catastrophes that were already fixed: {catastrophes if catastrophes else 'None yet'}\n\n" + f"You are a CHAOS AGENT. Describe a SPECIFIC, VIVID catastrophe that happened because of a flaw " + f"in this answer. Be creative and dramatic but grounded in a real flaw. " + f"Describe: 1) What went wrong 2) The cascading consequences 3) Who/what was affected. " + f"Make it different from previous catastrophes. Be theatrical!" + ) + try: + catastrophe = query_model(chaos, chaos_prompt) + catastrophes.append(catastrophe.strip()[:200]) + yield sse({"type": "response", "model": chaos, "text": catastrophe, "role": f"catastrophe {i+1}"}) + except Exception as e: + yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}); continue + + # Answerer must fix + yield sse({"type": "status", "message": f"Loop {i+1}: {answerer} desperately fixing..."}) + fix_prompt = ( + f"Original question: {prompt}\n\n" + f"Your previous answer:\n{current}\n\n" + f"CATASTROPHE REPORT:\n{catastrophe}\n\n" + f"ALL previous catastrophes you must also prevent:\n" + + "\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) + + f"\n\nRewrite your answer to prevent THIS catastrophe and ALL previous ones. " + f"Your answer must be BULLETPROOF. Return the complete fixed answer." + ) + try: + current = query_model(answerer, fix_prompt) + is_last = i == loops - 1 + yield sse({"type": "response", "model": answerer, "text": current, "role": "survivor" if is_last else f"fix {i+1}"}) + except Exception as e: + yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}) + + # Final verdict from chaos agent + yield sse({"type": "status", "message": f"{chaos} final inspection..."}) + final_prompt = ( + f"Original question: {prompt}\n\n" + f"After {loops} catastrophes, the final answer is:\n{current}\n\n" + f"All catastrophes it survived:\n" + + "\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) + + f"\n\nAs the Chaos Agent, give your final verdict: Is this answer now truly bulletproof? " + f"Rate its resilience 1-10. Can you find ONE MORE flaw? If not, admit defeat." + ) + try: + yield sse({"type": "response", "model": chaos, "text": query_model(chaos, final_prompt), "role": "final judgment"}) + except Exception as e: + yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}) + + +# ─── AUTONOMOUS PIPELINES ───────────────────────────────────── + +def _save_pipeline(pipeline, topic, steps, result, models, start_ms): + import time + duration = int((time.time() * 1000) - start_ms) + try: + with get_db() as conn: + with conn.cursor() as cur: + cur.execute( + """INSERT INTO pipeline_runs (pipeline, topic, status, steps, result, models_used, duration_ms, completed_at) + VALUES (%s, %s, 'completed', %s, %s, %s, %s, NOW())""", + (pipeline, topic, json.dumps(steps), json.dumps(result), list(set(models)), duration) + ) + conn.commit() + except Exception as e: + print(f"[DB] pipeline save error: {e}") + + +def run_research(config): + """Autonomous research pipeline: scout → parallel research → fact-check → synthesize.""" + import time + start = time.time() * 1000 + prompt = config["prompt"] + scout = config.get("scout", "llama3.2:latest") + models = config.get("models", []) + checker = config.get("checker", models[0] if models else scout) + synth = config.get("synthesizer", models[0] if models else scout) + num_q = config.get("num_questions", 5) + yield sse({"type": "clear"}) + steps = [] + all_models = [scout, checker, synth] + models + + # Step 1: Scout generates research questions + yield sse({"type": "status", "message": f"Step 1/4: {scout} generating {num_q} research questions..."}) + try: + q_prompt = ( + f"You are a research scout. Given the topic below, generate exactly {num_q} specific, " + f"diverse research questions that would build a comprehensive understanding. " + f"Return ONLY a numbered list.\n\nTopic: {prompt}" + ) + questions_raw = query_model(scout, q_prompt) + yield sse({"type": "response", "model": scout, "text": questions_raw, "role": "scout"}) + steps.append({"step": "scout", "model": scout, "output": questions_raw}) + except Exception as e: + yield sse({"type": "response", "model": scout, "text": str(e), "role": "error"}) + return + + # Parse questions + questions = [l.strip() for l in questions_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)] + questions = questions[:num_q] + if not questions: + yield sse({"type": "response", "model": "system", "text": "Failed to parse research questions.", "role": "error"}) + return + + # Step 2: Parallel research — distribute questions across models + yield sse({"type": "status", "message": f"Step 2/4: {len(models)} models researching {len(questions)} questions..."}) + research_results = {} + with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: + futures = {} + for i, q in enumerate(questions): + m = models[i % len(models)] if models else scout + rp = f"Research this question thoroughly. Provide specific facts, data, and examples.\n\nQuestion: {q}" + futures[pool.submit(query_model, m, rp)] = (m, q) + for future in as_completed(futures): + m, q = futures[future] + try: + answer = future.result() + research_results[q] = {"model": m, "answer": answer} + yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\n{answer}", "role": "researcher"}) + except Exception as e: + yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\nError: {e}", "role": "error"}) + research_results[q] = {"model": m, "answer": f"Error: {e}"} + steps.append({"step": "research", "results": {q: r["answer"][:500] for q, r in research_results.items()}}) + + # Step 3: Fact-check + yield sse({"type": "status", "message": f"Step 3/4: {checker} fact-checking all findings..."}) + check_prompt = f"Topic: {prompt}\n\nResearch findings to fact-check:\n\n" + for q, r in research_results.items(): + check_prompt += f"Q: {q}\nA: {r['answer'][:300]}\n\n" + check_prompt += ( + "For each finding, mark as:\n" + " VERIFIED — likely accurate\n" + " UNCERTAIN — may be wrong or outdated\n" + " FLAGGED — likely inaccurate\n" + "Be specific about what's wrong with flagged items." + ) + try: + check_result = query_model(checker, check_prompt) + yield sse({"type": "response", "model": checker, "text": check_result, "role": "fact-checker"}) + steps.append({"step": "fact-check", "model": checker, "output": check_result[:1000]}) + except Exception as e: + check_result = f"Error: {e}" + yield sse({"type": "response", "model": checker, "text": str(e), "role": "error"}) + + # Step 4: Synthesize into brief + yield sse({"type": "status", "message": f"Step 4/4: {synth} synthesizing research brief..."}) + synth_prompt = f"Topic: {prompt}\n\nResearch findings:\n\n" + for q, r in research_results.items(): + synth_prompt += f"Q: {q}\nA: {r['answer'][:400]}\n\n" + synth_prompt += f"\nFact-check notes:\n{check_result[:500]}\n\n" + synth_prompt += ( + "Synthesize ALL findings into a structured research brief with these sections:\n" + "1. EXECUTIVE SUMMARY (2-3 sentences)\n" + "2. KEY FINDINGS (bulleted list)\n" + "3. DETAILED ANALYSIS (organized by theme)\n" + "4. UNCERTAINTIES & GAPS (what needs more research)\n" + "5. RECOMMENDATIONS (actionable next steps)\n" + "Be comprehensive but concise." + ) + try: + brief = query_model(synth, synth_prompt) + yield sse({"type": "response", "model": synth, "text": brief, "role": "synthesis"}) + steps.append({"step": "synthesis", "model": synth, "output": brief[:2000]}) + except Exception as e: + brief = f"Error: {e}" + yield sse({"type": "response", "model": synth, "text": str(e), "role": "error"}) + + # Save pipeline run + _save_pipeline("research", prompt, steps, {"brief": brief, "questions": questions, "fact_check": check_result[:1000]}, all_models, start) + + +def run_eval(config): + """Model evaluation pipeline: same prompts → all models → judge scores → leaderboard.""" + import time + start = time.time() * 1000 + prompt = config["prompt"] + models = config.get("models", []) + judge = config.get("judge", models[0] if models else "qwen2.5:latest") + eval_type = config.get("eval_type", "general") + rounds = config.get("rounds", 3) + yield sse({"type": "clear"}) + steps = [] + all_models = models + [judge] + + # Generate eval prompts based on type + yield sse({"type": "status", "message": f"Generating {rounds} {eval_type} evaluation prompts..."}) + gen_prompt = ( + f"Generate exactly {rounds} evaluation prompts for testing LLM capability in: {eval_type}.\n" + f"Context/focus area: {prompt}\n\n" + f"Each prompt should test a different aspect. Return ONLY a numbered list of prompts, nothing else." + ) + try: + prompts_raw = query_model(judge, gen_prompt) + yield sse({"type": "response", "model": judge, "text": prompts_raw, "role": "prompt generator"}) + except Exception as e: + yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) + return + + eval_prompts = [l.strip() for l in prompts_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)] + eval_prompts = eval_prompts[:rounds] + if not eval_prompts: + yield sse({"type": "response", "model": "system", "text": "Failed to generate eval prompts.", "role": "error"}) + return + + # Run each prompt against all models + scores = {m: [] for m in models} + for ri, ep in enumerate(eval_prompts): + yield sse({"type": "status", "message": f"Round {ri+1}/{len(eval_prompts)}: Testing {len(models)} models..."}) + + # All models answer in parallel + responses = parallel_query(models, ep) + for m, r in responses.items(): + yield sse({"type": "response", "model": m, "text": f"[Round {ri+1}] {ep[:80]}...\n\n{r}", "role": f"round {ri+1}"}) + + # Judge scores all responses + yield sse({"type": "status", "message": f"Round {ri+1}: Judging..."}) + judge_prompt = ( + f"Evaluation prompt: {ep}\n\n" + f"Score each model's response 1-10 on: accuracy, completeness, clarity, reasoning.\n" + f"Return a JSON object: {{\"model_name\": {{\"score\": N, \"notes\": \"brief note\"}}}}.\n\n" + ) + for m, r in responses.items(): + judge_prompt += f"[{m}]:\n{r[:500]}\n\n" + try: + judgment = query_model(judge, judge_prompt) + yield sse({"type": "response", "model": judge, "text": judgment, "role": f"judge round {ri+1}"}) + # Try to parse scores + try: + import re + # Find numbers after model names + for m in models: + # Look for score patterns near model name + pattern = re.escape(m) + r'.*?["\s:]+(\d+)' + match = re.search(pattern, judgment, re.IGNORECASE | re.DOTALL) + if match: + scores[m].append(int(match.group(1))) + except Exception: + pass + except Exception as e: + yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) + + steps.append({"round": ri+1, "prompt": ep, "responses": {m: r[:300] for m, r in responses.items()}}) + + # Final leaderboard + yield sse({"type": "status", "message": "Generating leaderboard..."}) + leaderboard = [] + for m in models: + avg = sum(scores[m]) / len(scores[m]) if scores[m] else 0 + leaderboard.append({"model": m, "avg_score": round(avg, 1), "rounds": len(scores[m]), "scores": scores[m]}) + leaderboard.sort(key=lambda x: x["avg_score"], reverse=True) + + board_text = f"LEADERBOARD — {eval_type.upper()} ({len(eval_prompts)} rounds)\n{'='*50}\n\n" + for i, entry in enumerate(leaderboard): + medal = ["1st", "2nd", "3rd"][i] if i < 3 else f"{i+1}th" + bar = "#" * int(entry["avg_score"]) + board_text += f" {medal} {entry['model']:<30} {entry['avg_score']:>4}/10 {bar}\n" + if entry["scores"]: + board_text += f" Round scores: {entry['scores']}\n\n" + + yield sse({"type": "response", "model": judge, "text": board_text, "role": "final"}) + + _save_pipeline("eval", prompt, steps, {"leaderboard": leaderboard, "eval_type": eval_type}, all_models, start) + + +def run_extract(config): + """Knowledge extraction pipeline: chunk text → extract facts → verify → structured output.""" + import time + start = time.time() * 1000 + prompt = config["prompt"] + extractor = config.get("extractor", "qwen2.5:latest") + verifier = config.get("verifier", "gemma2:latest") + source = config.get("source", "prompt") + yield sse({"type": "clear"}) + steps = [] + all_models = [extractor, verifier] + + # Get source text + source_text = prompt + if source != "prompt": + file_map = { + "ontology": "/home/profit/ONTOLOGY.md", + "index": "/home/profit/INDEX.md", + "summaries": "/home/profit/SUMMARIES.md", + "guides": "/home/profit/GUIDES.md", + } + fpath = file_map.get(source) + if fpath and os.path.exists(fpath): + yield sse({"type": "status", "message": f"Reading {source}..."}) + with open(fpath) as f: + source_text = f.read()[:15000] # limit to ~15K chars + yield sse({"type": "response", "model": "system", "text": f"Loaded {source} ({len(source_text)} chars)", "role": "source"}) + else: + yield sse({"type": "response", "model": "system", "text": f"File not found: {source}", "role": "error"}) + return + + # Chunk if too long + chunks = [] + chunk_size = 4000 + for i in range(0, len(source_text), chunk_size): + chunks.append(source_text[i:i+chunk_size]) + + yield sse({"type": "status", "message": f"Processing {len(chunks)} chunk(s) with {extractor}..."}) + + all_facts = [] + all_entities = [] + all_relations = [] + + for ci, chunk in enumerate(chunks): + yield sse({"type": "status", "message": f"Extracting from chunk {ci+1}/{len(chunks)}..."}) + extract_prompt = ( + f"Extract structured knowledge from this text. Return a JSON object with:\n" + f" \"facts\": [\"fact 1\", \"fact 2\", ...],\n" + f" \"entities\": [{{\"name\": \"...\", \"type\": \"...\", \"description\": \"...\"}}, ...],\n" + f" \"relationships\": [{{\"from\": \"...\", \"to\": \"...\", \"type\": \"...\"}}, ...]\n\n" + f"Be thorough. Extract EVERY factual claim, named entity, and relationship.\n\n" + f"Text:\n{chunk}" + ) + try: + result = query_model(extractor, extract_prompt) + yield sse({"type": "response", "model": extractor, "text": result, "role": f"extraction {ci+1}"}) + # Try to parse JSON from response + try: + import re + json_match = re.search(r'\{[\s\S]*\}', result) + if json_match: + parsed = json.loads(json_match.group()) + all_facts.extend(parsed.get("facts", [])) + all_entities.extend(parsed.get("entities", [])) + all_relations.extend(parsed.get("relationships", [])) + except Exception: + all_facts.append(result[:500]) + except Exception as e: + yield sse({"type": "response", "model": extractor, "text": str(e), "role": "error"}) + + steps.append({"step": "extraction", "facts": len(all_facts), "entities": len(all_entities), "relations": len(all_relations)}) + + # Verify key facts + yield sse({"type": "status", "message": f"{verifier} verifying {len(all_facts)} facts..."}) + facts_sample = all_facts[:20] # verify up to 20 + verify_prompt = ( + f"Verify these extracted facts. For each, mark CORRECT, INCORRECT, or UNVERIFIABLE.\n" + f"If incorrect, provide the correction.\n\n" + ) + for i, f in enumerate(facts_sample): + fact_str = f if isinstance(f, str) else json.dumps(f) + verify_prompt += f"{i+1}. {fact_str}\n" + try: + verification = query_model(verifier, verify_prompt) + yield sse({"type": "response", "model": verifier, "text": verification, "role": "verifier"}) + steps.append({"step": "verification", "model": verifier, "output": verification[:1000]}) + except Exception as e: + verification = str(e) + yield sse({"type": "response", "model": verifier, "text": str(e), "role": "error"}) + + # Summary + summary = ( + f"KNOWLEDGE EXTRACTION SUMMARY\n{'='*40}\n\n" + f"Source: {source}\n" + f"Facts extracted: {len(all_facts)}\n" + f"Entities found: {len(all_entities)}\n" + f"Relationships mapped: {len(all_relations)}\n\n" + f"TOP ENTITIES:\n" + ) + for e in all_entities[:15]: + if isinstance(e, dict): + summary += f" [{e.get('type','?')}] {e.get('name','?')} — {e.get('description','')[:60]}\n" + summary += f"\nTOP RELATIONSHIPS:\n" + for r in all_relations[:15]: + if isinstance(r, dict): + summary += f" {r.get('from','?')} --[{r.get('type','?')}]--> {r.get('to','?')}\n" + + yield sse({"type": "response", "model": "system", "text": summary, "role": "final"}) + + result_data = { + "facts": all_facts[:100], + "entities": all_entities[:50], + "relationships": all_relations[:50], + "verification": verification[:1000], + "source": source, + } + _save_pipeline("extract", prompt or source, steps, result_data, all_models, start) + + +if __name__ == "__main__": + print("\n LLM Team UI running at http://localhost:5000\n") + app.run(host="0.0.0.0", port=5000, debug=False) diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..af7690c --- /dev/null +++ b/schema.sql @@ -0,0 +1,54 @@ +-- LLM Team UI Database Schema +-- Run against PostgreSQL: psql -d knowledge_base -f schema.sql + +CREATE TABLE IF NOT EXISTS team_runs ( + id SERIAL PRIMARY KEY, + mode TEXT NOT NULL, + prompt TEXT NOT NULL, + config JSONB, + responses JSONB NOT NULL DEFAULT '[]', + models_used TEXT[], + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS pipeline_runs ( + id SERIAL PRIMARY KEY, + pipeline TEXT NOT NULL, + topic TEXT NOT NULL, + status TEXT DEFAULT 'running', + steps JSONB DEFAULT '[]', + result JSONB, + models_used TEXT[], + duration_ms INTEGER, + created_at TIMESTAMPTZ DEFAULT NOW(), + completed_at TIMESTAMPTZ +); + +CREATE TABLE IF NOT EXISTS lab_experiments ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + status TEXT DEFAULT 'idle', + objective TEXT, + metric TEXT DEFAULT 'quality', + eval_cases JSONB DEFAULT '[]', + mutable_config JSONB DEFAULT '{}', + best_config JSONB, + best_score FLOAT DEFAULT 0, + total_trials INTEGER DEFAULT 0, + improvements INTEGER DEFAULT 0, + models_pool TEXT[], + created_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS lab_trials ( + id SERIAL PRIMARY KEY, + experiment_id INTEGER REFERENCES lab_experiments(id) ON DELETE CASCADE, + trial_num INTEGER, + config_diff TEXT, + config_snapshot JSONB, + scores JSONB, + avg_score FLOAT, + improved BOOLEAN DEFAULT FALSE, + duration_ms INTEGER, + created_at TIMESTAMPTZ DEFAULT NOW() +);