#!/usr/bin/env python3 """ Phase 4: Promotion and Revocation Engine Tests =============================================== Tests for promotion logic, revocation triggers, and monitor daemon. Required tests: - promotion_logic: Verify tier promotion rules work correctly - revocation_triggers: Verify revocation conditions trigger properly - monitor_daemon: Verify continuous monitoring is active """ import json import os import sqlite3 import subprocess import sys from pathlib import Path # Configuration BASE_PATH = Path("/opt/agent-governance") RUNTIME_PATH = BASE_PATH / "runtime" LEDGER_DB = BASE_PATH / "ledger" / "governance.db" # Test results PASSED = 0 FAILED = 0 def log(msg: str, status: str = "info"): """Log a message""" icons = {"pass": "\033[92m✓\033[0m", "fail": "\033[91m✗\033[0m", "info": "→"} print(f" {icons.get(status, '•')} {msg}") def test_promotion_logic(): """Test that tier promotion rules work correctly""" global PASSED, FAILED print("\n[TEST] promotion_logic") # 1. Check promotion module exists promotion_module = RUNTIME_PATH / "promotion.py" if not promotion_module.exists(): log(f"Promotion module not found: {promotion_module}", "fail") FAILED += 1 return False log("Promotion module exists", "pass") PASSED += 1 # 2. Check promotion module can be imported try: sys.path.insert(0, str(RUNTIME_PATH)) import promotion log("Promotion module importable", "pass") PASSED += 1 except ImportError as e: log(f"Failed to import promotion: {e}", "fail") FAILED += 1 return False finally: sys.path.pop(0) # 3. Check for promotion-related classes/functions try: sys.path.insert(0, str(RUNTIME_PATH)) import promotion as pm # Look for key promotion attributes key_attrs = ['PromotionEngine', 'PromotionRule', 'TierPromotion', 'evaluate', 'promote', 'check_eligibility'] found = [attr for attr in key_attrs if hasattr(pm, attr)] if found: log(f"Promotion has key components: {found}", "pass") PASSED += 1 else: # Check for any classes classes = [attr for attr in dir(pm) if isinstance(getattr(pm, attr, None), type)] if classes: log(f"Promotion has classes: {classes[:5]}", "pass") PASSED += 1 else: log("Promotion module missing key components", "fail") FAILED += 1 except Exception as e: log(f"Promotion inspection failed: {e}", "fail") FAILED += 1 finally: if str(RUNTIME_PATH) in sys.path: sys.path.remove(str(RUNTIME_PATH)) # 4. Check promotion rules are defined content = promotion_module.read_text() tier_patterns = ['T0', 'T1', 'T2', 'T3', 'T4', 'tier0', 'tier1', 'tier2', 'tier3', 'tier4'] found_tiers = [t for t in tier_patterns if t in content] if found_tiers: log(f"Promotion defines tier rules: {found_tiers[:5]}", "pass") PASSED += 1 else: log("Promotion missing tier definitions", "fail") FAILED += 1 # 5. Check for trust score logic if 'trust' in content.lower() or 'score' in content.lower() or 'eligib' in content.lower(): log("Promotion has trust/eligibility logic", "pass") PASSED += 1 else: log("Promotion missing trust score logic", "info") return True def test_revocation_triggers(): """Test that revocation conditions trigger properly""" global PASSED, FAILED print("\n[TEST] revocation_triggers") # 1. Check revocation module exists revocation_module = RUNTIME_PATH / "revocation.py" if not revocation_module.exists(): log(f"Revocation module not found: {revocation_module}", "fail") FAILED += 1 return False log("Revocation module exists", "pass") PASSED += 1 # 2. Check revocation module can be imported try: sys.path.insert(0, str(RUNTIME_PATH)) import revocation log("Revocation module importable", "pass") PASSED += 1 except ImportError as e: log(f"Failed to import revocation: {e}", "fail") FAILED += 1 return False finally: sys.path.pop(0) # 3. Check for revocation-related classes/functions try: sys.path.insert(0, str(RUNTIME_PATH)) import revocation as rv # Look for key revocation attributes key_attrs = ['RevocationEngine', 'RevocationTrigger', 'ViolationType', 'revoke', 'trigger', 'check_violation'] found = [attr for attr in key_attrs if hasattr(rv, attr)] if found: log(f"Revocation has key components: {found}", "pass") PASSED += 1 else: classes = [attr for attr in dir(rv) if isinstance(getattr(rv, attr, None), type)] if classes: log(f"Revocation has classes: {classes[:5]}", "pass") PASSED += 1 else: log("Revocation module missing key components", "fail") FAILED += 1 except Exception as e: log(f"Revocation inspection failed: {e}", "fail") FAILED += 1 finally: if str(RUNTIME_PATH) in sys.path: sys.path.remove(str(RUNTIME_PATH)) # 4. Check for violation types content = revocation_module.read_text() violation_patterns = ['violation', 'breach', 'unauthorized', 'exceed', 'limit'] found_violations = [v for v in violation_patterns if v in content.lower()] if found_violations: log(f"Revocation defines violation types", "pass") PASSED += 1 else: log("Revocation missing violation definitions", "fail") FAILED += 1 # 5. Check for immediate revocation logic if 'immediate' in content.lower() or 'critical' in content.lower() or 'emergency' in content.lower(): log("Revocation has immediate/critical handling", "pass") PASSED += 1 else: log("Revocation missing immediate handling", "info") return True def test_monitor_daemon(): """Test that continuous monitoring is active""" global PASSED, FAILED print("\n[TEST] monitor_daemon") # 1. Check monitors module exists monitors_module = RUNTIME_PATH / "monitors.py" if not monitors_module.exists(): log(f"Monitors module not found: {monitors_module}", "fail") FAILED += 1 return False log("Monitors module exists", "pass") PASSED += 1 # 2. Check monitors module can be imported try: sys.path.insert(0, str(RUNTIME_PATH)) import monitors log("Monitors module importable", "pass") PASSED += 1 except ImportError as e: log(f"Failed to import monitors: {e}", "fail") FAILED += 1 return False finally: sys.path.pop(0) # 3. Check for monitor-related classes/functions try: sys.path.insert(0, str(RUNTIME_PATH)) import monitors as mon # Look for key monitor attributes key_attrs = ['Monitor', 'DaemonMonitor', 'TrustMonitor', 'ResourceMonitor', 'start', 'stop', 'check', 'watch'] found = [attr for attr in key_attrs if hasattr(mon, attr)] if found: log(f"Monitors has key components: {found}", "pass") PASSED += 1 else: classes = [attr for attr in dir(mon) if isinstance(getattr(mon, attr, None), type)] if classes: log(f"Monitors has classes: {classes[:5]}", "pass") PASSED += 1 else: log("Monitors module missing key components", "fail") FAILED += 1 except Exception as e: log(f"Monitors inspection failed: {e}", "fail") FAILED += 1 finally: if str(RUNTIME_PATH) in sys.path: sys.path.remove(str(RUNTIME_PATH)) # 4. Check for daemon/background logic content = monitors_module.read_text() daemon_patterns = ['daemon', 'thread', 'async', 'background', 'loop', 'schedule'] found_daemon = [d for d in daemon_patterns if d in content.lower()] if found_daemon: log(f"Monitors has daemon patterns: {found_daemon[:3]}", "pass") PASSED += 1 else: log("Monitors missing daemon patterns", "fail") FAILED += 1 # 5. Check health manager integration health_module = RUNTIME_PATH / "health_manager.py" if health_module.exists(): log("Health manager exists for monitoring", "pass") PASSED += 1 else: log("Health manager not found", "info") # 6. Check circuit breaker integration circuit_module = RUNTIME_PATH / "circuit_breaker.py" if circuit_module.exists(): log("Circuit breaker exists for fault tolerance", "pass") PASSED += 1 else: log("Circuit breaker not found", "info") return True def main(): """Run all Phase 4 tests""" global PASSED, FAILED print("\n" + "=" * 60) print("PHASE 4: PROMOTION AND REVOCATION ENGINE TESTS") print("=" * 60) try: test_promotion_logic() test_revocation_triggers() test_monitor_daemon() except Exception as e: print(f"\n\033[91mTest execution error: {e}\033[0m") FAILED += 1 print("\n" + "=" * 60) print(f"RESULTS: {PASSED} passed, {FAILED} failed") print("=" * 60 + "\n") return FAILED == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)