#!/usr/bin/env python3 """ Sandbox Assertion ================= Verifies that execution targets are within sandbox boundaries. CRITICAL: This is a safety gate that prevents accidental production access. Part of Phase 3: Execution Pipeline - Preflight System. """ import json import subprocess import sys from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from typing import Optional class AssertionStatus(str, Enum): SAFE = "SAFE" # Definitely sandbox UNSAFE = "UNSAFE" # Definitely production UNKNOWN = "UNKNOWN" # Cannot determine OVERRIDE = "OVERRIDE" # Production with explicit approval @dataclass class AssertionResult: target: str status: AssertionStatus environment: str message: str risk_level: int # 0=none, 1=low, 2=medium, 3=high, 4=critical requires_approval: bool details: dict timestamp: str def to_dict(self) -> dict: return { "target": self.target, "status": self.status.value, "environment": self.environment, "message": self.message, "risk_level": self.risk_level, "requires_approval": self.requires_approval, "details": self.details, "timestamp": self.timestamp } class SandboxAsserter: """ Ensures execution stays within sandbox boundaries. This is a CRITICAL safety component that: - Identifies environment type from target name/IP/metadata - Blocks production access by default - Requires explicit approval for non-sandbox environments - Logs all assertions for audit """ # Environment detection patterns ENVIRONMENT_PATTERNS = { "production": [ "prod", "prd", "production", "live", "primary", "master", "main-db", "primary-db" ], "staging": [ "staging", "stg", "stage", "preprod", "pre-prod", "uat", "qa", "test-prod" ], "sandbox": [ "sandbox", "sbx", "dev", "development", "local", "test", "lab", "experiment", "demo" ] } # IP range classifications IP_CLASSIFICATIONS = { "10.77.0.0/24": "wireguard-internal", "10.0.0.0/8": "internal-sandbox", "172.16.0.0/12": "internal-sandbox", "192.168.0.0/16": "internal-sandbox", "0.0.0.0/0": "external-unknown" } # Risk levels by environment RISK_LEVELS = { "sandbox": 0, "development": 0, "staging": 2, "production": 4, "unknown": 3 } def __init__(self): self.vault_token = self._get_vault_token() def _get_vault_token(self) -> str: with open("/opt/vault/init-keys.json") as f: return json.load(f)["root_token"] def _now(self) -> str: return datetime.now(timezone.utc).isoformat() def _vault_read(self, path: str) -> Optional[dict]: """Read from Vault KV""" result = subprocess.run([ "curl", "-sk", "-H", f"X-Vault-Token: {self.vault_token}", f"https://127.0.0.1:8200/v1/secret/data/{path}" ], capture_output=True, text=True) try: data = json.loads(result.stdout) if "data" in data and "data" in data["data"]: return data["data"]["data"] except: pass return None def _detect_environment_from_name(self, target: str) -> tuple[str, float]: """Detect environment from target name. Returns (env, confidence)""" target_lower = target.lower() for env, patterns in self.ENVIRONMENT_PATTERNS.items(): for pattern in patterns: if pattern in target_lower: # Higher confidence for exact matches confidence = 0.95 if pattern == target_lower else 0.8 return env, confidence return "unknown", 0.3 def _check_pool_classification(self, target: str) -> Optional[str]: """Check pool classification from inventory""" inventory = self._vault_read("inventory/proxmox") if not inventory: return None for pool_name, pool_data in inventory.get("pools", {}).items(): nodes = pool_data.get("nodes", []) if target in nodes: # Classify based on pool name pool_lower = pool_name.lower() if "prod" in pool_lower: return "production" elif "staging" in pool_lower or "stg" in pool_lower: return "staging" elif "sandbox" in pool_lower or "dev" in pool_lower: return "sandbox" return None def assert_sandbox(self, target: str, allow_staging: bool = False, production_override: bool = False) -> AssertionResult: """ Assert that target is within sandbox boundaries. Args: target: Target name/identifier allow_staging: Allow staging environment (default: False) production_override: Explicit production approval (default: False) Returns: AssertionResult with safety determination """ timestamp = self._now() # Step 1: Check pool classification from Vault pool_env = self._check_pool_classification(target) # Step 2: Detect from name patterns name_env, name_confidence = self._detect_environment_from_name(target) # Step 3: Determine final environment if pool_env: environment = pool_env confidence = 0.99 # Pool classification is authoritative else: environment = name_env confidence = name_confidence risk_level = self.RISK_LEVELS.get(environment, 3) # Step 4: Make safety determination if environment == "sandbox" or environment == "development": return AssertionResult( target=target, status=AssertionStatus.SAFE, environment=environment, message=f"Target '{target}' is in SANDBOX environment - safe to proceed", risk_level=risk_level, requires_approval=False, details={ "pool_classification": pool_env, "name_detection": name_env, "confidence": confidence }, timestamp=timestamp ) elif environment == "staging": if allow_staging: return AssertionResult( target=target, status=AssertionStatus.SAFE, environment=environment, message=f"Target '{target}' is in STAGING - allowed by flag", risk_level=risk_level, requires_approval=False, details={ "pool_classification": pool_env, "name_detection": name_env, "confidence": confidence, "staging_allowed": True }, timestamp=timestamp ) else: return AssertionResult( target=target, status=AssertionStatus.UNSAFE, environment=environment, message=f"Target '{target}' is in STAGING - requires --allow-staging flag", risk_level=risk_level, requires_approval=True, details={ "pool_classification": pool_env, "name_detection": name_env, "confidence": confidence, "staging_allowed": False }, timestamp=timestamp ) elif environment == "production": if production_override: return AssertionResult( target=target, status=AssertionStatus.OVERRIDE, environment=environment, message=f"PRODUCTION OVERRIDE: Target '{target}' access with explicit approval", risk_level=risk_level, requires_approval=False, # Already approved details={ "pool_classification": pool_env, "name_detection": name_env, "confidence": confidence, "production_override": True, "WARNING": "PRODUCTION ACCESS ENABLED" }, timestamp=timestamp ) else: return AssertionResult( target=target, status=AssertionStatus.UNSAFE, environment=environment, message=f"BLOCKED: Target '{target}' is in PRODUCTION - requires explicit approval", risk_level=risk_level, requires_approval=True, details={ "pool_classification": pool_env, "name_detection": name_env, "confidence": confidence, "production_override": False, "BLOCKED": "Production access denied" }, timestamp=timestamp ) else: # unknown return AssertionResult( target=target, status=AssertionStatus.UNKNOWN, environment=environment, message=f"UNKNOWN: Cannot determine environment for '{target}'", risk_level=risk_level, requires_approval=True, details={ "pool_classification": pool_env, "name_detection": name_env, "confidence": confidence, "WARNING": "Unable to classify - treating as high risk" }, timestamp=timestamp ) def assert_all_targets(self, targets: list[str], allow_staging: bool = False, production_override: bool = False) -> dict: """Assert sandbox for multiple targets""" report = { "report_type": "sandbox_assertion", "timestamp": self._now(), "settings": { "allow_staging": allow_staging, "production_override": production_override }, "results": [], "summary": { "total": len(targets), "safe": 0, "unsafe": 0, "unknown": 0, "override": 0, "max_risk_level": 0 }, "can_proceed": True, "blocked_targets": [] } for target in targets: result = self.assert_sandbox(target, allow_staging, production_override) report["results"].append(result.to_dict()) if result.status == AssertionStatus.SAFE: report["summary"]["safe"] += 1 elif result.status == AssertionStatus.UNSAFE: report["summary"]["unsafe"] += 1 report["can_proceed"] = False report["blocked_targets"].append(target) elif result.status == AssertionStatus.UNKNOWN: report["summary"]["unknown"] += 1 report["can_proceed"] = False report["blocked_targets"].append(target) elif result.status == AssertionStatus.OVERRIDE: report["summary"]["override"] += 1 report["summary"]["max_risk_level"] = max( report["summary"]["max_risk_level"], result.risk_level ) return report # ============================================================================= # CLI # ============================================================================= if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Sandbox Assertion Check") parser.add_argument("targets", nargs="+", help="Target nodes to assert") parser.add_argument("--allow-staging", action="store_true", help="Allow staging environment targets") parser.add_argument("--production-override", action="store_true", help="DANGEROUS: Override production block with explicit approval") parser.add_argument("--json", action="store_true", help="Output JSON") args = parser.parse_args() asserter = SandboxAsserter() report = asserter.assert_all_targets( args.targets, allow_staging=args.allow_staging, production_override=args.production_override ) if args.json: print(json.dumps(report, indent=2)) else: print("\n" + "=" * 60) print("SANDBOX ASSERTION REPORT") print("=" * 60) print(f"Timestamp: {report['timestamp']}") print(f"Allow Staging: {report['settings']['allow_staging']}") print(f"Production Override: {report['settings']['production_override']}") print() risk_icons = ["[SAFE]", "[LOW]", "[MED]", "[HIGH]", "[CRIT]"] for result in report["results"]: status_icon = { "SAFE": "[OK]", "UNSAFE": "[BLOCKED]", "UNKNOWN": "[???]", "OVERRIDE": "[OVERRIDE]" }.get(result["status"], "[?]") risk_icon = risk_icons[min(result["risk_level"], 4)] print(f"Target: {result['target']}") print(f" Status: {status_icon} {result['status']}") print(f" Environment: {result['environment']}") print(f" Risk Level: {risk_icon} ({result['risk_level']}/4)") print(f" Message: {result['message']}") if result["requires_approval"]: print(f" !!! REQUIRES APPROVAL !!!") print() print("-" * 60) print("SUMMARY") print("-" * 60) s = report["summary"] print(f" Total Targets: {s['total']}") print(f" Safe: {s['safe']}") print(f" Unsafe (blocked): {s['unsafe']}") print(f" Unknown: {s['unknown']}") print(f" Override: {s['override']}") print(f" Max Risk Level: {s['max_risk_level']}/4") print() if report["blocked_targets"]: print(f"BLOCKED TARGETS: {report['blocked_targets']}") print() if report["can_proceed"]: print("[OK] SANDBOX ASSERTION PASSED - All targets are safe") else: print("[BLOCKED] SANDBOX ASSERTION FAILED - Cannot proceed") print(" Some targets are outside sandbox boundaries") print("=" * 60) sys.exit(0 if report["can_proceed"] else 1)