agent-governance/tests/test_runner.py
profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

503 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Test Runner for Agent Governance System
Runs unit tests, integration tests, and scenario tests.
"""
import sys
import os
import unittest
import argparse
from pathlib import Path
from datetime import datetime
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
sys.path.insert(0, str(Path(__file__).parent))
from mocks import MockVault, MockDragonfly, MockLLM, MockBlackboard
class TestResult:
"""Custom test result tracker"""
def __init__(self):
self.passed = 0
self.failed = 0
self.errors = 0
self.skipped = 0
self.failures = []
def add_success(self, test_name: str):
self.passed += 1
print(f"{test_name}")
def add_failure(self, test_name: str, message: str):
self.failed += 1
self.failures.append((test_name, message))
print(f"{test_name}")
print(f" {message}")
def add_error(self, test_name: str, error: Exception):
self.errors += 1
self.failures.append((test_name, str(error)))
print(f"{test_name} (ERROR)")
print(f" {error}")
def summary(self) -> str:
total = self.passed + self.failed + self.errors
return f"{self.passed}/{total} passed, {self.failed} failed, {self.errors} errors"
def run_test(name: str, test_func, result: TestResult):
"""Run a single test function"""
try:
test_func()
result.add_success(name)
except AssertionError as e:
result.add_failure(name, str(e))
except Exception as e:
result.add_error(name, e)
# === Unit Tests: MockVault ===
def test_vault_approle_auth():
"""Test AppRole authentication"""
vault = MockVault()
# Get credentials
role_id = vault.get_role_id("tier0-agent")
assert role_id is not None, "Should get role_id"
secret_id = vault.generate_secret_id("tier0-agent")
assert secret_id is not None, "Should generate secret_id"
# Authenticate
success, token, msg = vault.approle_login(role_id, secret_id)
assert success, f"Should authenticate: {msg}"
assert token is not None, "Should receive token"
def test_vault_policy_check():
"""Test policy enforcement"""
vault = MockVault()
# Create token with t0-observer policy
vault.inject_token("test-token", ["t0-observer"])
# Should be able to read docs
assert vault.check_policy("test-token", "secret/data/docs/readme", "read")
# Should not be able to write
assert not vault.check_policy("test-token", "secret/data/docs/readme", "create")
# Should not access SSH
assert not vault.check_policy("test-token", "ssh/creds/sandbox-user", "read")
def test_vault_token_lifecycle():
"""Test token creation, validation, revocation"""
vault = MockVault()
vault.inject_token("test-token", ["t0-observer"], ttl=3600)
# Should be valid
valid, token = vault.validate_token("test-token")
assert valid, "Token should be valid"
# Revoke
assert vault.revoke_token("test-token"), "Should revoke"
# Should be invalid after revocation
valid, _ = vault.validate_token("test-token")
assert not valid, "Token should be invalid after revocation"
# === Unit Tests: MockDragonfly ===
def test_dragonfly_strings():
"""Test string operations"""
db = MockDragonfly()
db.set("key1", "value1")
assert db.get("key1") == "value1"
db.set("counter", "0")
assert db.incr("counter") == 1
assert db.incr("counter", 5) == 6
def test_dragonfly_hashes():
"""Test hash operations"""
db = MockDragonfly()
db.hset("agent:001:state", mapping={
"phase": "EXECUTE",
"step": "1",
"status": "running"
})
assert db.hget("agent:001:state", "phase") == "EXECUTE"
assert db.hgetall("agent:001:state")["status"] == "running"
def test_dragonfly_locks():
"""Test distributed locks"""
db = MockDragonfly()
# Acquire lock
assert db.acquire_lock("task:001", "agent-A", ttl=30)
# Different owner should fail
assert not db.acquire_lock("task:001", "agent-B", ttl=30)
# Same owner should succeed (refresh)
assert db.acquire_lock("task:001", "agent-A", ttl=30)
# Release
assert db.release_lock("task:001", "agent-A")
# Now agent-B can acquire
assert db.acquire_lock("task:001", "agent-B", ttl=30)
def test_dragonfly_expiry():
"""Test key expiration"""
db = MockDragonfly()
db.set("temp", "value", ex=1)
assert db.get("temp") == "value"
# Manually expire (in real usage, would wait)
db._expiry["temp"] = datetime.utcnow()
assert db.get("temp") is None
# === Unit Tests: MockLLM ===
def test_llm_basic_response():
"""Test basic LLM response matching"""
llm = MockLLM()
response, meta = llm.complete("Please read this document")
assert "EXECUTE" in response
assert meta["confidence"] >= 0.5
def test_llm_pattern_matching():
"""Test custom pattern matching"""
llm = MockLLM()
llm.add_response(
pattern="deploy.*nginx",
response='{"action": "deploy_nginx", "confidence": 0.9}',
confidence=0.9
)
response, meta = llm.complete("Please deploy nginx to the server")
assert "deploy_nginx" in response
assert meta["confidence"] == 0.9
def test_llm_error_injection():
"""Test error injection for failure testing"""
llm = MockLLM()
llm.set_error_mode("timeout", after_calls=2)
# First two calls succeed
llm.complete("test 1")
llm.complete("test 2")
# Third call should fail
try:
llm.complete("test 3")
assert False, "Should have raised TimeoutError"
except TimeoutError:
pass
# === Unit Tests: MockBlackboard ===
def test_blackboard_write_read():
"""Test blackboard write and read"""
bb = MockBlackboard("task-001")
bb.write("problem", "objective", {"goal": "Test the system"}, "agent-A")
result = bb.read("problem", "objective")
assert result["goal"] == "Test the system"
def test_blackboard_consensus():
"""Test consensus voting"""
bb = MockBlackboard("task-001")
# Submit proposal
bb.submit_proposal("prop-1", {"action": "deploy"}, "agent-A")
# Vote
bb.vote("prop-1", "agent-A", "ACCEPT", "Looks good")
bb.vote("prop-1", "agent-B", "ACCEPT", "Agreed")
bb.vote("prop-1", "agent-C", "REJECT", "Need more testing")
# Check consensus
status = bb.check_consensus("prop-1", ["agent-A", "agent-B", "agent-C"])
assert status["reached"], "Consensus should be reached"
assert status["result"] == "ACCEPT", "Should be accepted (2-1)"
assert status["votes"]["ACCEPT"] == 2
assert status["votes"]["REJECT"] == 1
def test_blackboard_progress():
"""Test progress tracking"""
bb = MockBlackboard("task-001")
bb.update_progress("agent-A", "EXECUTE", "step-1", {"status": "running"})
bb.update_progress("agent-B", "PLAN", "analysis", {"status": "complete"})
progress = bb.get_all_progress()
assert "agent-A" in progress
assert progress["agent-A"]["phase"] == "EXECUTE"
assert "agent-B" in progress
# === Integration Tests ===
def test_agent_bootstrap_flow():
"""Test complete agent bootstrap flow using mocks"""
vault = MockVault()
db = MockDragonfly()
# Simulate bootstrap
role_id = vault.get_role_id("tier1-agent")
secret_id = vault.generate_secret_id("tier1-agent")
success, token, _ = vault.approle_login(role_id, secret_id)
assert success, "Auth should succeed"
# Store agent state
db.hset("agent:test-001:state", mapping={
"status": "bootstrapped",
"tier": "1",
"token_accessor": "test-accessor"
})
# Acquire execution lock
assert db.acquire_lock("agent:test-001:lock", "test-001", ttl=300)
# Verify state
state = db.hgetall("agent:test-001:state")
assert state["status"] == "bootstrapped"
def test_multi_agent_coordination():
"""Test multi-agent coordination via blackboard"""
bb = MockBlackboard("task-001")
# Agent A posts problem analysis
bb.write("problem", "analysis", {
"objective": "Deploy microservice",
"constraints": ["sandbox only", "no prod access"]
}, "agent-A")
# Agent B posts solution
bb.submit_proposal("solution-1", {
"approach": "container deployment",
"steps": ["build", "test", "deploy"]
}, "agent-B")
# Both agents vote
bb.vote("solution-1", "agent-A", "ACCEPT", "Approach looks correct")
bb.vote("solution-1", "agent-B", "ACCEPT", "Ready to proceed")
# Check consensus
status = bb.check_consensus("solution-1", ["agent-A", "agent-B"])
assert status["reached"]
assert status["result"] == "ACCEPT"
# Update progress
bb.update_progress("agent-B", "EXECUTE", "deploy", {"container": "nginx:latest"})
progress = bb.get_all_progress()
assert progress["agent-B"]["phase"] == "EXECUTE"
def test_error_budget_tracking():
"""Test error budget tracking across components"""
db = MockDragonfly()
agent_id = "test-agent-001"
# Initialize error counters
db.hset(f"agent:{agent_id}:errors", mapping={
"total_errors": "0",
"same_error_count": "0",
"procedure_violations": "0"
})
# Simulate errors
db.hincrby(f"agent:{agent_id}:errors", "total_errors")
db.hincrby(f"agent:{agent_id}:errors", "total_errors")
errors = db.hgetall(f"agent:{agent_id}:errors")
assert int(errors["total_errors"]) == 2
# Check if within budget (max 8)
within_budget = int(errors["total_errors"]) < 8
assert within_budget
# === Scenario Tests ===
def test_scenario_tier_promotion():
"""Scenario: Agent completes tasks and gets promoted"""
vault = MockVault()
db = MockDragonfly()
agent_id = "promo-agent"
# Initialize as T0
db.hset(f"agent:{agent_id}:metrics", mapping={
"tier": "0",
"compliant_runs": "0",
"consecutive_compliant": "0"
})
# Simulate 5 compliant runs
for i in range(5):
db.hincrby(f"agent:{agent_id}:metrics", "compliant_runs")
db.hincrby(f"agent:{agent_id}:metrics", "consecutive_compliant")
metrics = db.hgetall(f"agent:{agent_id}:metrics")
assert int(metrics["compliant_runs"]) == 5
assert int(metrics["consecutive_compliant"]) == 5
# Check promotion eligibility (T0->T1: 5 runs, 3 consecutive)
eligible = (
int(metrics["compliant_runs"]) >= 5 and
int(metrics["consecutive_compliant"]) >= 3
)
assert eligible, "Should be eligible for promotion"
def test_scenario_violation_revocation():
"""Scenario: Agent violates policy and gets revoked"""
vault = MockVault()
db = MockDragonfly()
llm = MockLLM()
agent_id = "violator-agent"
# Agent tries forbidden action
vault.inject_token("agent-token", ["t0-observer"])
allowed = vault.check_policy("agent-token", "ssh/creds/sandbox-user", "read")
assert not allowed, "T0 should not access SSH"
# Record violation
db.hset(f"agent:{agent_id}:errors", "procedure_violations", "1")
# Set revocation signal
db.set(f"agent:{agent_id}:revoke_signal", "1")
# Agent should detect revocation
revoked = db.get(f"agent:{agent_id}:revoke_signal") == "1"
assert revoked, "Agent should be revoked"
# Token should be revoked
vault.revoke_token("agent-token")
valid, _ = vault.validate_token("agent-token")
assert not valid, "Token should be invalid"
def test_scenario_multi_agent_conflict_resolution():
"""Scenario: Multiple agents resolve a conflict via mediator"""
bb = MockBlackboard("conflict-task")
# Agent A and B have different proposals
bb.submit_proposal("prop-A", {"approach": "terraform"}, "agent-A")
bb.submit_proposal("prop-B", {"approach": "ansible"}, "agent-B")
# They vote on each other's proposals
bb.vote("prop-A", "agent-A", "ACCEPT")
bb.vote("prop-A", "agent-B", "REJECT")
bb.vote("prop-B", "agent-A", "REJECT")
bb.vote("prop-B", "agent-B", "ACCEPT")
# Neither reaches consensus - need mediator
status_a = bb.check_consensus("prop-A", ["agent-A", "agent-B"])
status_b = bb.check_consensus("prop-B", ["agent-A", "agent-B"])
assert not status_a["reached"] or status_a["result"] == "TIE"
assert not status_b["reached"] or status_b["result"] == "TIE"
# Agent GAMMA (mediator) makes final decision
bb.vote("prop-A", "agent-GAMMA", "ACCEPT", "Terraform is more suitable for infrastructure")
# Now consensus is reached
final_status = bb.check_consensus("prop-A", ["agent-A", "agent-B", "agent-GAMMA"])
assert final_status["votes"]["ACCEPT"] == 2
assert final_status["votes"]["REJECT"] == 1
def main():
parser = argparse.ArgumentParser(description="Run agent governance tests")
parser.add_argument("--unit", action="store_true", help="Run unit tests only")
parser.add_argument("--integration", action="store_true", help="Run integration tests only")
parser.add_argument("--scenario", action="store_true", help="Run scenario tests only")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
run_all = not (args.unit or args.integration or args.scenario)
result = TestResult()
print("=" * 60)
print("Agent Governance Test Suite")
print("=" * 60)
if run_all or args.unit:
print("\n--- Unit Tests: MockVault ---")
run_test("test_vault_approle_auth", test_vault_approle_auth, result)
run_test("test_vault_policy_check", test_vault_policy_check, result)
run_test("test_vault_token_lifecycle", test_vault_token_lifecycle, result)
print("\n--- Unit Tests: MockDragonfly ---")
run_test("test_dragonfly_strings", test_dragonfly_strings, result)
run_test("test_dragonfly_hashes", test_dragonfly_hashes, result)
run_test("test_dragonfly_locks", test_dragonfly_locks, result)
run_test("test_dragonfly_expiry", test_dragonfly_expiry, result)
print("\n--- Unit Tests: MockLLM ---")
run_test("test_llm_basic_response", test_llm_basic_response, result)
run_test("test_llm_pattern_matching", test_llm_pattern_matching, result)
run_test("test_llm_error_injection", test_llm_error_injection, result)
print("\n--- Unit Tests: MockBlackboard ---")
run_test("test_blackboard_write_read", test_blackboard_write_read, result)
run_test("test_blackboard_consensus", test_blackboard_consensus, result)
run_test("test_blackboard_progress", test_blackboard_progress, result)
if run_all or args.integration:
print("\n--- Integration Tests ---")
run_test("test_agent_bootstrap_flow", test_agent_bootstrap_flow, result)
run_test("test_multi_agent_coordination", test_multi_agent_coordination, result)
run_test("test_error_budget_tracking", test_error_budget_tracking, result)
if run_all or args.scenario:
print("\n--- Scenario Tests ---")
run_test("test_scenario_tier_promotion", test_scenario_tier_promotion, result)
run_test("test_scenario_violation_revocation", test_scenario_violation_revocation, result)
run_test("test_scenario_multi_agent_conflict_resolution",
test_scenario_multi_agent_conflict_resolution, result)
print("\n" + "=" * 60)
print(f"Results: {result.summary()}")
print("=" * 60)
return 0 if result.failed == 0 and result.errors == 0 else 1
if __name__ == "__main__":
sys.exit(main())