#!/usr/bin/env python3 """ Tier 1 Operator Agent ===================== A governed agent that can execute approved plans, run infrastructure commands (Terraform, Ansible), and access sandbox environments. This agent enforces Tier 1 constraints: - Read/write file access (within allowed paths) - Command execution (with governance logging) - Sandbox SSH/API access only - No production access - No Vault root access - All actions logged to governance ledger Tier 1 agents are promoted from Tier 0 after demonstrating compliant behavior. They can execute without pre-approval but all actions are logged and audited. """ import json import os import sys import hashlib import sqlite3 import subprocess import shlex from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Optional, Any, List, Dict import re try: import redis except ImportError: redis = None # ============================================================================= # Configuration # ============================================================================= AGENT_DIR = Path(__file__).parent CONFIG_FILE = AGENT_DIR / "config" / "agent.json" WORKSPACE_DIR = AGENT_DIR / "workspace" PLANS_DIR = AGENT_DIR / "plans" LOGS_DIR = AGENT_DIR / "logs" LEDGER_DB = Path("/opt/agent-governance/ledger/governance.db") SANDBOX_DIR = Path("/opt/agent-governance/sandbox") # Load agent config with open(CONFIG_FILE) as f: CONFIG = json.load(f) AGENT_ID = CONFIG["agent_id"] AGENT_TIER = CONFIG["tier"] TIER_NAME = CONFIG.get("tier_name", "Operator") ALLOWED_PATHS = [Path(p) for p in CONFIG["constraints"]["allowed_paths"]] FORBIDDEN_PATHS = CONFIG["constraints"]["forbidden_paths"] ALLOWED_ACTIONS = CONFIG["constraints"]["allowed_actions"] FORBIDDEN_ACTIONS = CONFIG["constraints"]["forbidden_actions"] ALLOWED_TARGETS = CONFIG["constraints"].get("allowed_targets", []) FORBIDDEN_TARGETS = CONFIG["constraints"].get("forbidden_targets", []) CAPABILITIES = CONFIG.get("capabilities", {}) # ============================================================================= # Data Classes # ============================================================================= @dataclass class ActionResult: """Result of an agent action""" action: str success: bool data: Any = None error: Optional[str] = None blocked: bool = False block_reason: Optional[str] = None execution_time: float = 0.0 @dataclass class Plan: """A generated or loaded plan""" plan_id: str title: str description: str target: str steps: list rollback_steps: list created_at: str agent_id: str status: str = "draft" approved_by: Optional[str] = None executed: bool = False execution_results: List[Dict] = field(default_factory=list) @dataclass class ExecutionContext: """Context for command execution""" command: str working_dir: str timeout: int = 300 env: Dict[str, str] = field(default_factory=dict) capture_output: bool = True # ============================================================================= # Governance Integration # ============================================================================= class GovernanceClient: """Interfaces with the governance system""" def __init__(self): self.redis = self._get_redis() self.session_id = os.environ.get("SESSION_ID", f"session-{datetime.now().strftime('%Y%m%d-%H%M%S')}") def _get_redis(self): if redis is None: return None try: password = os.environ.get("REDIS_PASSWORD") if not password: try: with open("/opt/vault/init-keys.json") as f: token = json.load(f)["root_token"] result = subprocess.run([ "curl", "-sk", "-H", f"X-Vault-Token: {token}", "https://127.0.0.1:8200/v1/secret/data/services/dragonfly" ], capture_output=True, text=True, timeout=5) if result.returncode == 0: creds = json.loads(result.stdout).get("data", {}).get("data", {}) password = creds.get("password", "") except Exception: password = "" return redis.Redis(host="127.0.0.1", port=6379, password=password, decode_responses=True) except Exception: return None def log_action(self, action: str, decision: str, target: str, success: bool, confidence: float = 1.0, error: str = None, details: Dict = None): """Log action to governance ledger""" try: conn = sqlite3.connect(LEDGER_DB) cursor = conn.cursor() cursor.execute(""" INSERT INTO agent_actions (timestamp, agent_id, agent_version, tier, action, decision, confidence, target, success, error_message, session_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( datetime.now(timezone.utc).isoformat(), AGENT_ID, CONFIG.get("agent_version", "1.0.0"), AGENT_TIER, action, decision, confidence, target, 1 if success else 0, error, self.session_id )) conn.commit() conn.close() except Exception as e: print(f"Warning: Could not log action: {e}", file=sys.stderr) def update_heartbeat(self): """Update agent heartbeat in DragonflyDB""" if self.redis: try: data = json.dumps({ "timestamp": int(datetime.now().timestamp()), "tier": AGENT_TIER, "session": self.session_id }) self.redis.set(f"agent:{AGENT_ID}:heartbeat", data, ex=60) except Exception: pass def check_revocation(self) -> bool: """Check if agent has been revoked""" if self.redis: try: signal = self.redis.get(f"agent:{AGENT_ID}:revoke_signal") return signal == "1" except Exception: pass return False def increment_compliant(self): """Increment compliant run counter""" try: conn = sqlite3.connect(LEDGER_DB) cursor = conn.cursor() cursor.execute(""" UPDATE agent_metrics SET compliant_runs = compliant_runs + 1, consecutive_compliant = consecutive_compliant + 1, total_runs = total_runs + 1, last_active_at = datetime('now'), updated_at = datetime('now') WHERE agent_id = ? """, (AGENT_ID,)) if cursor.rowcount == 0: # Insert if not exists cursor.execute(""" INSERT INTO agent_metrics (agent_id, current_tier, compliant_runs, consecutive_compliant, total_runs, last_active_at) VALUES (?, ?, 1, 1, 1, datetime('now')) """, (AGENT_ID, AGENT_TIER)) conn.commit() conn.close() except Exception as e: print(f"Warning: Could not update metrics: {e}", file=sys.stderr) def record_violation(self, violation_type: str, details: str): """Record a governance violation""" try: conn = sqlite3.connect(LEDGER_DB) cursor = conn.cursor() cursor.execute(""" UPDATE agent_metrics SET violation_count = violation_count + 1, consecutive_compliant = 0, updated_at = datetime('now') WHERE agent_id = ? """, (AGENT_ID,)) conn.commit() conn.close() self.log_action( action="violation", decision="BLOCKED", target=violation_type, success=False, error=details ) except Exception as e: print(f"Warning: Could not record violation: {e}", file=sys.stderr) # ============================================================================= # Tier 1 Agent # ============================================================================= class Tier1Agent: """ A Tier 1 Operator agent with execution capabilities. Can execute commands, run infrastructure tools, and access sandbox environments. """ def __init__(self): self.governance = GovernanceClient() self._check_not_revoked() def _now(self) -> str: return datetime.now(timezone.utc).isoformat() def _check_not_revoked(self): """Check revocation status before any action""" if self.governance.check_revocation(): print("[REVOKED] Agent has been revoked. Exiting.", file=sys.stderr) sys.exit(1) def _is_path_allowed(self, path: str) -> bool: """Check if path is within allowed paths""" target = Path(path).resolve() # Check forbidden patterns for pattern in FORBIDDEN_PATHS: if pattern.startswith("**/"): if pattern[3:] in str(target): return False elif "*" in pattern: if target.match(pattern): return False elif str(target).startswith(pattern) or str(target) == pattern: return False # Check allowed paths for allowed in ALLOWED_PATHS: allowed_resolved = Path(allowed).resolve() try: target.relative_to(allowed_resolved) return True except ValueError: continue return False def _is_target_allowed(self, target: str) -> bool: """Check if target (host) is allowed""" # Check forbidden targets first for pattern in FORBIDDEN_TARGETS: if pattern.endswith("*"): if target.startswith(pattern[:-1]): return False elif target == pattern: return False # Check allowed targets for pattern in ALLOWED_TARGETS: if pattern.endswith("*"): if target.startswith(pattern[:-1]): return True elif target == pattern: return True return False def _is_command_safe(self, command: str) -> tuple[bool, str]: """Check if command is safe to execute""" # Forbidden command patterns dangerous_patterns = [ r"rm\s+-rf\s+/", r"rm\s+-rf\s+\*", r"mkfs\.", r"dd\s+if=.*of=/dev/", r">\s*/dev/sd", r"chmod\s+-R\s+777\s+/", r"curl.*\|\s*sh", r"wget.*\|\s*sh", r"eval\s+", ] for pattern in dangerous_patterns: if re.search(pattern, command, re.IGNORECASE): return False, f"Command matches dangerous pattern: {pattern}" # Check for production targets in command for forbidden in FORBIDDEN_TARGETS: clean_pattern = forbidden.replace("*", "") if clean_pattern and clean_pattern in command: return False, f"Command references forbidden target: {forbidden}" return True, "" def _block_action(self, action: str, reason: str) -> ActionResult: """Record a blocked action""" self.governance.log_action( action=action, decision="BLOCKED", target="N/A", success=False, error=reason ) self.governance.record_violation(action, reason) return ActionResult( action=action, success=False, blocked=True, block_reason=reason ) # ------------------------------------------------------------------------- # Read Operations (inherited from Tier 0) # ------------------------------------------------------------------------- def read_file(self, path: str) -> ActionResult: """Read a file (if allowed)""" self._check_not_revoked() self.governance.update_heartbeat() if not self._is_path_allowed(path): return self._block_action("read_file", f"Path not allowed: {path}") try: with open(path) as f: content = f.read() self.governance.log_action( action="read_file", decision="EXECUTE", target=path, success=True ) return ActionResult( action="read_file", success=True, data={"path": path, "content": content, "size": len(content)} ) except Exception as e: self.governance.log_action( action="read_file", decision="EXECUTE", target=path, success=False, error=str(e) ) return ActionResult(action="read_file", success=False, error=str(e)) def list_directory(self, path: str) -> ActionResult: """List directory contents (if allowed)""" self._check_not_revoked() self.governance.update_heartbeat() if not self._is_path_allowed(path): return self._block_action("list_directory", f"Path not allowed: {path}") try: entries = [] for entry in Path(path).iterdir(): entries.append({ "name": entry.name, "is_dir": entry.is_dir(), "size": entry.stat().st_size if entry.is_file() else 0 }) self.governance.log_action( action="list_directory", decision="EXECUTE", target=path, success=True ) return ActionResult( action="list_directory", success=True, data={"path": path, "entries": entries} ) except Exception as e: return ActionResult(action="list_directory", success=False, error=str(e)) # ------------------------------------------------------------------------- # Write Operations (Tier 1+) # ------------------------------------------------------------------------- def write_file(self, path: str, content: str) -> ActionResult: """Write to a file (if allowed)""" self._check_not_revoked() self.governance.update_heartbeat() if not CAPABILITIES.get("modify_files", False): return self._block_action("write_file", "Agent does not have modify_files capability") if not self._is_path_allowed(path): return self._block_action("write_file", f"Path not allowed: {path}") try: # Ensure parent directory exists Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: f.write(content) self.governance.log_action( action="write_file", decision="EXECUTE", target=path, success=True ) return ActionResult( action="write_file", success=True, data={"path": path, "size": len(content)} ) except Exception as e: self.governance.log_action( action="write_file", decision="EXECUTE", target=path, success=False, error=str(e) ) return ActionResult(action="write_file", success=False, error=str(e)) # ------------------------------------------------------------------------- # Command Execution (Tier 1+) # ------------------------------------------------------------------------- def execute_command(self, command: str, working_dir: str = None, timeout: int = 300, env: Dict = None) -> ActionResult: """Execute a shell command (with governance checks)""" self._check_not_revoked() self.governance.update_heartbeat() if not CAPABILITIES.get("execute_commands", False): return self._block_action("execute_command", "Agent does not have execute_commands capability") # Safety check is_safe, reason = self._is_command_safe(command) if not is_safe: return self._block_action("execute_command", reason) # Working directory check if working_dir and not self._is_path_allowed(working_dir): return self._block_action("execute_command", f"Working directory not allowed: {working_dir}") work_dir = working_dir or str(WORKSPACE_DIR) start_time = datetime.now() try: # Prepare environment cmd_env = os.environ.copy() if env: cmd_env.update(env) # Execute command result = subprocess.run( command, shell=True, cwd=work_dir, capture_output=True, text=True, timeout=timeout, env=cmd_env ) execution_time = (datetime.now() - start_time).total_seconds() self.governance.log_action( action="execute_command", decision="EXECUTE", target=command[:100], # Truncate for logging success=result.returncode == 0, error=result.stderr[:500] if result.returncode != 0 else None ) if result.returncode == 0: self.governance.increment_compliant() return ActionResult( action="execute_command", success=result.returncode == 0, data={ "command": command, "returncode": result.returncode, "stdout": result.stdout, "stderr": result.stderr, "working_dir": work_dir }, error=result.stderr if result.returncode != 0 else None, execution_time=execution_time ) except subprocess.TimeoutExpired: return ActionResult( action="execute_command", success=False, error=f"Command timed out after {timeout}s" ) except Exception as e: self.governance.log_action( action="execute_command", decision="EXECUTE", target=command[:100], success=False, error=str(e) ) return ActionResult(action="execute_command", success=False, error=str(e)) # ------------------------------------------------------------------------- # Infrastructure Tools (Tier 1+) # ------------------------------------------------------------------------- def terraform_plan(self, directory: str, var_file: str = None) -> ActionResult: """Run terraform plan""" self._check_not_revoked() if "terraform_plan" not in ALLOWED_ACTIONS: return self._block_action("terraform_plan", "Action not allowed for this agent") if not self._is_path_allowed(directory): return self._block_action("terraform_plan", f"Directory not allowed: {directory}") cmd = f"terraform plan -no-color" if var_file: cmd += f" -var-file={shlex.quote(var_file)}" result = self.execute_command(cmd, working_dir=directory) # Re-log as terraform_plan self.governance.log_action( action="terraform_plan", decision="EXECUTE", target=directory, success=result.success, error=result.error ) return ActionResult( action="terraform_plan", success=result.success, data=result.data, error=result.error, execution_time=result.execution_time ) def terraform_apply(self, directory: str, var_file: str = None, auto_approve: bool = True) -> ActionResult: """Run terraform apply""" self._check_not_revoked() if "terraform_apply" not in ALLOWED_ACTIONS: return self._block_action("terraform_apply", "Action not allowed for this agent") if not self._is_path_allowed(directory): return self._block_action("terraform_apply", f"Directory not allowed: {directory}") cmd = f"terraform apply -no-color" if auto_approve: cmd += " -auto-approve" if var_file: cmd += f" -var-file={shlex.quote(var_file)}" result = self.execute_command(cmd, working_dir=directory) self.governance.log_action( action="terraform_apply", decision="EXECUTE", target=directory, success=result.success, error=result.error ) if result.success: self.governance.increment_compliant() return ActionResult( action="terraform_apply", success=result.success, data=result.data, error=result.error, execution_time=result.execution_time ) def ansible_check(self, playbook: str, inventory: str = None, extra_vars: Dict = None) -> ActionResult: """Run ansible-playbook in check mode (dry-run)""" self._check_not_revoked() if "ansible_check" not in ALLOWED_ACTIONS: return self._block_action("ansible_check", "Action not allowed for this agent") playbook_path = Path(playbook) if not self._is_path_allowed(str(playbook_path.parent)): return self._block_action("ansible_check", f"Playbook path not allowed: {playbook}") cmd = f"ansible-playbook {shlex.quote(playbook)} --check" if inventory: cmd += f" -i {shlex.quote(inventory)}" if extra_vars: vars_json = json.dumps(extra_vars) cmd += f" -e {shlex.quote(vars_json)}" result = self.execute_command(cmd, working_dir=str(playbook_path.parent)) self.governance.log_action( action="ansible_check", decision="EXECUTE", target=playbook, success=result.success, error=result.error ) return ActionResult( action="ansible_check", success=result.success, data=result.data, error=result.error, execution_time=result.execution_time ) def ansible_run(self, playbook: str, inventory: str = None, extra_vars: Dict = None, limit: str = None) -> ActionResult: """Run ansible-playbook""" self._check_not_revoked() if "ansible_run" not in ALLOWED_ACTIONS: return self._block_action("ansible_run", "Action not allowed for this agent") playbook_path = Path(playbook) if not self._is_path_allowed(str(playbook_path.parent)): return self._block_action("ansible_run", f"Playbook path not allowed: {playbook}") # Check target hosts if limit: if not self._is_target_allowed(limit): return self._block_action("ansible_run", f"Target not allowed: {limit}") cmd = f"ansible-playbook {shlex.quote(playbook)}" if inventory: cmd += f" -i {shlex.quote(inventory)}" if extra_vars: vars_json = json.dumps(extra_vars) cmd += f" -e {shlex.quote(vars_json)}" if limit: cmd += f" --limit {shlex.quote(limit)}" result = self.execute_command(cmd, working_dir=str(playbook_path.parent)) self.governance.log_action( action="ansible_run", decision="EXECUTE", target=playbook, success=result.success, error=result.error ) if result.success: self.governance.increment_compliant() return ActionResult( action="ansible_run", success=result.success, data=result.data, error=result.error, execution_time=result.execution_time ) def docker_run(self, image: str, command: str = None, volumes: List[str] = None, env: Dict = None, remove: bool = True) -> ActionResult: """Run a Docker container""" self._check_not_revoked() if "docker_run" not in ALLOWED_ACTIONS: return self._block_action("docker_run", "Action not allowed for this agent") # Build docker command cmd = f"docker run" if remove: cmd += " --rm" if env: for k, v in env.items(): cmd += f" -e {shlex.quote(f'{k}={v}')}" if volumes: for vol in volumes: # Check volume source paths if ":" in vol: src = vol.split(":")[0] if not self._is_path_allowed(src): return self._block_action("docker_run", f"Volume path not allowed: {src}") cmd += f" -v {shlex.quote(vol)}" cmd += f" {shlex.quote(image)}" if command: cmd += f" {command}" result = self.execute_command(cmd) self.governance.log_action( action="docker_run", decision="EXECUTE", target=image, success=result.success, error=result.error ) return ActionResult( action="docker_run", success=result.success, data=result.data, error=result.error, execution_time=result.execution_time ) # ------------------------------------------------------------------------- # Plan Management # ------------------------------------------------------------------------- def generate_plan(self, title: str, description: str, target: str, steps: list, rollback_steps: list = None) -> ActionResult: """Generate a plan""" self._check_not_revoked() self.governance.update_heartbeat() plan_id = f"plan-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{hashlib.sha256(title.encode()).hexdigest()[:8]}" plan_dict = { "plan_id": plan_id, "title": title, "description": description, "target": target, "steps": steps, "rollback_steps": rollback_steps or [], "created_at": self._now(), "agent_id": AGENT_ID, "agent_tier": AGENT_TIER, "status": "approved", # Tier 1 plans are pre-approved "requires_approval": False, "approved_by": "auto-tier1", "executed": False } plan_file = PLANS_DIR / f"{plan_id}.json" with open(plan_file, "w") as f: json.dump(plan_dict, f, indent=2) self.governance.log_action( action="generate_plan", decision="PLAN", target=target, success=True, confidence=0.9 ) return ActionResult( action="generate_plan", success=True, data={ "plan_id": plan_id, "plan_file": str(plan_file), "status": "approved", "message": "Plan generated and auto-approved for Tier 1 execution." } ) def execute_plan(self, plan_id: str) -> ActionResult: """Execute a plan by ID""" self._check_not_revoked() self.governance.update_heartbeat() plan_file = PLANS_DIR / f"{plan_id}.json" if not plan_file.exists(): return ActionResult( action="execute_plan", success=False, error=f"Plan not found: {plan_id}" ) with open(plan_file) as f: plan = json.load(f) if plan.get("executed"): return ActionResult( action="execute_plan", success=False, error=f"Plan already executed: {plan_id}" ) # Execute each step results = [] all_success = True for i, step in enumerate(plan.get("steps", [])): step_cmd = step.get("command", step) if isinstance(step, dict) else step step_result = self.execute_command(step_cmd) results.append({ "step": i + 1, "command": step_cmd, "success": step_result.success, "output": step_result.data.get("stdout", "") if step_result.data else "", "error": step_result.error }) if not step_result.success: all_success = False break # Update plan status plan["executed"] = True plan["execution_results"] = results plan["executed_at"] = self._now() plan["status"] = "completed" if all_success else "failed" with open(plan_file, "w") as f: json.dump(plan, f, indent=2) self.governance.log_action( action="execute_plan", decision="EXECUTE", target=plan_id, success=all_success, error=None if all_success else "One or more steps failed" ) if all_success: self.governance.increment_compliant() return ActionResult( action="execute_plan", success=all_success, data={ "plan_id": plan_id, "steps_executed": len(results), "results": results }, error=None if all_success else "Plan execution failed" ) # ------------------------------------------------------------------------- # Forbidden Actions # ------------------------------------------------------------------------- def delete_production(self, target: str) -> ActionResult: """FORBIDDEN: Delete production resources""" return self._block_action( "delete_production", "Tier 1 agents cannot delete production resources." ) def access_vault_root(self) -> ActionResult: """FORBIDDEN: Access Vault root token""" return self._block_action( "access_vault_root", "Tier 1 agents cannot access Vault root credentials." ) def modify_governance(self, target: str) -> ActionResult: """FORBIDDEN: Modify governance rules""" return self._block_action( "modify_governance", "Tier 1 agents cannot modify governance rules." ) # ============================================================================= # CLI Interface # ============================================================================= def main(): import argparse parser = argparse.ArgumentParser(description="Tier 1 Operator Agent") subparsers = parser.add_subparsers(dest="command", required=True) # Status subparsers.add_parser("status", help="Show agent status") # Read file read_parser = subparsers.add_parser("read", help="Read a file") read_parser.add_argument("path", help="File path to read") # List directory ls_parser = subparsers.add_parser("ls", help="List directory") ls_parser.add_argument("path", nargs="?", default=str(WORKSPACE_DIR)) # Write file write_parser = subparsers.add_parser("write", help="Write a file") write_parser.add_argument("path", help="File path") write_parser.add_argument("--content", help="Content to write") write_parser.add_argument("--stdin", action="store_true", help="Read content from stdin") # Execute command exec_parser = subparsers.add_parser("exec", help="Execute a command") exec_parser.add_argument("cmd", nargs="+", help="Command to execute") exec_parser.add_argument("--dir", help="Working directory") exec_parser.add_argument("--timeout", type=int, default=300) # Generate plan plan_parser = subparsers.add_parser("plan", help="Generate a plan") plan_parser.add_argument("--title", required=True) plan_parser.add_argument("--description", required=True) plan_parser.add_argument("--target", required=True) plan_parser.add_argument("--steps", required=True, help="JSON array of steps") plan_parser.add_argument("--rollback", help="JSON array of rollback steps") # Execute plan run_plan_parser = subparsers.add_parser("run-plan", help="Execute a plan") run_plan_parser.add_argument("plan_id", help="Plan ID to execute") # Terraform tf_plan_parser = subparsers.add_parser("tf-plan", help="Run terraform plan") tf_plan_parser.add_argument("directory", help="Terraform directory") tf_plan_parser.add_argument("--var-file", help="Variable file") tf_apply_parser = subparsers.add_parser("tf-apply", help="Run terraform apply") tf_apply_parser.add_argument("directory", help="Terraform directory") tf_apply_parser.add_argument("--var-file", help="Variable file") # Ansible ansible_check_parser = subparsers.add_parser("ansible-check", help="Run ansible-playbook --check") ansible_check_parser.add_argument("playbook", help="Playbook path") ansible_check_parser.add_argument("--inventory", "-i", help="Inventory file") ansible_run_parser = subparsers.add_parser("ansible-run", help="Run ansible-playbook") ansible_run_parser.add_argument("playbook", help="Playbook path") ansible_run_parser.add_argument("--inventory", "-i", help="Inventory file") ansible_run_parser.add_argument("--limit", "-l", help="Limit to hosts") # Docker docker_parser = subparsers.add_parser("docker", help="Run Docker container") docker_parser.add_argument("image", help="Docker image") docker_parser.add_argument("--cmd", help="Command to run") docker_parser.add_argument("-v", "--volume", action="append", help="Volume mount") docker_parser.add_argument("-e", "--env", action="append", help="Environment variable") # Test forbidden actions subparsers.add_parser("test-forbidden", help="Test that forbidden actions are blocked") args = parser.parse_args() agent = Tier1Agent() if args.command == "status": print(f"\n{'='*60}") print("TIER 1 AGENT STATUS") print(f"{'='*60}") print(f"Agent ID: {AGENT_ID}") print(f"Tier: {AGENT_TIER} ({TIER_NAME})") print(f"Session: {agent.governance.session_id}") print(f"\nCapabilities:") for cap, enabled in CAPABILITIES.items(): status = "YES" if enabled else "NO" print(f" {cap}: {status}") print(f"\nAllowed Actions: {', '.join(ALLOWED_ACTIONS)}") print(f"Forbidden Actions: {', '.join(FORBIDDEN_ACTIONS)}") print(f"Allowed Targets: {', '.join(ALLOWED_TARGETS)}") print(f"\nWorkspace: {WORKSPACE_DIR}") print(f"Plans: {PLANS_DIR}") if agent.governance.check_revocation(): print(f"\n[REVOKED] Agent has been revoked!") else: print(f"\n[ACTIVE] Agent is active and operational") print(f"{'='*60}") elif args.command == "read": result = agent.read_file(args.path) if result.success: print(result.data["content"]) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "ls": result = agent.list_directory(args.path) if result.success: for entry in result.data["entries"]: prefix = "d" if entry["is_dir"] else "-" print(f"{prefix} {entry['name']}") elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "write": if args.stdin: content = sys.stdin.read() elif args.content: content = args.content else: print("[ERROR] Provide --content or --stdin", file=sys.stderr) sys.exit(1) result = agent.write_file(args.path, content) if result.success: print(f"[OK] Written {result.data['size']} bytes to {args.path}") elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "exec": cmd = " ".join(args.cmd) result = agent.execute_command(cmd, working_dir=args.dir, timeout=args.timeout) if result.success: print(result.data["stdout"]) if result.data["stderr"]: print(result.data["stderr"], file=sys.stderr) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) if result.data and result.data.get("stderr"): print(result.data["stderr"], file=sys.stderr) sys.exit(result.data.get("returncode", 1) if result.data else 1) elif args.command == "plan": steps = json.loads(args.steps) rollback = json.loads(args.rollback) if args.rollback else [] result = agent.generate_plan( title=args.title, description=args.description, target=args.target, steps=steps, rollback_steps=rollback ) if result.success: print(f"\n[OK] Plan generated: {result.data['plan_id']}") print(f"Status: {result.data['status']}") print(f"File: {result.data['plan_file']}") else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "run-plan": result = agent.execute_plan(args.plan_id) if result.success: print(f"\n[OK] Plan executed: {args.plan_id}") print(f"Steps executed: {result.data['steps_executed']}") for r in result.data['results']: status = "OK" if r['success'] else "FAIL" print(f" Step {r['step']}: [{status}] {r['command'][:50]}...") else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "tf-plan": result = agent.terraform_plan(args.directory, var_file=args.var_file) if result.success: print(result.data["stdout"]) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "tf-apply": result = agent.terraform_apply(args.directory, var_file=args.var_file) if result.success: print(result.data["stdout"]) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "ansible-check": result = agent.ansible_check(args.playbook, inventory=args.inventory) if result.success: print(result.data["stdout"]) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "ansible-run": result = agent.ansible_run(args.playbook, inventory=args.inventory, limit=args.limit) if result.success: print(result.data["stdout"]) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "docker": env_dict = {} if args.env: for e in args.env: if "=" in e: k, v = e.split("=", 1) env_dict[k] = v result = agent.docker_run( args.image, command=args.cmd, volumes=args.volume, env=env_dict if env_dict else None ) if result.success: print(result.data["stdout"]) elif result.blocked: print(f"[BLOCKED] {result.block_reason}", file=sys.stderr) sys.exit(1) else: print(f"[ERROR] {result.error}", file=sys.stderr) sys.exit(1) elif args.command == "test-forbidden": print("\n" + "="*60) print("TESTING FORBIDDEN ACTIONS") print("="*60) tests = [ ("delete_production", lambda: agent.delete_production("prod-db-01")), ("access_vault_root", lambda: agent.access_vault_root()), ("modify_governance", lambda: agent.modify_governance("policies")), ("dangerous_command", lambda: agent.execute_command("rm -rf /")), ("prod_target", lambda: agent.execute_command("ssh prod-server ls")), ] all_blocked = True for name, test_fn in tests: result = test_fn() if result.blocked: print(f"[BLOCKED] {name}: {result.block_reason}") else: print(f"[FAIL] {name} was NOT blocked!") all_blocked = False print("="*60) if all_blocked: print("[OK] All forbidden actions correctly blocked") else: print("[FAIL] Some actions were not blocked!") sys.exit(1) if __name__ == "__main__": main()