profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

574 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Comprehensive Governance System Tests
======================================
Tests all components of the agent governance framework:
- Vault connectivity and policies
- DragonflyDB state management
- SQLite ledger
- Preflight system
- Promotion engine
- Revocation engine
- Checkpoint skill
- Model Controller
- Tier 0 agent
"""
import json
import os
import subprocess
import sys
import sqlite3
from pathlib import Path
from datetime import datetime
# Colors for output
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
RESET = '\033[0m'
RESULTS = {"passed": 0, "failed": 0, "skipped": 0}
def log_test(name: str, passed: bool, message: str = ""):
global RESULTS
if passed:
RESULTS["passed"] += 1
status = f"{GREEN}PASS{RESET}"
else:
RESULTS["failed"] += 1
status = f"{RED}FAIL{RESET}"
msg = f" - {message}" if message else ""
print(f" [{status}] {name}{msg}")
def log_skip(name: str, reason: str):
global RESULTS
RESULTS["skipped"] += 1
print(f" [{YELLOW}SKIP{RESET}] {name} - {reason}")
def log_section(title: str):
print(f"\n{BLUE}{'='*60}{RESET}")
print(f"{BLUE}{title}{RESET}")
print(f"{BLUE}{'='*60}{RESET}")
# =============================================================================
# Test: Vault Connectivity
# =============================================================================
def test_vault():
log_section("VAULT TESTS")
# Test 1: Vault is running
result = subprocess.run(
["docker", "exec", "vault", "vault", "status", "-format=json"],
capture_output=True, text=True
)
try:
status = json.loads(result.stdout)
log_test("Vault is running", True)
log_test("Vault is initialized", status.get("initialized", False))
log_test("Vault is unsealed", not status.get("sealed", True))
except:
log_test("Vault is running", False, result.stderr[:100] if result.stderr else "Connection failed")
return
# Test 2: Get root token
try:
with open("/opt/vault/init-keys.json") as f:
keys = json.load(f)
root_token = keys["root_token"]
log_test("Root token accessible", True)
except Exception as e:
log_test("Root token accessible", False, str(e))
return
# Test 3: List policies
result = subprocess.run(
["docker", "exec", "-e", f"VAULT_TOKEN={root_token}", "vault",
"vault", "policy", "list", "-format=json"],
capture_output=True, text=True
)
try:
policies = json.loads(result.stdout)
expected = ["t0-observer", "t1-operator", "t2-builder", "t3-executor", "t4-architect"]
found = [p for p in expected if p in policies]
log_test("Tier policies loaded", len(found) == len(expected),
f"{len(found)}/{len(expected)} policies")
except:
log_test("Tier policies loaded", False)
# Test 4: Secrets engines
result = subprocess.run(
["docker", "exec", "-e", f"VAULT_TOKEN={root_token}", "vault",
"vault", "secrets", "list", "-format=json"],
capture_output=True, text=True
)
try:
engines = json.loads(result.stdout)
log_test("SSH secrets engine enabled", "ssh/" in engines)
log_test("Proxmox KV engine enabled", "proxmox/" in engines)
log_test("Secret KV engine enabled", "secret/" in engines)
except:
log_test("Secrets engines", False)
# Test 5: AppRole auth
result = subprocess.run(
["docker", "exec", "-e", f"VAULT_TOKEN={root_token}", "vault",
"vault", "auth", "list", "-format=json"],
capture_output=True, text=True
)
try:
auth = json.loads(result.stdout)
log_test("AppRole auth enabled", "approle/" in auth)
except:
log_test("AppRole auth enabled", False)
# =============================================================================
# Test: DragonflyDB Connectivity
# =============================================================================
def test_dragonfly():
log_section("DRAGONFLYDB TESTS")
try:
import redis
except ImportError:
log_skip("DragonflyDB tests", "redis module not installed")
return
# Get credentials from Vault
try:
with open("/opt/vault/init-keys.json") as f:
root_token = json.load(f)["root_token"]
result = subprocess.run([
"curl", "-sk",
"-H", f"X-Vault-Token: {root_token}",
"https://127.0.0.1:8200/v1/secret/data/services/dragonfly"
], capture_output=True, text=True)
creds = json.loads(result.stdout)["data"]["data"]
log_test("DragonflyDB credentials in Vault", True)
except Exception as e:
log_test("DragonflyDB credentials in Vault", False, str(e))
return
# Connect to DragonflyDB
try:
r = redis.Redis(
host=creds["host"],
port=int(creds["port"]),
password=creds["password"],
decode_responses=True
)
r.ping()
log_test("DragonflyDB connection", True)
except Exception as e:
log_test("DragonflyDB connection", False, str(e))
return
# Test basic operations
try:
r.set("test:governance:ping", "pong", ex=60)
value = r.get("test:governance:ping")
log_test("DragonflyDB read/write", value == "pong")
except Exception as e:
log_test("DragonflyDB read/write", False, str(e))
# Test list operations
try:
r.delete("test:governance:list")
r.lpush("test:governance:list", "item1", "item2")
items = r.lrange("test:governance:list", 0, -1)
log_test("DragonflyDB list operations", len(items) == 2)
r.delete("test:governance:list")
except Exception as e:
log_test("DragonflyDB list operations", False, str(e))
# =============================================================================
# Test: SQLite Ledger
# =============================================================================
def test_ledger():
log_section("LEDGER TESTS")
db_path = Path("/opt/agent-governance/ledger/governance.db")
log_test("Ledger database exists", db_path.exists())
if not db_path.exists():
return
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check tables exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
expected_tables = ["agent_metrics", "violations", "promotions", "orchestration_log"]
for table in expected_tables:
log_test(f"Table '{table}' exists", table in tables)
# Test insert/query
cursor.execute("""
INSERT OR REPLACE INTO agent_metrics
(agent_id, current_tier, compliant_runs, consecutive_compliant, total_runs, updated_at)
VALUES ('test-agent-pytest', 0, 0, 0, 0, datetime('now'))
""")
conn.commit()
cursor.execute("SELECT * FROM agent_metrics WHERE agent_id='test-agent-pytest'")
row = cursor.fetchone()
log_test("Ledger insert/query", row is not None)
# Cleanup
cursor.execute("DELETE FROM agent_metrics WHERE agent_id='test-agent-pytest'")
conn.commit()
conn.close()
except Exception as e:
log_test("Ledger operations", False, str(e))
# =============================================================================
# Test: Preflight System
# =============================================================================
def test_preflight():
log_section("PREFLIGHT TESTS")
preflight_dir = Path("/opt/agent-governance/preflight")
log_test("Preflight directory exists", preflight_dir.exists())
if not preflight_dir.exists():
return
# Test preflight script exists
preflight_py = preflight_dir / "preflight.py"
log_test("preflight.py exists", preflight_py.exists())
if not preflight_py.exists():
return
# Test sandbox target (should pass)
result = subprocess.run(
["python3", str(preflight_py), "sandbox-vm-01", "--action", "generic",
"--tier", "1", "--agent-id", "test"],
capture_output=True, text=True,
cwd=str(preflight_dir)
)
log_test("Preflight approves sandbox target", result.returncode == 0)
# Test production target (should fail for tier 1)
result = subprocess.run(
["python3", str(preflight_py), "prod-db-01", "--action", "generic",
"--tier", "1", "--agent-id", "test"],
capture_output=True, text=True,
cwd=str(preflight_dir)
)
log_test("Preflight blocks prod for tier 1", result.returncode != 0)
# =============================================================================
# Test: Promotion Engine
# =============================================================================
def test_promotion():
log_section("PROMOTION ENGINE TESTS")
promotion_py = Path("/opt/agent-governance/runtime/promotion.py")
log_test("promotion.py exists", promotion_py.exists())
if not promotion_py.exists():
return
runtime_dir = Path("/opt/agent-governance/runtime")
# Test requirements command
result = subprocess.run(
["python3", str(promotion_py), "requirements"],
capture_output=True, text=True,
cwd=str(runtime_dir)
)
log_test("Promotion requirements command", result.returncode == 0)
# Check output contains tier requirements
has_tiers = "T0 → T1" in result.stdout or "Tier 0" in result.stdout
log_test("Promotion requirements output valid", has_tiers)
# =============================================================================
# Test: Revocation Engine
# =============================================================================
def test_revocation():
log_section("REVOCATION ENGINE TESTS")
revocation_py = Path("/opt/agent-governance/runtime/revocation.py")
log_test("revocation.py exists", revocation_py.exists())
if not revocation_py.exists():
return
runtime_dir = Path("/opt/agent-governance/runtime")
# Test types command
result = subprocess.run(
["python3", str(revocation_py), "types"],
capture_output=True, text=True,
cwd=str(runtime_dir)
)
log_test("Revocation types command", result.returncode == 0)
# Check output contains violation types
has_types = "UNAUTHORIZED" in result.stdout or "VIOLATION" in result.stdout or "critical" in result.stdout.lower()
log_test("Revocation types output valid", has_types)
# =============================================================================
# Test: Checkpoint Skill
# =============================================================================
def test_checkpoint():
log_section("CHECKPOINT SKILL TESTS")
checkpoint_bin = Path("/opt/agent-governance/bin/checkpoint")
log_test("checkpoint CLI exists", checkpoint_bin.exists())
if not checkpoint_bin.exists():
return
# Test checkpoint now
result = subprocess.run(
[str(checkpoint_bin), "now", "--notes", "pytest run"],
capture_output=True, text=True
)
log_test("Checkpoint create", result.returncode == 0)
# Extract checkpoint ID
checkpoint_id = None
for line in result.stdout.split("\n"):
if line.startswith("ID:"):
checkpoint_id = line.split(":")[1].strip()
break
log_test("Checkpoint ID generated", checkpoint_id is not None)
# Test checkpoint load
result = subprocess.run(
[str(checkpoint_bin), "load", "--json"],
capture_output=True, text=True
)
log_test("Checkpoint load", result.returncode == 0)
try:
data = json.loads(result.stdout)
log_test("Checkpoint JSON valid", True)
log_test("Checkpoint has phase", data.get("phase") is not None)
log_test("Checkpoint has dependencies", len(data.get("dependencies", [])) > 0)
except:
log_test("Checkpoint JSON valid", False)
# Test checkpoint list
result = subprocess.run(
[str(checkpoint_bin), "list"],
capture_output=True, text=True
)
log_test("Checkpoint list", result.returncode == 0)
# Test queue commands
result = subprocess.run(
[str(checkpoint_bin), "queue", "list"],
capture_output=True, text=True
)
log_test("Checkpoint queue list", result.returncode == 0)
# =============================================================================
# Test: Model Controller
# =============================================================================
def test_model_controller():
log_section("MODEL CONTROLLER TESTS")
controller_bin = Path("/opt/agent-governance/bin/model-controller")
log_test("model-controller CLI exists", controller_bin.exists())
if not controller_bin.exists():
return
# Test status command
result = subprocess.run(
[str(controller_bin), "status"],
capture_output=True, text=True
)
log_test("Model controller status", result.returncode == 0)
# Check output
has_mode = "Mode:" in result.stdout or "mode:" in result.stdout.lower()
log_test("Model controller status output valid", has_mode)
# Test config command
result = subprocess.run(
[str(controller_bin), "config"],
capture_output=True, text=True
)
log_test("Model controller config", result.returncode == 0)
# =============================================================================
# Test: Tier 0 Agent
# =============================================================================
def test_tier0_agent():
log_section("TIER 0 AGENT TESTS")
agent_dir = Path("/opt/agent-governance/agents/tier0-agent")
agent_py = agent_dir / "agent.py"
log_test("Tier 0 agent directory exists", agent_dir.exists())
log_test("agent.py exists", agent_py.exists())
if not agent_py.exists():
return
# Test status command
result = subprocess.run(
["python3", str(agent_py), "status"],
capture_output=True, text=True,
cwd=str(agent_dir)
)
log_test("Agent status command", result.returncode == 0)
log_test("Agent shows as Tier 0", "Tier: 0" in result.stdout or "Tier 0" in result.stdout)
# Test reading allowed file
result = subprocess.run(
["python3", str(agent_py), "read", "/opt/agent-governance/docs/tier0-guide.md"],
capture_output=True, text=True,
cwd=str(agent_dir)
)
log_test("Agent can read docs", result.returncode == 0)
# Test reading forbidden file
result = subprocess.run(
["python3", str(agent_py), "read", "/etc/passwd"],
capture_output=True, text=True,
cwd=str(agent_dir)
)
log_test("Agent blocked from /etc", result.returncode != 0 or "BLOCKED" in result.stdout)
# Test test-forbidden command
result = subprocess.run(
["python3", str(agent_py), "test-forbidden"],
capture_output=True, text=True,
cwd=str(agent_dir)
)
log_test("Agent test-forbidden command", result.returncode == 0)
log_test("All forbidden actions blocked", "All forbidden actions correctly blocked" in result.stdout)
# =============================================================================
# Test: Governance Wrappers
# =============================================================================
def test_wrappers():
log_section("GOVERNANCE WRAPPERS TESTS")
wrappers_dir = Path("/opt/agent-governance/wrappers")
log_test("Wrappers directory exists", wrappers_dir.exists())
tf_wrapper = wrappers_dir / "tf-governed.sh"
ansible_wrapper = wrappers_dir / "ansible-governed.sh"
log_test("Terraform wrapper exists", tf_wrapper.exists())
log_test("Ansible wrapper exists", ansible_wrapper.exists())
# Check they're executable
if tf_wrapper.exists():
log_test("Terraform wrapper executable", os.access(tf_wrapper, os.X_OK))
if ansible_wrapper.exists():
log_test("Ansible wrapper executable", os.access(ansible_wrapper, os.X_OK))
# =============================================================================
# Test: Evidence System
# =============================================================================
def test_evidence():
log_section("EVIDENCE SYSTEM TESTS")
evidence_dir = Path("/opt/agent-governance/evidence")
evidence_py = evidence_dir / "evidence.py"
log_test("Evidence directory exists", evidence_dir.exists())
log_test("evidence.py exists", evidence_py.exists())
packages_dir = evidence_dir / "packages"
log_test("Evidence packages directory exists", packages_dir.exists())
if packages_dir.exists():
packages = list(packages_dir.iterdir())
log_test("Evidence packages created", len(packages) > 0, f"{len(packages)} packages")
# =============================================================================
# Main
# =============================================================================
def main():
print(f"\n{BLUE}{'#'*60}{RESET}")
print(f"{BLUE}# AGENT GOVERNANCE SYSTEM - COMPREHENSIVE TESTS{RESET}")
print(f"{BLUE}# {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{RESET}")
print(f"{BLUE}{'#'*60}{RESET}")
# Run all tests
test_vault()
test_dragonfly()
test_ledger()
test_preflight()
test_promotion()
test_revocation()
test_checkpoint()
test_model_controller()
test_tier0_agent()
test_wrappers()
test_evidence()
# Summary
print(f"\n{BLUE}{'='*60}{RESET}")
print(f"{BLUE}TEST SUMMARY{RESET}")
print(f"{BLUE}{'='*60}{RESET}")
total = RESULTS["passed"] + RESULTS["failed"] + RESULTS["skipped"]
print(f" {GREEN}Passed:{RESET} {RESULTS['passed']}")
print(f" {RED}Failed:{RESET} {RESULTS['failed']}")
print(f" {YELLOW}Skipped:{RESET} {RESULTS['skipped']}")
print(f" Total: {total}")
if RESULTS["failed"] > 0:
print(f"\n{RED}Some tests failed!{RESET}")
return 1
else:
print(f"\n{GREEN}All tests passed!{RESET}")
return 0
if __name__ == "__main__":
sys.exit(main())