#!/usr/bin/env python3
"""LLM Team UI - Web interface to configure and run multi-model teams."""
import json
import os
import time
import threading
import requests
import random
import psycopg2
import psycopg2.extras
from concurrent.futures import ThreadPoolExecutor, as_completed
from flask import Flask, render_template_string, request, jsonify, Response
app = Flask(__name__)
CONFIG_PATH = "/root/llm_team_config.json"
DEFAULT_CONFIG = {
"providers": {
"ollama": {"enabled": True, "base_url": "http://localhost:11434", "timeout": 300},
"openrouter": {"enabled": False, "base_url": "https://openrouter.ai/api/v1", "api_key": "", "timeout": 120},
"openai": {"enabled": False, "base_url": "https://api.openai.com/v1", "api_key": "", "timeout": 120},
"anthropic": {"enabled": False, "base_url": "https://api.anthropic.com/v1", "api_key": "", "timeout": 120},
},
"disabled_models": [],
"cloud_models": [],
"timeouts": {"global": 300, "per_model": {}},
}
def load_dotenv():
for p in ["/root/.env", "/home/profit/.env"]:
if os.path.exists(p):
with open(p) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_dotenv()
def load_config():
if os.path.exists(CONFIG_PATH):
with open(CONFIG_PATH) as f:
cfg = json.load(f)
# merge any missing defaults
for k, v in DEFAULT_CONFIG.items():
cfg.setdefault(k, v)
for k, v in DEFAULT_CONFIG["providers"].items():
cfg["providers"].setdefault(k, v)
return cfg
return json.loads(json.dumps(DEFAULT_CONFIG))
def save_config(cfg):
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2)
def get_api_key(provider_name):
cfg = load_config()
prov = cfg["providers"].get(provider_name, {})
key = prov.get("api_key", "")
if key:
return key
env_map = {"openrouter": "OPENROUTER_API_KEY", "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY"}
return os.environ.get(env_map.get(provider_name, ""), "")
DB_DSN = "dbname=knowledge_base user=kbuser password=IPbLBA0EQI8u4TeM2YZrbm1OAy5nSwqC host=localhost"
def get_db():
return psycopg2.connect(DB_DSN)
def save_run(mode, prompt, config_data, responses):
models = list({r.get("model", "") for r in responses if r.get("model")})
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO team_runs (mode, prompt, config, responses, models_used) VALUES (%s, %s, %s, %s, %s)",
(mode, prompt, json.dumps(config_data), json.dumps(responses), models)
)
conn.commit()
except Exception as e:
print(f"[DB] save_run error: {e}")
HTML = r"""
LLM Team
Output
◆ ◆ ◆
Select a mode, pick your models, and enter a prompt to run the team.
Re-pipe into mode
"""
ADMIN_HTML = r"""
LLM Team - Admin
Providers
Models
OpenRouter
Timeouts
Cloud Models
No cloud models configured.
"""
LAB_HTML = r"""
LLM Team - Lab
Experiments
Mutable Config
Live Monitor
Results
Mutable Config
Select an experiment from the Experiments tab first.
0.7
No Experiment Selected
Status: idle
Trials: 0
Best: 0.0/10
Improvements: 0
Trial Log
Start an experiment to see trials here.
Best Config
No best config yet.
Experiment
"""
# ─── HELPERS ───────────────────────────────────────────────────
def _get_timeout(model_id):
cfg = load_config()
t = cfg["timeouts"]["per_model"].get(model_id)
if t:
return t
if "::" in model_id:
prov = model_id.split("::")[0]
return cfg["providers"].get(prov, {}).get("timeout", cfg["timeouts"]["global"])
return cfg["providers"].get("ollama", {}).get("timeout", cfg["timeouts"]["global"])
def query_ollama(model, prompt, timeout):
cfg = load_config()
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
resp = requests.post(f"{base}/api/generate", json={
"model": model, "prompt": prompt, "stream": False,
}, timeout=timeout)
resp.raise_for_status()
return resp.json()["response"]
def query_openai_compatible(model, prompt, provider_name, timeout):
cfg = load_config()
prov = cfg["providers"].get(provider_name, {})
base = prov.get("base_url", "https://openrouter.ai/api/v1")
api_key = get_api_key(provider_name)
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
if provider_name == "openrouter":
headers["HTTP-Referer"] = "http://localhost:5000"
headers["X-Title"] = "LLM Team UI"
resp = requests.post(f"{base}/chat/completions", headers=headers, json={
"model": model, "messages": [{"role": "user", "content": prompt}], "stream": False,
}, timeout=timeout)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
def query_anthropic(model, prompt, timeout):
cfg = load_config()
prov = cfg["providers"].get("anthropic", {})
base = prov.get("base_url", "https://api.anthropic.com/v1")
api_key = get_api_key("anthropic")
resp = requests.post(f"{base}/messages", headers={
"x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json",
}, json={
"model": model, "max_tokens": 4096,
"messages": [{"role": "user", "content": prompt}],
}, timeout=timeout)
resp.raise_for_status()
return resp.json()["content"][0]["text"]
def query_model(model_id, prompt):
timeout = _get_timeout(model_id)
if "::" in model_id:
provider_name, model_name = model_id.split("::", 1)
if provider_name == "anthropic":
return query_anthropic(model_name, prompt, timeout)
return query_openai_compatible(model_name, prompt, provider_name, timeout)
return query_ollama(model_id, prompt, timeout)
# ─── CONTEXT MANAGEMENT ───────────────────────────────────────
# Context window sizes (tokens) — conservative estimates for safe prompting
MODEL_CONTEXT = {
"llama3.2": 4096, "mistral": 8192, "gemma2": 8192, "qwen2.5": 8192,
"gpt-oss": 4096, "gpt-4o": 128000, "gpt-4o-mini": 128000,
"claude-3": 200000, "claude-sonnet": 200000, "claude-haiku": 200000,
}
DEFAULT_CONTEXT = 4096 # safe fallback for unknown models
MAX_RESPONSE_CHARS = 12000 # cap individual responses (~3K tokens)
def estimate_tokens(text):
"""Rough token estimate: ~4 chars per token for English."""
return len(text) // 4 + 1
def get_context_limit(model_id):
"""Get context window size for a model."""
name = model_id.split("::")[-1].split(":")[0].lower()
for key, limit in MODEL_CONTEXT.items():
if key in name:
return limit
# OpenRouter models generally have larger contexts
if "::" in model_id:
return 16000
return DEFAULT_CONTEXT
def smart_truncate(text, max_tokens, preserve_end=200):
"""Truncate text preserving start and end, with a clear marker."""
if estimate_tokens(text) <= max_tokens:
return text
max_chars = max_tokens * 4
end_chars = preserve_end * 4
if max_chars <= end_chars * 2:
return text[:max_chars]
start = text[:max_chars - end_chars - 60]
end = text[-end_chars:]
return f"{start}\n\n[... truncated {estimate_tokens(text) - max_tokens} tokens ...]\n\n{end}"
def cap_response(text):
"""Cap a single model response to prevent runaway output."""
if len(text) <= MAX_RESPONSE_CHARS:
return text
return smart_truncate(text, MAX_RESPONSE_CHARS // 4)
def build_context(parts, model_id, reserve_for_response=1024):
"""Build a prompt from parts, fitting within model's context window.
parts: list of (label, text, priority) tuples
priority: 1=must keep, 2=important, 3=can truncate heavily
Returns: assembled prompt string that fits in context.
"""
limit = get_context_limit(model_id)
budget = limit - reserve_for_response
if budget <= 0:
budget = limit // 2
# First pass: measure everything
total = sum(estimate_tokens(t) for _, t, _ in parts)
if total <= budget:
return "\n\n".join(f"{label}\n{text}" if label else text for label, text, _ in parts)
# Need to truncate — allocate budget by priority
p1 = [(l, t, p) for l, t, p in parts if p == 1]
p2 = [(l, t, p) for l, t, p in parts if p == 2]
p3 = [(l, t, p) for l, t, p in parts if p == 3]
p1_tokens = sum(estimate_tokens(t) for _, t, _ in p1)
remaining = budget - p1_tokens
if remaining <= 0:
# Even priority 1 doesn't fit — truncate p1
per_part = budget // max(len(p1), 1)
result = []
for label, text, _ in p1:
result.append(f"{label}\n{smart_truncate(text, per_part)}" if label else smart_truncate(text, per_part))
return "\n\n".join(result)
# Allocate remaining to p2, then p3
result = [f"{l}\n{t}" if l else t for l, t, _ in p1]
for group in [p2, p3]:
if not group or remaining <= 0:
continue
per_part = remaining // max(len(group), 1)
for label, text, _ in group:
truncated = smart_truncate(text, max(per_part, 100))
result.append(f"{label}\n{truncated}" if label else truncated)
remaining -= estimate_tokens(truncated)
return "\n\n".join(result)
def safe_query(model_id, prompt, fallback_summarize=True):
"""Query with context safety — auto-truncates prompt if too large, retries on overflow errors."""
limit = get_context_limit(model_id)
prompt_tokens = estimate_tokens(prompt)
# Pre-flight check: truncate if obviously too large
if prompt_tokens > limit - 500:
prompt = smart_truncate(prompt, limit - 1000)
try:
response = query_model(model_id, prompt)
return cap_response(response)
except Exception as e:
err = str(e).lower()
# Detect context overflow errors from various providers
if any(k in err for k in ["context length", "too many tokens", "maximum context", "token limit",
"content_too_large", "request too large", "413", "400"]):
if fallback_summarize:
# Aggressive truncation and retry
truncated = smart_truncate(prompt, limit // 2)
try:
response = query_model(model_id, truncated)
return cap_response(response)
except Exception:
pass
return f"[Context overflow: prompt was ~{prompt_tokens} tokens, model limit ~{limit}. Response truncated to fit.]"
raise
def parallel_safe_query(models, prompt):
"""Like parallel_query but with context safety on each model."""
results = {}
max_timeout = max((_get_timeout(m) for m in models), default=300) + 30
with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool:
futures = {pool.submit(safe_query, m, prompt): m for m in models}
for future in as_completed(futures, timeout=max_timeout):
model = futures[future]
try:
results[model] = future.result(timeout=10)
except Exception as e:
results[model] = f"Error: {e}"
return results
def sse(data):
return f"data: {json.dumps(data)}\n\n"
def parallel_query(models, prompt):
"""Query multiple models in parallel with context safety."""
return parallel_safe_query(models, prompt)
# ─── ROUTES ────────────────────────────────────────────────────
@app.route("/")
def index():
return render_template_string(HTML)
@app.route("/api/models")
def get_models():
SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"}
cfg = load_config()
models = []
# Local Ollama models
if cfg["providers"]["ollama"].get("enabled", True):
try:
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
resp = requests.get(f"{base}/api/tags", timeout=10)
seen = set()
for m in resp.json().get("models", []):
full = m["name"]
short = full.split(":")[0]
size = m.get("size", 0)
if short in SKIP or size < 1_000_000 or short in seen:
continue
if full in cfg.get("disabled_models", []):
continue
seen.add(short)
models.append({"name": full, "size": f"{size/(1024**3):.1f} GB",
"provider": "ollama", "provider_label": "Local",
"display_name": short})
except Exception:
pass
# Cloud models
for cm in cfg.get("cloud_models", []):
if not cm.get("enabled", True):
continue
prov = cm["id"].split("::")[0] if "::" in cm["id"] else "cloud"
if not cfg["providers"].get(prov, {}).get("enabled", False):
continue
models.append({"name": cm["id"], "size": cm.get("context", "cloud"),
"provider": prov, "provider_label": prov.title(),
"display_name": cm.get("display_name", cm["id"].split("::")[-1])})
return jsonify({"models": models})
# ─── ADMIN ROUTES ─────────────────────────────────────────────
@app.route("/admin")
def admin_page():
return render_template_string(ADMIN_HTML)
@app.route("/api/admin/config", methods=["GET"])
def admin_get_config():
cfg = load_config()
safe = json.loads(json.dumps(cfg))
for name, p in safe["providers"].items():
if p.get("api_key"):
p["api_key_set"] = True
p["api_key"] = ""
else:
p["api_key_set"] = bool(get_api_key(name))
return jsonify(safe)
@app.route("/api/admin/config", methods=["POST"])
def admin_save_config():
data = request.json
cfg = load_config()
# update providers (preserve existing keys if not sent)
for name, prov in data.get("providers", {}).items():
if name in cfg["providers"]:
new_key = prov.get("api_key", "")
if not new_key:
prov["api_key"] = cfg["providers"][name].get("api_key", "")
cfg["providers"][name].update(prov)
if "disabled_models" in data:
cfg["disabled_models"] = data["disabled_models"]
if "cloud_models" in data:
cfg["cloud_models"] = data["cloud_models"]
if "timeouts" in data:
cfg["timeouts"] = data["timeouts"]
save_config(cfg)
return jsonify({"ok": True})
@app.route("/api/admin/test-provider", methods=["POST"])
def admin_test_provider():
data = request.json
name = data.get("provider", "")
cfg = load_config()
prov = cfg["providers"].get(name, {})
try:
if name == "ollama":
r = requests.get(f"{prov.get('base_url', 'http://localhost:11434')}/api/tags", timeout=5)
count = len(r.json().get("models", []))
return jsonify({"ok": True, "message": f"Connected. {count} models available."})
elif name == "openrouter":
key = data.get("api_key") or get_api_key("openrouter")
r = requests.get(f"{prov.get('base_url', 'https://openrouter.ai/api/v1')}/models",
headers={"Authorization": f"Bearer {key}"}, timeout=10)
count = len(r.json().get("data", []))
return jsonify({"ok": True, "message": f"Connected. {count} models available."})
elif name == "openai":
key = data.get("api_key") or get_api_key("openai")
r = requests.get(f"{prov.get('base_url', 'https://api.openai.com/v1')}/models",
headers={"Authorization": f"Bearer {key}"}, timeout=10)
return jsonify({"ok": True, "message": f"Connected. {len(r.json().get('data', []))} models."})
elif name == "anthropic":
key = data.get("api_key") or get_api_key("anthropic")
r = requests.post(f"{prov.get('base_url', 'https://api.anthropic.com/v1')}/messages",
headers={"x-api-key": key, "anthropic-version": "2023-06-01", "Content-Type": "application/json"},
json={"model": "claude-haiku-4-5-20251001", "max_tokens": 1, "messages": [{"role": "user", "content": "hi"}]},
timeout=10)
return jsonify({"ok": True, "message": "Connected to Anthropic."})
return jsonify({"ok": False, "message": "Unknown provider"})
except Exception as e:
return jsonify({"ok": False, "message": str(e)})
_or_models_cache = {"data": None, "ts": 0}
@app.route("/api/admin/openrouter/models")
def admin_openrouter_models():
import time
now = time.time()
if _or_models_cache["data"] and now - _or_models_cache["ts"] < 300:
return jsonify({"models": _or_models_cache["data"]})
key = get_api_key("openrouter")
headers = {"Authorization": f"Bearer {key}"} if key else {}
try:
r = requests.get("https://openrouter.ai/api/v1/models", headers=headers, timeout=15)
r.raise_for_status()
free = []
for m in r.json().get("data", []):
pricing = m.get("pricing", {})
if pricing.get("prompt") == "0" and pricing.get("completion") == "0":
free.append({"id": m["id"], "name": m.get("name", m["id"]),
"context_length": m.get("context_length", 0)})
_or_models_cache["data"] = free
_or_models_cache["ts"] = now
return jsonify({"models": free})
except Exception as e:
return jsonify({"models": [], "error": str(e)})
@app.route("/api/admin/ollama-models")
def admin_ollama_models():
cfg = load_config()
base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434")
SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"}
try:
resp = requests.get(f"{base}/api/tags", timeout=10)
models = []
seen = set()
for m in resp.json().get("models", []):
full = m["name"]
short = full.split(":")[0]
size = m.get("size", 0)
if short in SKIP or size < 1_000_000 or short in seen:
continue
seen.add(short)
models.append({"name": full, "size": f"{size/(1024**3):.1f} GB",
"disabled": full in cfg.get("disabled_models", [])})
return jsonify({"models": models})
except Exception as e:
return jsonify({"models": [], "error": str(e)})
# ─── HISTORY ROUTES ────────────────────────────────────────────
@app.route("/api/runs")
def get_runs():
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, mode, prompt, models_used, created_at FROM team_runs ORDER BY created_at DESC LIMIT 50")
runs = cur.fetchall()
for r in runs:
r["created_at"] = r["created_at"].isoformat()
return jsonify({"runs": runs})
except Exception as e:
return jsonify({"runs": [], "error": str(e)})
@app.route("/api/runs/")
def get_run(run_id):
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM team_runs WHERE id = %s", (run_id,))
run = cur.fetchone()
if not run:
return jsonify({"error": "not found"}), 404
run["created_at"] = run["created_at"].isoformat()
return jsonify(run)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/runs/", methods=["DELETE"])
def delete_run(run_id):
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM team_runs WHERE id = %s", (run_id,))
conn.commit()
return jsonify({"ok": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/pipelines")
def get_pipelines():
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, pipeline, topic, status, models_used, duration_ms, created_at FROM pipeline_runs ORDER BY created_at DESC LIMIT 50")
runs = cur.fetchall()
for r in runs:
r["created_at"] = r["created_at"].isoformat() if r["created_at"] else None
return jsonify({"pipelines": runs})
except Exception as e:
return jsonify({"pipelines": [], "error": str(e)})
@app.route("/api/pipelines/")
def get_pipeline(pid):
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM pipeline_runs WHERE id = %s", (pid,))
run = cur.fetchone()
if not run:
return jsonify({"error": "not found"}), 404
run["created_at"] = run["created_at"].isoformat() if run["created_at"] else None
run["completed_at"] = run["completed_at"].isoformat() if run["completed_at"] else None
return jsonify(run)
except Exception as e:
return jsonify({"error": str(e)}), 500
# ─── LAB: RATCHET ENGINE ──────────────────────────────────────
_lab_threads = {} # experiment_id -> thread
_lab_streams = {} # experiment_id -> [queue, ...]
def _lab_emit(exp_id, data):
for q in _lab_streams.get(exp_id, []):
q.append(data)
def _score_response(response, expected, metric, judge_model=None):
if metric == "accuracy":
if not expected:
return 5.0
resp_lower = response.lower().strip()
exp_lower = expected.lower().strip()
if exp_lower in resp_lower:
return 10.0
if any(w in resp_lower for w in exp_lower.split()):
return 5.0
return 1.0
elif metric == "speed":
return 10.0 # speed scored externally by duration
elif metric == "quality" and judge_model:
try:
judgment = query_model(judge_model,
f"Rate this response 1-10 for quality, relevance, and completeness.\n\n"
f"EXPECTED: {expected or 'No expected output specified'}\n\n"
f"RESPONSE: {response[:1500]}\n\n"
f"Return ONLY a number 1-10, nothing else.")
import re
m = re.search(r'\b(\d+)\b', judgment)
return min(float(m.group(1)), 10.0) if m else 5.0
except Exception:
return 5.0
return 5.0
def _ratchet_loop(exp_id):
try:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (exp_id,))
exp = cur.fetchone()
if not exp:
return
eval_cases = exp["eval_cases"] or []
models_pool = exp["models_pool"] or []
metric = exp["metric"] or "quality"
objective = exp["objective"] or "Improve response quality"
mutable = exp["mutable_config"] or {
"system_prompt": "You are a helpful assistant.",
"temperature": 0.7,
"model": models_pool[0] if models_pool else "llama3.2:latest",
}
best_config = exp["best_config"] or json.loads(json.dumps(mutable))
best_score = exp["best_score"] or 0
trial_num = exp["total_trials"] or 0
# Pick meta-model (largest in pool)
meta_model = models_pool[-1] if models_pool else "qwen2.5:latest"
judge_model = models_pool[0] if models_pool else "llama3.2:latest"
while True:
# Check if still running
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT status FROM lab_experiments WHERE id = %s", (exp_id,))
row = cur.fetchone()
if not row or row[0] != "running":
break
trial_num += 1
trial_start = time.time()
_lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": "Proposing change..."})
# Step 1: Meta-model proposes a change
history_hint = ""
if trial_num > 1:
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT config_diff, avg_score, improved FROM lab_trials WHERE experiment_id = %s ORDER BY id DESC LIMIT 5", (exp_id,))
recent = cur.fetchall()
if recent:
history_hint = "\n\nRecent trials:\n" + "\n".join(
f" {'KEPT' if r['improved'] else 'DISCARDED'} (score {r['avg_score']:.1f}): {r['config_diff']}" for r in recent)
propose_prompt = (
f"You are optimizing an LLM pipeline. Objective: {objective}\n"
f"Metric: {metric} (higher is better, max 10)\n"
f"Current best score: {best_score:.1f}/10\n\n"
f"Current config:\n{json.dumps(mutable, indent=2)}\n\n"
f"Available models: {models_pool}\n"
f"Eval cases: {len(eval_cases)}\n"
f"{history_hint}\n\n"
f"Suggest exactly ONE change to improve the score. Return ONLY valid JSON with the FULL updated config. "
f"Keys: system_prompt (string), temperature (0.0-1.5), model (string from available models). "
f"Be creative but focused. Change only one thing at a time."
)
try:
proposal_raw = query_model(meta_model, propose_prompt)
import re
json_match = re.search(r'\{[\s\S]*\}', proposal_raw)
if json_match:
proposed = json.loads(json_match.group())
# Validate keys
if "system_prompt" not in proposed:
proposed["system_prompt"] = mutable.get("system_prompt", "")
if "temperature" not in proposed:
proposed["temperature"] = mutable.get("temperature", 0.7)
if "model" not in proposed:
proposed["model"] = mutable.get("model", models_pool[0])
else:
proposed = mutable
except Exception:
proposed = mutable
# Describe the diff
diffs = []
for k in set(list(mutable.keys()) + list(proposed.keys())):
old_v = mutable.get(k)
new_v = proposed.get(k)
if old_v != new_v:
if k == "system_prompt":
diffs.append(f"system_prompt changed ({len(str(old_v))} → {len(str(new_v))} chars)")
else:
diffs.append(f"{k}: {old_v} → {new_v}")
config_diff = "; ".join(diffs) if diffs else "no change"
_lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": f"Testing: {config_diff[:80]}"})
# Step 2: Run eval cases with proposed config
trial_scores = []
model_to_use = proposed.get("model", models_pool[0] if models_pool else "llama3.2:latest")
sys_prompt = proposed.get("system_prompt", "")
for ci, case in enumerate(eval_cases):
inp = case.get("input", "")
expected = case.get("expected", "")
full_prompt = f"{sys_prompt}\n\n{inp}" if sys_prompt else inp
try:
resp = query_model(model_to_use, full_prompt)
score = _score_response(resp, expected, metric, judge_model if metric == "quality" else None)
trial_scores.append({"input": inp[:100], "score": score, "response": resp[:300]})
except Exception as e:
trial_scores.append({"input": inp[:100], "score": 0, "error": str(e)})
avg_score = sum(s["score"] for s in trial_scores) / max(len(trial_scores), 1)
duration_ms = int((time.time() - trial_start) * 1000)
improved = avg_score > best_score
# Step 3: Ratchet
if improved:
best_score = avg_score
best_config = json.loads(json.dumps(proposed))
mutable = json.loads(json.dumps(proposed))
else:
mutable = json.loads(json.dumps(best_config))
# Save trial
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO lab_trials (experiment_id, trial_num, config_diff, config_snapshot, scores, avg_score, improved, duration_ms)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
(exp_id, trial_num, config_diff, json.dumps(proposed), json.dumps(trial_scores), avg_score, improved, duration_ms)
)
cur.execute(
"""UPDATE lab_experiments SET total_trials = %s, best_score = %s, best_config = %s, mutable_config = %s,
improvements = improvements + %s WHERE id = %s""",
(trial_num, best_score, json.dumps(best_config), json.dumps(mutable), 1 if improved else 0, exp_id)
)
conn.commit()
_lab_emit(exp_id, {
"type": "trial", "trial": trial_num, "score": round(avg_score, 2),
"best": round(best_score, 2), "improved": improved, "diff": config_diff[:100],
"duration_ms": duration_ms
})
# Done
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s AND status = 'running'", (exp_id,))
conn.commit()
_lab_emit(exp_id, {"type": "done"})
except Exception as e:
_lab_emit(exp_id, {"type": "error", "message": str(e)})
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'error' WHERE id = %s", (exp_id,))
conn.commit()
except Exception:
pass
# ─── LAB API ROUTES ───────────────────────────────────────────
@app.route("/lab")
def lab_page():
return render_template_string(LAB_HTML)
@app.route("/api/lab/experiments", methods=["GET"])
def lab_list():
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT id, name, status, metric, best_score, total_trials, improvements, models_pool, created_at FROM lab_experiments ORDER BY created_at DESC")
rows = cur.fetchall()
for r in rows:
r["created_at"] = r["created_at"].isoformat()
return jsonify({"experiments": rows})
@app.route("/api/lab/experiments", methods=["POST"])
def lab_create():
d = request.json
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO lab_experiments (name, objective, metric, eval_cases, mutable_config, best_config, models_pool)
VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""",
(d["name"], d.get("objective", ""), d.get("metric", "quality"),
json.dumps(d.get("eval_cases", [])),
json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})),
json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})),
d.get("models_pool", []))
)
eid = cur.fetchone()[0]
conn.commit()
return jsonify({"id": eid})
@app.route("/api/lab/experiments/", methods=["GET"])
def lab_get(eid):
with get_db() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (eid,))
exp = cur.fetchone()
if not exp:
return jsonify({"error": "not found"}), 404
exp["created_at"] = exp["created_at"].isoformat()
cur.execute("SELECT * FROM lab_trials WHERE experiment_id = %s ORDER BY trial_num", (eid,))
exp["trials"] = cur.fetchall()
for t in exp["trials"]:
t["created_at"] = t["created_at"].isoformat()
return jsonify(exp)
@app.route("/api/lab/experiments/", methods=["PUT"])
def lab_update(eid):
d = request.json
sets, vals = [], []
for k in ["name", "objective", "metric"]:
if k in d:
sets.append(f"{k} = %s")
vals.append(d[k])
for k in ["eval_cases", "mutable_config"]:
if k in d:
sets.append(f"{k} = %s")
vals.append(json.dumps(d[k]))
if "models_pool" in d:
sets.append("models_pool = %s")
vals.append(d["models_pool"])
if not sets:
return jsonify({"ok": True})
vals.append(eid)
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(f"UPDATE lab_experiments SET {', '.join(sets)} WHERE id = %s", vals)
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//start", methods=["POST"])
def lab_start(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'running' WHERE id = %s", (eid,))
conn.commit()
if eid in _lab_threads and _lab_threads[eid].is_alive():
return jsonify({"ok": True, "message": "Already running"})
t = threading.Thread(target=_ratchet_loop, args=(eid,), daemon=True)
_lab_threads[eid] = t
t.start()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//pause", methods=["POST"])
def lab_pause(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s", (eid,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//reset", methods=["POST"])
def lab_reset(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE lab_experiments SET status = 'idle', total_trials = 0, improvements = 0, best_score = 0, best_config = mutable_config WHERE id = %s", (eid,))
cur.execute("DELETE FROM lab_trials WHERE experiment_id = %s", (eid,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//delete", methods=["DELETE"])
def lab_delete(eid):
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM lab_experiments WHERE id = %s", (eid,))
conn.commit()
return jsonify({"ok": True})
@app.route("/api/lab/experiments//stream")
def lab_stream(eid):
q = []
_lab_streams.setdefault(eid, []).append(q)
def generate():
try:
while True:
if q:
data = q.pop(0)
yield f"data: {json.dumps(data)}\n\n"
if data.get("type") == "done":
break
else:
time.sleep(0.5)
yield ": keepalive\n\n"
finally:
_lab_streams.get(eid, []).remove(q) if q in _lab_streams.get(eid, []) else None
return Response(generate(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ─── TEAM ROUTES ──────────────────────────────────────────────
@app.route("/api/run", methods=["POST"])
def run_team():
config = request.json
mode = config["mode"]
RUNNERS = {
"brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate,
"validator": run_validator, "roundrobin": run_roundrobin, "redteam": run_redteam,
"consensus": run_consensus, "codereview": run_codereview, "ladder": run_ladder,
"tournament": run_tournament, "evolution": run_evolution, "blindassembly": run_blindassembly,
"staircase": run_staircase, "drift": run_drift, "mesh": run_mesh,
"hallucination": run_hallucination, "timeloop": run_timeloop,
"research": run_research, "eval": run_eval, "extract": run_extract,
}
def generate():
collected = []
runner = RUNNERS.get(mode)
if runner:
for event_str in runner(config):
yield event_str
try:
data = json.loads(event_str.replace("data: ", "", 1).strip())
if data.get("type") == "response":
collected.append({"model": data.get("model", ""), "text": data.get("text", ""), "role": data.get("role", "")})
except Exception:
pass
else:
yield sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"})
yield sse({"type": "done"})
if collected:
save_run(mode, config.get("prompt", ""), config, collected)
return Response(generate(), mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"})
# ─── ORIGINAL 10 MODES ────────────────────────────────────────
def run_brainstorm(config):
models, prompt = config.get("models", []), config["prompt"]
synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"Querying {len(models)} models..."})
responses = parallel_query(models, prompt)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": r, "role": "respondent"})
if len(responses) > 1:
yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."})
parts = [("QUESTION:", prompt, 1), ("INSTRUCTION:", "Synthesize the best answer. Be concise.", 1)]
for m, r in responses.items():
parts.append((f"[{m}]:", cap_response(r), 3))
sp = build_context(parts, synthesizer)
try:
yield sse({"type": "response", "model": synthesizer, "text": safe_query(synthesizer, sp), "role": "synthesis"})
except Exception as e:
yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"})
def run_pipeline(config):
steps, current = config.get("steps", []), config["prompt"]
yield sse({"type": "clear"})
for i, step in enumerate(steps):
model = step["model"]
yield sse({"type": "status", "message": f"Step {i+1}/{len(steps)}: {model}..."})
try:
prompt = step["instruction"].replace("{input}", cap_response(current))
current = safe_query(model, prompt)
yield sse({"type": "response", "model": model, "text": current, "role": f"step {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": model, "text": str(e), "role": "error"}); break
def run_debate(config):
prompt, d1, d2, judge = config["prompt"], config["debater1"], config["debater2"], config["judge"]
rounds = config.get("rounds", 2)
yield sse({"type": "clear"})
history = []
for m in [d1, d2]:
yield sse({"type": "status", "message": f"{m} opening..."})
try:
r = safe_query(m, f"Give your position on: {prompt}")
history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": "opening"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
for rd in range(rounds):
for i, m in enumerate([d1, d2]):
other = [d1, d2][1-i]
other_last = [h[1] for h in history if h[0] == other][-1]
yield sse({"type": "status", "message": f"Round {rd+1}: {m}..."})
try:
rebuttal_prompt = build_context([
("Topic:", prompt, 1),
(f"Opponent ({other}) said:", cap_response(other_last), 2),
("INSTRUCTION:", "Rebuttal or concede:", 1),
], m)
r = safe_query(m, rebuttal_prompt)
history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": f"round {rd+1}"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
yield sse({"type": "status", "message": f"{judge} judging..."})
parts = [("Topic:", prompt, 1), ("INSTRUCTION:", "Judge: who won and why?", 1)]
for m, t in history:
parts.append((f"[{m}]:", cap_response(t), 3))
jp = build_context(parts, judge)
try:
yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "judge"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
def run_validator(config):
prompt, answerer, validators = config["prompt"], config["answerer"], config.get("validators", [])
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{answerer} answering..."})
try:
answer = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
yield sse({"type": "status", "message": f"Validating with {len(validators)} models..."})
vp = f"QUESTION: {prompt}\n\nANSWER:\n{answer}\n\nFact-check. Score 1-10 for accuracy, completeness, clarity. Flag errors."
results = parallel_query(validators, vp)
for m, r in results.items():
yield sse({"type": "response", "model": m, "text": r, "role": "validator"})
def run_roundrobin(config):
prompt, models, cycles = config["prompt"], config.get("models", []), config.get("cycles", 2)
yield sse({"type": "clear"})
if not models: return
yield sse({"type": "status", "message": f"{models[0]} drafting..."})
try:
current = query_model(models[0], f"Answer:\n\n{prompt}")
yield sse({"type": "response", "model": models[0], "text": current, "role": "draft"})
except Exception as e:
yield sse({"type": "response", "model": models[0], "text": str(e), "role": "error"}); return
for cycle in range(cycles):
start = 1 if cycle == 0 else 0
for i in range(start, len(models)):
m = models[i]
yield sse({"type": "status", "message": f"Cycle {cycle+1}: {m}..."})
try:
current = query_model(m, f"Question: {prompt}\n\nCurrent answer:\n{current}\n\nImprove it. Return full improved answer.")
is_last = (cycle == cycles-1) and (i == len(models)-1)
yield sse({"type": "response", "model": m, "text": current, "role": "final" if is_last else f"cycle {cycle+1}"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
def run_redteam(config):
prompt, author, attacker, patcher = config["prompt"], config["author"], config["attacker"], config["patcher"]
rounds = config.get("rounds", 2)
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{author} writing..."})
try:
current = query_model(author, prompt)
yield sse({"type": "response", "model": author, "text": current, "role": "author"})
except Exception as e:
yield sse({"type": "response", "model": author, "text": str(e), "role": "error"}); return
for r in range(rounds):
yield sse({"type": "status", "message": f"Round {r+1}: {attacker} attacking..."})
try:
attack = query_model(attacker, f"Question: {prompt}\n\nAnswer:\n{current}\n\nRED TEAM: find every flaw, error, weakness, edge case. Be aggressive.")
yield sse({"type": "response", "model": attacker, "text": attack, "role": f"attack {r+1}"})
except Exception as e:
yield sse({"type": "response", "model": attacker, "text": str(e), "role": "error"}); continue
yield sse({"type": "status", "message": f"Round {r+1}: {patcher} fixing..."})
try:
current = query_model(patcher, f"Question: {prompt}\n\nAnswer:\n{current}\n\nFlaws found:\n{attack}\n\nFix ALL issues. Return complete improved answer.")
yield sse({"type": "response", "model": patcher, "text": current, "role": "patcher" if r == rounds-1 else f"patch {r+1}"})
except Exception as e:
yield sse({"type": "response", "model": patcher, "text": str(e), "role": "error"})
def run_consensus(config):
prompt, models, max_rounds = config["prompt"], config.get("models", []), config.get("max_rounds", 3)
yield sse({"type": "clear"})
if not models: return
yield sse({"type": "status", "message": f"Round 1: {len(models)} models answering..."})
responses = parallel_query(models, prompt)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": r, "role": "round 1"})
for rd in range(2, max_rounds + 1):
yield sse({"type": "status", "message": f"Round {rd}: reviewing each other..."})
new = {}
for m in models:
parts = [("Question:", prompt, 1),
("Your answer:", cap_response(responses.get(m, "")), 2),
("INSTRUCTION:", "Revise considering other perspectives. Adopt good points, defend if right.", 1)]
for o, r in responses.items():
if o != m:
parts.append((f"[{o}]:", cap_response(r), 3))
ctx = build_context(parts, m)
try:
new[m] = safe_query(m, ctx)
yield sse({"type": "response", "model": m, "text": new[m], "role": "consensus" if rd == max_rounds else f"round {rd}"})
except Exception as e:
new[m] = responses.get(m, ""); yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
responses = new
def run_codereview(config):
prompt, coder, reviewer, tester = config["prompt"], config["coder"], config["reviewer"], config["tester"]
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{coder} coding..."})
try:
code = query_model(coder, f"Write code for this task. Only output code with brief comments.\n\n{prompt}")
yield sse({"type": "response", "model": coder, "text": code, "role": "coder"})
except Exception as e:
yield sse({"type": "response", "model": coder, "text": str(e), "role": "error"}); return
yield sse({"type": "status", "message": f"{reviewer} reviewing..."})
try:
review = query_model(reviewer, f"Task: {prompt}\n\nCode:\n{code}\n\nReview: bugs, security, performance, style, edge cases. Provide corrected code if needed.")
yield sse({"type": "response", "model": reviewer, "text": review, "role": "reviewer"})
except Exception as e:
review = ""; yield sse({"type": "response", "model": reviewer, "text": str(e), "role": "error"})
yield sse({"type": "status", "message": f"{tester} testing..."})
try:
tests = query_model(tester, f"Task: {prompt}\n\nCode:\n{code}\n\nReview:\n{review}\n\nWrite comprehensive unit tests. Cover normal, edge, error cases.")
yield sse({"type": "response", "model": tester, "text": tests, "role": "tester"})
except Exception as e:
yield sse({"type": "response", "model": tester, "text": str(e), "role": "error"})
def run_ladder(config):
prompt, models = config["prompt"], config.get("models", [])
levels = [
("Child (5yo)", "Explain to a 5-year-old. Very simple words, short sentences, fun analogies."),
("Teenager", "Explain to a 15-year-old. Everyday language, relatable examples, some technical terms."),
("College Student", "College level. Proper terminology, theory, structured explanation."),
("Professional", "Professional level. Technical language, real-world applications, trade-offs."),
("PhD Expert", "PhD/expert level. Nuanced details, current research, math if relevant, edge cases."),
]
yield sse({"type": "clear"})
for i, (name, instr) in enumerate(levels):
m = models[i % len(models)] if models else "qwen2.5"
yield sse({"type": "status", "message": f"Level {i+1}/5: {name} ({m})..."})
try:
yield sse({"type": "response", "model": m, "text": query_model(m, f"{instr}\n\nQuestion: {prompt}"), "role": name})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
def run_tournament(config):
prompt, models, judge = config["prompt"], config.get("models", []), config.get("judge", "qwen2.5")
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{len(models)} models competing..."})
responses = parallel_query(models, prompt)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": r, "role": "competitor"})
if len(responses) < 2: return
yield sse({"type": "status", "message": f"{judge} ranking..."})
parts = [("Question:", prompt, 1),
("INSTRUCTION:", "Rank all from best to worst. Score 1-10 each. Then refine the winner into the ultimate answer.", 1)]
for m, r in responses.items():
parts.append((f"[{m}]:", cap_response(r), 3))
jp = build_context(parts, judge)
try:
yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "verdict"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
# ─── NEW 7 MODES ──────────────────────────────────────────────
def run_evolution(config):
"""Genetic algorithm: generate, score, breed, mutate across generations."""
prompt, models = config["prompt"], config.get("models", [])
generations = config.get("generations", 3)
judge = config.get("judge", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
if not models: return
# Gen 0: each model generates an answer
yield sse({"type": "status", "message": "Generation 0: spawning initial population..."})
population = parallel_query(models, prompt)
for m, r in population.items():
yield sse({"type": "response", "model": m, "text": r, "role": "gen 0"})
for gen in range(1, generations + 1):
# Fitness scoring
yield sse({"type": "status", "message": f"Generation {gen}: fitness evaluation..."})
score_prompt = f"Question: {prompt}\n\nRate each answer 1-100. Return ONLY a JSON object like {{\"model_name\": score}}.\n\n"
for m, r in population.items():
score_prompt += f"[{m}]: {r.strip()}\n\n"
try:
scores_raw = query_model(judge, score_prompt)
yield sse({"type": "response", "model": judge, "text": scores_raw, "role": f"fitness gen {gen}"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}); continue
# Breed: take top 2 answers, ask a model to combine them
pop_list = list(population.items())
if len(pop_list) < 2: break
parent1, parent2 = pop_list[0], pop_list[1]
yield sse({"type": "status", "message": f"Generation {gen}: breeding + mutating..."})
new_population = {}
for m in models:
breed_prompt = (
f"Question: {prompt}\n\n"
f"Parent A ({parent1[0]}):\n{parent1[1].strip()}\n\n"
f"Parent B ({parent2[0]}):\n{parent2[1].strip()}\n\n"
f"You are {m}. Breed these two answers: take the best parts of each parent, "
f"combine them, then MUTATE by adding one novel insight or improvement. "
f"Return your evolved answer."
)
try:
offspring = query_model(m, breed_prompt)
new_population[m] = offspring
is_last = gen == generations
yield sse({"type": "response", "model": m, "text": offspring, "role": "final" if is_last else f"gen {gen}"})
except Exception as e:
new_population[m] = population.get(m, "")
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
population = new_population
def run_blindassembly(config):
"""Split question into parts, each model answers blind, assembler stitches."""
prompt, models = config["prompt"], config.get("models", [])
assembler = config.get("assembler", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
if not models: return
n = len(models)
# Step 1: Decompose the question
yield sse({"type": "status", "message": "Decomposing question into sub-tasks..."})
decompose_prompt = (
f"Split this question into exactly {n} independent sub-parts that together fully answer it. "
f"Return ONLY a numbered list, one sub-question per line. No other text.\n\n"
f"Question: {prompt}"
)
try:
parts_raw = query_model(assembler, decompose_prompt)
yield sse({"type": "response", "model": assembler, "text": parts_raw, "role": "decomposer"})
except Exception as e:
yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}); return
# Parse parts
parts = [line.strip() for line in parts_raw.strip().split("\n") if line.strip() and any(c.isalpha() for c in line)]
while len(parts) < n:
parts.append(f"Additional aspect of: {prompt}")
parts = parts[:n]
# Step 2: Each model answers their part BLIND
yield sse({"type": "status", "message": f"Sending {n} sub-tasks to models (blind)..."})
fragments = {}
with ThreadPoolExecutor(max_workers=n) as pool:
futures = {}
for i, m in enumerate(models):
blind_prompt = (
f"Answer ONLY this specific sub-question. Do not address anything else.\n\n"
f"Sub-question: {parts[i]}"
)
futures[pool.submit(query_model, m, blind_prompt)] = (m, parts[i])
for future in as_completed(futures):
m, part = futures[future]
try:
fragments[m] = {"part": part, "answer": future.result()}
yield sse({"type": "response", "model": m, "text": f"SUB-TASK: {part}\n\nANSWER:\n{fragments[m]['answer']}", "role": "blind worker"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
# Step 3: Assemble
yield sse({"type": "status", "message": f"{assembler} assembling blind fragments..."})
assemble_prompt = f"Original question: {prompt}\n\nMultiple models each answered a sub-part WITHOUT seeing each other:\n\n"
for m, data in fragments.items():
assemble_prompt += f"[{m}] (sub-task: {data['part']}):\n{data['answer'].strip()}\n\n"
assemble_prompt += "Stitch these fragments into ONE coherent, complete answer. Fill any gaps. Remove contradictions."
try:
yield sse({"type": "response", "model": assembler, "text": query_model(assembler, assemble_prompt), "role": "assembler"})
except Exception as e:
yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"})
def run_staircase(config):
"""Devil's Staircase: each round adds a new constraint."""
prompt = config["prompt"]
answerer = config["answerer"]
challenger = config["challenger"]
steps = config.get("steps", 4)
yield sse({"type": "clear"})
# Initial answer
yield sse({"type": "status", "message": f"{answerer} answering..."})
try:
current = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": current, "role": "initial answer"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
constraints = []
for s in range(steps):
# Challenger adds a constraint
yield sse({"type": "status", "message": f"Step {s+1}: {challenger} adding constraint..."})
constraint_prompt = (
f"Original question: {prompt}\n\n"
f"Current answer:\n{current}\n\n"
f"Existing constraints: {constraints if constraints else 'None yet'}\n\n"
f"Add ONE new realistic constraint, complication, or edge case that the current answer doesn't handle. "
f"Make it specific and challenging but plausible. State ONLY the new constraint, nothing else."
)
try:
new_constraint = query_model(challenger, constraint_prompt)
constraints.append(new_constraint.strip())
yield sse({"type": "response", "model": challenger, "text": new_constraint, "role": f"constraint {s+1}"})
except Exception as e:
yield sse({"type": "response", "model": challenger, "text": str(e), "role": "error"}); continue
# Answerer must adapt
yield sse({"type": "status", "message": f"Step {s+1}: {answerer} adapting..."})
adapt_prompt = (
f"Original question: {prompt}\n\n"
f"ALL constraints you must satisfy:\n" +
"\n".join(f" {i+1}. {c}" for i, c in enumerate(constraints)) +
f"\n\nYour previous answer:\n{current}\n\n"
f"Rewrite your answer to handle ALL constraints. Return the complete updated answer."
)
try:
current = query_model(answerer, adapt_prompt)
is_last = s == steps - 1
yield sse({"type": "response", "model": answerer, "text": current, "role": "final" if is_last else f"adapted {s+1}"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"})
def run_drift(config):
"""Same prompt N times to same model, analyze variance."""
prompt = config["prompt"]
target = config["target"]
samples = config.get("samples", 5)
analyzer = config["analyzer"]
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"Sampling {target} {samples} times..."})
results = []
for i in range(samples):
yield sse({"type": "status", "message": f"Sample {i+1}/{samples}..."})
try:
r = query_model(target, prompt)
results.append(r)
yield sse({"type": "response", "model": target, "text": r, "role": f"sample {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": target, "text": str(e), "role": "error"})
if len(results) < 2: return
# Analyze
yield sse({"type": "status", "message": f"{analyzer} analyzing drift..."})
analysis_prompt = (
f"Question asked: {prompt}\n\n"
f"The model '{target}' was asked this same question {len(results)} times. Here are all responses:\n\n"
)
for i, r in enumerate(results):
analysis_prompt += f"--- Sample {i+1} ---\n{r.strip()}\n\n"
analysis_prompt += (
"DRIFT ANALYSIS:\n"
"1. What claims/facts are CONSISTENT across all samples? (HIGH CONFIDENCE)\n"
"2. What claims VARY between samples? (LOW CONFIDENCE - possible hallucination)\n"
"3. What is completely CONTRADICTED between samples? (UNRELIABLE)\n"
"4. Give an overall confidence score 1-10 for the model's answer to this question.\n"
"5. Provide the 'true' answer using only high-confidence claims."
)
try:
yield sse({"type": "response", "model": analyzer, "text": query_model(analyzer, analysis_prompt), "role": "analyzer"})
except Exception as e:
yield sse({"type": "response", "model": analyzer, "text": str(e), "role": "error"})
def run_mesh(config):
"""Each model answers as a different stakeholder."""
prompt, models = config["prompt"], config.get("models", [])
synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5")
yield sse({"type": "clear"})
perspectives = [
("CEO / Business Leader", "You are a CEO. Answer from a business strategy perspective: ROI, market impact, competitive advantage, risk."),
("Software Engineer", "You are a senior engineer. Answer from a technical perspective: architecture, implementation, scalability, tech debt."),
("End User / Customer", "You are an end user/customer. Answer from a usability perspective: experience, pain points, what you actually need."),
("Regulator / Legal", "You are a regulator/legal advisor. Answer from a compliance perspective: laws, regulations, liability, ethics, privacy."),
("Competitor", "You are a competitor analyzing this. What threats/opportunities does this create? What would you do differently?"),
]
if not models: return
responses = {}
for i, (role_name, instruction) in enumerate(perspectives):
m = models[i % len(models)]
yield sse({"type": "status", "message": f"{role_name}: {m}..."})
try:
r = query_model(m, f"{instruction}\n\nQuestion: {prompt}")
responses[role_name] = (m, r)
yield sse({"type": "response", "model": m, "text": r, "role": role_name})
except Exception as e:
yield sse({"type": "response", "model": m, "text": str(e), "role": "error"})
# 360 synthesis
yield sse({"type": "status", "message": f"{synthesizer} weaving 360-degree view..."})
syn = f"Question: {prompt}\n\nMultiple stakeholders gave their perspective:\n\n"
for role, (m, r) in responses.items():
syn += f"[{role} ({m})]: {r.strip()}\n\n"
syn += "Synthesize a 360-degree view that balances all stakeholder perspectives. Highlight tensions and trade-offs."
try:
yield sse({"type": "response", "model": synthesizer, "text": query_model(synthesizer, syn), "role": "mesh-360"})
except Exception as e:
yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"})
def run_hallucination(config):
"""One answers, hunters verify each claim independently."""
prompt, answerer = config["prompt"], config["answerer"]
hunters = config.get("hunters", [])
yield sse({"type": "clear"})
# Get answer
yield sse({"type": "status", "message": f"{answerer} answering..."})
try:
answer = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
# Extract claims
yield sse({"type": "status", "message": "Extracting factual claims..."})
extract_prompt = (
f"Extract every factual claim from this answer as a numbered list. Include specific facts, numbers, dates, "
f"names, and cause-effect relationships. One claim per line.\n\nAnswer:\n{answer}"
)
try:
claims = query_model(answerer, extract_prompt)
yield sse({"type": "response", "model": answerer, "text": claims, "role": "claims extracted"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
# Each hunter verifies independently
yield sse({"type": "status", "message": f"{len(hunters)} hunters verifying claims..."})
hunt_prompt = (
f"Original question: {prompt}\n\n"
f"An AI generated this answer:\n{answer}\n\n"
f"Here are the extracted claims:\n{claims}\n\n"
f"For EACH claim, verdict:\n"
f" VERIFIED - you are confident this is correct\n"
f" SUSPICIOUS - might be wrong or misleading\n"
f" HALLUCINATED - this is likely made up or incorrect\n"
f" UNVERIFIABLE - cannot determine from your knowledge\n"
f"Explain your reasoning for suspicious/hallucinated claims."
)
results = parallel_query(hunters, hunt_prompt)
for m, r in results.items():
yield sse({"type": "response", "model": m, "text": r, "role": "hunter"})
def run_timeloop(config):
"""CHAOS MODE: answer -> catastrophe -> fix -> new catastrophe -> repeat."""
prompt = config["prompt"]
answerer = config["answerer"]
chaos = config["chaos"]
loops = config.get("loops", 4)
yield sse({"type": "clear"})
# Initial answer
yield sse({"type": "status", "message": f"{answerer} answering (unaware of impending doom)..."})
try:
current = query_model(answerer, prompt)
yield sse({"type": "response", "model": answerer, "text": current, "role": "initial (doomed)"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return
catastrophes = []
for i in range(loops):
# Chaos agent creates a catastrophe
yield sse({"type": "status", "message": f"Loop {i+1}: CHAOS AGENT unleashed..."})
chaos_prompt = (
f"Original question: {prompt}\n\n"
f"Someone implemented this answer:\n{current}\n\n"
f"Previous catastrophes that were already fixed: {catastrophes if catastrophes else 'None yet'}\n\n"
f"You are a CHAOS AGENT. Describe a SPECIFIC, VIVID catastrophe that happened because of a flaw "
f"in this answer. Be creative and dramatic but grounded in a real flaw. "
f"Describe: 1) What went wrong 2) The cascading consequences 3) Who/what was affected. "
f"Make it different from previous catastrophes. Be theatrical!"
)
try:
catastrophe = query_model(chaos, chaos_prompt)
catastrophes.append(catastrophe.strip()[:200])
yield sse({"type": "response", "model": chaos, "text": catastrophe, "role": f"catastrophe {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}); continue
# Answerer must fix
yield sse({"type": "status", "message": f"Loop {i+1}: {answerer} desperately fixing..."})
fix_prompt = (
f"Original question: {prompt}\n\n"
f"Your previous answer:\n{current}\n\n"
f"CATASTROPHE REPORT:\n{catastrophe}\n\n"
f"ALL previous catastrophes you must also prevent:\n" +
"\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) +
f"\n\nRewrite your answer to prevent THIS catastrophe and ALL previous ones. "
f"Your answer must be BULLETPROOF. Return the complete fixed answer."
)
try:
current = query_model(answerer, fix_prompt)
is_last = i == loops - 1
yield sse({"type": "response", "model": answerer, "text": current, "role": "survivor" if is_last else f"fix {i+1}"})
except Exception as e:
yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"})
# Final verdict from chaos agent
yield sse({"type": "status", "message": f"{chaos} final inspection..."})
final_prompt = (
f"Original question: {prompt}\n\n"
f"After {loops} catastrophes, the final answer is:\n{current}\n\n"
f"All catastrophes it survived:\n" +
"\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) +
f"\n\nAs the Chaos Agent, give your final verdict: Is this answer now truly bulletproof? "
f"Rate its resilience 1-10. Can you find ONE MORE flaw? If not, admit defeat."
)
try:
yield sse({"type": "response", "model": chaos, "text": query_model(chaos, final_prompt), "role": "final judgment"})
except Exception as e:
yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"})
# ─── AUTONOMOUS PIPELINES ─────────────────────────────────────
def _save_pipeline(pipeline, topic, steps, result, models, start_ms):
import time
duration = int((time.time() * 1000) - start_ms)
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO pipeline_runs (pipeline, topic, status, steps, result, models_used, duration_ms, completed_at)
VALUES (%s, %s, 'completed', %s, %s, %s, %s, NOW())""",
(pipeline, topic, json.dumps(steps), json.dumps(result), list(set(models)), duration)
)
conn.commit()
except Exception as e:
print(f"[DB] pipeline save error: {e}")
def run_research(config):
"""Autonomous research pipeline: scout → parallel research → fact-check → synthesize."""
import time
start = time.time() * 1000
prompt = config["prompt"]
scout = config.get("scout", "llama3.2:latest")
models = config.get("models", [])
checker = config.get("checker", models[0] if models else scout)
synth = config.get("synthesizer", models[0] if models else scout)
num_q = config.get("num_questions", 5)
yield sse({"type": "clear"})
steps = []
all_models = [scout, checker, synth] + models
# Step 1: Scout generates research questions
yield sse({"type": "status", "message": f"Step 1/4: {scout} generating {num_q} research questions..."})
try:
q_prompt = (
f"You are a research scout. Given the topic below, generate exactly {num_q} specific, "
f"diverse research questions that would build a comprehensive understanding. "
f"Return ONLY a numbered list.\n\nTopic: {prompt}"
)
questions_raw = query_model(scout, q_prompt)
yield sse({"type": "response", "model": scout, "text": questions_raw, "role": "scout"})
steps.append({"step": "scout", "model": scout, "output": questions_raw})
except Exception as e:
yield sse({"type": "response", "model": scout, "text": str(e), "role": "error"})
return
# Parse questions
questions = [l.strip() for l in questions_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)]
questions = questions[:num_q]
if not questions:
yield sse({"type": "response", "model": "system", "text": "Failed to parse research questions.", "role": "error"})
return
# Step 2: Parallel research — distribute questions across models
yield sse({"type": "status", "message": f"Step 2/4: {len(models)} models researching {len(questions)} questions..."})
research_results = {}
with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool:
futures = {}
for i, q in enumerate(questions):
m = models[i % len(models)] if models else scout
rp = f"Research this question thoroughly. Provide specific facts, data, and examples.\n\nQuestion: {q}"
futures[pool.submit(query_model, m, rp)] = (m, q)
for future in as_completed(futures):
m, q = futures[future]
try:
answer = future.result()
research_results[q] = {"model": m, "answer": answer}
yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\n{answer}", "role": "researcher"})
except Exception as e:
yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\nError: {e}", "role": "error"})
research_results[q] = {"model": m, "answer": f"Error: {e}"}
steps.append({"step": "research", "results": {q: r["answer"][:500] for q, r in research_results.items()}})
# Step 3: Fact-check
yield sse({"type": "status", "message": f"Step 3/4: {checker} fact-checking all findings..."})
check_prompt = f"Topic: {prompt}\n\nResearch findings to fact-check:\n\n"
for q, r in research_results.items():
check_prompt += f"Q: {q}\nA: {r['answer'][:300]}\n\n"
check_prompt += (
"For each finding, mark as:\n"
" VERIFIED — likely accurate\n"
" UNCERTAIN — may be wrong or outdated\n"
" FLAGGED — likely inaccurate\n"
"Be specific about what's wrong with flagged items."
)
try:
check_result = query_model(checker, check_prompt)
yield sse({"type": "response", "model": checker, "text": check_result, "role": "fact-checker"})
steps.append({"step": "fact-check", "model": checker, "output": check_result[:1000]})
except Exception as e:
check_result = f"Error: {e}"
yield sse({"type": "response", "model": checker, "text": str(e), "role": "error"})
# Step 4: Synthesize into brief
yield sse({"type": "status", "message": f"Step 4/4: {synth} synthesizing research brief..."})
synth_prompt = f"Topic: {prompt}\n\nResearch findings:\n\n"
for q, r in research_results.items():
synth_prompt += f"Q: {q}\nA: {r['answer'][:400]}\n\n"
synth_prompt += f"\nFact-check notes:\n{check_result[:500]}\n\n"
synth_prompt += (
"Synthesize ALL findings into a structured research brief with these sections:\n"
"1. EXECUTIVE SUMMARY (2-3 sentences)\n"
"2. KEY FINDINGS (bulleted list)\n"
"3. DETAILED ANALYSIS (organized by theme)\n"
"4. UNCERTAINTIES & GAPS (what needs more research)\n"
"5. RECOMMENDATIONS (actionable next steps)\n"
"Be comprehensive but concise."
)
try:
brief = query_model(synth, synth_prompt)
yield sse({"type": "response", "model": synth, "text": brief, "role": "synthesis"})
steps.append({"step": "synthesis", "model": synth, "output": brief[:2000]})
except Exception as e:
brief = f"Error: {e}"
yield sse({"type": "response", "model": synth, "text": str(e), "role": "error"})
# Save pipeline run
_save_pipeline("research", prompt, steps, {"brief": brief, "questions": questions, "fact_check": check_result[:1000]}, all_models, start)
def run_eval(config):
"""Model evaluation pipeline: same prompts → all models → judge scores → leaderboard."""
import time
start = time.time() * 1000
prompt = config["prompt"]
models = config.get("models", [])
judge = config.get("judge", models[0] if models else "qwen2.5:latest")
eval_type = config.get("eval_type", "general")
rounds = config.get("rounds", 3)
yield sse({"type": "clear"})
steps = []
all_models = models + [judge]
# Generate eval prompts based on type
yield sse({"type": "status", "message": f"Generating {rounds} {eval_type} evaluation prompts..."})
gen_prompt = (
f"Generate exactly {rounds} evaluation prompts for testing LLM capability in: {eval_type}.\n"
f"Context/focus area: {prompt}\n\n"
f"Each prompt should test a different aspect. Return ONLY a numbered list of prompts, nothing else."
)
try:
prompts_raw = query_model(judge, gen_prompt)
yield sse({"type": "response", "model": judge, "text": prompts_raw, "role": "prompt generator"})
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
return
eval_prompts = [l.strip() for l in prompts_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)]
eval_prompts = eval_prompts[:rounds]
if not eval_prompts:
yield sse({"type": "response", "model": "system", "text": "Failed to generate eval prompts.", "role": "error"})
return
# Run each prompt against all models
scores = {m: [] for m in models}
for ri, ep in enumerate(eval_prompts):
yield sse({"type": "status", "message": f"Round {ri+1}/{len(eval_prompts)}: Testing {len(models)} models..."})
# All models answer in parallel
responses = parallel_query(models, ep)
for m, r in responses.items():
yield sse({"type": "response", "model": m, "text": f"[Round {ri+1}] {ep[:80]}...\n\n{r}", "role": f"round {ri+1}"})
# Judge scores all responses
yield sse({"type": "status", "message": f"Round {ri+1}: Judging..."})
judge_prompt = (
f"Evaluation prompt: {ep}\n\n"
f"Score each model's response 1-10 on: accuracy, completeness, clarity, reasoning.\n"
f"Return a JSON object: {{\"model_name\": {{\"score\": N, \"notes\": \"brief note\"}}}}.\n\n"
)
for m, r in responses.items():
judge_prompt += f"[{m}]:\n{r[:500]}\n\n"
try:
judgment = query_model(judge, judge_prompt)
yield sse({"type": "response", "model": judge, "text": judgment, "role": f"judge round {ri+1}"})
# Try to parse scores
try:
import re
# Find numbers after model names
for m in models:
# Look for score patterns near model name
pattern = re.escape(m) + r'.*?["\s:]+(\d+)'
match = re.search(pattern, judgment, re.IGNORECASE | re.DOTALL)
if match:
scores[m].append(int(match.group(1)))
except Exception:
pass
except Exception as e:
yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"})
steps.append({"round": ri+1, "prompt": ep, "responses": {m: r[:300] for m, r in responses.items()}})
# Final leaderboard
yield sse({"type": "status", "message": "Generating leaderboard..."})
leaderboard = []
for m in models:
avg = sum(scores[m]) / len(scores[m]) if scores[m] else 0
leaderboard.append({"model": m, "avg_score": round(avg, 1), "rounds": len(scores[m]), "scores": scores[m]})
leaderboard.sort(key=lambda x: x["avg_score"], reverse=True)
board_text = f"LEADERBOARD — {eval_type.upper()} ({len(eval_prompts)} rounds)\n{'='*50}\n\n"
for i, entry in enumerate(leaderboard):
medal = ["1st", "2nd", "3rd"][i] if i < 3 else f"{i+1}th"
bar = "#" * int(entry["avg_score"])
board_text += f" {medal} {entry['model']:<30} {entry['avg_score']:>4}/10 {bar}\n"
if entry["scores"]:
board_text += f" Round scores: {entry['scores']}\n\n"
yield sse({"type": "response", "model": judge, "text": board_text, "role": "final"})
_save_pipeline("eval", prompt, steps, {"leaderboard": leaderboard, "eval_type": eval_type}, all_models, start)
def run_extract(config):
"""Knowledge extraction pipeline: chunk text → extract facts → verify → structured output."""
import time
start = time.time() * 1000
prompt = config["prompt"]
extractor = config.get("extractor", "qwen2.5:latest")
verifier = config.get("verifier", "gemma2:latest")
source = config.get("source", "prompt")
yield sse({"type": "clear"})
steps = []
all_models = [extractor, verifier]
# Get source text
source_text = prompt
if source != "prompt":
file_map = {
"ontology": "/home/profit/ONTOLOGY.md",
"index": "/home/profit/INDEX.md",
"summaries": "/home/profit/SUMMARIES.md",
"guides": "/home/profit/GUIDES.md",
}
fpath = file_map.get(source)
if fpath and os.path.exists(fpath):
yield sse({"type": "status", "message": f"Reading {source}..."})
with open(fpath) as f:
source_text = f.read()[:15000] # limit to ~15K chars
yield sse({"type": "response", "model": "system", "text": f"Loaded {source} ({len(source_text)} chars)", "role": "source"})
else:
yield sse({"type": "response", "model": "system", "text": f"File not found: {source}", "role": "error"})
return
# Chunk if too long
chunks = []
chunk_size = 4000
for i in range(0, len(source_text), chunk_size):
chunks.append(source_text[i:i+chunk_size])
yield sse({"type": "status", "message": f"Processing {len(chunks)} chunk(s) with {extractor}..."})
all_facts = []
all_entities = []
all_relations = []
for ci, chunk in enumerate(chunks):
yield sse({"type": "status", "message": f"Extracting from chunk {ci+1}/{len(chunks)}..."})
extract_prompt = (
f"Extract structured knowledge from this text. Return a JSON object with:\n"
f" \"facts\": [\"fact 1\", \"fact 2\", ...],\n"
f" \"entities\": [{{\"name\": \"...\", \"type\": \"...\", \"description\": \"...\"}}, ...],\n"
f" \"relationships\": [{{\"from\": \"...\", \"to\": \"...\", \"type\": \"...\"}}, ...]\n\n"
f"Be thorough. Extract EVERY factual claim, named entity, and relationship.\n\n"
f"Text:\n{chunk}"
)
try:
result = query_model(extractor, extract_prompt)
yield sse({"type": "response", "model": extractor, "text": result, "role": f"extraction {ci+1}"})
# Try to parse JSON from response
try:
import re
json_match = re.search(r'\{[\s\S]*\}', result)
if json_match:
parsed = json.loads(json_match.group())
all_facts.extend(parsed.get("facts", []))
all_entities.extend(parsed.get("entities", []))
all_relations.extend(parsed.get("relationships", []))
except Exception:
all_facts.append(result[:500])
except Exception as e:
yield sse({"type": "response", "model": extractor, "text": str(e), "role": "error"})
steps.append({"step": "extraction", "facts": len(all_facts), "entities": len(all_entities), "relations": len(all_relations)})
# Verify key facts
yield sse({"type": "status", "message": f"{verifier} verifying {len(all_facts)} facts..."})
facts_sample = all_facts[:20] # verify up to 20
verify_prompt = (
f"Verify these extracted facts. For each, mark CORRECT, INCORRECT, or UNVERIFIABLE.\n"
f"If incorrect, provide the correction.\n\n"
)
for i, f in enumerate(facts_sample):
fact_str = f if isinstance(f, str) else json.dumps(f)
verify_prompt += f"{i+1}. {fact_str}\n"
try:
verification = query_model(verifier, verify_prompt)
yield sse({"type": "response", "model": verifier, "text": verification, "role": "verifier"})
steps.append({"step": "verification", "model": verifier, "output": verification[:1000]})
except Exception as e:
verification = str(e)
yield sse({"type": "response", "model": verifier, "text": str(e), "role": "error"})
# Summary
summary = (
f"KNOWLEDGE EXTRACTION SUMMARY\n{'='*40}\n\n"
f"Source: {source}\n"
f"Facts extracted: {len(all_facts)}\n"
f"Entities found: {len(all_entities)}\n"
f"Relationships mapped: {len(all_relations)}\n\n"
f"TOP ENTITIES:\n"
)
for e in all_entities[:15]:
if isinstance(e, dict):
summary += f" [{e.get('type','?')}] {e.get('name','?')} — {e.get('description','')[:60]}\n"
summary += f"\nTOP RELATIONSHIPS:\n"
for r in all_relations[:15]:
if isinstance(r, dict):
summary += f" {r.get('from','?')} --[{r.get('type','?')}]--> {r.get('to','?')}\n"
yield sse({"type": "response", "model": "system", "text": summary, "role": "final"})
result_data = {
"facts": all_facts[:100],
"entities": all_entities[:50],
"relationships": all_relations[:50],
"verification": verification[:1000],
"source": source,
}
_save_pipeline("extract", prompt or source, steps, result_data, all_models, start)
if __name__ == "__main__":
print("\n LLM Team UI running at http://localhost:5000\n")
app.run(host="0.0.0.0", port=5000, debug=False)