diff --git a/llm_team_ui.py b/llm_team_ui.py index 303fb4e..c79232b 100644 --- a/llm_team_ui.py +++ b/llm_team_ui.py @@ -109,8 +109,24 @@ _original_sentinel_interval = None # stash the normal interval during high-aler def _track_violation(ip, event_type="unknown"): - """Record a security violation. If velocity threshold exceeded, auto-escalate.""" + """Record a security violation. If velocity threshold exceeded, auto-escalate. + + Cross-lineage scrum 2026-04-30 follow-up: OB-4's path-bypass for + admins was incomplete — _track_violation is called from 3 sites + (exploit_scan, rate_limit, login_fail) and only the exploit one + had the bypass. An admin hitting rate-limit + login-typo + a + legit URL containing 'UNION' within 60s could still self-ban + via the OTHER paths. Now is_allowlisted bails early so allowlisted + IPs never accumulate violations from ANY path. Defense in depth — + _auto_escalate also re-checks below. + + Eviction sweep when tracker grows >10K entries (same pattern as + _rate_limit; both had identical unbounded-dict WARNs).""" + if is_allowlisted(ip): + return False now = time.time() + if len(_violation_tracker) > 10000: + _evict_stale_violation_tracker(now) if ip not in _violation_tracker: _violation_tracker[ip] = [] _violation_tracker[ip].append(now) @@ -123,8 +139,26 @@ def _track_violation(ip, event_type="unknown"): return False +def _evict_stale_violation_tracker(now): + """Drop _violation_tracker entries whose newest timestamp is past + the window. Called from _track_violation only when dict exceeds + 10K — the hot path stays untouched for normal traffic.""" + cutoff = now - VELOCITY_WINDOW + stale = [ip for ip, ts in _violation_tracker.items() if not ts or max(ts) < cutoff] + for ip in stale: + del _violation_tracker[ip] + + def _auto_escalate(ip, violation_count, event_type): - """Auto-ban IP and switch sentinel to high-alert mode.""" + """Auto-ban IP and switch sentinel to high-alert mode. + + Defense in depth (2026-04-30): _track_violation already short- + circuits for allowlisted IPs, but if a future code path calls + _auto_escalate directly we still want the allowlist guard. Bail + early; nothing escalates against a trusted IP.""" + if is_allowlisted(ip): + sec_log.info("AUTO_ESCALATE_BLOCKED ip=%s — allowlisted, refused to ban", ip) + return global _original_sentinel_interval, SENTINEL_INTERVAL sec_log.warning("AUTO_ESCALATE ip=%s violations=%d/%ds type=%s", ip, violation_count, VELOCITY_WINDOW, event_type) _sentinel_log_entry(f"AUTO_ESCALATE ip={ip} violations={violation_count}/{VELOCITY_WINDOW}s type={event_type}") @@ -7600,7 +7634,16 @@ def _kill_connections(ip): pass def _nginx_ban(ip): - """Add IP to nginx deny list and reload.""" + """Add IP to nginx deny list and reload. + + Defense in depth (2026-04-30): refuse to write allowlisted IPs + to the deny list under ANY circumstance — even a buggy caller + that bypassed _track_violation's allowlist check. The deny list + is the last write before nginx reload; this is the last place + we can stop a bad ban.""" + if is_allowlisted(ip): + sec_log.info("NGINX_BAN_BLOCKED ip=%s — allowlisted, refused to write deny rule", ip) + return import subprocess try: line = f"deny {ip};\n" @@ -7642,8 +7685,13 @@ def admin_ban_ip(): action = data.get("action", "ban") if not ip: return jsonify({"error": "IP required"}), 400 - if ip.startswith("192.168."): - return jsonify({"error": "Cannot ban LAN addresses"}), 400 + # Defense in depth (2026-04-30): use the canonical ALLOWLIST_IPS + # check rather than a substring on "192.168." which would let + # 10.0.0.0/8 LANs and IPv6 loopback ::1 through. Same allowlist + # the auto-ban paths now respect — operator can't accidentally + # cut off their own LAN gateway. + if is_allowlisted(ip): + return jsonify({"error": f"refusing to ban allowlisted IP {ip} (in ALLOWLIST_IPS)"}), 400 try: if action == "ban": subprocess.run(["fail2ban-client", "set", "llm-team-exploit", "banip", ip],