#!/usr/bin/env python3 """ Unified Preflight System ======================== Combines all preflight checks into a single gate. Part of Phase 3: Execution Pipeline - Preflight System. This is the main entry point for preflight validation. All execution must pass through this gate. """ import json import sys from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional from inventory_check import InventoryChecker from dependency_check import DependencyChecker from sandbox_assert import SandboxAsserter, AssertionStatus @dataclass class PreflightGate: """ Unified preflight gate that blocks execution on any failure. Gate Order: 1. Sandbox Assertion (CRITICAL - must pass first) 2. Inventory Validation 3. Dependency Check If any check fails, execution is BLOCKED. """ def __init__(self): self.inventory_checker = InventoryChecker() self.dependency_checker = DependencyChecker() self.sandbox_asserter = SandboxAsserter() def _now(self) -> str: return datetime.now(timezone.utc).isoformat() def run_preflight( self, targets: list[str], action_type: str, agent_tier: int, agent_id: str, extra_secrets: list[str] = None, allow_staging: bool = False, production_override: bool = False ) -> dict: """ Run full preflight validation. Args: targets: List of target nodes action_type: Type of action (terraform, ansible, docker, generic) agent_tier: Agent's trust tier (0-4) agent_id: Agent identifier for audit extra_secrets: Additional secrets to validate allow_staging: Allow staging targets production_override: DANGEROUS: Allow production Returns: Comprehensive preflight report with gate decision """ start_time = datetime.now(timezone.utc) report = { "report_type": "unified_preflight", "agent_id": agent_id, "agent_tier": agent_tier, "action_type": action_type, "targets": targets, "timestamp": self._now(), "gates": {}, "gate_order": ["sandbox", "inventory", "dependencies"], "failed_gates": [], "can_proceed": True, "decision": "PENDING", "blocking_reason": None } # ===================================================================== # GATE 1: Sandbox Assertion (CRITICAL) # ===================================================================== print("\n[GATE 1/3] Sandbox Assertion...") sandbox_report = self.sandbox_asserter.assert_all_targets( targets, allow_staging=allow_staging, production_override=production_override ) report["gates"]["sandbox"] = sandbox_report if not sandbox_report["can_proceed"]: report["can_proceed"] = False report["failed_gates"].append("sandbox") report["decision"] = "BLOCKED" report["blocking_reason"] = f"Sandbox assertion failed: {sandbox_report['blocked_targets']}" print(f" [BLOCKED] Sandbox assertion failed") # Short-circuit: Don't run other checks if sandbox fails report["duration_ms"] = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) return report else: print(f" [PASS] All targets within sandbox boundaries") # ===================================================================== # GATE 2: Inventory Validation # ===================================================================== print("\n[GATE 2/3] Inventory Validation...") inventory_report = self.inventory_checker.preflight_report(targets, agent_tier) report["gates"]["inventory"] = inventory_report if not inventory_report["can_proceed"]: report["can_proceed"] = False report["failed_gates"].append("inventory") if report["decision"] == "PENDING": report["decision"] = "BLOCKED" report["blocking_reason"] = f"Inventory validation failed" print(f" [BLOCKED] Inventory validation failed") else: print(f" [PASS] All targets validated in inventory") # ===================================================================== # GATE 3: Dependency Check # ===================================================================== print("\n[GATE 3/3] Dependency Check...") dependency_report = self.dependency_checker.preflight_report(action_type, extra_secrets) report["gates"]["dependencies"] = dependency_report if not dependency_report["can_proceed"]: report["can_proceed"] = False report["failed_gates"].append("dependencies") if report["decision"] == "PENDING": report["decision"] = "BLOCKED" report["blocking_reason"] = f"Dependency check failed" print(f" [BLOCKED] Dependency check failed") else: print(f" [PASS] All dependencies available") # ===================================================================== # Final Decision # ===================================================================== if report["can_proceed"]: report["decision"] = "APPROVED" report["blocking_reason"] = None report["duration_ms"] = int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000) return report def print_report(self, report: dict): """Print a human-readable report""" print("\n" + "=" * 70) print("UNIFIED PREFLIGHT REPORT") print("=" * 70) print(f"Agent ID: {report['agent_id']}") print(f"Agent Tier: {report['agent_tier']}") print(f"Action Type: {report['action_type']}") print(f"Targets: {report['targets']}") print(f"Timestamp: {report['timestamp']}") print(f"Duration: {report['duration_ms']}ms") print() print("GATE RESULTS:") print("-" * 70) for gate_name in report["gate_order"]: gate = report["gates"].get(gate_name, {}) passed = gate.get("can_proceed", False) status = "[PASS]" if passed else "[FAIL]" print(f" {status} {gate_name.upper()}") if gate_name == "sandbox": summary = gate.get("summary", {}) print(f" Safe: {summary.get('safe', 0)}, Unsafe: {summary.get('unsafe', 0)}, Risk: {summary.get('max_risk_level', 0)}/4") elif gate_name == "inventory": summary = gate.get("summary", {}) print(f" Checks: {summary.get('total_checks', 0)}, Passed: {summary.get('passed', 0)}, Failed: {summary.get('failed', 0)}") elif gate_name == "dependencies": summary = gate.get("summary", {}) print(f" Checks: {summary.get('total_checks', 0)}, Passed: {summary.get('passed', 0)}, Failed: {summary.get('failed', 0)}") print() print("-" * 70) print("DECISION") print("-" * 70) if report["can_proceed"]: print(f" [APPROVED] Preflight checks passed - execution authorized") else: print(f" [BLOCKED] Preflight checks failed - execution denied") print(f" Failed Gates: {report['failed_gates']}") print(f" Reason: {report['blocking_reason']}") print("=" * 70) # ============================================================================= # CLI # ============================================================================= if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Unified Preflight Gate") parser.add_argument("targets", nargs="+", help="Target nodes") parser.add_argument("--action", "-a", default="generic", choices=["terraform", "ansible", "docker", "generic"], help="Action type") parser.add_argument("--tier", "-t", type=int, default=1, help="Agent tier (0-4)") parser.add_argument("--agent-id", default="cli-user", help="Agent identifier") parser.add_argument("--secret", action="append", dest="extra_secrets", help="Additional secrets to check") parser.add_argument("--allow-staging", action="store_true", help="Allow staging targets") parser.add_argument("--production-override", action="store_true", help="DANGEROUS: Allow production targets") parser.add_argument("--json", action="store_true", help="Output JSON only") parser.add_argument("--quiet", "-q", action="store_true", help="Quiet mode") args = parser.parse_args() gate = PreflightGate() if not args.quiet: print(f"\n{'='*70}") print("PREFLIGHT CHECK INITIATED") print(f"{'='*70}") print(f"Agent: {args.agent_id} (Tier {args.tier})") print(f"Action: {args.action}") print(f"Targets: {args.targets}") report = gate.run_preflight( targets=args.targets, action_type=args.action, agent_tier=args.tier, agent_id=args.agent_id, extra_secrets=args.extra_secrets, allow_staging=args.allow_staging, production_override=args.production_override ) if args.json: print(json.dumps(report, indent=2)) elif not args.quiet: gate.print_report(report) sys.exit(0 if report["can_proceed"] else 1)