Fix IP banning: nginx deny list + connection kill for instant enforcement

fail2ban was using nftables action while UFW uses iptables-nft, so bans
were recorded but never enforced. Added three-layer ban enforcement:
1. nginx deny list (/etc/nginx/banned_ips.conf) for instant 403
2. ss -K to kill existing TCP connections on ban
3. Auto-sync nginx deny file on ban/unban (manual, mass, AI sentinel)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
root 2026-03-28 13:05:49 -05:00
parent eea8ff46db
commit 411040f206

View File

@ -4826,6 +4826,49 @@ def admin_security_data():
return jsonify({"ips": result[:100], "total_banned": len(banned), "banned_list": sorted(banned)})
_NGINX_BAN_FILE = "/etc/nginx/banned_ips.conf"
def _kill_connections(ip):
"""Kill existing TCP connections from an IP so bans take effect instantly."""
import subprocess
try:
subprocess.run(["ss", "-K", "dst", ip], capture_output=True, text=True, timeout=5)
except Exception:
pass
def _nginx_ban(ip):
"""Add IP to nginx deny list and reload."""
import subprocess
try:
line = f"deny {ip};\n"
try:
with open(_NGINX_BAN_FILE) as f:
if line in f.read():
return
except FileNotFoundError:
pass
with open(_NGINX_BAN_FILE, "a") as f:
f.write(line)
subprocess.run(["systemctl", "reload", "nginx"], capture_output=True, timeout=5)
except Exception:
pass
def _nginx_unban(ip):
"""Remove IP from nginx deny list and reload."""
import subprocess
try:
with open(_NGINX_BAN_FILE) as f:
lines = f.readlines()
line = f"deny {ip};\n"
if line in lines:
lines.remove(line)
with open(_NGINX_BAN_FILE, "w") as f:
f.writelines(lines)
subprocess.run(["systemctl", "reload", "nginx"], capture_output=True, timeout=5)
except Exception:
pass
@app.route("/api/admin/security/ban", methods=["POST"])
@admin_required
def admin_ban_ip():
@ -4842,12 +4885,15 @@ def admin_ban_ip():
if action == "ban":
subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip],
capture_output=True, text=True, timeout=5)
_nginx_ban(ip)
_kill_connections(ip)
sec_log.warning("MANUAL_BAN ip=%s by=%s", ip, session.get("username", "admin"))
return jsonify({"ok": True, "message": f"Banned {ip}"})
elif action == "unban":
for jail in ["llm-team-exploit", "llm-team-login", "nginx-botsearch", "nginx-bad-request", "nginx-forbidden"]:
subprocess.run(["fail2ban-client", "set", jail, "unbanip", ip],
capture_output=True, text=True, timeout=5)
_nginx_unban(ip)
sec_log.warning("MANUAL_UNBAN ip=%s by=%s", ip, session.get("username", "admin"))
return jsonify({"ok": True, "message": f"Unbanned {ip}"})
except Exception as e:
@ -5101,11 +5147,14 @@ def admin_mass_ban():
if action == "ban":
subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip],
capture_output=True, text=True, timeout=5)
_nginx_ban(ip)
_kill_connections(ip)
sec_log.warning("MASS_BAN ip=%s by=%s", ip, session.get("username", "admin"))
elif action == "unban":
for jail in ["llm-team-exploit", "llm-team-login", "nginx-botsearch", "nginx-bad-request", "nginx-forbidden"]:
subprocess.run(["fail2ban-client", "set", jail, "unbanip", ip],
capture_output=True, text=True, timeout=5)
_nginx_unban(ip)
sec_log.warning("MASS_UNBAN ip=%s by=%s", ip, session.get("username", "admin"))
results["success"] += 1
except Exception:
@ -8017,6 +8066,8 @@ def _sentinel_scan():
try:
subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip],
capture_output=True, text=True, timeout=5)
_nginx_ban(ip)
_kill_connections(ip)
ban_count += 1
sec_log.warning("AI_BAN ip=%s threat=%s reason=%s attack=%s", ip, threat, reason, attack_type)
_sentinel_log_entry(f"AI_BAN ip={ip} threat={threat} reason={reason} attack_type={attack_type}")