#!/usr/bin/env python3 """LLM Team UI - Web interface to configure and run multi-model teams.""" import json import os import time import threading import secrets import hashlib import logging import re import requests import random import psycopg2 import psycopg2.extras import bcrypt from concurrent.futures import ThreadPoolExecutor, as_completed from flask import Flask, render_template_string, request, jsonify, Response, redirect, url_for, session from functools import wraps app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET", secrets.token_hex(32)) # ─── SECURITY LOGGING ───────────────────────────────────────── # Dedicated security log for fail2ban and audit trail _sec_handler = logging.FileHandler("/var/log/llm-team-security.log") _sec_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) sec_log = logging.getLogger("security") sec_log.addHandler(_sec_handler) sec_log.setLevel(logging.WARNING) # ─── EMAIL ALERTS ────────────────────────────────────────────── SMTP_HOST = os.environ.get("SMTP_HOST", "127.0.0.1") SMTP_PORT = int(os.environ.get("SMTP_PORT", "1025")) ALERT_FROM = os.environ.get("ALERT_FROM", "security@island37.com") ALERT_TO = os.environ.get("ALERT_TO", "admin@island37.com") def send_security_alert(subject, body): """Send security alert email (non-blocking).""" def _send(): try: import smtplib from email.message import EmailMessage msg = EmailMessage() msg["Subject"] = f"[LLM Team Security] {subject}" msg["From"] = ALERT_FROM msg["To"] = ALERT_TO msg.set_content(body) with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as s: s.send_message(msg) except Exception as e: sec_log.error("EMAIL_FAILED subject=%s error=%s", subject, str(e)) threading.Thread(target=_send, daemon=True).start() # Known exploit paths that scanners probe EXPLOIT_PATTERNS = re.compile( r"(\.env|wp-admin|wp-login|phpmyadmin|\.git|/admin\.php|/config\.|" r"\.asp|\.aspx|/cgi-bin|/shell|/eval|/exec|/passwd|/etc/shadow|" r"\.\./|%2e%2e| (count, window_start) RATE_LIMIT_WINDOW = 60 RATE_LIMIT_MAX = 60 LOGIN_RATE_MAX = 5 # IPs that never get rate-limited (your LAN, localhost) ALLOWLIST_IPS = {"127.0.0.1", "::1", "192.168.1.1"} # Demo mode state — toggled by admin at runtime _demo_mode = {"active": False, "started_by": None} # Admin-only write routes — blocked in demo for non-admin users ADMIN_WRITE_ROUTES = { "/api/admin/config": ["POST"], "/api/admin/test-provider": ["POST"], "/api/auth/login": ["POST"], } def is_allowlisted(ip): return ip in ALLOWLIST_IPS or ip.startswith("192.168.1.") def rate_limited(ip, max_req=RATE_LIMIT_MAX): if is_allowlisted(ip): return False now = time.time() if ip not in _rate_limit or now - _rate_limit[ip][1] > RATE_LIMIT_WINDOW: _rate_limit[ip] = (1, now) return False count, start = _rate_limit[ip] if count >= max_req: return True _rate_limit[ip] = (count + 1, start) return False def is_admin(): return session.get("role") == "admin" def is_demo(): return _demo_mode["active"] def login_required(f): @wraps(f) def decorated(*args, **kwargs): # Demo mode: everyone gets in if is_demo() and not session.get("user_id"): session["demo_user"] = True if not session.get("user_id") and not is_demo(): if request.path.startswith("/api/"): return jsonify({"error": "unauthorized"}), 401 return redirect("/login") return f(*args, **kwargs) return decorated def admin_required(f): @wraps(f) def decorated(*args, **kwargs): # Demo mode: allow read access (GET), block writes unless admin if is_demo(): if request.method == "GET": return f(*args, **kwargs) if not is_admin(): return jsonify({"error": "demo mode: read-only", "demo": True}), 403 if not session.get("user_id"): if request.path.startswith("/api/"): return jsonify({"error": "unauthorized"}), 401 return redirect("/login") if session.get("role") != "admin": return "Forbidden", 403 return f(*args, **kwargs) return decorated @app.before_request def security_checks(): ip = request.headers.get("X-Real-IP", request.remote_addr) path = request.path ua = request.headers.get("User-Agent", "") # Exploit scanner detection — log, alert, and block if EXPLOIT_PATTERNS.search(path) or EXPLOIT_PATTERNS.search(request.query_string.decode("utf-8", errors="ignore")): sec_log.warning("EXPLOIT_SCAN ip=%s path=%s ua=%s", ip, path, ua) send_security_alert( f"Exploit Scan from {ip}", f"IP: {ip}\nPath: {path}\nUser-Agent: {ua}\nTime: {time.strftime('%Y-%m-%d %H:%M:%S')}" ) return "Not Found", 404 # Rate limit (allowlisted IPs skip) if rate_limited(ip): sec_log.warning("RATE_LIMITED ip=%s path=%s", ip, path) return jsonify({"error": "rate limited"}), 429 # Always allow these if path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"): return if path.startswith("/static"): return # In demo mode, block admin write routes for non-admins if is_demo() and not is_admin(): for route, methods in ADMIN_WRITE_ROUTES.items(): if path == route and request.method in methods: return jsonify({"error": "demo mode: admin settings are read-only", "demo": True}), 403 @app.after_request def security_headers(response): response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; connect-src 'self'" response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=()" if request.path.startswith("/api/"): response.headers["Cache-Control"] = "no-store" return response HONEYPOT_404_HTML = """ 404 Not Found

404

Page not found

The page you're looking for doesn't exist or has been moved.

Go to login
""" @app.errorhandler(404) def page_not_found(e): ip = request.headers.get("X-Real-IP", request.remote_addr) ua = request.headers.get("User-Agent", "") path = request.path referer = request.headers.get("Referer", "") method = request.method accept_lang = request.headers.get("Accept-Language", "") accept_enc = request.headers.get("Accept-Encoding", "") # Build fingerprint hash from request characteristics fp_raw = f"{ua}|{accept_lang}|{accept_enc}|{request.headers.get('Connection','')}|{request.headers.get('DNT','')}" fp_hash = hashlib.sha256(fp_raw.encode()).hexdigest()[:16] # Classify threat level threat = "low" if EXPLOIT_PATTERNS.search(path): threat = "high" elif any(s in path.lower() for s in ("/admin", "/config", "/api/", "/debug", "/console", "/server-status")): threat = "medium" elif not ua or "bot" in ua.lower() or "scanner" in ua.lower() or "nikto" in ua.lower() or "sqlmap" in ua.lower(): threat = "high" sec_log.warning( "404_HIT ip=%s fp=%s threat=%s method=%s path=%s referer=%s ua=%s", ip, fp_hash, threat, method, path, referer, ua ) html = HONEYPOT_404_HTML.replace("{{FINGERPRINT}}", fp_hash) return html, 404 @app.route("/api/fp", methods=["POST"]) def fingerprint_collect(): """Silent endpoint that collects browser fingerprint data from 404 pages.""" ip = request.headers.get("X-Real-IP", request.remote_addr) data = request.json or {} ua = request.headers.get("User-Agent", "") # Build server-side fingerprint fp_raw = f"{ua}|{request.headers.get('Accept-Language','')}|{request.headers.get('Accept-Encoding','')}|{request.headers.get('Connection','')}|{request.headers.get('DNT','')}" fp_hash = hashlib.sha256(fp_raw.encode()).hexdigest()[:16] sec_log.warning( "FINGERPRINT ip=%s fp=%s tz=%s lang=%s platform=%s cores=%s mem=%s touch=%s screen=%s dpr=%s plugins=%s webgl=%s", ip, fp_hash, data.get("tz", ""), data.get("lang", ""), data.get("plat", ""), data.get("cores", ""), data.get("mem", ""), data.get("touch", ""), data.get("screen", ""), data.get("dpr", ""), data.get("plugins", ""), data.get("webgl", "") ) return "", 204 LOGIN_HTML = """ LLM Team - Login

LLM Team

Sign in to continue

SYS.AUTH // v3.2
""" @app.route("/login") def login_page(): if session.get("user_id"): return redirect("/") return LOGIN_HTML @app.route("/api/auth/setup") def auth_setup(): try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM users") count = cur.fetchone()[0] return jsonify({"needs_setup": count == 0}) except Exception: return jsonify({"needs_setup": True}) @app.route("/api/auth/login", methods=["POST"]) def auth_login(): ip = request.remote_addr if rate_limited(ip, LOGIN_RATE_MAX): return jsonify({"error": "Too many attempts. Wait a minute."}), 429 data = request.json or {} username = data.get("username", "").strip() password = data.get("password", "") is_setup = data.get("setup", False) if not username or not password: return jsonify({"error": "Username and password required"}), 400 if len(password) < 8: return jsonify({"error": "Password must be at least 8 characters"}), 400 try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: if is_setup: # First-time setup: create admin cur.execute("SELECT COUNT(*) as c FROM users") if cur.fetchone()["c"] > 0: return jsonify({"error": "Setup already completed"}), 400 pw_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() cur.execute("INSERT INTO users (username, password_hash, role) VALUES (%s, %s, 'admin') RETURNING id", (username, pw_hash)) uid = cur.fetchone()["id"] conn.commit() session["user_id"] = uid session["username"] = username session["role"] = "admin" session.permanent = True return jsonify({"ok": True}) # Normal login cur.execute("SELECT * FROM users WHERE username = %s", (username,)) user = cur.fetchone() if not user or not bcrypt.checkpw(password.encode(), user["password_hash"].encode()): sec_log.warning("LOGIN_FAILED ip=%s user=%s", ip, username) send_security_alert( f"Failed Login from {ip}", f"IP: {ip}\nUsername attempted: {username}\nTime: {time.strftime('%Y-%m-%d %H:%M:%S')}" ) return jsonify({"error": "Invalid credentials"}), 401 session["user_id"] = user["id"] session["username"] = user["username"] session["role"] = user["role"] session.permanent = True return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/auth/logout", methods=["POST"]) def auth_logout(): session.clear() return jsonify({"ok": True}) @app.route("/logout") def logout_page(): session.clear() return redirect("/login") @app.route("/api/demo/status") def demo_status(): return jsonify({"active": is_demo(), "started_by": _demo_mode.get("started_by")}) @app.route("/api/demo/toggle", methods=["POST"]) def demo_toggle(): if not session.get("user_id") or not is_admin(): return jsonify({"error": "admin only"}), 403 _demo_mode["active"] = not _demo_mode["active"] _demo_mode["started_by"] = session.get("username") if _demo_mode["active"] else None return jsonify({"active": _demo_mode["active"]}) @app.route("/api/demo/allowlist", methods=["GET"]) def demo_get_allowlist(): if not is_admin(): return jsonify({"error": "admin only"}), 403 return jsonify({"ips": sorted(ALLOWLIST_IPS)}) @app.route("/api/demo/allowlist", methods=["POST"]) def demo_set_allowlist(): if not is_admin(): return jsonify({"error": "admin only"}), 403 data = request.json or {} ip = data.get("ip", "").strip() action = data.get("action", "add") if not ip: return jsonify({"error": "ip required"}), 400 if action == "add": ALLOWLIST_IPS.add(ip) elif action == "remove" and ip in ALLOWLIST_IPS: ALLOWLIST_IPS.discard(ip) return jsonify({"ips": sorted(ALLOWLIST_IPS)}) @app.route("/logs") @admin_required def logs_page(): return LOGS_HTML @app.route("/api/admin/logs") @admin_required def admin_logs(): source = request.args.get("source", "app") limit = min(int(request.args.get("limit", 100)), 500) lines = [] try: if source == "nginx_access": with open("/var/log/nginx/access.log") as f: lines = f.readlines()[-limit:] elif source == "nginx_error": with open("/var/log/nginx/error.log") as f: lines = f.readlines()[-limit:] elif source == "security": with open("/var/log/llm-team-security.log") as f: lines = f.readlines()[-limit:] elif source == "runs": return jsonify({"lines": [], "runs": list(reversed(_run_log[-limit:]))}) else: # App log — get from journalctl import subprocess result = subprocess.run( ["journalctl", "-u", "llm-team-ui", "--no-pager", "-n", str(limit), "--output=short-iso"], capture_output=True, text=True, timeout=5 ) lines = result.stdout.strip().split("\n") if result.stdout else [] except Exception as e: lines = [f"Error reading log: {e}"] return jsonify({"lines": [l.rstrip() for l in lines]}) LOGS_HTML = r""" LLM Team — Logs

Logs // System View

Monitor Admin ← Team
App Log
Run History
Nginx Errors
Nginx Access
Security Raw
Threat Intel
Wall of Shame
Loading...
""" CONFIG_PATH = "/root/llm_team_config.json" DEFAULT_CONFIG = { "providers": { "ollama": {"enabled": True, "base_url": "http://localhost:11434", "timeout": 300}, "openrouter": {"enabled": False, "base_url": "https://openrouter.ai/api/v1", "api_key": "", "timeout": 120}, "openai": {"enabled": False, "base_url": "https://api.openai.com/v1", "api_key": "", "timeout": 120}, "anthropic": {"enabled": False, "base_url": "https://api.anthropic.com/v1", "api_key": "", "timeout": 120}, }, "disabled_models": [], "cloud_models": [], "timeouts": {"global": 300, "per_model": {}}, } def load_dotenv(): for p in ["/root/.env", "/home/profit/.env"]: if os.path.exists(p): with open(p) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) os.environ.setdefault(k.strip(), v.strip()) load_dotenv() def load_config(): if os.path.exists(CONFIG_PATH): with open(CONFIG_PATH) as f: cfg = json.load(f) # merge any missing defaults for k, v in DEFAULT_CONFIG.items(): cfg.setdefault(k, v) for k, v in DEFAULT_CONFIG["providers"].items(): cfg["providers"].setdefault(k, v) return cfg return json.loads(json.dumps(DEFAULT_CONFIG)) def save_config(cfg): with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2) def get_api_key(provider_name): cfg = load_config() prov = cfg["providers"].get(provider_name, {}) key = prov.get("api_key", "") if key: return key env_map = {"openrouter": "OPENROUTER_API_KEY", "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY"} return os.environ.get(env_map.get(provider_name, ""), "") DB_DSN = "dbname=knowledge_base user=kbuser password=IPbLBA0EQI8u4TeM2YZrbm1OAy5nSwqC host=localhost" def get_db(): return psycopg2.connect(DB_DSN) def save_run(mode, prompt, config_data, responses): models = list({r.get("model", "") for r in responses if r.get("model")}) try: with get_db() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO team_runs (mode, prompt, config, responses, models_used) VALUES (%s, %s, %s, %s, %s)", (mode, prompt, json.dumps(config_data), json.dumps(responses), models) ) conn.commit() except Exception as e: print(f"[DB] save_run error: {e}") HTML = r""" LLM Team

LLM Team

0 models
Mode: Brainstorm

Mode

