diff --git a/llm_team_ui.py b/llm_team_ui.py index 8551bd0..6ee11dd 100644 --- a/llm_team_ui.py +++ b/llm_team_ui.py @@ -7,6 +7,8 @@ import time import threading import secrets import hashlib +import logging +import re import requests import random import psycopg2 @@ -19,6 +21,45 @@ from functools import wraps app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET", secrets.token_hex(32)) +# ─── SECURITY LOGGING ───────────────────────────────────────── +# Dedicated security log for fail2ban and audit trail +_sec_handler = logging.FileHandler("/var/log/llm-team-security.log") +_sec_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) +sec_log = logging.getLogger("security") +sec_log.addHandler(_sec_handler) +sec_log.setLevel(logging.WARNING) + +# ─── EMAIL ALERTS ────────────────────────────────────────────── +SMTP_HOST = os.environ.get("SMTP_HOST", "127.0.0.1") +SMTP_PORT = int(os.environ.get("SMTP_PORT", "1025")) +ALERT_FROM = os.environ.get("ALERT_FROM", "security@island37.com") +ALERT_TO = os.environ.get("ALERT_TO", "admin@island37.com") + +def send_security_alert(subject, body): + """Send security alert email (non-blocking).""" + def _send(): + try: + import smtplib + from email.message import EmailMessage + msg = EmailMessage() + msg["Subject"] = f"[LLM Team Security] {subject}" + msg["From"] = ALERT_FROM + msg["To"] = ALERT_TO + msg.set_content(body) + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as s: + s.send_message(msg) + except Exception as e: + sec_log.error("EMAIL_FAILED subject=%s error=%s", subject, str(e)) + threading.Thread(target=_send, daemon=True).start() + +# Known exploit paths that scanners probe +EXPLOIT_PATTERNS = re.compile( + r"(\.env|wp-admin|wp-login|phpmyadmin|\.git|/admin\.php|/config\.|" + r"\.asp|\.aspx|/cgi-bin|/shell|/eval|/exec|/passwd|/etc/shadow|" + r"\.\./|%2e%2e| (count, window_start) @@ -100,19 +141,34 @@ def admin_required(f): @app.before_request def security_checks(): - ip = request.remote_addr + ip = request.headers.get("X-Real-IP", request.remote_addr) + path = request.path + ua = request.headers.get("User-Agent", "") + + # Exploit scanner detection — log, alert, and block + if EXPLOIT_PATTERNS.search(path) or EXPLOIT_PATTERNS.search(request.query_string.decode("utf-8", errors="ignore")): + sec_log.warning("EXPLOIT_SCAN ip=%s path=%s ua=%s", ip, path, ua) + send_security_alert( + f"Exploit Scan from {ip}", + f"IP: {ip}\nPath: {path}\nUser-Agent: {ua}\nTime: {time.strftime('%Y-%m-%d %H:%M:%S')}" + ) + return "Not Found", 404 + # Rate limit (allowlisted IPs skip) if rate_limited(ip): + sec_log.warning("RATE_LIMITED ip=%s path=%s", ip, path) return jsonify({"error": "rate limited"}), 429 + # Always allow these - if request.path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"): + if path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"): return - if request.path.startswith("/static"): + if path.startswith("/static"): return + # In demo mode, block admin write routes for non-admins if is_demo() and not is_admin(): for route, methods in ADMIN_WRITE_ROUTES.items(): - if request.path == route and request.method in methods: + if path == route and request.method in methods: return jsonify({"error": "demo mode: admin settings are read-only", "demo": True}), 403 @@ -122,11 +178,98 @@ def security_headers(response): response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" + response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; connect-src 'self'" + response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=()" if request.path.startswith("/api/"): response.headers["Cache-Control"] = "no-store" return response +HONEYPOT_404_HTML = """ + +404 Not Found + + +
+

404

+

Page not found

+

The page you're looking for doesn't exist or has been moved.

