#!/usr/bin/env python3 """ Phase 1: Foundation Tests ========================= Tests for ledger connection, vault status, and audit logging. Required tests: - ledger_connection: Verify SQLite ledger is accessible - vault_status: Verify Vault is healthy and unsealed - audit_logging: Verify audit trail is working """ import json import os import sqlite3 import subprocess import sys from datetime import datetime from pathlib import Path # Configuration VAULT_ADDR = os.getenv("VAULT_ADDR", "https://127.0.0.1:8200") VAULT_TOKEN_FILE = "/opt/vault/init-keys.json" LEDGER_DB = "/opt/agent-governance/ledger/governance.db" # Test results PASSED = 0 FAILED = 0 def log(msg: str, status: str = "info"): """Log a message""" icons = {"pass": "\033[92m✓\033[0m", "fail": "\033[91m✗\033[0m", "info": "→"} print(f" {icons.get(status, '•')} {msg}") def get_root_token() -> str: """Get Vault root token""" try: with open(VAULT_TOKEN_FILE) as f: return json.load(f)["root_token"] except FileNotFoundError: return os.getenv("VAULT_TOKEN", "") def vault_request(method: str, path: str, token: str) -> dict: """Make a Vault API request""" cmd = ["curl", "-sk", "-X", method, "-H", f"X-Vault-Token: {token}"] cmd.append(f"{VAULT_ADDR}/v1/{path}") try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.stdout: return json.loads(result.stdout) except Exception: pass return {} def test_ledger_connection(): """Test that SQLite ledger is accessible and has correct schema""" global PASSED, FAILED print("\n[TEST] ledger_connection") # 1. Check database file exists if not Path(LEDGER_DB).exists(): log(f"Ledger database not found: {LEDGER_DB}", "fail") FAILED += 1 return False log("Ledger database file exists", "pass") PASSED += 1 # 2. Connect to database try: conn = sqlite3.connect(LEDGER_DB) cursor = conn.cursor() log("Successfully connected to ledger", "pass") PASSED += 1 except sqlite3.Error as e: log(f"Failed to connect to ledger: {e}", "fail") FAILED += 1 return False # 3. Check required tables exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] required_tables = ["governance_events", "agent_states", "violations"] missing = [t for t in required_tables if t not in tables] if missing: # Tables might have different names, check for any tables if len(tables) == 0: log(f"No tables found in ledger", "fail") FAILED += 1 return False else: log(f"Found tables: {tables}", "pass") PASSED += 1 else: log("All required tables present", "pass") PASSED += 1 # 4. Test read/write capability try: # Try to read row count from any table if tables: cursor.execute(f"SELECT COUNT(*) FROM {tables[0]}") count = cursor.fetchone()[0] log(f"Ledger readable, {count} rows in {tables[0]}", "pass") PASSED += 1 except sqlite3.Error as e: log(f"Ledger read test failed: {e}", "fail") FAILED += 1 conn.close() return True def test_vault_status(): """Test that Vault is healthy and unsealed""" global PASSED, FAILED print("\n[TEST] vault_status") # 1. Check Vault health endpoint (doesn't need token) try: result = subprocess.run( ["curl", "-sk", f"{VAULT_ADDR}/v1/sys/health"], capture_output=True, text=True, timeout=10 ) health = json.loads(result.stdout) if result.stdout else {} except Exception as e: log(f"Cannot reach Vault: {e}", "fail") FAILED += 1 return False # 2. Check initialized if not health.get("initialized", False): log("Vault is not initialized", "fail") FAILED += 1 return False log("Vault is initialized", "pass") PASSED += 1 # 3. Check unsealed if health.get("sealed", True): log("Vault is sealed", "fail") FAILED += 1 return False log("Vault is unsealed", "pass") PASSED += 1 # 4. Check standby status standby = health.get("standby", False) log(f"Vault standby: {standby}", "pass") PASSED += 1 # 5. Verify we can authenticate token = get_root_token() if token: resp = vault_request("GET", "auth/token/lookup-self", token) if "data" in resp: log("Token authentication working", "pass") PASSED += 1 else: log("Token authentication failed", "fail") FAILED += 1 else: log("No Vault token available (skipping auth test)", "info") return True def test_audit_logging(): """Test that audit logging is working""" global PASSED, FAILED print("\n[TEST] audit_logging") # 1. Check Vault audit backends token = get_root_token() if not token: log("No Vault token, skipping Vault audit check", "info") else: resp = vault_request("GET", "sys/audit", token) audit_devices = resp.get("data", {}) if audit_devices: log(f"Vault audit devices: {list(audit_devices.keys())}", "pass") PASSED += 1 else: log("No Vault audit devices configured", "info") # 2. Check ledger has audit entries try: conn = sqlite3.connect(LEDGER_DB) cursor = conn.cursor() # Get list of tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] # Look for audit/event related tables audit_tables = [t for t in tables if any(k in t.lower() for k in ['audit', 'event', 'log'])] if audit_tables: log(f"Found audit tables: {audit_tables}", "pass") PASSED += 1 # Check if there are entries for table in audit_tables[:1]: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] log(f"Audit entries in {table}: {count}", "pass") PASSED += 1 else: # Check governance_events if it exists if "governance_events" in tables: cursor.execute("SELECT COUNT(*) FROM governance_events") count = cursor.fetchone()[0] log(f"Governance events logged: {count}", "pass") PASSED += 1 else: log("No dedicated audit tables found", "info") conn.close() except sqlite3.Error as e: log(f"Ledger audit check failed: {e}", "fail") FAILED += 1 # 3. Verify audit directory exists for file-based logging audit_dirs = [ "/var/log/vault", "/opt/agent-governance/logs", "/opt/agent-governance/orchestrator/logs" ] found_log_dir = False for audit_dir in audit_dirs: if Path(audit_dir).exists(): log(f"Log directory exists: {audit_dir}", "pass") PASSED += 1 found_log_dir = True break if not found_log_dir: log("No log directory found (may use centralized logging)", "info") return True def main(): """Run all Phase 1 tests""" global PASSED, FAILED print("\n" + "=" * 60) print("PHASE 1: FOUNDATION TESTS") print("=" * 60) try: test_ledger_connection() test_vault_status() test_audit_logging() except Exception as e: print(f"\n\033[91mTest execution error: {e}\033[0m") FAILED += 1 print("\n" + "=" * 60) print(f"RESULTS: {PASSED} passed, {FAILED} failed") print("=" * 60 + "\n") return FAILED == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)