#!/usr/bin/env python3 """ Test Runner for Agent Governance System Runs unit tests, integration tests, and scenario tests. """ import sys import os import unittest import argparse from pathlib import Path from datetime import datetime # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent)) from mocks import MockVault, MockDragonfly, MockLLM, MockBlackboard class TestResult: """Custom test result tracker""" def __init__(self): self.passed = 0 self.failed = 0 self.errors = 0 self.skipped = 0 self.failures = [] def add_success(self, test_name: str): self.passed += 1 print(f" ✓ {test_name}") def add_failure(self, test_name: str, message: str): self.failed += 1 self.failures.append((test_name, message)) print(f" ✗ {test_name}") print(f" {message}") def add_error(self, test_name: str, error: Exception): self.errors += 1 self.failures.append((test_name, str(error))) print(f" ✗ {test_name} (ERROR)") print(f" {error}") def summary(self) -> str: total = self.passed + self.failed + self.errors return f"{self.passed}/{total} passed, {self.failed} failed, {self.errors} errors" def run_test(name: str, test_func, result: TestResult): """Run a single test function""" try: test_func() result.add_success(name) except AssertionError as e: result.add_failure(name, str(e)) except Exception as e: result.add_error(name, e) # === Unit Tests: MockVault === def test_vault_approle_auth(): """Test AppRole authentication""" vault = MockVault() # Get credentials role_id = vault.get_role_id("tier0-agent") assert role_id is not None, "Should get role_id" secret_id = vault.generate_secret_id("tier0-agent") assert secret_id is not None, "Should generate secret_id" # Authenticate success, token, msg = vault.approle_login(role_id, secret_id) assert success, f"Should authenticate: {msg}" assert token is not None, "Should receive token" def test_vault_policy_check(): """Test policy enforcement""" vault = MockVault() # Create token with t0-observer policy vault.inject_token("test-token", ["t0-observer"]) # Should be able to read docs assert vault.check_policy("test-token", "secret/data/docs/readme", "read") # Should not be able to write assert not vault.check_policy("test-token", "secret/data/docs/readme", "create") # Should not access SSH assert not vault.check_policy("test-token", "ssh/creds/sandbox-user", "read") def test_vault_token_lifecycle(): """Test token creation, validation, revocation""" vault = MockVault() vault.inject_token("test-token", ["t0-observer"], ttl=3600) # Should be valid valid, token = vault.validate_token("test-token") assert valid, "Token should be valid" # Revoke assert vault.revoke_token("test-token"), "Should revoke" # Should be invalid after revocation valid, _ = vault.validate_token("test-token") assert not valid, "Token should be invalid after revocation" # === Unit Tests: MockDragonfly === def test_dragonfly_strings(): """Test string operations""" db = MockDragonfly() db.set("key1", "value1") assert db.get("key1") == "value1" db.set("counter", "0") assert db.incr("counter") == 1 assert db.incr("counter", 5) == 6 def test_dragonfly_hashes(): """Test hash operations""" db = MockDragonfly() db.hset("agent:001:state", mapping={ "phase": "EXECUTE", "step": "1", "status": "running" }) assert db.hget("agent:001:state", "phase") == "EXECUTE" assert db.hgetall("agent:001:state")["status"] == "running" def test_dragonfly_locks(): """Test distributed locks""" db = MockDragonfly() # Acquire lock assert db.acquire_lock("task:001", "agent-A", ttl=30) # Different owner should fail assert not db.acquire_lock("task:001", "agent-B", ttl=30) # Same owner should succeed (refresh) assert db.acquire_lock("task:001", "agent-A", ttl=30) # Release assert db.release_lock("task:001", "agent-A") # Now agent-B can acquire assert db.acquire_lock("task:001", "agent-B", ttl=30) def test_dragonfly_expiry(): """Test key expiration""" db = MockDragonfly() db.set("temp", "value", ex=1) assert db.get("temp") == "value" # Manually expire (in real usage, would wait) db._expiry["temp"] = datetime.utcnow() assert db.get("temp") is None # === Unit Tests: MockLLM === def test_llm_basic_response(): """Test basic LLM response matching""" llm = MockLLM() response, meta = llm.complete("Please read this document") assert "EXECUTE" in response assert meta["confidence"] >= 0.5 def test_llm_pattern_matching(): """Test custom pattern matching""" llm = MockLLM() llm.add_response( pattern="deploy.*nginx", response='{"action": "deploy_nginx", "confidence": 0.9}', confidence=0.9 ) response, meta = llm.complete("Please deploy nginx to the server") assert "deploy_nginx" in response assert meta["confidence"] == 0.9 def test_llm_error_injection(): """Test error injection for failure testing""" llm = MockLLM() llm.set_error_mode("timeout", after_calls=2) # First two calls succeed llm.complete("test 1") llm.complete("test 2") # Third call should fail try: llm.complete("test 3") assert False, "Should have raised TimeoutError" except TimeoutError: pass # === Unit Tests: MockBlackboard === def test_blackboard_write_read(): """Test blackboard write and read""" bb = MockBlackboard("task-001") bb.write("problem", "objective", {"goal": "Test the system"}, "agent-A") result = bb.read("problem", "objective") assert result["goal"] == "Test the system" def test_blackboard_consensus(): """Test consensus voting""" bb = MockBlackboard("task-001") # Submit proposal bb.submit_proposal("prop-1", {"action": "deploy"}, "agent-A") # Vote bb.vote("prop-1", "agent-A", "ACCEPT", "Looks good") bb.vote("prop-1", "agent-B", "ACCEPT", "Agreed") bb.vote("prop-1", "agent-C", "REJECT", "Need more testing") # Check consensus status = bb.check_consensus("prop-1", ["agent-A", "agent-B", "agent-C"]) assert status["reached"], "Consensus should be reached" assert status["result"] == "ACCEPT", "Should be accepted (2-1)" assert status["votes"]["ACCEPT"] == 2 assert status["votes"]["REJECT"] == 1 def test_blackboard_progress(): """Test progress tracking""" bb = MockBlackboard("task-001") bb.update_progress("agent-A", "EXECUTE", "step-1", {"status": "running"}) bb.update_progress("agent-B", "PLAN", "analysis", {"status": "complete"}) progress = bb.get_all_progress() assert "agent-A" in progress assert progress["agent-A"]["phase"] == "EXECUTE" assert "agent-B" in progress # === Integration Tests === def test_agent_bootstrap_flow(): """Test complete agent bootstrap flow using mocks""" vault = MockVault() db = MockDragonfly() # Simulate bootstrap role_id = vault.get_role_id("tier1-agent") secret_id = vault.generate_secret_id("tier1-agent") success, token, _ = vault.approle_login(role_id, secret_id) assert success, "Auth should succeed" # Store agent state db.hset("agent:test-001:state", mapping={ "status": "bootstrapped", "tier": "1", "token_accessor": "test-accessor" }) # Acquire execution lock assert db.acquire_lock("agent:test-001:lock", "test-001", ttl=300) # Verify state state = db.hgetall("agent:test-001:state") assert state["status"] == "bootstrapped" def test_multi_agent_coordination(): """Test multi-agent coordination via blackboard""" bb = MockBlackboard("task-001") # Agent A posts problem analysis bb.write("problem", "analysis", { "objective": "Deploy microservice", "constraints": ["sandbox only", "no prod access"] }, "agent-A") # Agent B posts solution bb.submit_proposal("solution-1", { "approach": "container deployment", "steps": ["build", "test", "deploy"] }, "agent-B") # Both agents vote bb.vote("solution-1", "agent-A", "ACCEPT", "Approach looks correct") bb.vote("solution-1", "agent-B", "ACCEPT", "Ready to proceed") # Check consensus status = bb.check_consensus("solution-1", ["agent-A", "agent-B"]) assert status["reached"] assert status["result"] == "ACCEPT" # Update progress bb.update_progress("agent-B", "EXECUTE", "deploy", {"container": "nginx:latest"}) progress = bb.get_all_progress() assert progress["agent-B"]["phase"] == "EXECUTE" def test_error_budget_tracking(): """Test error budget tracking across components""" db = MockDragonfly() agent_id = "test-agent-001" # Initialize error counters db.hset(f"agent:{agent_id}:errors", mapping={ "total_errors": "0", "same_error_count": "0", "procedure_violations": "0" }) # Simulate errors db.hincrby(f"agent:{agent_id}:errors", "total_errors") db.hincrby(f"agent:{agent_id}:errors", "total_errors") errors = db.hgetall(f"agent:{agent_id}:errors") assert int(errors["total_errors"]) == 2 # Check if within budget (max 8) within_budget = int(errors["total_errors"]) < 8 assert within_budget # === Scenario Tests === def test_scenario_tier_promotion(): """Scenario: Agent completes tasks and gets promoted""" vault = MockVault() db = MockDragonfly() agent_id = "promo-agent" # Initialize as T0 db.hset(f"agent:{agent_id}:metrics", mapping={ "tier": "0", "compliant_runs": "0", "consecutive_compliant": "0" }) # Simulate 5 compliant runs for i in range(5): db.hincrby(f"agent:{agent_id}:metrics", "compliant_runs") db.hincrby(f"agent:{agent_id}:metrics", "consecutive_compliant") metrics = db.hgetall(f"agent:{agent_id}:metrics") assert int(metrics["compliant_runs"]) == 5 assert int(metrics["consecutive_compliant"]) == 5 # Check promotion eligibility (T0->T1: 5 runs, 3 consecutive) eligible = ( int(metrics["compliant_runs"]) >= 5 and int(metrics["consecutive_compliant"]) >= 3 ) assert eligible, "Should be eligible for promotion" def test_scenario_violation_revocation(): """Scenario: Agent violates policy and gets revoked""" vault = MockVault() db = MockDragonfly() llm = MockLLM() agent_id = "violator-agent" # Agent tries forbidden action vault.inject_token("agent-token", ["t0-observer"]) allowed = vault.check_policy("agent-token", "ssh/creds/sandbox-user", "read") assert not allowed, "T0 should not access SSH" # Record violation db.hset(f"agent:{agent_id}:errors", "procedure_violations", "1") # Set revocation signal db.set(f"agent:{agent_id}:revoke_signal", "1") # Agent should detect revocation revoked = db.get(f"agent:{agent_id}:revoke_signal") == "1" assert revoked, "Agent should be revoked" # Token should be revoked vault.revoke_token("agent-token") valid, _ = vault.validate_token("agent-token") assert not valid, "Token should be invalid" def test_scenario_multi_agent_conflict_resolution(): """Scenario: Multiple agents resolve a conflict via mediator""" bb = MockBlackboard("conflict-task") # Agent A and B have different proposals bb.submit_proposal("prop-A", {"approach": "terraform"}, "agent-A") bb.submit_proposal("prop-B", {"approach": "ansible"}, "agent-B") # They vote on each other's proposals bb.vote("prop-A", "agent-A", "ACCEPT") bb.vote("prop-A", "agent-B", "REJECT") bb.vote("prop-B", "agent-A", "REJECT") bb.vote("prop-B", "agent-B", "ACCEPT") # Neither reaches consensus - need mediator status_a = bb.check_consensus("prop-A", ["agent-A", "agent-B"]) status_b = bb.check_consensus("prop-B", ["agent-A", "agent-B"]) assert not status_a["reached"] or status_a["result"] == "TIE" assert not status_b["reached"] or status_b["result"] == "TIE" # Agent GAMMA (mediator) makes final decision bb.vote("prop-A", "agent-GAMMA", "ACCEPT", "Terraform is more suitable for infrastructure") # Now consensus is reached final_status = bb.check_consensus("prop-A", ["agent-A", "agent-B", "agent-GAMMA"]) assert final_status["votes"]["ACCEPT"] == 2 assert final_status["votes"]["REJECT"] == 1 def main(): parser = argparse.ArgumentParser(description="Run agent governance tests") parser.add_argument("--unit", action="store_true", help="Run unit tests only") parser.add_argument("--integration", action="store_true", help="Run integration tests only") parser.add_argument("--scenario", action="store_true", help="Run scenario tests only") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args() run_all = not (args.unit or args.integration or args.scenario) result = TestResult() print("=" * 60) print("Agent Governance Test Suite") print("=" * 60) if run_all or args.unit: print("\n--- Unit Tests: MockVault ---") run_test("test_vault_approle_auth", test_vault_approle_auth, result) run_test("test_vault_policy_check", test_vault_policy_check, result) run_test("test_vault_token_lifecycle", test_vault_token_lifecycle, result) print("\n--- Unit Tests: MockDragonfly ---") run_test("test_dragonfly_strings", test_dragonfly_strings, result) run_test("test_dragonfly_hashes", test_dragonfly_hashes, result) run_test("test_dragonfly_locks", test_dragonfly_locks, result) run_test("test_dragonfly_expiry", test_dragonfly_expiry, result) print("\n--- Unit Tests: MockLLM ---") run_test("test_llm_basic_response", test_llm_basic_response, result) run_test("test_llm_pattern_matching", test_llm_pattern_matching, result) run_test("test_llm_error_injection", test_llm_error_injection, result) print("\n--- Unit Tests: MockBlackboard ---") run_test("test_blackboard_write_read", test_blackboard_write_read, result) run_test("test_blackboard_consensus", test_blackboard_consensus, result) run_test("test_blackboard_progress", test_blackboard_progress, result) if run_all or args.integration: print("\n--- Integration Tests ---") run_test("test_agent_bootstrap_flow", test_agent_bootstrap_flow, result) run_test("test_multi_agent_coordination", test_multi_agent_coordination, result) run_test("test_error_budget_tracking", test_error_budget_tracking, result) if run_all or args.scenario: print("\n--- Scenario Tests ---") run_test("test_scenario_tier_promotion", test_scenario_tier_promotion, result) run_test("test_scenario_violation_revocation", test_scenario_violation_revocation, result) run_test("test_scenario_multi_agent_conflict_resolution", test_scenario_multi_agent_conflict_resolution, result) print("\n" + "=" * 60) print(f"Results: {result.summary()}") print("=" * 60) return 0 if result.failed == 0 and result.errors == 0 else 1 if __name__ == "__main__": sys.exit(main())