BrainstormAll + synthesize
PipelineChain sequence
DebateArgue + judge
ValidatorFact-check
Round RobinIterate improve
Red TeamAttack + defend
ConsensusConverge
Code ReviewWrite+review+test
ELI Ladder5 levels
TournamentCompete + vote
EvolutionGenetic algo
Blind AssemblySplit + merge
StaircaseAdd constraints
Drift DetectConfidence map
PerspectiveStakeholder 360
Hallucinate?Claim verify
Time LoopCatastrophe fix!
Autonomous Pipelines
ResearchAuto brief
Model EvalBenchmark
KnowledgeExtract facts
All models answer in parallel, then one synthesizes the best parts into a final answer.

Models

Prompt

Output

◆ ◆ ◆

Select a mode, pick your models, and enter a prompt to run the team.

Response

Re-pipe into mode

History

""" ADMIN_HTML = r""" LLM Team - Admin

LLM Team Admin

Providers
Models
OpenRouter
Timeouts
Security

Ollama (Local)

OpenRouter

OpenAI

Anthropic

Local Models (Ollama)

Loading...

Cloud Models

No cloud models configured.

Free Models on OpenRouter

Click "Fetch Models" to load the list.

Global Default

Per-Model Overrides

Loading models...

Demo Mode

When active, the public can view and use the Team UI, Lab, and all modes without logging in. Admin settings (API keys, config saves) are read-only for non-admins.

Status: Off

IP Allowlist

These IPs are never rate-limited. Your local network (192.168.1.*) is always allowed.

""" LAB_HTML = r""" LLM Team - Lab

Lab AutoResearch

Experiments
Mutable Config
Live Monitor
Results

Your Experiments

Loading...

Self-Analysis AI reports from your own system data

Experiment Templates click to auto-fill the create form

Mutable Config

Select an experiment from the Experiments tab first.

No Experiment Selected

Status: idle
Trials: 0
Best: 0.0/10
Improvements: 0

Score Progression

Trial Log

Start an experiment to see trials here.

Best Config

No best config yet.

All Experiments