+ Go to login +
+ + +
+
+ +""" + + +@app.errorhandler(404) +def page_not_found(e): + ip = request.headers.get("X-Real-IP", request.remote_addr) + ua = request.headers.get("User-Agent", "") + path = request.path + referer = request.headers.get("Referer", "") + method = request.method + accept_lang = request.headers.get("Accept-Language", "") + accept_enc = request.headers.get("Accept-Encoding", "") + + # Build fingerprint hash from request characteristics + fp_raw = f"{ua}|{accept_lang}|{accept_enc}|{request.headers.get('Connection','')}|{request.headers.get('DNT','')}" + fp_hash = hashlib.sha256(fp_raw.encode()).hexdigest()[:16] + + # Classify threat level + threat = "low" + if EXPLOIT_PATTERNS.search(path): + threat = "high" + elif any(s in path.lower() for s in ("/admin", "/config", "/api/", "/debug", "/console", "/server-status")): + threat = "medium" + elif not ua or "bot" in ua.lower() or "scanner" in ua.lower() or "nikto" in ua.lower() or "sqlmap" in ua.lower(): + threat = "high" + + sec_log.warning( + "404_HIT ip=%s fp=%s threat=%s method=%s path=%s referer=%s ua=%s", + ip, fp_hash, threat, method, path, referer, ua + ) + + html = HONEYPOT_404_HTML.replace("{{FINGERPRINT}}", fp_hash) + return html, 404 + + +@app.route("/api/fp", methods=["POST"]) +def fingerprint_collect(): + """Silent endpoint that collects browser fingerprint data from 404 pages.""" + ip = request.headers.get("X-Real-IP", request.remote_addr) + data = request.json or {} + ua = request.headers.get("User-Agent", "") + + # Build server-side fingerprint + fp_raw = f"{ua}|{request.headers.get('Accept-Language','')}|{request.headers.get('Accept-Encoding','')}|{request.headers.get('Connection','')}|{request.headers.get('DNT','')}" + fp_hash = hashlib.sha256(fp_raw.encode()).hexdigest()[:16] + + sec_log.warning( + "FINGERPRINT ip=%s fp=%s tz=%s lang=%s platform=%s cores=%s mem=%s touch=%s screen=%s dpr=%s plugins=%s webgl=%s", + ip, fp_hash, + data.get("tz", ""), data.get("lang", ""), data.get("plat", ""), + data.get("cores", ""), data.get("mem", ""), data.get("touch", ""), + data.get("screen", ""), data.get("dpr", ""), data.get("plugins", ""), + data.get("webgl", "") + ) + return "", 204 + + LOGIN_HTML = """ @@ -258,6 +401,11 @@ def auth_login(): cur.execute("SELECT * FROM users WHERE username = %s", (username,)) user = cur.fetchone() if not user or not bcrypt.checkpw(password.encode(), user["password_hash"].encode()): + sec_log.warning("LOGIN_FAILED ip=%s user=%s", ip, username) + send_security_alert( + f"Failed Login from {ip}", + f"IP: {ip}\nUsername attempted: {username}\nTime: {time.strftime('%Y-%m-%d %H:%M:%S')}" + ) return jsonify({"error": "Invalid credentials"}), 401 session["user_id"] = user["id"] @@ -288,7 +436,7 @@ def demo_status(): @app.route("/api/demo/toggle", methods=["POST"]) def demo_toggle(): - if not is_admin(): + if not session.get("user_id") or not is_admin(): return jsonify({"error": "admin only"}), 403 _demo_mode["active"] = not _demo_mode["active"] _demo_mode["started_by"] = session.get("username") if _demo_mode["active"] else None @@ -2994,8 +3142,21 @@ def lab_stream(eid): @app.route("/api/run", methods=["POST"]) @login_required def run_team(): + ip = request.remote_addr + if rate_limited(ip): + return jsonify({"error": "Rate limit exceeded. Wait a minute."}), 429 + config = request.json - mode = config["mode"] + if not config: + return jsonify({"error": "Request body required"}), 400 + + mode = config.get("mode", "") + if not mode: + return jsonify({"error": "Mode is required"}), 400 + + prompt = config.get("prompt", "").strip() + if not prompt: + return jsonify({"error": "Prompt cannot be empty"}), 400 RUNNERS = { "brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate, @@ -3066,7 +3227,12 @@ def run_pipeline(config): def run_debate(config): - prompt, d1, d2, judge = config["prompt"], config["debater1"], config["debater2"], config["judge"] + prompt = config.get("prompt", "") + d1, d2, judge = config.get("debater1"), config.get("debater2"), config.get("judge") + if not all([d1, d2, judge]): + yield sse({"type": "response", "model": "system", "text": "Debate mode requires 'debater1', 'debater2', and 'judge' model parameters.", "role": "error"}) + yield sse({"type": "done"}) + return rounds = config.get("rounds", 2) yield sse({"type": "clear"}) history = [] @@ -3143,7 +3309,12 @@ def run_roundrobin(config): def run_redteam(config): - prompt, author, attacker, patcher = config["prompt"], config["author"], config["attacker"], config["patcher"] + prompt = config.get("prompt", "") + author, attacker, patcher = config.get("author"), config.get("attacker"), config.get("patcher") + if not all([author, attacker, patcher]): + yield sse({"type": "response", "model": "system", "text": "Red team mode requires 'author', 'attacker', and 'patcher' model parameters.", "role": "error"}) + yield sse({"type": "done"}) + return rounds = config.get("rounds", 2) yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{author} writing..."}) @@ -3195,7 +3366,12 @@ def run_consensus(config): def run_codereview(config): - prompt, coder, reviewer, tester = config["prompt"], config["coder"], config["reviewer"], config["tester"] + prompt = config.get("prompt", "") + coder, reviewer, tester = config.get("coder"), config.get("reviewer"), config.get("tester") + if not all([coder, reviewer, tester]): + yield sse({"type": "response", "model": "system", "text": "Code review mode requires 'coder', 'reviewer', and 'tester' model parameters.", "role": "error"}) + yield sse({"type": "done"}) + return yield sse({"type": "clear"}) yield sse({"type": "status", "message": f"{coder} coding..."}) try: