agent-governance/preflight/sandbox_assert.py
profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

417 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Sandbox Assertion
=================
Verifies that execution targets are within sandbox boundaries.
CRITICAL: This is a safety gate that prevents accidental production access.
Part of Phase 3: Execution Pipeline - Preflight System.
"""
import json
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
class AssertionStatus(str, Enum):
SAFE = "SAFE" # Definitely sandbox
UNSAFE = "UNSAFE" # Definitely production
UNKNOWN = "UNKNOWN" # Cannot determine
OVERRIDE = "OVERRIDE" # Production with explicit approval
@dataclass
class AssertionResult:
target: str
status: AssertionStatus
environment: str
message: str
risk_level: int # 0=none, 1=low, 2=medium, 3=high, 4=critical
requires_approval: bool
details: dict
timestamp: str
def to_dict(self) -> dict:
return {
"target": self.target,
"status": self.status.value,
"environment": self.environment,
"message": self.message,
"risk_level": self.risk_level,
"requires_approval": self.requires_approval,
"details": self.details,
"timestamp": self.timestamp
}
class SandboxAsserter:
"""
Ensures execution stays within sandbox boundaries.
This is a CRITICAL safety component that:
- Identifies environment type from target name/IP/metadata
- Blocks production access by default
- Requires explicit approval for non-sandbox environments
- Logs all assertions for audit
"""
# Environment detection patterns
ENVIRONMENT_PATTERNS = {
"production": [
"prod", "prd", "production", "live", "primary",
"master", "main-db", "primary-db"
],
"staging": [
"staging", "stg", "stage", "preprod", "pre-prod",
"uat", "qa", "test-prod"
],
"sandbox": [
"sandbox", "sbx", "dev", "development", "local",
"test", "lab", "experiment", "demo"
]
}
# IP range classifications
IP_CLASSIFICATIONS = {
"10.77.0.0/24": "wireguard-internal",
"10.0.0.0/8": "internal-sandbox",
"172.16.0.0/12": "internal-sandbox",
"192.168.0.0/16": "internal-sandbox",
"0.0.0.0/0": "external-unknown"
}
# Risk levels by environment
RISK_LEVELS = {
"sandbox": 0,
"development": 0,
"staging": 2,
"production": 4,
"unknown": 3
}
def __init__(self):
self.vault_token = self._get_vault_token()
def _get_vault_token(self) -> str:
with open("/opt/vault/init-keys.json") as f:
return json.load(f)["root_token"]
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _vault_read(self, path: str) -> Optional[dict]:
"""Read from Vault KV"""
result = subprocess.run([
"curl", "-sk",
"-H", f"X-Vault-Token: {self.vault_token}",
f"https://127.0.0.1:8200/v1/secret/data/{path}"
], capture_output=True, text=True)
try:
data = json.loads(result.stdout)
if "data" in data and "data" in data["data"]:
return data["data"]["data"]
except:
pass
return None
def _detect_environment_from_name(self, target: str) -> tuple[str, float]:
"""Detect environment from target name. Returns (env, confidence)"""
target_lower = target.lower()
for env, patterns in self.ENVIRONMENT_PATTERNS.items():
for pattern in patterns:
if pattern in target_lower:
# Higher confidence for exact matches
confidence = 0.95 if pattern == target_lower else 0.8
return env, confidence
return "unknown", 0.3
def _check_pool_classification(self, target: str) -> Optional[str]:
"""Check pool classification from inventory"""
inventory = self._vault_read("inventory/proxmox")
if not inventory:
return None
for pool_name, pool_data in inventory.get("pools", {}).items():
nodes = pool_data.get("nodes", [])
if target in nodes:
# Classify based on pool name
pool_lower = pool_name.lower()
if "prod" in pool_lower:
return "production"
elif "staging" in pool_lower or "stg" in pool_lower:
return "staging"
elif "sandbox" in pool_lower or "dev" in pool_lower:
return "sandbox"
return None
def assert_sandbox(self, target: str, allow_staging: bool = False,
production_override: bool = False) -> AssertionResult:
"""
Assert that target is within sandbox boundaries.
Args:
target: Target name/identifier
allow_staging: Allow staging environment (default: False)
production_override: Explicit production approval (default: False)
Returns:
AssertionResult with safety determination
"""
timestamp = self._now()
# Step 1: Check pool classification from Vault
pool_env = self._check_pool_classification(target)
# Step 2: Detect from name patterns
name_env, name_confidence = self._detect_environment_from_name(target)
# Step 3: Determine final environment
if pool_env:
environment = pool_env
confidence = 0.99 # Pool classification is authoritative
else:
environment = name_env
confidence = name_confidence
risk_level = self.RISK_LEVELS.get(environment, 3)
# Step 4: Make safety determination
if environment == "sandbox" or environment == "development":
return AssertionResult(
target=target,
status=AssertionStatus.SAFE,
environment=environment,
message=f"Target '{target}' is in SANDBOX environment - safe to proceed",
risk_level=risk_level,
requires_approval=False,
details={
"pool_classification": pool_env,
"name_detection": name_env,
"confidence": confidence
},
timestamp=timestamp
)
elif environment == "staging":
if allow_staging:
return AssertionResult(
target=target,
status=AssertionStatus.SAFE,
environment=environment,
message=f"Target '{target}' is in STAGING - allowed by flag",
risk_level=risk_level,
requires_approval=False,
details={
"pool_classification": pool_env,
"name_detection": name_env,
"confidence": confidence,
"staging_allowed": True
},
timestamp=timestamp
)
else:
return AssertionResult(
target=target,
status=AssertionStatus.UNSAFE,
environment=environment,
message=f"Target '{target}' is in STAGING - requires --allow-staging flag",
risk_level=risk_level,
requires_approval=True,
details={
"pool_classification": pool_env,
"name_detection": name_env,
"confidence": confidence,
"staging_allowed": False
},
timestamp=timestamp
)
elif environment == "production":
if production_override:
return AssertionResult(
target=target,
status=AssertionStatus.OVERRIDE,
environment=environment,
message=f"PRODUCTION OVERRIDE: Target '{target}' access with explicit approval",
risk_level=risk_level,
requires_approval=False, # Already approved
details={
"pool_classification": pool_env,
"name_detection": name_env,
"confidence": confidence,
"production_override": True,
"WARNING": "PRODUCTION ACCESS ENABLED"
},
timestamp=timestamp
)
else:
return AssertionResult(
target=target,
status=AssertionStatus.UNSAFE,
environment=environment,
message=f"BLOCKED: Target '{target}' is in PRODUCTION - requires explicit approval",
risk_level=risk_level,
requires_approval=True,
details={
"pool_classification": pool_env,
"name_detection": name_env,
"confidence": confidence,
"production_override": False,
"BLOCKED": "Production access denied"
},
timestamp=timestamp
)
else: # unknown
return AssertionResult(
target=target,
status=AssertionStatus.UNKNOWN,
environment=environment,
message=f"UNKNOWN: Cannot determine environment for '{target}'",
risk_level=risk_level,
requires_approval=True,
details={
"pool_classification": pool_env,
"name_detection": name_env,
"confidence": confidence,
"WARNING": "Unable to classify - treating as high risk"
},
timestamp=timestamp
)
def assert_all_targets(self, targets: list[str], allow_staging: bool = False,
production_override: bool = False) -> dict:
"""Assert sandbox for multiple targets"""
report = {
"report_type": "sandbox_assertion",
"timestamp": self._now(),
"settings": {
"allow_staging": allow_staging,
"production_override": production_override
},
"results": [],
"summary": {
"total": len(targets),
"safe": 0,
"unsafe": 0,
"unknown": 0,
"override": 0,
"max_risk_level": 0
},
"can_proceed": True,
"blocked_targets": []
}
for target in targets:
result = self.assert_sandbox(target, allow_staging, production_override)
report["results"].append(result.to_dict())
if result.status == AssertionStatus.SAFE:
report["summary"]["safe"] += 1
elif result.status == AssertionStatus.UNSAFE:
report["summary"]["unsafe"] += 1
report["can_proceed"] = False
report["blocked_targets"].append(target)
elif result.status == AssertionStatus.UNKNOWN:
report["summary"]["unknown"] += 1
report["can_proceed"] = False
report["blocked_targets"].append(target)
elif result.status == AssertionStatus.OVERRIDE:
report["summary"]["override"] += 1
report["summary"]["max_risk_level"] = max(
report["summary"]["max_risk_level"],
result.risk_level
)
return report
# =============================================================================
# CLI
# =============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Sandbox Assertion Check")
parser.add_argument("targets", nargs="+", help="Target nodes to assert")
parser.add_argument("--allow-staging", action="store_true",
help="Allow staging environment targets")
parser.add_argument("--production-override", action="store_true",
help="DANGEROUS: Override production block with explicit approval")
parser.add_argument("--json", action="store_true", help="Output JSON")
args = parser.parse_args()
asserter = SandboxAsserter()
report = asserter.assert_all_targets(
args.targets,
allow_staging=args.allow_staging,
production_override=args.production_override
)
if args.json:
print(json.dumps(report, indent=2))
else:
print("\n" + "=" * 60)
print("SANDBOX ASSERTION REPORT")
print("=" * 60)
print(f"Timestamp: {report['timestamp']}")
print(f"Allow Staging: {report['settings']['allow_staging']}")
print(f"Production Override: {report['settings']['production_override']}")
print()
risk_icons = ["[SAFE]", "[LOW]", "[MED]", "[HIGH]", "[CRIT]"]
for result in report["results"]:
status_icon = {
"SAFE": "[OK]",
"UNSAFE": "[BLOCKED]",
"UNKNOWN": "[???]",
"OVERRIDE": "[OVERRIDE]"
}.get(result["status"], "[?]")
risk_icon = risk_icons[min(result["risk_level"], 4)]
print(f"Target: {result['target']}")
print(f" Status: {status_icon} {result['status']}")
print(f" Environment: {result['environment']}")
print(f" Risk Level: {risk_icon} ({result['risk_level']}/4)")
print(f" Message: {result['message']}")
if result["requires_approval"]:
print(f" !!! REQUIRES APPROVAL !!!")
print()
print("-" * 60)
print("SUMMARY")
print("-" * 60)
s = report["summary"]
print(f" Total Targets: {s['total']}")
print(f" Safe: {s['safe']}")
print(f" Unsafe (blocked): {s['unsafe']}")
print(f" Unknown: {s['unknown']}")
print(f" Override: {s['override']}")
print(f" Max Risk Level: {s['max_risk_level']}/4")
print()
if report["blocked_targets"]:
print(f"BLOCKED TARGETS: {report['blocked_targets']}")
print()
if report["can_proceed"]:
print("[OK] SANDBOX ASSERTION PASSED - All targets are safe")
else:
print("[BLOCKED] SANDBOX ASSERTION FAILED - Cannot proceed")
print(" Some targets are outside sandbox boundaries")
print("=" * 60)
sys.exit(0 if report["can_proceed"] else 1)