Loading...
""" # ─── HELPERS ─────────────────────────────────────────────────── def _get_timeout(model_id): cfg = load_config() t = cfg["timeouts"]["per_model"].get(model_id) if t: return t if "::" in model_id: prov = model_id.split("::")[0] return cfg["providers"].get(prov, {}).get("timeout", cfg["timeouts"]["global"]) return cfg["providers"].get("ollama", {}).get("timeout", cfg["timeouts"]["global"]) def query_ollama(model, prompt, timeout): cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") # Set num_ctx based on prompt size — Ollama defaults to 2048 which is too small prompt_tokens = estimate_tokens(prompt) ctx_limit = get_context_limit(model) num_ctx = min(max(prompt_tokens + 1024, 2048), ctx_limit) # Truncate prompt if it exceeds the model's context window if prompt_tokens > ctx_limit - 512: prompt = smart_truncate(prompt, ctx_limit - 512) resp = requests.post(f"{base}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": {"num_ctx": num_ctx} }, timeout=timeout) resp.raise_for_status() return resp.json()["response"] def query_openai_compatible(model, prompt, provider_name, timeout): cfg = load_config() prov = cfg["providers"].get(provider_name, {}) base = prov.get("base_url", "https://openrouter.ai/api/v1") api_key = get_api_key(provider_name) headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} if provider_name == "openrouter": headers["HTTP-Referer"] = "http://localhost:5000" headers["X-Title"] = "LLM Team UI" resp = requests.post(f"{base}/chat/completions", headers=headers, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False, }, timeout=timeout) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"] def query_anthropic(model, prompt, timeout): cfg = load_config() prov = cfg["providers"].get("anthropic", {}) base = prov.get("base_url", "https://api.anthropic.com/v1") api_key = get_api_key("anthropic") resp = requests.post(f"{base}/messages", headers={ "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json", }, json={ "model": model, "max_tokens": 4096, "messages": [{"role": "user", "content": prompt}], }, timeout=timeout) resp.raise_for_status() return resp.json()["content"][0]["text"] def query_model(model_id, prompt): timeout = _get_timeout(model_id) if "::" in model_id: provider_name, model_name = model_id.split("::", 1) if provider_name == "anthropic": return query_anthropic(model_name, prompt, timeout) return query_openai_compatible(model_name, prompt, provider_name, timeout) return query_ollama(model_id, prompt, timeout) # ─── CONTEXT MANAGEMENT ─────────────────────────────────────── # Context window sizes (tokens) — conservative estimates for safe prompting MODEL_CONTEXT = { "llama3.2": 4096, "llama3.1": 8192, "llama3": 8192, "mistral": 8192, "gemma2": 8192, "gemma3": 32768, "qwen2.5": 8192, "qwen3": 32768, "gpt-oss": 4096, "gpt-4o": 128000, "gpt-4o-mini": 128000, "claude-3": 200000, "claude-sonnet": 200000, "claude-haiku": 200000, } DEFAULT_CONTEXT = 4096 # safe fallback for unknown models MAX_RESPONSE_CHARS = 12000 # cap individual responses (~3K tokens) def estimate_tokens(text): """Rough token estimate: ~4 chars per token for English.""" return len(text) // 4 + 1 def get_context_limit(model_id): """Get context window size for a model.""" name = model_id.split("::")[-1].split(":")[0].lower() for key, limit in MODEL_CONTEXT.items(): if key in name: return limit # OpenRouter models generally have larger contexts if "::" in model_id: return 16000 return DEFAULT_CONTEXT def smart_truncate(text, max_tokens, preserve_end=200): """Truncate text preserving start and end, with a clear marker.""" if estimate_tokens(text) <= max_tokens: return text max_chars = max_tokens * 4 end_chars = preserve_end * 4 if max_chars <= end_chars * 2: return text[:max_chars] start = text[:max_chars - end_chars - 60] end = text[-end_chars:] return f"{start}\n\n[... truncated {estimate_tokens(text) - max_tokens} tokens ...]\n\n{end}" def cap_response(text): """Cap a single model response to prevent runaway output.""" if len(text) <= MAX_RESPONSE_CHARS: return text return smart_truncate(text, MAX_RESPONSE_CHARS // 4) def build_context(parts, model_id, reserve_for_response=1024): """Build a prompt from parts, fitting within model's context window. parts: list of (label, text, priority) tuples priority: 1=must keep, 2=important, 3=can truncate heavily Returns: assembled prompt string that fits in context. """ limit = get_context_limit(model_id) budget = limit - reserve_for_response if budget <= 0: budget = limit // 2 # First pass: measure everything total = sum(estimate_tokens(t) for _, t, _ in parts) if total <= budget: return "\n\n".join(f"{label}\n{text}" if label else text for label, text, _ in parts) # Need to truncate — allocate budget by priority p1 = [(l, t, p) for l, t, p in parts if p == 1] p2 = [(l, t, p) for l, t, p in parts if p == 2] p3 = [(l, t, p) for l, t, p in parts if p == 3] p1_tokens = sum(estimate_tokens(t) for _, t, _ in p1) remaining = budget - p1_tokens if remaining <= 0: # Even priority 1 doesn't fit — truncate p1 per_part = budget // max(len(p1), 1) result = [] for label, text, _ in p1: result.append(f"{label}\n{smart_truncate(text, per_part)}" if label else smart_truncate(text, per_part)) return "\n\n".join(result) # Allocate remaining to p2, then p3 result = [f"{l}\n{t}" if l else t for l, t, _ in p1] for group in [p2, p3]: if not group or remaining <= 0: continue per_part = remaining // max(len(group), 1) for label, text, _ in group: truncated = smart_truncate(text, max(per_part, 100)) result.append(f"{label}\n{truncated}" if label else truncated) remaining -= estimate_tokens(truncated) return "\n\n".join(result) def safe_query(model_id, prompt, fallback_summarize=True): """Query with context safety — auto-truncates prompt if too large, retries on overflow errors.""" limit = get_context_limit(model_id) prompt_tokens = estimate_tokens(prompt) # Pre-flight check: truncate if obviously too large if prompt_tokens > limit - 500: prompt = smart_truncate(prompt, limit - 1000) try: response = query_model(model_id, prompt) return cap_response(response) except Exception as e: err = str(e).lower() # Detect context overflow errors from various providers if any(k in err for k in ["context length", "too many tokens", "maximum context", "token limit", "content_too_large", "request too large", "413", "400"]): if fallback_summarize: # Aggressive truncation and retry truncated = smart_truncate(prompt, limit // 2) try: response = query_model(model_id, truncated) return cap_response(response) except Exception: pass return f"[Context overflow: prompt was ~{prompt_tokens} tokens, model limit ~{limit}. Response truncated to fit.]" raise def parallel_safe_query(models, prompt): """Like parallel_query but with context safety on each model.""" results = {} max_timeout = max((_get_timeout(m) for m in models), default=300) + 30 with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: futures = {pool.submit(safe_query, m, prompt): m for m in models} for future in as_completed(futures, timeout=max_timeout): model = futures[future] try: results[model] = future.result(timeout=10) except Exception as e: results[model] = f"Error: {e}" return results def sse(data): return f"data: {json.dumps(data)}\n\n" def parallel_query(models, prompt): """Query multiple models in parallel with context safety.""" return parallel_safe_query(models, prompt) # ─── ROUTES ──────────────────────────────────────────────────── @app.route("/") @login_required def index(): return render_template_string(HTML) @app.route("/api/models") @login_required def get_models(): SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"} cfg = load_config() models = [] # Local Ollama models if cfg["providers"]["ollama"].get("enabled", True): try: base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") resp = requests.get(f"{base}/api/tags", timeout=10) seen = set() for m in resp.json().get("models", []): full = m["name"] short = full.split(":")[0] size = m.get("size", 0) if short in SKIP or size < 1_000_000 or short in seen: continue if full in cfg.get("disabled_models", []): continue seen.add(short) models.append({"name": full, "size": f"{size/(1024**3):.1f} GB", "provider": "ollama", "provider_label": "Local", "display_name": short}) except Exception: pass # Cloud models for cm in cfg.get("cloud_models", []): if not cm.get("enabled", True): continue prov = cm["id"].split("::")[0] if "::" in cm["id"] else "cloud" if not cfg["providers"].get(prov, {}).get("enabled", False): continue models.append({"name": cm["id"], "size": cm.get("context", "cloud"), "provider": prov, "provider_label": prov.title(), "display_name": cm.get("display_name", cm["id"].split("::")[-1])}) return jsonify({"models": models}) # ─── ADMIN ROUTES ───────────────────────────────────────────── @app.route("/admin") @admin_required def admin_page(): return render_template_string(ADMIN_HTML) @app.route("/api/admin/config", methods=["GET"]) @admin_required def admin_get_config(): cfg = load_config() safe = json.loads(json.dumps(cfg)) for name, p in safe["providers"].items(): if p.get("api_key"): p["api_key_set"] = True p["api_key"] = "" else: p["api_key_set"] = bool(get_api_key(name)) return jsonify(safe) @app.route("/api/admin/config", methods=["POST"]) @admin_required def admin_save_config(): data = request.json cfg = load_config() # update providers (preserve existing keys if not sent) for name, prov in data.get("providers", {}).items(): if name in cfg["providers"]: new_key = prov.get("api_key", "") if not new_key: prov["api_key"] = cfg["providers"][name].get("api_key", "") cfg["providers"][name].update(prov) if "disabled_models" in data: cfg["disabled_models"] = data["disabled_models"] if "cloud_models" in data: cfg["cloud_models"] = data["cloud_models"] if "timeouts" in data: cfg["timeouts"] = data["timeouts"] save_config(cfg) return jsonify({"ok": True}) @app.route("/api/admin/test-provider", methods=["POST"]) @admin_required def admin_test_provider(): data = request.json name = data.get("provider", "") cfg = load_config() prov = cfg["providers"].get(name, {}) try: if name == "ollama": r = requests.get(f"{prov.get('base_url', 'http://localhost:11434')}/api/tags", timeout=5) count = len(r.json().get("models", [])) return jsonify({"ok": True, "message": f"Connected. {count} models available."}) elif name == "openrouter": key = data.get("api_key") or get_api_key("openrouter") r = requests.get(f"{prov.get('base_url', 'https://openrouter.ai/api/v1')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=10) count = len(r.json().get("data", [])) return jsonify({"ok": True, "message": f"Connected. {count} models available."}) elif name == "openai": key = data.get("api_key") or get_api_key("openai") r = requests.get(f"{prov.get('base_url', 'https://api.openai.com/v1')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=10) return jsonify({"ok": True, "message": f"Connected. {len(r.json().get('data', []))} models."}) elif name == "anthropic": key = data.get("api_key") or get_api_key("anthropic") r = requests.post(f"{prov.get('base_url', 'https://api.anthropic.com/v1')}/messages", headers={"x-api-key": key, "anthropic-version": "2023-06-01", "Content-Type": "application/json"}, json={"model": "claude-haiku-4-5-20251001", "max_tokens": 1, "messages": [{"role": "user", "content": "hi"}]}, timeout=10) return jsonify({"ok": True, "message": "Connected to Anthropic."}) return jsonify({"ok": False, "message": "Unknown provider"}) except Exception as e: return jsonify({"ok": False, "message": str(e)}) _or_models_cache = {"data": None, "ts": 0} @app.route("/api/admin/openrouter/models") @admin_required def admin_openrouter_models(): import time now = time.time() if _or_models_cache["data"] and now - _or_models_cache["ts"] < 300: return jsonify({"models": _or_models_cache["data"]}) key = get_api_key("openrouter") headers = {"Authorization": f"Bearer {key}"} if key else {} try: r = requests.get("https://openrouter.ai/api/v1/models", headers=headers, timeout=15) r.raise_for_status() free = [] for m in r.json().get("data", []): pricing = m.get("pricing", {}) if pricing.get("prompt") == "0" and pricing.get("completion") == "0": free.append({"id": m["id"], "name": m.get("name", m["id"]), "context_length": m.get("context_length", 0)}) _or_models_cache["data"] = free _or_models_cache["ts"] = now return jsonify({"models": free}) except Exception as e: return jsonify({"models": [], "error": str(e)}) @app.route("/api/admin/ollama-models") @admin_required def admin_ollama_models(): cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") SKIP = {"nomic-embed-text", "mxbai-embed-large", "all-minilm", "snowflake-arctic-embed"} try: resp = requests.get(f"{base}/api/tags", timeout=10) models = [] seen = set() for m in resp.json().get("models", []): full = m["name"] short = full.split(":")[0] size = m.get("size", 0) if short in SKIP or size < 1_000_000 or short in seen: continue seen.add(short) models.append({"name": full, "size": f"{size/(1024**3):.1f} GB", "disabled": full in cfg.get("disabled_models", [])}) return jsonify({"models": models}) except Exception as e: return jsonify({"models": [], "error": str(e)}) # ─── SECURITY DASHBOARD ─────────────────────────────────────── @app.route("/api/admin/security") @admin_required def admin_security_data(): """Aggregate security log into IP-level threat intelligence with full fingerprints.""" import subprocess, collections ips = collections.defaultdict(lambda: { "hits": 0, "exploit_scans": 0, "login_fails": 0, "rate_limits": 0, "first_seen": "", "last_seen": "", "paths": set(), "threat": "low", "uas": set(), "methods": collections.Counter(), "log_lines": [], "event_types": collections.Counter(), "ai_verdicts": [] }) try: with open("/var/log/llm-team-security.log") as f: for line in f: line = line.strip() if not line: continue parts = line.split(" ", 2) if len(parts) < 3: continue ts = parts[0] + " " + parts[1].split(",")[0] rest = parts[2] ip_match = None for token in rest.split(): if token.startswith("ip="): ip_match = token[3:] break if not ip_match: # Check AI_BAN lines if "AI_BAN" in rest or "AI_VERDICT" in rest: for token in rest.split(): if token.startswith("ip="): ip_match = token[3:] break if not ip_match: continue entry = ips[ip_match] entry["hits"] += 1 if not entry["first_seen"]: entry["first_seen"] = ts entry["last_seen"] = ts # Categorize event if "EXPLOIT_SCAN" in rest: entry["exploit_scans"] += 1 entry["event_types"]["exploit_scan"] += 1 elif "LOGIN_FAILED" in rest: entry["login_fails"] += 1 entry["event_types"]["login_fail"] += 1 elif "RATE_LIMITED" in rest: entry["rate_limits"] += 1 entry["event_types"]["rate_limit"] += 1 elif "AI_BAN" in rest: entry["event_types"]["ai_ban"] += 1 elif "MANUAL_BAN" in rest: entry["event_types"]["manual_ban"] += 1 elif "404_HIT" in rest: entry["event_types"]["404"] += 1 # Extract fields for token in rest.split(): if token.startswith("path="): entry["paths"].add(token[5:]) elif token.startswith("method="): entry["methods"][token[7:]] += 1 if "ua=" in rest: ua = rest.split("ua=", 1)[1][:80] entry["uas"].add(ua) # Keep last 15 raw log lines per IP entry["log_lines"].append(line) if len(entry["log_lines"]) > 15: entry["log_lines"].pop(0) except Exception: pass # Attach AI sentinel verdicts for v in _sentinel_results: ip = v.get("ip", "") if ip in ips: ips[ip]["ai_verdicts"].append(v) # Calculate threat level + fingerprint for ip, d in ips.items(): if d["exploit_scans"] >= 3: d["threat"] = "critical" elif d["exploit_scans"] >= 1: d["threat"] = "high" elif d["login_fails"] >= 3: d["threat"] = "high" elif d["hits"] >= 10: d["threat"] = "medium" # Fingerprint: multiple UAs = rotating scanner if len(d["uas"]) >= 3: d["threat"] = max(d["threat"], "high", key=["low","medium","high","critical"].index) d["paths"] = sorted(d["paths"])[:15] d["uas"] = sorted(d["uas"])[:5] d["methods"] = dict(d["methods"]) d["event_types"] = dict(d["event_types"]) # Get fail2ban status banned = set() ban_jails = {} for jail in ["llm-team-exploit", "llm-team-login", "nginx-botsearch", "nginx-bad-request", "nginx-forbidden"]: try: result = subprocess.run(["fail2ban-client", "status", jail], capture_output=True, text=True, timeout=5) for line in result.stdout.split("\n"): if "Banned IP list" in line: for ip in line.split(":", 1)[1].strip().split(): ip = ip.strip() if ip: banned.add(ip) ban_jails.setdefault(ip, []).append(jail) except Exception: pass # Build sorted result sort_by = request.args.get("sort", "hits") result = [] for ip, d in ips.items(): if ip.startswith("192.168."): continue result.append({ "ip": ip, "hits": d["hits"], "exploit_scans": d["exploit_scans"], "login_fails": d["login_fails"], "rate_limits": d["rate_limits"], "first_seen": d["first_seen"], "last_seen": d["last_seen"], "paths": d["paths"], "uas": d["uas"], "methods": d["methods"], "event_types": d["event_types"], "threat": d["threat"], "banned": ip in banned, "ban_jails": ban_jails.get(ip, []), "ua_count": len(d["uas"]), "log_lines": d["log_lines"], "ai_verdicts": d["ai_verdicts"] }) # Sort threat_order = {"critical": 4, "high": 3, "medium": 2, "low": 1} if sort_by == "threat": result.sort(key=lambda x: (threat_order.get(x["threat"], 0), x["hits"]), reverse=True) elif sort_by == "recent": result.sort(key=lambda x: x["last_seen"], reverse=True) elif sort_by == "banned": result.sort(key=lambda x: (x["banned"], x["hits"]), reverse=True) else: result.sort(key=lambda x: x["hits"], reverse=True) return jsonify({"ips": result[:100], "total_banned": len(banned), "banned_list": sorted(banned)}) @app.route("/api/admin/security/ban", methods=["POST"]) @admin_required def admin_ban_ip(): """Manually ban/unban an IP via fail2ban.""" import subprocess data = request.json or {} ip = data.get("ip", "").strip() action = data.get("action", "ban") if not ip: return jsonify({"error": "IP required"}), 400 if ip.startswith("192.168."): return jsonify({"error": "Cannot ban LAN addresses"}), 400 try: if action == "ban": subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip], capture_output=True, text=True, timeout=5) sec_log.warning("MANUAL_BAN ip=%s by=%s", ip, session.get("username", "admin")) return jsonify({"ok": True, "message": f"Banned {ip}"}) elif action == "unban": for jail in ["llm-team-exploit", "llm-team-login", "nginx-botsearch", "nginx-bad-request", "nginx-forbidden"]: subprocess.run(["fail2ban-client", "set", jail, "unbanip", ip], capture_output=True, text=True, timeout=5) sec_log.warning("MANUAL_UNBAN ip=%s by=%s", ip, session.get("username", "admin")) return jsonify({"ok": True, "message": f"Unbanned {ip}"}) except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"error": "Invalid action"}), 400 @app.route("/api/admin/security/enrich", methods=["POST"]) @admin_required def admin_enrich_ip(): """Enrich an IP with geolocation, ISP, proxy detection, and AI analysis.""" data = request.json or {} ip = data.get("ip", "").strip() if not ip: return jsonify({"error": "IP required"}), 400 result = {"ip": ip, "geo": None, "ai_analysis": None, "error": None} # Step 1: Geolocation + ISP via ip-api.com try: geo_resp = requests.get( f"http://ip-api.com/json/{ip}?fields=status,country,countryCode,regionName,city,isp,org,as,mobile,proxy,hosting,lat,lon,timezone", timeout=5 ) geo = geo_resp.json() if geo.get("status") == "success": result["geo"] = geo else: result["geo"] = {"error": "lookup failed"} except Exception as e: result["geo"] = {"error": str(e)} # Step 2: Gather all log data for this IP ip_logs = [] try: with open("/var/log/llm-team-security.log") as f: for line in f: if f"ip={ip}" in line: ip_logs.append(line.strip()) except Exception: pass # Step 3: Web-Check deep scan (ports, DNS, blocklists, traceroute) WEB_CHECK_BASE = "http://localhost:3000/api" webcheck = {} for endpoint in ["ports", "dns", "block-lists", "trace-route", "headers", "status"]: try: wc_resp = requests.get(f"{WEB_CHECK_BASE}/{endpoint}?url={ip}", timeout=20) if wc_resp.status_code == 200: data = wc_resp.json() if not isinstance(data, dict) or not data.get("error"): webcheck[endpoint.replace("-", "_")] = data except Exception: pass result["webcheck"] = webcheck # Step 4: AI threat analysis with full context (including web-check data) try: geo_ctx = "" if result["geo"] and not result["geo"].get("error"): g = result["geo"] geo_ctx = f"Geolocation: {g.get('city','?')}, {g.get('regionName','?')}, {g.get('country','?')}\n" geo_ctx += f"ISP: {g.get('isp','?')} | Org: {g.get('org','?')} | AS: {g.get('as','?')}\n" geo_ctx += f"Proxy: {g.get('proxy',False)} | Hosting: {g.get('hosting',False)} | Mobile: {g.get('mobile',False)}\n" # Add web-check data if available wc_ctx = "" if webcheck.get("ports"): open_ports = webcheck["ports"].get("openPorts", []) if open_ports: wc_ctx += f"Open ports: {', '.join(str(p) for p in open_ports)}\n" if webcheck.get("block_lists"): blocked = [b["server"] for b in webcheck["block_lists"].get("blocklists", []) if b.get("isBlocked")] if blocked: wc_ctx += f"Blocked on {len(blocked)} DNS blocklists: {', '.join(blocked[:5])}\n" if webcheck.get("trace_route") and webcheck["trace_route"].get("result"): hops = [list(h.keys())[0] for h in webcheck["trace_route"]["result"] if isinstance(h, dict)] if hops: wc_ctx += f"Traceroute ({len(hops)} hops): {' → '.join(hops[:8])}\n" log_ctx = "\n".join(ip_logs[-20:]) if ip_logs else "No log entries found." prompt = ( f"You are an aggressive cybersecurity analyst protecting a production web application. " f"Provide a detailed threat assessment for IP {ip}. " f"This is a PRIVATE application — there is NO legitimate reason for unknown IPs to scan it.\n\n" f"{geo_ctx}{wc_ctx}\n" f"Activity log ({len(ip_logs)} total entries, showing last 20):\n{log_ctx}\n\n" "THREAT LEVEL RULES (follow strictly):\n" "- critical: ANY exploit scan (.env, .git, wp-admin, etc.) OR blocked on multiple DNS blocklists OR multiple user agents\n" "- high: probing non-existent paths repeatedly OR hosting/proxy IP OR port scan shows only SSH\n" "- medium: a few 404s on common paths from non-proxy IP\n" "- low: single benign request (robots.txt, favicon)\n" "- An IP blocked on 10+ DNS blocklists is ALWAYS critical regardless of log activity\n" "- An IP with only port 22 open and no web service is suspicious infrastructure\n\n" "Provide your analysis as JSON:\n" '{"threat_level": "none|low|medium|high|critical",\n' ' "classification": "scanner|bruteforce|bot|researcher|targeted_attack|compromised_host|legitimate",\n' ' "confidence": 0.0-1.0,\n' ' "summary": "2-3 sentence threat assessment",\n' ' "indicators": ["list of specific indicators found"],\n' ' "recommendation": "specific recommended action — ban permanently, ban 24h, monitor, or ignore",\n' ' "likely_automated": true/false,\n' ' "pattern": "description of attack pattern if any"}\n' ) cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") ai_resp = requests.post(f"{base}/api/generate", json={ "model": SENTINEL_MODEL, "prompt": prompt, "stream": False, "options": {"num_ctx": 4096, "temperature": 0.1} }, timeout=60) ai_resp.raise_for_status() ai_text = ai_resp.json()["response"] # Parse JSON from AI response text = ai_text.strip() if "```" in text: text = text.split("```")[1] if text.startswith("json"): text = text[4:] start_idx = text.find("{") end_idx = text.rfind("}") + 1 if start_idx >= 0 and end_idx > start_idx: result["ai_analysis"] = json.loads(text[start_idx:end_idx]) else: result["ai_analysis"] = {"raw": ai_text[:500]} except Exception as e: result["ai_analysis"] = {"error": str(e)} result["log_count"] = len(ip_logs) # Step 5: Save to Wall of Shame database try: geo = result.get("geo") or {} ai = result.get("ai_analysis") or {} wc = result.get("webcheck") or {} open_ports = json.dumps(wc.get("ports", {}).get("openPorts", [])) bl = wc.get("block_lists", {}).get("blocklists", []) blocked = [b["server"] for b in bl if b.get("isBlocked")] tr_hops = [] if wc.get("trace_route") and wc["trace_route"].get("result"): for h in wc["trace_route"]["result"]: if isinstance(h, dict): hop_ip = list(h.keys())[0] tr_hops.append({"ip": hop_ip, "latency": h[hop_ip][0] if h[hop_ip] else None}) with get_db() as conn: with conn.cursor() as cur: cur.execute(""" INSERT INTO threat_intel (ip, threat_level, classification, confidence, summary, indicators, recommendation, pattern, attack_type, likely_automated, country, country_code, city, isp, org, asn, is_proxy, is_hosting, open_ports, blocklist_count, blocklist_total, blocklists_blocked, reverse_dns, traceroute, log_count, banned, raw_data, enriched_at, updated_at) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,NOW(),NOW()) ON CONFLICT (ip) DO UPDATE SET threat_level=EXCLUDED.threat_level, classification=EXCLUDED.classification, confidence=EXCLUDED.confidence, summary=EXCLUDED.summary, indicators=EXCLUDED.indicators, recommendation=EXCLUDED.recommendation, pattern=EXCLUDED.pattern, attack_type=EXCLUDED.attack_type, likely_automated=EXCLUDED.likely_automated, country=EXCLUDED.country, country_code=EXCLUDED.country_code, city=EXCLUDED.city, isp=EXCLUDED.isp, org=EXCLUDED.org, asn=EXCLUDED.asn, is_proxy=EXCLUDED.is_proxy, is_hosting=EXCLUDED.is_hosting, open_ports=EXCLUDED.open_ports, blocklist_count=EXCLUDED.blocklist_count, blocklist_total=EXCLUDED.blocklist_total, blocklists_blocked=EXCLUDED.blocklists_blocked, reverse_dns=EXCLUDED.reverse_dns, traceroute=EXCLUDED.traceroute, log_count=EXCLUDED.log_count, banned=EXCLUDED.banned, raw_data=EXCLUDED.raw_data, updated_at=NOW() """, ( ip, ai.get("threat_level", "unknown"), ai.get("classification"), ai.get("confidence", 0), ai.get("summary"), json.dumps(ai.get("indicators", [])), ai.get("recommendation"), ai.get("pattern"), ai.get("attack_type"), ai.get("likely_automated", False), geo.get("country"), geo.get("countryCode"), geo.get("city"), geo.get("isp"), geo.get("org"), geo.get("as"), geo.get("proxy", False), geo.get("hosting", False), open_ports, len(blocked), len(bl), json.dumps(blocked), "", json.dumps(tr_hops), len(ip_logs), ip in _get_banned_ips(), json.dumps(result) )) conn.commit() result["saved"] = True except Exception as e: result["saved"] = False result["save_error"] = str(e) return jsonify(result) def _get_banned_ips(): """Quick check of all banned IPs.""" import subprocess banned = set() for jail in ["llm-team-exploit", "llm-team-login"]: try: r = subprocess.run(["fail2ban-client", "status", jail], capture_output=True, text=True, timeout=5) for line in r.stdout.split("\n"): if "Banned IP list" in line: for ip in line.split(":", 1)[1].strip().split(): banned.add(ip.strip()) except Exception: pass return banned @app.route("/api/admin/wall-of-shame") @admin_required def admin_wall_of_shame(): """Return all enriched threat intel from the database.""" sort = request.args.get("sort", "enriched_at") order = request.args.get("order", "desc") threat_filter = request.args.get("threat", "") allowed_sorts = {"enriched_at", "threat_level", "confidence", "blocklist_count", "log_count", "ip"} if sort not in allowed_sorts: sort = "enriched_at" order_sql = "DESC" if order == "desc" else "ASC" try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: if threat_filter: cur.execute(f"SELECT * FROM threat_intel WHERE threat_level = %s ORDER BY {sort} {order_sql} LIMIT 200", (threat_filter,)) else: cur.execute(f"SELECT * FROM threat_intel ORDER BY {sort} {order_sql} LIMIT 200") rows = cur.fetchall() for r in rows: r["enriched_at"] = r["enriched_at"].isoformat() if r["enriched_at"] else None r["updated_at"] = r["updated_at"].isoformat() if r["updated_at"] else None return jsonify({"entries": rows, "total": len(rows)}) except Exception as e: return jsonify({"entries": [], "error": str(e)}) @app.route("/api/admin/security/mass-ban", methods=["POST"]) @admin_required def admin_mass_ban(): """Ban or unban multiple IPs at once.""" import subprocess data = request.json or {} ip_list = data.get("ips", []) action = data.get("action", "ban") if not ip_list: return jsonify({"error": "No IPs provided"}), 400 results = {"success": 0, "failed": 0, "skipped": 0} for ip in ip_list: ip = ip.strip() if not ip or ip.startswith("192.168."): results["skipped"] += 1 continue try: if action == "ban": subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip], capture_output=True, text=True, timeout=5) sec_log.warning("MASS_BAN ip=%s by=%s", ip, session.get("username", "admin")) elif action == "unban": for jail in ["llm-team-exploit", "llm-team-login", "nginx-botsearch", "nginx-bad-request", "nginx-forbidden"]: subprocess.run(["fail2ban-client", "set", jail, "unbanip", ip], capture_output=True, text=True, timeout=5) sec_log.warning("MASS_UNBAN ip=%s by=%s", ip, session.get("username", "admin")) results["success"] += 1 except Exception: results["failed"] += 1 return jsonify({"ok": True, "results": results}) # ─── ADMIN MONITOR ───────────────────────────────────────────── @app.route("/admin/monitor") @admin_required def monitor_page(): return MONITOR_HTML @app.route("/api/admin/monitor") @admin_required def monitor_data(): active = [] for rid, r in _active_runs.items(): active.append({ "run_id": rid, "mode": r["mode"], "user": r["user"], "prompt": r["prompt"], "elapsed": round(time.time() - r["started"], 1), "step": r["step"], "total_steps": r["total_steps"], "substep": r["substep"], "events": r["events"], "errors": len(r["errors"]), "responses_size": r["responses_size"], "error_details": r["errors"][-3:] # last 3 errors }) recent = list(reversed(_run_log[-20:])) return jsonify({"active": active, "recent": recent, "timestamp": time.time()}) MONITOR_HTML = r""" LLM Team — Monitor

