#!/usr/bin/env python3 """ Agent Tests =========== Comprehensive tests for all agent implementations: - tier0-agent (Observer) - tier1-agent (Operator) - llm-planner (Python) - multi-agent (TypeScript orchestration) Tests verify: - Agent initialization and configuration - Governance integration (ledger, heartbeat) - Action constraints (allowed/forbidden) - Multi-agent coordination infrastructure """ import json import os import sqlite3 import subprocess import sys from pathlib import Path from datetime import datetime # Paths BASE_PATH = Path("/opt/agent-governance") AGENTS_PATH = BASE_PATH / "agents" LEDGER_PATH = BASE_PATH / "ledger" / "governance.db" # Test results PASSED = 0 FAILED = 0 def log(msg: str, status: str = "info"): """Log a message""" icons = {"pass": "\033[92m✓\033[0m", "fail": "\033[91m✗\033[0m", "info": "→"} print(f" {icons.get(status, '•')} {msg}") # ============================================================================= # Tier 0 Agent Tests # ============================================================================= def test_tier0_agent(): """Test Tier 0 Observer agent""" global PASSED, FAILED print("\n[TEST] tier0_agent") tier0_path = AGENTS_PATH / "tier0-agent" # 1. Check agent files exist agent_file = tier0_path / "agent.py" if agent_file.exists(): log("agent.py exists", "pass") PASSED += 1 else: log("agent.py missing", "fail") FAILED += 1 return # 2. Check config exists config_file = tier0_path / "config" / "agent.json" if config_file.exists(): log("config/agent.json exists", "pass") PASSED += 1 # Validate config structure try: with open(config_file) as f: config = json.load(f) if "agent_id" in config: log(f"Agent ID: {config['agent_id']}", "pass") PASSED += 1 if config.get("tier") == 0: log("Tier correctly set to 0", "pass") PASSED += 1 if "constraints" in config: constraints = config["constraints"] if "allowed_actions" in constraints: log(f"Allowed actions: {len(constraints['allowed_actions'])}", "pass") PASSED += 1 if "forbidden_actions" in constraints: log(f"Forbidden actions: {len(constraints['forbidden_actions'])}", "pass") PASSED += 1 except Exception as e: log(f"Config validation error: {e}", "fail") FAILED += 1 else: log("config/agent.json missing", "fail") FAILED += 1 # 3. Check bootstrap script bootstrap = tier0_path / "bootstrap.sh" if bootstrap.exists() and os.access(bootstrap, os.X_OK): log("bootstrap.sh exists and is executable", "pass") PASSED += 1 else: log("bootstrap.sh missing or not executable", "fail") FAILED += 1 # 4. Check run script run_script = tier0_path / "run-agent.sh" if run_script.exists() and os.access(run_script, os.X_OK): log("run-agent.sh exists and is executable", "pass") PASSED += 1 else: log("run-agent.sh missing or not executable", "fail") FAILED += 1 # 5. Test agent import try: sys.path.insert(0, str(tier0_path)) import importlib.util spec = importlib.util.spec_from_file_location("agent", agent_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) log("Agent module importable", "pass") PASSED += 1 # Check for key classes if hasattr(module, "Tier0Agent") or hasattr(module, "ActionResult"): log("Agent classes defined", "pass") PASSED += 1 except Exception as e: log(f"Agent import error: {e}", "fail") FAILED += 1 # ============================================================================= # Tier 1 Agent Tests # ============================================================================= def test_tier1_agent(): """Test Tier 1 Operator agent""" global PASSED, FAILED print("\n[TEST] tier1_agent") tier1_path = AGENTS_PATH / "tier1-agent" # 1. Check agent files exist agent_file = tier1_path / "agent.py" if agent_file.exists(): log("agent.py exists", "pass") PASSED += 1 else: log("agent.py missing", "fail") FAILED += 1 return # 2. Check agent size (should be substantial) file_size = agent_file.stat().st_size if file_size > 20000: # > 20KB log(f"Agent implementation size: {file_size // 1024}KB", "pass") PASSED += 1 else: log(f"Agent seems small: {file_size // 1024}KB", "fail") FAILED += 1 # 3. Check config config_file = tier1_path / "config" / "agent.json" if config_file.exists(): log("config/agent.json exists", "pass") PASSED += 1 try: with open(config_file) as f: config = json.load(f) if config.get("tier") == 1: log("Tier correctly set to 1", "pass") PASSED += 1 # Tier 1 should have execution capabilities allowed = config.get("constraints", {}).get("allowed_actions", []) if "execute_command" in allowed or "write_file" in allowed: log("Execution capabilities enabled", "pass") PASSED += 1 except Exception as e: log(f"Config error: {e}", "fail") FAILED += 1 # 4. Test forbidden actions are defined try: with open(agent_file) as f: content = f.read() if "FORBIDDEN_ACTIONS" in content or "forbidden_actions" in content: log("Forbidden actions defined", "pass") PASSED += 1 if "delete_production" in content.lower(): log("Production deletion blocked", "pass") PASSED += 1 if "access_vault_root" in content.lower(): log("Vault root access blocked", "pass") PASSED += 1 except Exception as e: log(f"Content check error: {e}", "fail") FAILED += 1 # 5. Test agent import try: import importlib.util spec = importlib.util.spec_from_file_location("agent", agent_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) log("Agent module importable", "pass") PASSED += 1 if hasattr(module, "Tier1Agent"): log("Tier1Agent class defined", "pass") PASSED += 1 # Check for execution methods cls = getattr(module, "Tier1Agent") methods = dir(cls) exec_methods = ["execute_command", "write_file", "terraform_plan", "ansible_run"] found = sum(1 for m in exec_methods if m in methods) log(f"Execution methods found: {found}/{len(exec_methods)}", "pass") PASSED += 1 except Exception as e: log(f"Agent import error: {e}", "fail") FAILED += 1 # ============================================================================= # LLM Planner Tests # ============================================================================= def test_llm_planner(): """Test LLM Planner agent""" global PASSED, FAILED print("\n[TEST] llm_planner") planner_path = AGENTS_PATH / "llm-planner" # 1. Check module files exist agent_file = planner_path / "agent.py" governance_file = planner_path / "governance.py" governed_file = planner_path / "governed_agent.py" for f, name in [(agent_file, "agent.py"), (governance_file, "governance.py"), (governed_file, "governed_agent.py")]: if f.exists(): log(f"{name} exists", "pass") PASSED += 1 else: log(f"{name} missing", "fail") FAILED += 1 # 2. Check venv exists venv_path = planner_path / ".venv" if venv_path.exists(): log("Virtual environment exists", "pass") PASSED += 1 else: log("Virtual environment missing", "fail") FAILED += 1 # 3. Check content for LLM integration try: with open(agent_file) as f: content = f.read() if "openai" in content.lower() or "OpenAI" in content: log("OpenAI SDK integration", "pass") PASSED += 1 if "confidence" in content.lower(): log("Confidence scoring implemented", "pass") PASSED += 1 if "AGENT_METADATA" in content: log("Agent metadata defined", "pass") PASSED += 1 except Exception as e: log(f"Content check error: {e}", "fail") FAILED += 1 # 4. Check governance module try: with open(governance_file) as f: content = f.read() if "DragonflyDB" in content or "dragonfly" in content.lower() or "redis" in content.lower(): log("DragonflyDB integration", "pass") PASSED += 1 if "AgentPhase" in content: log("AgentPhase enum defined", "pass") PASSED += 1 if "revocation" in content.lower() or "revoke" in content.lower(): log("Revocation handling", "pass") PASSED += 1 except Exception as e: log(f"Governance check error: {e}", "fail") FAILED += 1 # ============================================================================= # Multi-Agent Orchestration Tests # ============================================================================= def test_multi_agent(): """Test Multi-Agent orchestration system""" global PASSED, FAILED print("\n[TEST] multi_agent") multi_path = AGENTS_PATH / "multi-agent" # 1. Check TypeScript files exist ts_files = ["orchestrator.ts", "agents.ts", "coordination.ts", "types.ts", "governance.ts"] for ts_file in ts_files: path = multi_path / ts_file if path.exists(): log(f"{ts_file} exists", "pass") PASSED += 1 else: if ts_file == "governance.ts": log(f"{ts_file} missing (optional)", "info") else: log(f"{ts_file} missing", "fail") FAILED += 1 # 2. Check package.json package_json = multi_path / "package.json" if package_json.exists(): try: with open(package_json) as f: pkg = json.load(f) log(f"package.json valid (name: {pkg.get('name', 'N/A')})", "pass") PASSED += 1 # Check dependencies deps = pkg.get("dependencies", {}) if "redis" in deps or "@redis/client" in deps: log("Redis client dependency", "pass") PASSED += 1 if "openai" in deps: log("OpenAI dependency", "pass") PASSED += 1 except Exception as e: log(f"package.json error: {e}", "fail") FAILED += 1 # 3. Check node_modules node_modules = multi_path / "node_modules" if node_modules.exists() and node_modules.is_dir(): module_count = len(list(node_modules.iterdir())) log(f"node_modules installed ({module_count} packages)", "pass") PASSED += 1 else: log("node_modules not installed", "fail") FAILED += 1 # 4. Check orchestrator content orchestrator_file = multi_path / "orchestrator.ts" if orchestrator_file.exists(): content = orchestrator_file.read_text() if "MultiAgentOrchestrator" in content: log("MultiAgentOrchestrator class defined", "pass") PASSED += 1 if "AgentAlpha" in content and "AgentBeta" in content: log("Alpha/Beta agents imported", "pass") PASSED += 1 if "AgentGamma" in content: log("Gamma agent (conditional spawn) implemented", "pass") PASSED += 1 if "spawnGamma" in content or "SpawnController" in content: log("Spawn controller logic present", "pass") PASSED += 1 if "monitorConditions" in content: log("Condition monitoring implemented", "pass") PASSED += 1 # 5. Check coordination patterns coordination_file = multi_path / "coordination.ts" if coordination_file.exists(): content = coordination_file.read_text() patterns = [ ("Blackboard", "Blackboard pattern"), ("MessageBus", "Message bus"), ("AgentStateManager", "State management"), ("MetricsCollector", "Metrics collection"), ] for pattern, desc in patterns: if pattern in content: log(f"{desc} implemented", "pass") PASSED += 1 # 6. Check agents implementation agents_file = multi_path / "agents.ts" if agents_file.exists(): content = agents_file.read_text() agent_classes = ["AgentAlpha", "AgentBeta", "AgentGamma", "BaseAgent"] for cls in agent_classes: if f"class {cls}" in content or f"export class {cls}" in content: log(f"{cls} class defined", "pass") PASSED += 1 # ============================================================================= # Governance Integration Tests # ============================================================================= def test_governance_integration(): """Test governance ledger integration across agents""" global PASSED, FAILED print("\n[TEST] governance_integration") # 1. Check ledger exists if LEDGER_PATH.exists(): log("Governance ledger exists", "pass") PASSED += 1 else: log("Governance ledger missing", "fail") FAILED += 1 return # 2. Check ledger tables try: conn = sqlite3.connect(LEDGER_PATH) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] expected_tables = ["agent_actions", "agent_metrics"] for table in expected_tables: if table in tables: log(f"Table '{table}' exists", "pass") PASSED += 1 else: log(f"Table '{table}' missing", "fail") FAILED += 1 # 3. Check for orchestration table if "orchestration_log" in tables: log("orchestration_log table exists", "pass") PASSED += 1 else: log("orchestration_log table missing (may be created on first use)", "info") # 4. Check agent_actions has data cursor.execute("SELECT COUNT(*) FROM agent_actions") count = cursor.fetchone()[0] log(f"agent_actions has {count} records", "pass") PASSED += 1 conn.close() except Exception as e: log(f"Ledger check error: {e}", "fail") FAILED += 1 # 5. Check tier agents log to ledger tier0_file = AGENTS_PATH / "tier0-agent" / "agent.py" tier1_file = AGENTS_PATH / "tier1-agent" / "agent.py" for agent_file, name in [(tier0_file, "tier0"), (tier1_file, "tier1")]: if agent_file.exists(): content = agent_file.read_text() if "governance.db" in content or "log_action" in content or "ledger" in content.lower(): log(f"{name}-agent logs to ledger", "pass") PASSED += 1 else: log(f"{name}-agent may not log to ledger", "info") # ============================================================================= # Agent Health Check Tests # ============================================================================= def test_agent_health(): """Test agent health and diagnostics""" global PASSED, FAILED print("\n[TEST] agent_health") # 1. Check tier0 agent status command tier0_run = AGENTS_PATH / "tier0-agent" / "run-agent.sh" if tier0_run.exists(): try: result = subprocess.run( [str(tier0_run), "status"], capture_output=True, text=True, timeout=10, cwd=str(AGENTS_PATH / "tier0-agent") ) if result.returncode == 0 or "agent" in result.stdout.lower(): log("tier0 status command works", "pass") PASSED += 1 else: log(f"tier0 status returned: {result.returncode}", "info") except subprocess.TimeoutExpired: log("tier0 status timed out", "fail") FAILED += 1 except Exception as e: log(f"tier0 status error: {e}", "fail") FAILED += 1 # 2. Check tier1 agent status command tier1_run = AGENTS_PATH / "tier1-agent" / "run-agent.sh" if tier1_run.exists(): try: result = subprocess.run( [str(tier1_run), "status"], capture_output=True, text=True, timeout=10, cwd=str(AGENTS_PATH / "tier1-agent") ) if result.returncode == 0 or "agent" in result.stdout.lower(): log("tier1 status command works", "pass") PASSED += 1 else: log(f"tier1 status returned: {result.returncode}", "info") except subprocess.TimeoutExpired: log("tier1 status timed out", "fail") FAILED += 1 except Exception as e: log(f"tier1 status error: {e}", "fail") FAILED += 1 # 3. Check multi-agent TypeScript compiles multi_path = AGENTS_PATH / "multi-agent" try: result = subprocess.run( ["bun", "run", "tsc", "--noEmit"], capture_output=True, text=True, timeout=30, cwd=str(multi_path) ) if result.returncode == 0: log("multi-agent TypeScript compiles", "pass") PASSED += 1 else: # Try without tsc log("TypeScript check skipped (tsc not in scripts)", "info") except FileNotFoundError: log("bun not available for TypeScript check", "info") except Exception as e: log(f"TypeScript check error: {e}", "info") # ============================================================================= # Main # ============================================================================= def main(): """Run all agent tests""" global PASSED, FAILED print("\n" + "=" * 60) print("AGENT TESTS") print("=" * 60) try: test_tier0_agent() test_tier1_agent() test_llm_planner() test_multi_agent() test_governance_integration() test_agent_health() except Exception as e: print(f"\n\033[91mTest execution error: {e}\033[0m") import traceback traceback.print_exc() FAILED += 1 print("\n" + "=" * 60) print(f"RESULTS: {PASSED} passed, {FAILED} failed") print("=" * 60 + "\n") return FAILED == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)