#!/usr/bin/env python3 """ Comprehensive Governance System Tests ====================================== Tests all components of the agent governance framework: - Vault connectivity and policies - DragonflyDB state management - SQLite ledger - Preflight system - Promotion engine - Revocation engine - Checkpoint skill - Model Controller - Tier 0 agent """ import json import os import subprocess import sys import sqlite3 from pathlib import Path from datetime import datetime # Colors for output GREEN = '\033[92m' RED = '\033[91m' YELLOW = '\033[93m' BLUE = '\033[94m' RESET = '\033[0m' RESULTS = {"passed": 0, "failed": 0, "skipped": 0} def log_test(name: str, passed: bool, message: str = ""): global RESULTS if passed: RESULTS["passed"] += 1 status = f"{GREEN}PASS{RESET}" else: RESULTS["failed"] += 1 status = f"{RED}FAIL{RESET}" msg = f" - {message}" if message else "" print(f" [{status}] {name}{msg}") def log_skip(name: str, reason: str): global RESULTS RESULTS["skipped"] += 1 print(f" [{YELLOW}SKIP{RESET}] {name} - {reason}") def log_section(title: str): print(f"\n{BLUE}{'='*60}{RESET}") print(f"{BLUE}{title}{RESET}") print(f"{BLUE}{'='*60}{RESET}") # ============================================================================= # Test: Vault Connectivity # ============================================================================= def test_vault(): log_section("VAULT TESTS") # Test 1: Vault is running result = subprocess.run( ["docker", "exec", "vault", "vault", "status", "-format=json"], capture_output=True, text=True ) try: status = json.loads(result.stdout) log_test("Vault is running", True) log_test("Vault is initialized", status.get("initialized", False)) log_test("Vault is unsealed", not status.get("sealed", True)) except: log_test("Vault is running", False, result.stderr[:100] if result.stderr else "Connection failed") return # Test 2: Get root token try: with open("/opt/vault/init-keys.json") as f: keys = json.load(f) root_token = keys["root_token"] log_test("Root token accessible", True) except Exception as e: log_test("Root token accessible", False, str(e)) return # Test 3: List policies result = subprocess.run( ["docker", "exec", "-e", f"VAULT_TOKEN={root_token}", "vault", "vault", "policy", "list", "-format=json"], capture_output=True, text=True ) try: policies = json.loads(result.stdout) expected = ["t0-observer", "t1-operator", "t2-builder", "t3-executor", "t4-architect"] found = [p for p in expected if p in policies] log_test("Tier policies loaded", len(found) == len(expected), f"{len(found)}/{len(expected)} policies") except: log_test("Tier policies loaded", False) # Test 4: Secrets engines result = subprocess.run( ["docker", "exec", "-e", f"VAULT_TOKEN={root_token}", "vault", "vault", "secrets", "list", "-format=json"], capture_output=True, text=True ) try: engines = json.loads(result.stdout) log_test("SSH secrets engine enabled", "ssh/" in engines) log_test("Proxmox KV engine enabled", "proxmox/" in engines) log_test("Secret KV engine enabled", "secret/" in engines) except: log_test("Secrets engines", False) # Test 5: AppRole auth result = subprocess.run( ["docker", "exec", "-e", f"VAULT_TOKEN={root_token}", "vault", "vault", "auth", "list", "-format=json"], capture_output=True, text=True ) try: auth = json.loads(result.stdout) log_test("AppRole auth enabled", "approle/" in auth) except: log_test("AppRole auth enabled", False) # ============================================================================= # Test: DragonflyDB Connectivity # ============================================================================= def test_dragonfly(): log_section("DRAGONFLYDB TESTS") try: import redis except ImportError: log_skip("DragonflyDB tests", "redis module not installed") return # Get credentials from Vault try: with open("/opt/vault/init-keys.json") as f: root_token = json.load(f)["root_token"] result = subprocess.run([ "curl", "-sk", "-H", f"X-Vault-Token: {root_token}", "https://127.0.0.1:8200/v1/secret/data/services/dragonfly" ], capture_output=True, text=True) creds = json.loads(result.stdout)["data"]["data"] log_test("DragonflyDB credentials in Vault", True) except Exception as e: log_test("DragonflyDB credentials in Vault", False, str(e)) return # Connect to DragonflyDB try: r = redis.Redis( host=creds["host"], port=int(creds["port"]), password=creds["password"], decode_responses=True ) r.ping() log_test("DragonflyDB connection", True) except Exception as e: log_test("DragonflyDB connection", False, str(e)) return # Test basic operations try: r.set("test:governance:ping", "pong", ex=60) value = r.get("test:governance:ping") log_test("DragonflyDB read/write", value == "pong") except Exception as e: log_test("DragonflyDB read/write", False, str(e)) # Test list operations try: r.delete("test:governance:list") r.lpush("test:governance:list", "item1", "item2") items = r.lrange("test:governance:list", 0, -1) log_test("DragonflyDB list operations", len(items) == 2) r.delete("test:governance:list") except Exception as e: log_test("DragonflyDB list operations", False, str(e)) # ============================================================================= # Test: SQLite Ledger # ============================================================================= def test_ledger(): log_section("LEDGER TESTS") db_path = Path("/opt/agent-governance/ledger/governance.db") log_test("Ledger database exists", db_path.exists()) if not db_path.exists(): return try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Check tables exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] expected_tables = ["agent_metrics", "violations", "promotions", "orchestration_log"] for table in expected_tables: log_test(f"Table '{table}' exists", table in tables) # Test insert/query cursor.execute(""" INSERT OR REPLACE INTO agent_metrics (agent_id, current_tier, compliant_runs, consecutive_compliant, total_runs, updated_at) VALUES ('test-agent-pytest', 0, 0, 0, 0, datetime('now')) """) conn.commit() cursor.execute("SELECT * FROM agent_metrics WHERE agent_id='test-agent-pytest'") row = cursor.fetchone() log_test("Ledger insert/query", row is not None) # Cleanup cursor.execute("DELETE FROM agent_metrics WHERE agent_id='test-agent-pytest'") conn.commit() conn.close() except Exception as e: log_test("Ledger operations", False, str(e)) # ============================================================================= # Test: Preflight System # ============================================================================= def test_preflight(): log_section("PREFLIGHT TESTS") preflight_dir = Path("/opt/agent-governance/preflight") log_test("Preflight directory exists", preflight_dir.exists()) if not preflight_dir.exists(): return # Test preflight script exists preflight_py = preflight_dir / "preflight.py" log_test("preflight.py exists", preflight_py.exists()) if not preflight_py.exists(): return # Test sandbox target (should pass) result = subprocess.run( ["python3", str(preflight_py), "sandbox-vm-01", "--action", "generic", "--tier", "1", "--agent-id", "test"], capture_output=True, text=True, cwd=str(preflight_dir) ) log_test("Preflight approves sandbox target", result.returncode == 0) # Test production target (should fail for tier 1) result = subprocess.run( ["python3", str(preflight_py), "prod-db-01", "--action", "generic", "--tier", "1", "--agent-id", "test"], capture_output=True, text=True, cwd=str(preflight_dir) ) log_test("Preflight blocks prod for tier 1", result.returncode != 0) # ============================================================================= # Test: Promotion Engine # ============================================================================= def test_promotion(): log_section("PROMOTION ENGINE TESTS") promotion_py = Path("/opt/agent-governance/runtime/promotion.py") log_test("promotion.py exists", promotion_py.exists()) if not promotion_py.exists(): return runtime_dir = Path("/opt/agent-governance/runtime") # Test requirements command result = subprocess.run( ["python3", str(promotion_py), "requirements"], capture_output=True, text=True, cwd=str(runtime_dir) ) log_test("Promotion requirements command", result.returncode == 0) # Check output contains tier requirements has_tiers = "T0 → T1" in result.stdout or "Tier 0" in result.stdout log_test("Promotion requirements output valid", has_tiers) # ============================================================================= # Test: Revocation Engine # ============================================================================= def test_revocation(): log_section("REVOCATION ENGINE TESTS") revocation_py = Path("/opt/agent-governance/runtime/revocation.py") log_test("revocation.py exists", revocation_py.exists()) if not revocation_py.exists(): return runtime_dir = Path("/opt/agent-governance/runtime") # Test types command result = subprocess.run( ["python3", str(revocation_py), "types"], capture_output=True, text=True, cwd=str(runtime_dir) ) log_test("Revocation types command", result.returncode == 0) # Check output contains violation types has_types = "UNAUTHORIZED" in result.stdout or "VIOLATION" in result.stdout or "critical" in result.stdout.lower() log_test("Revocation types output valid", has_types) # ============================================================================= # Test: Checkpoint Skill # ============================================================================= def test_checkpoint(): log_section("CHECKPOINT SKILL TESTS") checkpoint_bin = Path("/opt/agent-governance/bin/checkpoint") log_test("checkpoint CLI exists", checkpoint_bin.exists()) if not checkpoint_bin.exists(): return # Test checkpoint now result = subprocess.run( [str(checkpoint_bin), "now", "--notes", "pytest run"], capture_output=True, text=True ) log_test("Checkpoint create", result.returncode == 0) # Extract checkpoint ID checkpoint_id = None for line in result.stdout.split("\n"): if line.startswith("ID:"): checkpoint_id = line.split(":")[1].strip() break log_test("Checkpoint ID generated", checkpoint_id is not None) # Test checkpoint load result = subprocess.run( [str(checkpoint_bin), "load", "--json"], capture_output=True, text=True ) log_test("Checkpoint load", result.returncode == 0) try: data = json.loads(result.stdout) log_test("Checkpoint JSON valid", True) log_test("Checkpoint has phase", data.get("phase") is not None) log_test("Checkpoint has dependencies", len(data.get("dependencies", [])) > 0) except: log_test("Checkpoint JSON valid", False) # Test checkpoint list result = subprocess.run( [str(checkpoint_bin), "list"], capture_output=True, text=True ) log_test("Checkpoint list", result.returncode == 0) # Test queue commands result = subprocess.run( [str(checkpoint_bin), "queue", "list"], capture_output=True, text=True ) log_test("Checkpoint queue list", result.returncode == 0) # ============================================================================= # Test: Model Controller # ============================================================================= def test_model_controller(): log_section("MODEL CONTROLLER TESTS") controller_bin = Path("/opt/agent-governance/bin/model-controller") log_test("model-controller CLI exists", controller_bin.exists()) if not controller_bin.exists(): return # Test status command result = subprocess.run( [str(controller_bin), "status"], capture_output=True, text=True ) log_test("Model controller status", result.returncode == 0) # Check output has_mode = "Mode:" in result.stdout or "mode:" in result.stdout.lower() log_test("Model controller status output valid", has_mode) # Test config command result = subprocess.run( [str(controller_bin), "config"], capture_output=True, text=True ) log_test("Model controller config", result.returncode == 0) # ============================================================================= # Test: Tier 0 Agent # ============================================================================= def test_tier0_agent(): log_section("TIER 0 AGENT TESTS") agent_dir = Path("/opt/agent-governance/agents/tier0-agent") agent_py = agent_dir / "agent.py" log_test("Tier 0 agent directory exists", agent_dir.exists()) log_test("agent.py exists", agent_py.exists()) if not agent_py.exists(): return # Test status command result = subprocess.run( ["python3", str(agent_py), "status"], capture_output=True, text=True, cwd=str(agent_dir) ) log_test("Agent status command", result.returncode == 0) log_test("Agent shows as Tier 0", "Tier: 0" in result.stdout or "Tier 0" in result.stdout) # Test reading allowed file result = subprocess.run( ["python3", str(agent_py), "read", "/opt/agent-governance/docs/tier0-guide.md"], capture_output=True, text=True, cwd=str(agent_dir) ) log_test("Agent can read docs", result.returncode == 0) # Test reading forbidden file result = subprocess.run( ["python3", str(agent_py), "read", "/etc/passwd"], capture_output=True, text=True, cwd=str(agent_dir) ) log_test("Agent blocked from /etc", result.returncode != 0 or "BLOCKED" in result.stdout) # Test test-forbidden command result = subprocess.run( ["python3", str(agent_py), "test-forbidden"], capture_output=True, text=True, cwd=str(agent_dir) ) log_test("Agent test-forbidden command", result.returncode == 0) log_test("All forbidden actions blocked", "All forbidden actions correctly blocked" in result.stdout) # ============================================================================= # Test: Governance Wrappers # ============================================================================= def test_wrappers(): log_section("GOVERNANCE WRAPPERS TESTS") wrappers_dir = Path("/opt/agent-governance/wrappers") log_test("Wrappers directory exists", wrappers_dir.exists()) tf_wrapper = wrappers_dir / "tf-governed.sh" ansible_wrapper = wrappers_dir / "ansible-governed.sh" log_test("Terraform wrapper exists", tf_wrapper.exists()) log_test("Ansible wrapper exists", ansible_wrapper.exists()) # Check they're executable if tf_wrapper.exists(): log_test("Terraform wrapper executable", os.access(tf_wrapper, os.X_OK)) if ansible_wrapper.exists(): log_test("Ansible wrapper executable", os.access(ansible_wrapper, os.X_OK)) # ============================================================================= # Test: Evidence System # ============================================================================= def test_evidence(): log_section("EVIDENCE SYSTEM TESTS") evidence_dir = Path("/opt/agent-governance/evidence") evidence_py = evidence_dir / "evidence.py" log_test("Evidence directory exists", evidence_dir.exists()) log_test("evidence.py exists", evidence_py.exists()) packages_dir = evidence_dir / "packages" log_test("Evidence packages directory exists", packages_dir.exists()) if packages_dir.exists(): packages = list(packages_dir.iterdir()) log_test("Evidence packages created", len(packages) > 0, f"{len(packages)} packages") # ============================================================================= # Main # ============================================================================= def main(): print(f"\n{BLUE}{'#'*60}{RESET}") print(f"{BLUE}# AGENT GOVERNANCE SYSTEM - COMPREHENSIVE TESTS{RESET}") print(f"{BLUE}# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{RESET}") print(f"{BLUE}{'#'*60}{RESET}") # Run all tests test_vault() test_dragonfly() test_ledger() test_preflight() test_promotion() test_revocation() test_checkpoint() test_model_controller() test_tier0_agent() test_wrappers() test_evidence() # Summary print(f"\n{BLUE}{'='*60}{RESET}") print(f"{BLUE}TEST SUMMARY{RESET}") print(f"{BLUE}{'='*60}{RESET}") total = RESULTS["passed"] + RESULTS["failed"] + RESULTS["skipped"] print(f" {GREEN}Passed:{RESET} {RESULTS['passed']}") print(f" {RED}Failed:{RESET} {RESULTS['failed']}") print(f" {YELLOW}Skipped:{RESET} {RESULTS['skipped']}") print(f" Total: {total}") if RESULTS["failed"] > 0: print(f"\n{RED}Some tests failed!{RESET}") return 1 else: print(f"\n{GREEN}All tests passed!{RESET}") return 0 if __name__ == "__main__": sys.exit(main())