Monitor // Process View

0
Active Runs
0
Completed
0
Errors
Avg Duration
Active Runs
No active runs
Recent Runs
No recent runs
History (from DB)
Loading...
""" # ─── HISTORY ROUTES ──────────────────────────────────────────── @app.route("/api/runs") @login_required def get_runs(): show = request.args.get("show", "active") # active, archived, all try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: if show == "archived": cur.execute("SELECT id, mode, prompt, models_used, created_at, archived FROM team_runs WHERE archived = true ORDER BY created_at DESC LIMIT 200") elif show == "all": cur.execute("SELECT id, mode, prompt, models_used, created_at, archived FROM team_runs ORDER BY created_at DESC LIMIT 200") else: cur.execute("SELECT id, mode, prompt, models_used, created_at, archived FROM team_runs WHERE archived = false ORDER BY created_at DESC LIMIT 50") runs = cur.fetchall() for r in runs: r["created_at"] = r["created_at"].isoformat() return jsonify({"runs": runs}) except Exception as e: return jsonify({"runs": [], "error": str(e)}) @app.route("/api/runs/") @login_required def get_run(run_id): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM team_runs WHERE id = %s", (run_id,)) run = cur.fetchone() if not run: return jsonify({"error": "not found"}), 404 run["created_at"] = run["created_at"].isoformat() return jsonify(run) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs/", methods=["DELETE"]) @login_required def delete_run(run_id): try: with get_db() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM team_runs WHERE id = %s", (run_id,)) conn.commit() return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs//archive", methods=["POST"]) @login_required def archive_run(run_id): try: with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE team_runs SET archived = true WHERE id = %s", (run_id,)) conn.commit() return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs//restore", methods=["POST"]) @login_required def restore_run(run_id): try: with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE team_runs SET archived = false WHERE id = %s", (run_id,)) conn.commit() return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs/bulk-archive", methods=["POST"]) @admin_required def bulk_archive_runs(): data = request.json or {} action = data.get("action", "archive") # archive or restore ids = data.get("ids", []) before = data.get("before") # archive all before this date try: with get_db() as conn: with conn.cursor() as cur: archived_val = action == "archive" if ids: cur.execute("UPDATE team_runs SET archived = %s WHERE id = ANY(%s)", (archived_val, ids)) count = cur.rowcount elif before: cur.execute("UPDATE team_runs SET archived = %s WHERE created_at < %s AND archived = %s", (archived_val, before, not archived_val)) count = cur.rowcount else: # Archive all cur.execute("UPDATE team_runs SET archived = true WHERE archived = false") count = cur.rowcount conn.commit() return jsonify({"ok": True, "count": count}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs//tags", methods=["POST"]) @login_required def update_run_tags(run_id): data = request.json or {} tags = data.get("tags", []) notes = data.get("notes") try: with get_db() as conn: with conn.cursor() as cur: if notes is not None: cur.execute("UPDATE team_runs SET tags = %s, notes = %s WHERE id = %s", (tags, notes, run_id)) else: cur.execute("UPDATE team_runs SET tags = %s WHERE id = %s", (tags, run_id)) conn.commit() return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/runs/tags") @login_required def get_all_tags(): """Get all unique tags in use.""" try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT DISTINCT unnest(tags) as tag FROM team_runs ORDER BY tag") tags = [r[0] for r in cur.fetchall()] return jsonify({"tags": tags}) except Exception as e: return jsonify({"tags": [], "error": str(e)}) @app.route("/api/runs/vectors") @login_required def get_run_vectors(): """Return runs as structured text documents for AI/embedding consumption.""" limit = min(int(request.args.get("limit", 50)), 500) mode_filter = request.args.get("mode", "") tag_filter = request.args.get("tag", "") try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: conditions = ["archived = false"] params = [] if mode_filter: conditions.append("mode = %s") params.append(mode_filter) if tag_filter: conditions.append("%s = ANY(tags)") params.append(tag_filter) where = " AND ".join(conditions) params.append(limit) cur.execute(f"SELECT * FROM team_runs WHERE {where} ORDER BY created_at DESC LIMIT %s", params) runs = cur.fetchall() vectors = [] for run in runs: responses = run.get("responses") or [] # Build structured document doc_parts = [ f"MODE: {run['mode']}", f"PROMPT: {run['prompt']}", f"MODELS: {', '.join(run.get('models_used') or [])}", f"DATE: {run['created_at'].isoformat()}", f"TAGS: {', '.join(run.get('tags') or [])}", ] if run.get("notes"): doc_parts.append(f"NOTES: {run['notes']}") for i, resp in enumerate(responses): doc_parts.append(f"\n--- RESPONSE {i+1} [{resp.get('role','?')}] by {resp.get('model','?')} ---") doc_parts.append(resp.get("text", "")[:2000]) document = "\n".join(doc_parts) vectors.append({ "id": run["id"], "mode": run["mode"], "prompt": run["prompt"], "tags": run.get("tags") or [], "document": document, "char_count": len(document), "token_estimate": len(document) // 4 }) return jsonify({"vectors": vectors, "total": len(vectors)}) except Exception as e: return jsonify({"vectors": [], "error": str(e)}) @app.route("/history") @login_required def history_page(): return HISTORY_HTML HISTORY_HTML = r""" LLM Team — History

History // Run Archive

IDModePromptModelsTagsDate
Loading...
""" @app.route("/api/self-analyze", methods=["POST"]) @admin_required def self_analyze(): """Run AI analysis on the system's own data. Generates reports from logs, runs, and security data.""" data = request.json or {} report_type = data.get("type", "threat_intel") model = data.get("model", "qwen2.5:latest") # Gather data based on report type context = "" if report_type == "threat_intel": try: with open("/var/log/llm-team-security.log") as f: lines = [l.strip() for l in f.readlines() if "192.168" not in l] context = f"SECURITY LOG ({len(lines)} external entries, last 80):\n" + "\n".join(lines[-80:]) except Exception: context = "No security log data available." prompt = ( "You are a threat intelligence analyst. Analyze these server logs from a PRIVATE web application and produce a concise STRATEGIC THREAT REPORT.\n\n" f"{context}\n\n" "Sections: 1) EXECUTIVE SUMMARY 2) ATTACK TAXONOMY with counts 3) ATTACKER PROFILING 4) PREDICTIVE ANALYSIS 5) TOP 5 ACTIONABLE RECOMMENDATIONS" ) elif report_type == "model_performance": try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT mode, models_used, jsonb_array_length(responses) as resp_count, LENGTH(responses::text) as resp_size, LENGTH(prompt) as prompt_len FROM team_runs WHERE archived=false ORDER BY created_at DESC LIMIT 50") runs = cur.fetchall() context = json.dumps([dict(r) for r in runs], default=str)[:6000] except Exception: context = "No run data available." prompt = ( f"Analyze this dataset of AI orchestration runs.\n\nDATA:\n{context}\n\n" "Produce a MODEL PERFORMANCE REPORT: 1) USAGE PATTERNS 2) MODEL WORKLOAD 3) RESPONSE EFFICIENCY 4) OPTIMIZATION OPPORTUNITIES 5) RECOMMENDED EXPERIMENTS" ) elif report_type == "access_patterns": try: with open("/var/log/nginx/access.log") as f: lines = f.readlines()[-200:] context = "NGINX ACCESS LOG (last 200 entries):\n" + "".join(lines) except Exception: context = "No access log data." prompt = ( f"Analyze these web server access logs for a private AI orchestration platform.\n\n{context}\n\n" "Produce a USAGE ANALYTICS REPORT: 1) TRAFFIC PATTERNS (peak times, frequency) 2) FEATURE USAGE (which pages/APIs are used most) " "3) USER JOURNEY (typical workflow sequence) 4) PERFORMANCE INSIGHTS 5) UX RECOMMENDATIONS" ) elif report_type == "security_posture": # Combine multiple data sources sec_lines = "" try: with open("/var/log/llm-team-security.log") as f: sec_lines = "\n".join([l.strip() for l in f.readlines() if "192.168" not in l][-40:]) except Exception: pass sentinel_lines = "" try: with open("/var/log/llm-team-sentinel.log") as f: sentinel_lines = "\n".join(f.readlines()[-20:]) except Exception: pass threat_count = 0 try: with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM threat_intel") threat_count = cur.fetchone()[0] except Exception: pass import subprocess banned = "" try: r = subprocess.run(["fail2ban-client", "status", "llm-team-exploit"], capture_output=True, text=True, timeout=5) banned = r.stdout except Exception: pass context = f"SECURITY LOG (external, last 40):\n{sec_lines}\n\nSENTINEL LOG:\n{sentinel_lines}\n\nTHREAT INTEL DB: {threat_count} profiled IPs\n\nFAIL2BAN STATUS:\n{banned}" prompt = ( f"You are auditing the security posture of a private web application.\n\n{context}\n\n" "Produce a SECURITY POSTURE ASSESSMENT: 1) OVERALL RISK RATING (1-10) 2) DEFENSE EFFECTIVENESS (what's working) " "3) GAPS AND WEAKNESSES 4) INCIDENT TIMELINE (recent events) 5) PRIORITY HARDENING STEPS" ) else: return jsonify({"error": f"Unknown report type: {report_type}"}), 400 # Run analysis try: cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") resp = requests.post(f"{base}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": {"num_ctx": 8192, "temperature": 0.2} }, timeout=120) resp.raise_for_status() report = resp.json()["response"] except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"report": report, "type": report_type, "model": model, "data_size": len(context)}) @app.route("/api/pipelines") @login_required def get_pipelines(): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT id, pipeline, topic, status, models_used, duration_ms, created_at FROM pipeline_runs ORDER BY created_at DESC LIMIT 50") runs = cur.fetchall() for r in runs: r["created_at"] = r["created_at"].isoformat() if r["created_at"] else None return jsonify({"pipelines": runs}) except Exception as e: return jsonify({"pipelines": [], "error": str(e)}) @app.route("/api/pipelines/") @login_required def get_pipeline(pid): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM pipeline_runs WHERE id = %s", (pid,)) run = cur.fetchone() if not run: return jsonify({"error": "not found"}), 404 run["created_at"] = run["created_at"].isoformat() if run["created_at"] else None run["completed_at"] = run["completed_at"].isoformat() if run["completed_at"] else None return jsonify(run) except Exception as e: return jsonify({"error": str(e)}), 500 # ─── LAB: RATCHET ENGINE ────────────────────────────────────── _lab_threads = {} # experiment_id -> thread _lab_streams = {} # experiment_id -> [queue, ...] def _lab_emit(exp_id, data): for q in _lab_streams.get(exp_id, []): q.append(data) def _score_response(response, expected, metric, judge_model=None): if metric == "accuracy": if not expected: return 5.0 resp_lower = response.lower().strip() exp_lower = expected.lower().strip() if exp_lower in resp_lower: return 10.0 if any(w in resp_lower for w in exp_lower.split()): return 5.0 return 1.0 elif metric == "speed": return 10.0 # speed scored externally by duration elif metric == "quality" and judge_model: try: judgment = query_model(judge_model, f"Rate this response 1-10 for quality, relevance, and completeness.\n\n" f"EXPECTED: {expected or 'No expected output specified'}\n\n" f"RESPONSE: {response[:1500]}\n\n" f"Return ONLY a number 1-10, nothing else.") import re m = re.search(r'\b(\d+)\b', judgment) return min(float(m.group(1)), 10.0) if m else 5.0 except Exception: return 5.0 return 5.0 def _ratchet_loop(exp_id): try: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (exp_id,)) exp = cur.fetchone() if not exp: return eval_cases = exp["eval_cases"] or [] models_pool = exp["models_pool"] or [] metric = exp["metric"] or "quality" objective = exp["objective"] or "Improve response quality" mutable = exp["mutable_config"] or { "system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": models_pool[0] if models_pool else "llama3.2:latest", } best_config = exp["best_config"] or json.loads(json.dumps(mutable)) best_score = exp["best_score"] or 0 trial_num = exp["total_trials"] or 0 # Pick meta-model (largest in pool) meta_model = models_pool[-1] if models_pool else "qwen2.5:latest" judge_model = models_pool[0] if models_pool else "llama3.2:latest" while True: # Check if still running with get_db() as conn: with conn.cursor() as cur: cur.execute("SELECT status FROM lab_experiments WHERE id = %s", (exp_id,)) row = cur.fetchone() if not row or row[0] != "running": break trial_num += 1 trial_start = time.time() _lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": "Proposing change..."}) # Step 1: Meta-model proposes a change history_hint = "" if trial_num > 1: with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT config_diff, avg_score, improved FROM lab_trials WHERE experiment_id = %s ORDER BY id DESC LIMIT 5", (exp_id,)) recent = cur.fetchall() if recent: history_hint = "\n\nRecent trials:\n" + "\n".join( f" {'KEPT' if r['improved'] else 'DISCARDED'} (score {r['avg_score']:.1f}): {r['config_diff']}" for r in recent) propose_prompt = ( f"You are optimizing an LLM pipeline. Objective: {objective}\n" f"Metric: {metric} (higher is better, max 10)\n" f"Current best score: {best_score:.1f}/10\n\n" f"Current config:\n{json.dumps(mutable, indent=2)}\n\n" f"Available models: {models_pool}\n" f"Eval cases: {len(eval_cases)}\n" f"{history_hint}\n\n" f"Suggest exactly ONE change to improve the score. Return ONLY valid JSON with the FULL updated config. " f"Keys: system_prompt (string), temperature (0.0-1.5), model (string from available models). " f"Be creative but focused. Change only one thing at a time." ) try: proposal_raw = query_model(meta_model, propose_prompt) import re json_match = re.search(r'\{[\s\S]*\}', proposal_raw) if json_match: proposed = json.loads(json_match.group()) # Validate keys if "system_prompt" not in proposed: proposed["system_prompt"] = mutable.get("system_prompt", "") if "temperature" not in proposed: proposed["temperature"] = mutable.get("temperature", 0.7) if "model" not in proposed: proposed["model"] = mutable.get("model", models_pool[0]) else: proposed = mutable except Exception: proposed = mutable # Describe the diff diffs = [] for k in set(list(mutable.keys()) + list(proposed.keys())): old_v = mutable.get(k) new_v = proposed.get(k) if old_v != new_v: if k == "system_prompt": diffs.append(f"system_prompt changed ({len(str(old_v))} → {len(str(new_v))} chars)") else: diffs.append(f"{k}: {old_v} → {new_v}") config_diff = "; ".join(diffs) if diffs else "no change" _lab_emit(exp_id, {"type": "status", "trial": trial_num, "message": f"Testing: {config_diff[:80]}"}) # Step 2: Run eval cases with proposed config trial_scores = [] model_to_use = proposed.get("model", models_pool[0] if models_pool else "llama3.2:latest") sys_prompt = proposed.get("system_prompt", "") for ci, case in enumerate(eval_cases): inp = case.get("input", "") expected = case.get("expected", "") full_prompt = f"{sys_prompt}\n\n{inp}" if sys_prompt else inp try: resp = query_model(model_to_use, full_prompt) score = _score_response(resp, expected, metric, judge_model if metric == "quality" else None) trial_scores.append({"input": inp[:100], "score": score, "response": resp[:300]}) except Exception as e: trial_scores.append({"input": inp[:100], "score": 0, "error": str(e)}) avg_score = sum(s["score"] for s in trial_scores) / max(len(trial_scores), 1) duration_ms = int((time.time() - trial_start) * 1000) improved = avg_score > best_score # Step 3: Ratchet if improved: best_score = avg_score best_config = json.loads(json.dumps(proposed)) mutable = json.loads(json.dumps(proposed)) else: mutable = json.loads(json.dumps(best_config)) # Save trial with get_db() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO lab_trials (experiment_id, trial_num, config_diff, config_snapshot, scores, avg_score, improved, duration_ms) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (exp_id, trial_num, config_diff, json.dumps(proposed), json.dumps(trial_scores), avg_score, improved, duration_ms) ) cur.execute( """UPDATE lab_experiments SET total_trials = %s, best_score = %s, best_config = %s, mutable_config = %s, improvements = improvements + %s WHERE id = %s""", (trial_num, best_score, json.dumps(best_config), json.dumps(mutable), 1 if improved else 0, exp_id) ) conn.commit() _lab_emit(exp_id, { "type": "trial", "trial": trial_num, "score": round(avg_score, 2), "best": round(best_score, 2), "improved": improved, "diff": config_diff[:100], "duration_ms": duration_ms }) # Done with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s AND status = 'running'", (exp_id,)) conn.commit() _lab_emit(exp_id, {"type": "done"}) except Exception as e: _lab_emit(exp_id, {"type": "error", "message": str(e)}) try: with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'error' WHERE id = %s", (exp_id,)) conn.commit() except Exception: pass # ─── LAB API ROUTES ─────────────────────────────────────────── @app.route("/lab") @admin_required def lab_page(): return render_template_string(LAB_HTML) @app.route("/api/lab/experiments", methods=["GET"]) @admin_required def lab_list(): with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT id, name, status, metric, best_score, total_trials, improvements, models_pool, created_at FROM lab_experiments ORDER BY created_at DESC") rows = cur.fetchall() for r in rows: r["created_at"] = r["created_at"].isoformat() return jsonify({"experiments": rows}) @app.route("/api/lab/experiments", methods=["POST"]) @admin_required def lab_create(): d = request.json with get_db() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO lab_experiments (name, objective, metric, eval_cases, mutable_config, best_config, models_pool) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id""", (d["name"], d.get("objective", ""), d.get("metric", "quality"), json.dumps(d.get("eval_cases", [])), json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})), json.dumps(d.get("mutable_config", {"system_prompt": "You are a helpful assistant.", "temperature": 0.7, "model": ""})), d.get("models_pool", [])) ) eid = cur.fetchone()[0] conn.commit() return jsonify({"id": eid}) @app.route("/api/lab/experiments/", methods=["GET"]) @admin_required def lab_get(eid): with get_db() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute("SELECT * FROM lab_experiments WHERE id = %s", (eid,)) exp = cur.fetchone() if not exp: return jsonify({"error": "not found"}), 404 exp["created_at"] = exp["created_at"].isoformat() cur.execute("SELECT * FROM lab_trials WHERE experiment_id = %s ORDER BY trial_num", (eid,)) exp["trials"] = cur.fetchall() for t in exp["trials"]: t["created_at"] = t["created_at"].isoformat() return jsonify(exp) @app.route("/api/lab/experiments/", methods=["PUT"]) @admin_required def lab_update(eid): d = request.json sets, vals = [], [] for k in ["name", "objective", "metric"]: if k in d: sets.append(f"{k} = %s") vals.append(d[k]) for k in ["eval_cases", "mutable_config"]: if k in d: sets.append(f"{k} = %s") vals.append(json.dumps(d[k])) if "models_pool" in d: sets.append("models_pool = %s") vals.append(d["models_pool"]) if not sets: return jsonify({"ok": True}) vals.append(eid) with get_db() as conn: with conn.cursor() as cur: cur.execute(f"UPDATE lab_experiments SET {', '.join(sets)} WHERE id = %s", vals) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//start", methods=["POST"]) @admin_required def lab_start(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'running' WHERE id = %s", (eid,)) conn.commit() if eid in _lab_threads and _lab_threads[eid].is_alive(): return jsonify({"ok": True, "message": "Already running"}) t = threading.Thread(target=_ratchet_loop, args=(eid,), daemon=True) _lab_threads[eid] = t t.start() return jsonify({"ok": True}) @app.route("/api/lab/experiments//pause", methods=["POST"]) @admin_required def lab_pause(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'paused' WHERE id = %s", (eid,)) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//reset", methods=["POST"]) @admin_required def lab_reset(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("UPDATE lab_experiments SET status = 'idle', total_trials = 0, improvements = 0, best_score = 0, best_config = mutable_config WHERE id = %s", (eid,)) cur.execute("DELETE FROM lab_trials WHERE experiment_id = %s", (eid,)) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//delete", methods=["DELETE"]) @admin_required def lab_delete(eid): with get_db() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM lab_experiments WHERE id = %s", (eid,)) conn.commit() return jsonify({"ok": True}) @app.route("/api/lab/experiments//stream") @admin_required def lab_stream(eid): q = [] _lab_streams.setdefault(eid, []).append(q) def generate(): try: while True: if q: data = q.pop(0) yield f"data: {json.dumps(data)}\n\n" if data.get("type") == "done": break else: time.sleep(0.5) yield ": keepalive\n\n" finally: _lab_streams.get(eid, []).remove(q) if q in _lab_streams.get(eid, []) else None return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ─── ACTIVE RUN TRACKING ────────────────────────────────────── import uuid as _uuid _active_runs = {} # run_id -> {mode, user, prompt, started, step, total_steps, substep, events, errors} _run_log = [] # recent completed runs with timing/error info (last 100) def _log_run(run_info): """Archive a completed run to the log.""" _run_log.append(run_info) if len(_run_log) > 100: _run_log.pop(0) # ─── TEAM ROUTES ────────────────────────────────────────────── @app.route("/api/run", methods=["POST"]) @login_required def run_team(): ip = request.remote_addr if rate_limited(ip): return jsonify({"error": "Rate limit exceeded. Wait a minute."}), 429 config = request.json if not config: return jsonify({"error": "Request body required"}), 400 mode = config.get("mode", "") if not mode: return jsonify({"error": "Mode is required"}), 400 prompt = config.get("prompt", "").strip() if not prompt: return jsonify({"error": "Prompt cannot be empty"}), 400 RUNNERS = { "brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate, "validator": run_validator, "roundrobin": run_roundrobin, "redteam": run_redteam, "consensus": run_consensus, "codereview": run_codereview, "ladder": run_ladder, "tournament": run_tournament, "evolution": run_evolution, "blindassembly": run_blindassembly, "staircase": run_staircase, "drift": run_drift, "mesh": run_mesh, "hallucination": run_hallucination, "timeloop": run_timeloop, "research": run_research, "eval": run_eval, "extract": run_extract, } run_id = str(_uuid.uuid4())[:8] username = session.get("username", "unknown") _active_runs[run_id] = { "mode": mode, "user": username, "prompt": prompt[:100], "started": time.time(), "step": 0, "total_steps": 0, "substep": "", "events": 0, "errors": [], "responses_size": 0 } def generate(): import queue collected = [] run = _active_runs[run_id] event_queue = queue.Queue() stop_heartbeat = threading.Event() # Heartbeat thread: sends keepalive every 10s to prevent connection timeout def heartbeat(): while not stop_heartbeat.is_set(): stop_heartbeat.wait(10) if not stop_heartbeat.is_set(): event_queue.put(": keepalive\n\n") hb_thread = threading.Thread(target=heartbeat, daemon=True) hb_thread.start() # Runner thread: executes the mode runner and pushes events to queue def run_pipeline(): try: runner = RUNNERS.get(mode) if runner: for event_str in runner(config): event_queue.put(event_str) else: event_queue.put(sse({"type": "response", "model": "system", "text": f"Unknown mode: {mode}", "role": "error"})) event_queue.put(sse({"type": "done"})) except Exception as e: run["errors"].append({"model": "system", "error": str(e)[:500], "time": time.time()}) event_queue.put(sse({"type": "response", "model": "system", "text": f"Pipeline error: {e}", "role": "error"})) event_queue.put(sse({"type": "done"})) finally: event_queue.put(None) # sentinel pipeline_thread = threading.Thread(target=run_pipeline, daemon=True) pipeline_thread.start() try: while True: try: event_str = event_queue.get(timeout=30) except queue.Empty: # Safety keepalive if heartbeat thread died yield ": keepalive\n\n" continue if event_str is None: break yield event_str # Track events if event_str.startswith("data: "): run["events"] += 1 try: data = json.loads(event_str[6:].strip()) evt_type = data.get("type") if evt_type == "response": text = data.get("text", "") run["responses_size"] += len(text) collected.append({"model": data.get("model", ""), "text": text, "role": data.get("role", "")}) if data.get("role") == "error": run["errors"].append({"model": data.get("model"), "error": text[:200], "time": time.time()}) elif evt_type == "progress": run["step"] = data.get("step", run["step"]) run["total_steps"] = data.get("total_steps", run["total_steps"]) run["substep"] = data.get("substep", "") elif evt_type == "status": run["substep"] = data.get("message", "") except Exception: pass finally: stop_heartbeat.set() run["finished"] = time.time() run["duration"] = round(run["finished"] - run["started"], 1) run["response_count"] = len(collected) _log_run(dict(run, run_id=run_id)) _active_runs.pop(run_id, None) if collected: save_run(mode, config.get("prompt", ""), config, collected) return Response(generate(), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"}) # ─── ORIGINAL 10 MODES ──────────────────────────────────────── def run_brainstorm(config): models, prompt = config.get("models", []), config["prompt"] synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5") total = 2 if len(models) > 1 else 1 yield sse({"type": "clear"}) yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"Querying {len(models)} models in parallel...", "percent": 10}) yield sse({"type": "status", "message": f"Querying {len(models)} models..."}) # Stream responses as they arrive instead of waiting for all responses = {} completed = 0 max_timeout = max((_get_timeout(m) for m in models), default=300) + 30 with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: futures = {pool.submit(safe_query, m, prompt): m for m in models} for future in as_completed(futures, timeout=max_timeout): m = futures[future] try: r = future.result(timeout=10) except Exception as e: r = f"Error: {e}" responses[m] = r completed += 1 pct = 10 + int((completed / len(models)) * 50) yield sse({"type": "progress", "step": 1, "total_steps": total, "substep": f"{completed}/{len(models)} models responded", "percent": pct}) role = "error" if r.startswith("Error:") else "respondent" yield sse({"type": "response", "model": m, "text": r, "role": role}) if len(responses) > 1: yield sse({"type": "progress", "step": 2, "total_steps": total, "substep": f"Synthesizing with {synthesizer}...", "percent": 70}) yield sse({"type": "status", "message": f"Synthesizing with {synthesizer}..."}) parts = [("QUESTION:", prompt, 1), ("INSTRUCTION:", "Synthesize the best answer. Be concise.", 1)] for m, r in responses.items(): parts.append((f"[{m}]:", cap_response(r), 3)) sp = build_context(parts, synthesizer) try: yield sse({"type": "response", "model": synthesizer, "text": safe_query(synthesizer, sp), "role": "synthesis"}) except Exception as e: yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"}) yield sse({"type": "progress", "step": total, "total_steps": total, "substep": "Complete", "percent": 100}) def run_pipeline(config): steps, current = config.get("steps", []), config["prompt"] yield sse({"type": "clear"}) for i, step in enumerate(steps): model = step["model"] yield sse({"type": "status", "message": f"Step {i+1}/{len(steps)}: {model}..."}) try: prompt = step["instruction"].replace("{input}", cap_response(current)) current = safe_query(model, prompt) yield sse({"type": "response", "model": model, "text": current, "role": f"step {i+1}"}) except Exception as e: yield sse({"type": "response", "model": model, "text": str(e), "role": "error"}); break def run_debate(config): prompt = config.get("prompt", "") d1, d2, judge = config.get("debater1"), config.get("debater2"), config.get("judge") if not all([d1, d2, judge]): yield sse({"type": "response", "model": "system", "text": "Debate mode requires 'debater1', 'debater2', and 'judge' model parameters.", "role": "error"}) yield sse({"type": "done"}) return rounds = config.get("rounds", 2) yield sse({"type": "clear"}) history = [] for m in [d1, d2]: yield sse({"type": "status", "message": f"{m} opening..."}) try: r = safe_query(m, f"Give your position on: {prompt}") history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": "opening"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) for rd in range(rounds): for i, m in enumerate([d1, d2]): other = [d1, d2][1-i] other_last = [h[1] for h in history if h[0] == other][-1] yield sse({"type": "status", "message": f"Round {rd+1}: {m}..."}) try: rebuttal_prompt = build_context([ ("Topic:", prompt, 1), (f"Opponent ({other}) said:", cap_response(other_last), 2), ("INSTRUCTION:", "Rebuttal or concede:", 1), ], m) r = safe_query(m, rebuttal_prompt) history.append((m, r)); yield sse({"type": "response", "model": m, "text": r, "role": f"round {rd+1}"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) yield sse({"type": "status", "message": f"{judge} judging..."}) parts = [("Topic:", prompt, 1), ("INSTRUCTION:", "Judge: who won and why?", 1)] for m, t in history: parts.append((f"[{m}]:", cap_response(t), 3)) jp = build_context(parts, judge) try: yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "judge"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) def run_validator(config): prompt, answerer, validators = config["prompt"], config["answerer"], config.get("validators", []) yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{answerer} answering..."}) try: answer = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return yield sse({"type": "status", "message": f"Validating with {len(validators)} models..."}) vp = f"QUESTION: {prompt}\n\nANSWER:\n{answer}\n\nFact-check. Score 1-10 for accuracy, completeness, clarity. Flag errors." results = parallel_query(validators, vp) for m, r in results.items(): yield sse({"type": "response", "model": m, "text": r, "role": "validator"}) def run_roundrobin(config): prompt, models, cycles = config["prompt"], config.get("models", []), config.get("cycles", 2) yield sse({"type": "clear"}) if not models: return yield sse({"type": "status", "message": f"{models[0]} drafting..."}) try: current = query_model(models[0], f"Answer:\n\n{prompt}") yield sse({"type": "response", "model": models[0], "text": current, "role": "draft"}) except Exception as e: yield sse({"type": "response", "model": models[0], "text": str(e), "role": "error"}); return for cycle in range(cycles): start = 1 if cycle == 0 else 0 for i in range(start, len(models)): m = models[i] yield sse({"type": "status", "message": f"Cycle {cycle+1}: {m}..."}) try: current = query_model(m, f"Question: {prompt}\n\nCurrent answer:\n{current}\n\nImprove it. Return full improved answer.") is_last = (cycle == cycles-1) and (i == len(models)-1) yield sse({"type": "response", "model": m, "text": current, "role": "final" if is_last else f"cycle {cycle+1}"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) def run_redteam(config): prompt = config.get("prompt", "") author, attacker, patcher = config.get("author"), config.get("attacker"), config.get("patcher") if not all([author, attacker, patcher]): yield sse({"type": "response", "model": "system", "text": "Red team mode requires 'author', 'attacker', and 'patcher' model parameters.", "role": "error"}) yield sse({"type": "done"}) return rounds = config.get("rounds", 2) yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{author} writing..."}) try: current = query_model(author, prompt) yield sse({"type": "response", "model": author, "text": current, "role": "author"}) except Exception as e: yield sse({"type": "response", "model": author, "text": str(e), "role": "error"}); return for r in range(rounds): yield sse({"type": "status", "message": f"Round {r+1}: {attacker} attacking..."}) try: attack = query_model(attacker, f"Question: {prompt}\n\nAnswer:\n{current}\n\nRED TEAM: find every flaw, error, weakness, edge case. Be aggressive.") yield sse({"type": "response", "model": attacker, "text": attack, "role": f"attack {r+1}"}) except Exception as e: yield sse({"type": "response", "model": attacker, "text": str(e), "role": "error"}); continue yield sse({"type": "status", "message": f"Round {r+1}: {patcher} fixing..."}) try: current = query_model(patcher, f"Question: {prompt}\n\nAnswer:\n{current}\n\nFlaws found:\n{attack}\n\nFix ALL issues. Return complete improved answer.") yield sse({"type": "response", "model": patcher, "text": current, "role": "patcher" if r == rounds-1 else f"patch {r+1}"}) except Exception as e: yield sse({"type": "response", "model": patcher, "text": str(e), "role": "error"}) def run_consensus(config): prompt, models, max_rounds = config["prompt"], config.get("models", []), config.get("max_rounds", 3) yield sse({"type": "clear"}) if not models: return yield sse({"type": "status", "message": f"Round 1: {len(models)} models answering..."}) responses = parallel_query(models, prompt) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": r, "role": "round 1"}) for rd in range(2, max_rounds + 1): yield sse({"type": "status", "message": f"Round {rd}: reviewing each other..."}) new = {} for m in models: parts = [("Question:", prompt, 1), ("Your answer:", cap_response(responses.get(m, "")), 2), ("INSTRUCTION:", "Revise considering other perspectives. Adopt good points, defend if right.", 1)] for o, r in responses.items(): if o != m: parts.append((f"[{o}]:", cap_response(r), 3)) ctx = build_context(parts, m) try: new[m] = safe_query(m, ctx) yield sse({"type": "response", "model": m, "text": new[m], "role": "consensus" if rd == max_rounds else f"round {rd}"}) except Exception as e: new[m] = responses.get(m, ""); yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) responses = new def run_codereview(config): prompt = config.get("prompt", "") coder, reviewer, tester = config.get("coder"), config.get("reviewer"), config.get("tester") if not all([coder, reviewer, tester]): yield sse({"type": "response", "model": "system", "text": "Code review mode requires 'coder', 'reviewer', and 'tester' model parameters.", "role": "error"}) yield sse({"type": "done"}) return yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{coder} coding..."}) try: code = query_model(coder, f"Write code for this task. Only output code with brief comments.\n\n{prompt}") yield sse({"type": "response", "model": coder, "text": code, "role": "coder"}) except Exception as e: yield sse({"type": "response", "model": coder, "text": str(e), "role": "error"}); return yield sse({"type": "status", "message": f"{reviewer} reviewing..."}) try: review = query_model(reviewer, f"Task: {prompt}\n\nCode:\n{code}\n\nReview: bugs, security, performance, style, edge cases. Provide corrected code if needed.") yield sse({"type": "response", "model": reviewer, "text": review, "role": "reviewer"}) except Exception as e: review = ""; yield sse({"type": "response", "model": reviewer, "text": str(e), "role": "error"}) yield sse({"type": "status", "message": f"{tester} testing..."}) try: tests = query_model(tester, f"Task: {prompt}\n\nCode:\n{code}\n\nReview:\n{review}\n\nWrite comprehensive unit tests. Cover normal, edge, error cases.") yield sse({"type": "response", "model": tester, "text": tests, "role": "tester"}) except Exception as e: yield sse({"type": "response", "model": tester, "text": str(e), "role": "error"}) def run_ladder(config): prompt, models = config["prompt"], config.get("models", []) levels = [ ("Child (5yo)", "Explain to a 5-year-old. Very simple words, short sentences, fun analogies."), ("Teenager", "Explain to a 15-year-old. Everyday language, relatable examples, some technical terms."), ("College Student", "College level. Proper terminology, theory, structured explanation."), ("Professional", "Professional level. Technical language, real-world applications, trade-offs."), ("PhD Expert", "PhD/expert level. Nuanced details, current research, math if relevant, edge cases."), ] yield sse({"type": "clear"}) for i, (name, instr) in enumerate(levels): m = models[i % len(models)] if models else "qwen2.5" yield sse({"type": "status", "message": f"Level {i+1}/5: {name} ({m})..."}) try: yield sse({"type": "response", "model": m, "text": query_model(m, f"{instr}\n\nQuestion: {prompt}"), "role": name}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) def run_tournament(config): prompt, models, judge = config["prompt"], config.get("models", []), config.get("judge", "qwen2.5") yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{len(models)} models competing..."}) responses = parallel_query(models, prompt) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": r, "role": "competitor"}) if len(responses) < 2: return yield sse({"type": "status", "message": f"{judge} ranking..."}) parts = [("Question:", prompt, 1), ("INSTRUCTION:", "Rank all from best to worst. Score 1-10 each. Then refine the winner into the ultimate answer.", 1)] for m, r in responses.items(): parts.append((f"[{m}]:", cap_response(r), 3)) jp = build_context(parts, judge) try: yield sse({"type": "response", "model": judge, "text": safe_query(judge, jp), "role": "verdict"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) # ─── NEW 7 MODES ────────────────────────────────────────────── def run_evolution(config): """Genetic algorithm: generate, score, breed, mutate across generations.""" prompt, models = config["prompt"], config.get("models", []) generations = config.get("generations", 3) judge = config.get("judge", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) if not models: return # Gen 0: each model generates an answer yield sse({"type": "status", "message": "Generation 0: spawning initial population..."}) population = parallel_query(models, prompt) for m, r in population.items(): yield sse({"type": "response", "model": m, "text": r, "role": "gen 0"}) for gen in range(1, generations + 1): # Fitness scoring yield sse({"type": "status", "message": f"Generation {gen}: fitness evaluation..."}) score_prompt = f"Question: {prompt}\n\nRate each answer 1-100. Return ONLY a JSON object like {{\"model_name\": score}}.\n\n" for m, r in population.items(): score_prompt += f"[{m}]: {r.strip()}\n\n" try: scores_raw = query_model(judge, score_prompt) yield sse({"type": "response", "model": judge, "text": scores_raw, "role": f"fitness gen {gen}"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}); continue # Breed: take top 2 answers, ask a model to combine them pop_list = list(population.items()) if len(pop_list) < 2: break parent1, parent2 = pop_list[0], pop_list[1] yield sse({"type": "status", "message": f"Generation {gen}: breeding + mutating..."}) new_population = {} for m in models: breed_prompt = ( f"Question: {prompt}\n\n" f"Parent A ({parent1[0]}):\n{parent1[1].strip()}\n\n" f"Parent B ({parent2[0]}):\n{parent2[1].strip()}\n\n" f"You are {m}. Breed these two answers: take the best parts of each parent, " f"combine them, then MUTATE by adding one novel insight or improvement. " f"Return your evolved answer." ) try: offspring = query_model(m, breed_prompt) new_population[m] = offspring is_last = gen == generations yield sse({"type": "response", "model": m, "text": offspring, "role": "final" if is_last else f"gen {gen}"}) except Exception as e: new_population[m] = population.get(m, "") yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) population = new_population def run_blindassembly(config): """Split question into parts, each model answers blind, assembler stitches.""" prompt, models = config["prompt"], config.get("models", []) assembler = config.get("assembler", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) if not models: return n = len(models) # Step 1: Decompose the question yield sse({"type": "status", "message": "Decomposing question into sub-tasks..."}) decompose_prompt = ( f"Split this question into exactly {n} independent sub-parts that together fully answer it. " f"Return ONLY a numbered list, one sub-question per line. No other text.\n\n" f"Question: {prompt}" ) try: parts_raw = query_model(assembler, decompose_prompt) yield sse({"type": "response", "model": assembler, "text": parts_raw, "role": "decomposer"}) except Exception as e: yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}); return # Parse parts parts = [line.strip() for line in parts_raw.strip().split("\n") if line.strip() and any(c.isalpha() for c in line)] while len(parts) < n: parts.append(f"Additional aspect of: {prompt}") parts = parts[:n] # Step 2: Each model answers their part BLIND yield sse({"type": "status", "message": f"Sending {n} sub-tasks to models (blind)..."}) fragments = {} with ThreadPoolExecutor(max_workers=n) as pool: futures = {} for i, m in enumerate(models): blind_prompt = ( f"Answer ONLY this specific sub-question. Do not address anything else.\n\n" f"Sub-question: {parts[i]}" ) futures[pool.submit(query_model, m, blind_prompt)] = (m, parts[i]) for future in as_completed(futures): m, part = futures[future] try: fragments[m] = {"part": part, "answer": future.result()} yield sse({"type": "response", "model": m, "text": f"SUB-TASK: {part}\n\nANSWER:\n{fragments[m]['answer']}", "role": "blind worker"}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) # Step 3: Assemble yield sse({"type": "status", "message": f"{assembler} assembling blind fragments..."}) assemble_prompt = f"Original question: {prompt}\n\nMultiple models each answered a sub-part WITHOUT seeing each other:\n\n" for m, data in fragments.items(): assemble_prompt += f"[{m}] (sub-task: {data['part']}):\n{data['answer'].strip()}\n\n" assemble_prompt += "Stitch these fragments into ONE coherent, complete answer. Fill any gaps. Remove contradictions." try: yield sse({"type": "response", "model": assembler, "text": query_model(assembler, assemble_prompt), "role": "assembler"}) except Exception as e: yield sse({"type": "response", "model": assembler, "text": str(e), "role": "error"}) def run_staircase(config): """Devil's Staircase: each round adds a new constraint.""" prompt = config["prompt"] answerer = config["answerer"] challenger = config["challenger"] steps = config.get("steps", 4) yield sse({"type": "clear"}) # Initial answer yield sse({"type": "status", "message": f"{answerer} answering..."}) try: current = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": current, "role": "initial answer"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return constraints = [] for s in range(steps): # Challenger adds a constraint yield sse({"type": "status", "message": f"Step {s+1}: {challenger} adding constraint..."}) constraint_prompt = ( f"Original question: {prompt}\n\n" f"Current answer:\n{current}\n\n" f"Existing constraints: {constraints if constraints else 'None yet'}\n\n" f"Add ONE new realistic constraint, complication, or edge case that the current answer doesn't handle. " f"Make it specific and challenging but plausible. State ONLY the new constraint, nothing else." ) try: new_constraint = query_model(challenger, constraint_prompt) constraints.append(new_constraint.strip()) yield sse({"type": "response", "model": challenger, "text": new_constraint, "role": f"constraint {s+1}"}) except Exception as e: yield sse({"type": "response", "model": challenger, "text": str(e), "role": "error"}); continue # Answerer must adapt yield sse({"type": "status", "message": f"Step {s+1}: {answerer} adapting..."}) adapt_prompt = ( f"Original question: {prompt}\n\n" f"ALL constraints you must satisfy:\n" + "\n".join(f" {i+1}. {c}" for i, c in enumerate(constraints)) + f"\n\nYour previous answer:\n{current}\n\n" f"Rewrite your answer to handle ALL constraints. Return the complete updated answer." ) try: current = query_model(answerer, adapt_prompt) is_last = s == steps - 1 yield sse({"type": "response", "model": answerer, "text": current, "role": "final" if is_last else f"adapted {s+1}"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}) def run_drift(config): """Same prompt N times to same model, analyze variance.""" prompt = config["prompt"] target = config["target"] samples = config.get("samples", 5) analyzer = config["analyzer"] yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"Sampling {target} {samples} times..."}) results = [] for i in range(samples): yield sse({"type": "status", "message": f"Sample {i+1}/{samples}..."}) try: r = query_model(target, prompt) results.append(r) yield sse({"type": "response", "model": target, "text": r, "role": f"sample {i+1}"}) except Exception as e: yield sse({"type": "response", "model": target, "text": str(e), "role": "error"}) if len(results) < 2: return # Analyze yield sse({"type": "status", "message": f"{analyzer} analyzing drift..."}) analysis_prompt = ( f"Question asked: {prompt}\n\n" f"The model '{target}' was asked this same question {len(results)} times. Here are all responses:\n\n" ) for i, r in enumerate(results): analysis_prompt += f"--- Sample {i+1} ---\n{r.strip()}\n\n" analysis_prompt += ( "DRIFT ANALYSIS:\n" "1. What claims/facts are CONSISTENT across all samples? (HIGH CONFIDENCE)\n" "2. What claims VARY between samples? (LOW CONFIDENCE - possible hallucination)\n" "3. What is completely CONTRADICTED between samples? (UNRELIABLE)\n" "4. Give an overall confidence score 1-10 for the model's answer to this question.\n" "5. Provide the 'true' answer using only high-confidence claims." ) try: yield sse({"type": "response", "model": analyzer, "text": query_model(analyzer, analysis_prompt), "role": "analyzer"}) except Exception as e: yield sse({"type": "response", "model": analyzer, "text": str(e), "role": "error"}) def run_mesh(config): """Each model answers as a different stakeholder.""" prompt, models = config["prompt"], config.get("models", []) synthesizer = config.get("synthesizer", models[0] if models else "qwen2.5") yield sse({"type": "clear"}) perspectives = [ ("CEO / Business Leader", "You are a CEO. Answer from a business strategy perspective: ROI, market impact, competitive advantage, risk."), ("Software Engineer", "You are a senior engineer. Answer from a technical perspective: architecture, implementation, scalability, tech debt."), ("End User / Customer", "You are an end user/customer. Answer from a usability perspective: experience, pain points, what you actually need."), ("Regulator / Legal", "You are a regulator/legal advisor. Answer from a compliance perspective: laws, regulations, liability, ethics, privacy."), ("Competitor", "You are a competitor analyzing this. What threats/opportunities does this create? What would you do differently?"), ] if not models: return responses = {} for i, (role_name, instruction) in enumerate(perspectives): m = models[i % len(models)] yield sse({"type": "status", "message": f"{role_name}: {m}..."}) try: r = query_model(m, f"{instruction}\n\nQuestion: {prompt}") responses[role_name] = (m, r) yield sse({"type": "response", "model": m, "text": r, "role": role_name}) except Exception as e: yield sse({"type": "response", "model": m, "text": str(e), "role": "error"}) # 360 synthesis yield sse({"type": "status", "message": f"{synthesizer} weaving 360-degree view..."}) syn = f"Question: {prompt}\n\nMultiple stakeholders gave their perspective:\n\n" for role, (m, r) in responses.items(): syn += f"[{role} ({m})]: {r.strip()}\n\n" syn += "Synthesize a 360-degree view that balances all stakeholder perspectives. Highlight tensions and trade-offs." try: yield sse({"type": "response", "model": synthesizer, "text": query_model(synthesizer, syn), "role": "mesh-360"}) except Exception as e: yield sse({"type": "response", "model": synthesizer, "text": str(e), "role": "error"}) def run_hallucination(config): """One answers, hunters verify each claim independently.""" prompt, answerer = config["prompt"], config["answerer"] hunters = config.get("hunters", []) yield sse({"type": "clear"}) # Get answer yield sse({"type": "status", "message": f"{answerer} answering..."}) try: answer = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": answer, "role": "answer"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return # Extract claims yield sse({"type": "status", "message": "Extracting factual claims..."}) extract_prompt = ( f"Extract every factual claim from this answer as a numbered list. Include specific facts, numbers, dates, " f"names, and cause-effect relationships. One claim per line.\n\nAnswer:\n{answer}" ) try: claims = query_model(answerer, extract_prompt) yield sse({"type": "response", "model": answerer, "text": claims, "role": "claims extracted"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return # Each hunter verifies independently yield sse({"type": "status", "message": f"{len(hunters)} hunters verifying claims..."}) hunt_prompt = ( f"Original question: {prompt}\n\n" f"An AI generated this answer:\n{answer}\n\n" f"Here are the extracted claims:\n{claims}\n\n" f"For EACH claim, verdict:\n" f" VERIFIED - you are confident this is correct\n" f" SUSPICIOUS - might be wrong or misleading\n" f" HALLUCINATED - this is likely made up or incorrect\n" f" UNVERIFIABLE - cannot determine from your knowledge\n" f"Explain your reasoning for suspicious/hallucinated claims." ) results = parallel_query(hunters, hunt_prompt) for m, r in results.items(): yield sse({"type": "response", "model": m, "text": r, "role": "hunter"}) def run_timeloop(config): """CHAOS MODE: answer -> catastrophe -> fix -> new catastrophe -> repeat.""" prompt = config["prompt"] answerer = config["answerer"] chaos = config["chaos"] loops = config.get("loops", 4) yield sse({"type": "clear"}) # Initial answer yield sse({"type": "status", "message": f"{answerer} answering (unaware of impending doom)..."}) try: current = query_model(answerer, prompt) yield sse({"type": "response", "model": answerer, "text": current, "role": "initial (doomed)"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}); return catastrophes = [] for i in range(loops): # Chaos agent creates a catastrophe yield sse({"type": "status", "message": f"Loop {i+1}: CHAOS AGENT unleashed..."}) chaos_prompt = ( f"Original question: {prompt}\n\n" f"Someone implemented this answer:\n{current}\n\n" f"Previous catastrophes that were already fixed: {catastrophes if catastrophes else 'None yet'}\n\n" f"You are a CHAOS AGENT. Describe a SPECIFIC, VIVID catastrophe that happened because of a flaw " f"in this answer. Be creative and dramatic but grounded in a real flaw. " f"Describe: 1) What went wrong 2) The cascading consequences 3) Who/what was affected. " f"Make it different from previous catastrophes. Be theatrical!" ) try: catastrophe = query_model(chaos, chaos_prompt) catastrophes.append(catastrophe.strip()[:200]) yield sse({"type": "response", "model": chaos, "text": catastrophe, "role": f"catastrophe {i+1}"}) except Exception as e: yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}); continue # Answerer must fix yield sse({"type": "status", "message": f"Loop {i+1}: {answerer} desperately fixing..."}) fix_prompt = ( f"Original question: {prompt}\n\n" f"Your previous answer:\n{current}\n\n" f"CATASTROPHE REPORT:\n{catastrophe}\n\n" f"ALL previous catastrophes you must also prevent:\n" + "\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) + f"\n\nRewrite your answer to prevent THIS catastrophe and ALL previous ones. " f"Your answer must be BULLETPROOF. Return the complete fixed answer." ) try: current = query_model(answerer, fix_prompt) is_last = i == loops - 1 yield sse({"type": "response", "model": answerer, "text": current, "role": "survivor" if is_last else f"fix {i+1}"}) except Exception as e: yield sse({"type": "response", "model": answerer, "text": str(e), "role": "error"}) # Final verdict from chaos agent yield sse({"type": "status", "message": f"{chaos} final inspection..."}) final_prompt = ( f"Original question: {prompt}\n\n" f"After {loops} catastrophes, the final answer is:\n{current}\n\n" f"All catastrophes it survived:\n" + "\n".join(f" {j+1}. {c}" for j, c in enumerate(catastrophes)) + f"\n\nAs the Chaos Agent, give your final verdict: Is this answer now truly bulletproof? " f"Rate its resilience 1-10. Can you find ONE MORE flaw? If not, admit defeat." ) try: yield sse({"type": "response", "model": chaos, "text": query_model(chaos, final_prompt), "role": "final judgment"}) except Exception as e: yield sse({"type": "response", "model": chaos, "text": str(e), "role": "error"}) # ─── AUTONOMOUS PIPELINES ───────────────────────────────────── def _save_pipeline(pipeline, topic, steps, result, models, start_ms): import time duration = int((time.time() * 1000) - start_ms) try: with get_db() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO pipeline_runs (pipeline, topic, status, steps, result, models_used, duration_ms, completed_at) VALUES (%s, %s, 'completed', %s, %s, %s, %s, NOW())""", (pipeline, topic, json.dumps(steps), json.dumps(result), list(set(models)), duration) ) conn.commit() except Exception as e: print(f"[DB] pipeline save error: {e}") def run_research(config): """Autonomous research pipeline: scout → parallel research → fact-check → synthesize.""" import time start = time.time() * 1000 prompt = config["prompt"] scout = config.get("scout", "llama3.2:latest") models = config.get("models", []) checker = config.get("checker", models[0] if models else scout) synth = config.get("synthesizer", models[0] if models else scout) num_q = min(config.get("num_questions", 5), 15) # hard cap at 15 yield sse({"type": "clear"}) total_steps = 4 steps = [] all_models = [scout, checker, synth] + models # Step 1: Scout generates research questions yield sse({"type": "progress", "step": 1, "total_steps": total_steps, "substep": f"{scout} generating {num_q} research questions...", "percent": 5}) yield sse({"type": "status", "message": f"Step 1/{total_steps}: {scout} generating {num_q} research questions..."}) try: q_prompt = ( f"You are a research scout. Given the topic below, generate exactly {num_q} specific, " f"diverse research questions that would build a comprehensive understanding. " f"Return ONLY a numbered list.\n\nTopic: {prompt}" ) questions_raw = query_model(scout, q_prompt) yield sse({"type": "response", "model": scout, "text": questions_raw, "role": "scout"}) steps.append({"step": "scout", "model": scout, "output": questions_raw}) except Exception as e: yield sse({"type": "response", "model": scout, "text": str(e), "role": "error"}) return # Parse questions questions = [l.strip() for l in questions_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)] questions = questions[:num_q] if not questions: yield sse({"type": "response", "model": "system", "text": "Failed to parse research questions.", "role": "error"}) return yield sse({"type": "progress", "step": 1, "total_steps": total_steps, "substep": f"Parsed {len(questions)} questions", "percent": 15}) # Step 2: Parallel research — distribute questions across models yield sse({"type": "progress", "step": 2, "total_steps": total_steps, "substep": f"0/{len(questions)} questions researched...", "percent": 18}) yield sse({"type": "status", "message": f"Step 2/{total_steps}: {len(models)} models researching {len(questions)} questions..."}) research_results = {} completed_q = 0 failed_q = 0 with ThreadPoolExecutor(max_workers=max(len(models), 1)) as pool: futures = {} for i, q in enumerate(questions): m = models[i % len(models)] if models else scout rp = f"Research this question thoroughly. Provide specific facts, data, and examples.\n\nQuestion: {q}" futures[pool.submit(query_model, m, rp)] = (m, q) for future in as_completed(futures): m, q = futures[future] try: answer = future.result() # Cap individual research answers to prevent context explosion if len(answer) > 8000: answer = answer[:7500] + "\n\n[... response truncated for pipeline stability ...]" research_results[q] = {"model": m, "answer": answer} completed_q += 1 pct = 18 + int((completed_q / len(questions)) * 42) yield sse({"type": "progress", "step": 2, "total_steps": total_steps, "substep": f"{completed_q}/{len(questions)} questions researched", "percent": pct}) yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\n{answer}", "role": "researcher"}) except Exception as e: failed_q += 1 completed_q += 1 pct = 18 + int((completed_q / len(questions)) * 42) yield sse({"type": "progress", "step": 2, "total_steps": total_steps, "substep": f"{completed_q}/{len(questions)} ({failed_q} failed)", "percent": pct}) yield sse({"type": "response", "model": m, "text": f"Q: {q}\n\nError: {e}", "role": "error"}) research_results[q] = {"model": m, "answer": f"Error: {e}"} steps.append({"step": "research", "results": {q: r["answer"][:500] for q, r in research_results.items()}}) # Step 3: Fact-check — cap context to prevent OOM yield sse({"type": "progress", "step": 3, "total_steps": total_steps, "substep": f"{checker} fact-checking...", "percent": 62}) yield sse({"type": "status", "message": f"Step 3/{total_steps}: {checker} fact-checking all findings..."}) check_prompt = f"Topic: {prompt}\n\nResearch findings to fact-check:\n\n" # Smart truncation: fit within context limits per_answer_cap = min(300, 3000 // max(len(research_results), 1)) for q, r in research_results.items(): if r["answer"].startswith("Error:"): continue check_prompt += f"Q: {q}\nA: {r['answer'][:per_answer_cap]}\n\n" check_prompt += ( "For each finding, mark as:\n" " VERIFIED — likely accurate\n" " UNCERTAIN — may be wrong or outdated\n" " FLAGGED — likely inaccurate\n" "Be specific about what's wrong with flagged items." ) try: check_result = query_model(checker, check_prompt) yield sse({"type": "response", "model": checker, "text": check_result, "role": "fact-checker"}) steps.append({"step": "fact-check", "model": checker, "output": check_result[:1000]}) except Exception as e: check_result = f"Error: {e}" yield sse({"type": "response", "model": checker, "text": str(e), "role": "error"}) # Step 4: Synthesize into brief — cap context yield sse({"type": "progress", "step": 4, "total_steps": total_steps, "substep": f"{synth} synthesizing brief...", "percent": 80}) yield sse({"type": "status", "message": f"Step 4/{total_steps}: {synth} synthesizing research brief..."}) synth_prompt = f"Topic: {prompt}\n\nResearch findings:\n\n" per_synth_cap = min(400, 4000 // max(len(research_results), 1)) for q, r in research_results.items(): if r["answer"].startswith("Error:"): synth_prompt += f"Q: {q}\nA: [research failed]\n\n" else: synth_prompt += f"Q: {q}\nA: {r['answer'][:per_synth_cap]}\n\n" synth_prompt += f"\nFact-check notes:\n{check_result[:500]}\n\n" synth_prompt += ( "Synthesize ALL findings into a structured research brief with these sections:\n" "1. EXECUTIVE SUMMARY (2-3 sentences)\n" "2. KEY FINDINGS (bulleted list)\n" "3. DETAILED ANALYSIS (organized by theme)\n" "4. UNCERTAINTIES & GAPS (what needs more research)\n" "5. RECOMMENDATIONS (actionable next steps)\n" "Be comprehensive but concise." ) try: brief = query_model(synth, synth_prompt) yield sse({"type": "response", "model": synth, "text": brief, "role": "synthesis"}) steps.append({"step": "synthesis", "model": synth, "output": brief[:2000]}) except Exception as e: brief = f"Error: {e}" yield sse({"type": "response", "model": synth, "text": str(e), "role": "error"}) yield sse({"type": "progress", "step": 4, "total_steps": total_steps, "substep": "Research complete", "percent": 100}) # Save pipeline run _save_pipeline("research", prompt, steps, {"brief": brief, "questions": questions, "fact_check": check_result[:1000]}, all_models, start) def run_eval(config): """Model evaluation pipeline: same prompts → all models → judge scores → leaderboard.""" import time start = time.time() * 1000 prompt = config["prompt"] models = config.get("models", []) judge = config.get("judge", models[0] if models else "qwen2.5:latest") eval_type = config.get("eval_type", "general") rounds = config.get("rounds", 3) yield sse({"type": "clear"}) steps = [] all_models = models + [judge] # Generate eval prompts based on type yield sse({"type": "status", "message": f"Generating {rounds} {eval_type} evaluation prompts..."}) gen_prompt = ( f"Generate exactly {rounds} evaluation prompts for testing LLM capability in: {eval_type}.\n" f"Context/focus area: {prompt}\n\n" f"Each prompt should test a different aspect. Return ONLY a numbered list of prompts, nothing else." ) try: prompts_raw = query_model(judge, gen_prompt) yield sse({"type": "response", "model": judge, "text": prompts_raw, "role": "prompt generator"}) except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) return eval_prompts = [l.strip() for l in prompts_raw.strip().split("\n") if l.strip() and any(c.isalpha() for c in l)] eval_prompts = eval_prompts[:rounds] if not eval_prompts: yield sse({"type": "response", "model": "system", "text": "Failed to generate eval prompts.", "role": "error"}) return # Run each prompt against all models scores = {m: [] for m in models} for ri, ep in enumerate(eval_prompts): yield sse({"type": "status", "message": f"Round {ri+1}/{len(eval_prompts)}: Testing {len(models)} models..."}) # All models answer in parallel responses = parallel_query(models, ep) for m, r in responses.items(): yield sse({"type": "response", "model": m, "text": f"[Round {ri+1}] {ep[:80]}...\n\n{r}", "role": f"round {ri+1}"}) # Judge scores all responses yield sse({"type": "status", "message": f"Round {ri+1}: Judging..."}) judge_prompt = ( f"Evaluation prompt: {ep}\n\n" f"Score each model's response 1-10 on: accuracy, completeness, clarity, reasoning.\n" f"Return a JSON object: {{\"model_name\": {{\"score\": N, \"notes\": \"brief note\"}}}}.\n\n" ) for m, r in responses.items(): judge_prompt += f"[{m}]:\n{r[:500]}\n\n" try: judgment = query_model(judge, judge_prompt) yield sse({"type": "response", "model": judge, "text": judgment, "role": f"judge round {ri+1}"}) # Try to parse scores try: import re # Find numbers after model names for m in models: # Look for score patterns near model name pattern = re.escape(m) + r'.*?["\s:]+(\d+)' match = re.search(pattern, judgment, re.IGNORECASE | re.DOTALL) if match: scores[m].append(int(match.group(1))) except Exception: pass except Exception as e: yield sse({"type": "response", "model": judge, "text": str(e), "role": "error"}) steps.append({"round": ri+1, "prompt": ep, "responses": {m: r[:300] for m, r in responses.items()}}) # Final leaderboard yield sse({"type": "status", "message": "Generating leaderboard..."}) leaderboard = [] for m in models: avg = sum(scores[m]) / len(scores[m]) if scores[m] else 0 leaderboard.append({"model": m, "avg_score": round(avg, 1), "rounds": len(scores[m]), "scores": scores[m]}) leaderboard.sort(key=lambda x: x["avg_score"], reverse=True) board_text = f"LEADERBOARD — {eval_type.upper()} ({len(eval_prompts)} rounds)\n{'='*50}\n\n" for i, entry in enumerate(leaderboard): medal = ["1st", "2nd", "3rd"][i] if i < 3 else f"{i+1}th" bar = "#" * int(entry["avg_score"]) board_text += f" {medal} {entry['model']:<30} {entry['avg_score']:>4}/10 {bar}\n" if entry["scores"]: board_text += f" Round scores: {entry['scores']}\n\n" yield sse({"type": "response", "model": judge, "text": board_text, "role": "final"}) _save_pipeline("eval", prompt, steps, {"leaderboard": leaderboard, "eval_type": eval_type}, all_models, start) def run_extract(config): """Knowledge extraction pipeline: chunk text → extract facts → verify → structured output.""" import time start = time.time() * 1000 prompt = config["prompt"] extractor = config.get("extractor", "qwen2.5:latest") verifier = config.get("verifier", "gemma2:latest") source = config.get("source", "prompt") yield sse({"type": "clear"}) steps = [] all_models = [extractor, verifier] # Get source text source_text = prompt if source != "prompt": file_map = { "ontology": "/home/profit/ONTOLOGY.md", "index": "/home/profit/INDEX.md", "summaries": "/home/profit/SUMMARIES.md", "guides": "/home/profit/GUIDES.md", } fpath = file_map.get(source) if fpath and os.path.exists(fpath): yield sse({"type": "status", "message": f"Reading {source}..."}) with open(fpath) as f: source_text = f.read()[:15000] # limit to ~15K chars yield sse({"type": "response", "model": "system", "text": f"Loaded {source} ({len(source_text)} chars)", "role": "source"}) else: yield sse({"type": "response", "model": "system", "text": f"File not found: {source}", "role": "error"}) return # Chunk if too long chunks = [] chunk_size = 4000 for i in range(0, len(source_text), chunk_size): chunks.append(source_text[i:i+chunk_size]) yield sse({"type": "status", "message": f"Processing {len(chunks)} chunk(s) with {extractor}..."}) all_facts = [] all_entities = [] all_relations = [] for ci, chunk in enumerate(chunks): yield sse({"type": "status", "message": f"Extracting from chunk {ci+1}/{len(chunks)}..."}) extract_prompt = ( f"Extract structured knowledge from this text. Return a JSON object with:\n" f" \"facts\": [\"fact 1\", \"fact 2\", ...],\n" f" \"entities\": [{{\"name\": \"...\", \"type\": \"...\", \"description\": \"...\"}}, ...],\n" f" \"relationships\": [{{\"from\": \"...\", \"to\": \"...\", \"type\": \"...\"}}, ...]\n\n" f"Be thorough. Extract EVERY factual claim, named entity, and relationship.\n\n" f"Text:\n{chunk}" ) try: result = query_model(extractor, extract_prompt) yield sse({"type": "response", "model": extractor, "text": result, "role": f"extraction {ci+1}"}) # Try to parse JSON from response try: import re json_match = re.search(r'\{[\s\S]*\}', result) if json_match: parsed = json.loads(json_match.group()) all_facts.extend(parsed.get("facts", [])) all_entities.extend(parsed.get("entities", [])) all_relations.extend(parsed.get("relationships", [])) except Exception: all_facts.append(result[:500]) except Exception as e: yield sse({"type": "response", "model": extractor, "text": str(e), "role": "error"}) steps.append({"step": "extraction", "facts": len(all_facts), "entities": len(all_entities), "relations": len(all_relations)}) # Verify key facts yield sse({"type": "status", "message": f"{verifier} verifying {len(all_facts)} facts..."}) facts_sample = all_facts[:20] # verify up to 20 verify_prompt = ( f"Verify these extracted facts. For each, mark CORRECT, INCORRECT, or UNVERIFIABLE.\n" f"If incorrect, provide the correction.\n\n" ) for i, f in enumerate(facts_sample): fact_str = f if isinstance(f, str) else json.dumps(f) verify_prompt += f"{i+1}. {fact_str}\n" try: verification = query_model(verifier, verify_prompt) yield sse({"type": "response", "model": verifier, "text": verification, "role": "verifier"}) steps.append({"step": "verification", "model": verifier, "output": verification[:1000]}) except Exception as e: verification = str(e) yield sse({"type": "response", "model": verifier, "text": str(e), "role": "error"}) # Summary summary = ( f"KNOWLEDGE EXTRACTION SUMMARY\n{'='*40}\n\n" f"Source: {source}\n" f"Facts extracted: {len(all_facts)}\n" f"Entities found: {len(all_entities)}\n" f"Relationships mapped: {len(all_relations)}\n\n" f"TOP ENTITIES:\n" ) for e in all_entities[:15]: if isinstance(e, dict): summary += f" [{e.get('type','?')}] {e.get('name','?')} — {e.get('description','')[:60]}\n" summary += f"\nTOP RELATIONSHIPS:\n" for r in all_relations[:15]: if isinstance(r, dict): summary += f" {r.get('from','?')} --[{r.get('type','?')}]--> {r.get('to','?')}\n" yield sse({"type": "response", "model": "system", "text": summary, "role": "final"}) result_data = { "facts": all_facts[:100], "entities": all_entities[:50], "relationships": all_relations[:50], "verification": verification[:1000], "source": source, } _save_pipeline("extract", prompt or source, steps, result_data, all_models, start) # ─── AI SECURITY SENTINEL ───────────────────────────────────── SENTINEL_LOG = "/var/log/llm-team-sentinel.log" SENTINEL_MODEL = "qwen2.5:latest" SENTINEL_INTERVAL = 300 # 5 minutes _sentinel_last_pos = 0 _sentinel_results = [] # last 50 analyses _sentinel_stats = {"scans": 0, "bans": 0, "last_run": None, "last_error": None, "next_scan_ts": 0} def _sentinel_log_entry(msg): """Write to sentinel log file.""" try: with open(SENTINEL_LOG, "a") as f: f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {msg}\n") except Exception: pass def _sentinel_scan(): """Read new security log entries and analyze with local AI.""" global _sentinel_last_pos import subprocess, collections _sentinel_stats["last_run"] = time.strftime("%Y-%m-%d %H:%M:%S") _sentinel_stats["last_run_ts"] = time.time() _sentinel_stats["scans"] += 1 # Read new lines since last scan try: with open("/var/log/llm-team-security.log") as f: f.seek(0, 2) # end of file file_size = f.tell() if _sentinel_last_pos > file_size: _sentinel_last_pos = 0 # log rotated f.seek(_sentinel_last_pos) new_lines = f.readlines() _sentinel_last_pos = f.tell() except Exception as e: _sentinel_stats["last_error"] = str(e) return if not new_lines: _sentinel_log_entry("SCAN_COMPLETE new_lines=0 action=none") return # Aggregate by IP ip_activity = collections.defaultdict(list) for line in new_lines: line = line.strip() if not line: continue ip = None for token in line.split(): if token.startswith("ip="): ip = token[3:] break if ip and not ip.startswith("192.168."): ip_activity[ip].append(line) if not ip_activity: _sentinel_log_entry(f"SCAN_COMPLETE new_lines={len(new_lines)} external_ips=0 action=none") return # Get currently banned IPs to skip banned = set() try: for jail in ["llm-team-exploit", "llm-team-login"]: result = subprocess.run(["fail2ban-client", "status", jail], capture_output=True, text=True, timeout=5) for line in result.stdout.split("\n"): if "Banned IP list" in line: for ip in line.split(":", 1)[1].strip().split(): banned.add(ip.strip()) except Exception: pass # Build analysis prompt for the AI analysis_items = [] for ip, lines in ip_activity.items(): if ip in banned: continue summary = f"IP {ip} ({len(lines)} events):\n" for l in lines[:8]: # cap at 8 lines per IP summary += f" {l}\n" analysis_items.append((ip, summary, lines)) if not analysis_items: _sentinel_log_entry(f"SCAN_COMPLETE new_lines={len(new_lines)} all_banned_or_lan action=none") return # Batch analysis prompt prompt = ( "You are an aggressive cybersecurity sentinel protecting a PRIVATE production web application. " "There is NO legitimate reason for unknown IPs to probe this server. " "Analyze these log entries and classify each IP. Respond with ONLY a JSON array:\n" '[{"ip": "x.x.x.x", "threat": "none|low|medium|high|critical", "action": "ignore|monitor|ban", ' '"reason": "brief reason", "attack_type": "scanner|bruteforce|exploit|bot|compromised_host|legitimate"}]\n\n' "RULES (follow strictly — err on the side of banning):\n" "- ANY probe for /.git, /.env, /wp-admin, /phpmyadmin, /xmlrpc.php, /admin.php, /config = BAN immediately\n" "- ANY probe for .env.production, .env.local, .env.development = BAN — this is targeted recon\n" "- Multiple different user agents from same IP = rotating scanner = BAN\n" "- HeadlessChrome, curl, python-requests doing probing = automated scanner = BAN\n" "- Failed logins >= 2 = BAN\n" "- /robots.txt or /favicon.ico ALONE from a known bot UA = ignore\n" "- Everything else = BAN if it looks automated, monitor if genuinely ambiguous\n" "- When in doubt, BAN. This is a private server.\n\n" "Log entries:\n\n" ) for ip, summary, _ in analysis_items[:15]: # max 15 IPs per scan prompt += summary + "\n" # Query local AI try: cfg = load_config() base = cfg["providers"]["ollama"].get("base_url", "http://localhost:11434") resp = requests.post(f"{base}/api/generate", json={ "model": SENTINEL_MODEL, "prompt": prompt, "stream": False, "options": {"num_ctx": 4096, "temperature": 0.1} }, timeout=60) resp.raise_for_status() ai_response = resp.json()["response"] except Exception as e: _sentinel_stats["last_error"] = f"AI query failed: {e}" _sentinel_log_entry(f"AI_ERROR error={e}") return # Parse AI response try: # Extract JSON from response (handle markdown code blocks) text = ai_response.strip() if "```" in text: text = text.split("```")[1] if text.startswith("json"): text = text[4:] # Find the JSON array start_idx = text.find("[") end_idx = text.rfind("]") + 1 if start_idx >= 0 and end_idx > start_idx: text = text[start_idx:end_idx] verdicts = json.loads(text) except Exception as e: _sentinel_stats["last_error"] = f"Parse failed: {e}" _sentinel_log_entry(f"PARSE_ERROR response={ai_response[:200]}") return # Execute actions ban_count = 0 for v in verdicts: ip = v.get("ip", "") action = v.get("action", "ignore") threat = v.get("threat", "low") reason = v.get("reason", "") attack_type = v.get("attack_type", "unknown") result_entry = { "ip": ip, "threat": threat, "action": action, "reason": reason, "attack_type": attack_type, "time": time.strftime("%Y-%m-%d %H:%M:%S") } _sentinel_results.append(result_entry) if len(_sentinel_results) > 50: _sentinel_results.pop(0) if action == "ban" and ip and not ip.startswith("192.168."): try: subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip], capture_output=True, text=True, timeout=5) ban_count += 1 sec_log.warning("AI_BAN ip=%s threat=%s reason=%s attack=%s", ip, threat, reason, attack_type) _sentinel_log_entry(f"AI_BAN ip={ip} threat={threat} reason={reason} attack_type={attack_type}") except Exception as e: _sentinel_log_entry(f"BAN_FAILED ip={ip} error={e}") else: _sentinel_log_entry(f"AI_VERDICT ip={ip} threat={threat} action={action} reason={reason} attack_type={attack_type}") _sentinel_stats["bans"] += ban_count _sentinel_log_entry(f"SCAN_COMPLETE new_lines={len(new_lines)} ips_analyzed={len(analysis_items)} verdicts={len(verdicts)} bans={ban_count}") def _sentinel_loop(): """Background loop running every SENTINEL_INTERVAL seconds.""" global _sentinel_last_pos # Start from end of file (only analyze new entries) try: with open("/var/log/llm-team-security.log") as f: f.seek(0, 2) _sentinel_last_pos = f.tell() except Exception: pass _sentinel_log_entry("SENTINEL_START model=" + SENTINEL_MODEL + " interval=" + str(SENTINEL_INTERVAL) + "s") while True: _sentinel_stats["next_scan_ts"] = time.time() + SENTINEL_INTERVAL time.sleep(SENTINEL_INTERVAL) try: _sentinel_scan() except Exception as e: _sentinel_stats["last_error"] = str(e) _sentinel_log_entry(f"SENTINEL_ERROR {e}") # API for sentinel status @app.route("/api/admin/sentinel") @admin_required def admin_sentinel_status(): now = time.time() next_ts = _sentinel_stats.get("next_scan_ts", 0) next_in = max(0, next_ts - now) return jsonify({ "stats": _sentinel_stats, "recent_verdicts": list(reversed(_sentinel_results[-20:])), "model": SENTINEL_MODEL, "interval": SENTINEL_INTERVAL, "next_scan_in": round(next_in, 1), "server_time": round(now, 1) }) # Start sentinel thread _sentinel_thread = threading.Thread(target=_sentinel_loop, daemon=True) _sentinel_thread.start() if __name__ == "__main__": print("\n LLM Team UI running at http://localhost:5000\n") print(f" AI Sentinel active: {SENTINEL_MODEL} scanning every {SENTINEL_INTERVAL}s\n") app.run(host="127.0.0.1", port=5000, debug=False, threaded=True)