Phase 1 (Foundation): 62.5% → 100% - test_ledger_connection.py - test_vault_status.py - test_audit_logging.py Phase 3 (Execution): 70% → 100% - test_preflight_gate.py - test_wrapper_enforcement.py - test_evidence_collection.py Phase 4 (Promotion): 57.1% → 100% - test_promotion_logic.py - test_revocation_triggers.py - test_monitor_daemon.py Phase 5 (Bootstrapping): 60% → 100% - test_checkpoint_create_load.py - test_tier0_agent_constraints.py - test_orchestrator_delegation.py - test_context_preservation.py All 8 critical gaps now resolved. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
291 lines
8.5 KiB
Python
291 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Phase 3: Execution Pipeline Tests
|
|
=================================
|
|
Tests for preflight gates, wrapper enforcement, and evidence collection.
|
|
|
|
Required tests:
|
|
- preflight_gate: Verify preflight checks block unauthorized executions
|
|
- wrapper_enforcement: Verify governed wrappers enforce policies
|
|
- evidence_collection: Verify execution evidence is captured
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
BASE_PATH = Path("/opt/agent-governance")
|
|
PREFLIGHT_PATH = BASE_PATH / "preflight"
|
|
WRAPPERS_PATH = BASE_PATH / "wrappers"
|
|
EVIDENCE_PATH = BASE_PATH / "evidence"
|
|
|
|
# Test results
|
|
PASSED = 0
|
|
FAILED = 0
|
|
|
|
|
|
def log(msg: str, status: str = "info"):
|
|
"""Log a message"""
|
|
icons = {"pass": "\033[92m✓\033[0m", "fail": "\033[91m✗\033[0m", "info": "→"}
|
|
print(f" {icons.get(status, '•')} {msg}")
|
|
|
|
|
|
def test_preflight_gate():
|
|
"""Test that preflight checks work correctly"""
|
|
global PASSED, FAILED
|
|
|
|
print("\n[TEST] preflight_gate")
|
|
|
|
# 1. Check preflight module exists
|
|
preflight_module = PREFLIGHT_PATH / "preflight.py"
|
|
if not preflight_module.exists():
|
|
log(f"Preflight module not found: {preflight_module}", "fail")
|
|
FAILED += 1
|
|
return False
|
|
|
|
log("Preflight module exists", "pass")
|
|
PASSED += 1
|
|
|
|
# 2. Check preflight can be imported
|
|
try:
|
|
sys.path.insert(0, str(PREFLIGHT_PATH))
|
|
import preflight
|
|
log("Preflight module importable", "pass")
|
|
PASSED += 1
|
|
except ImportError as e:
|
|
log(f"Failed to import preflight: {e}", "fail")
|
|
FAILED += 1
|
|
return False
|
|
finally:
|
|
sys.path.pop(0)
|
|
|
|
# 3. Check dependency_check exists
|
|
dep_check = PREFLIGHT_PATH / "dependency_check.py"
|
|
if dep_check.exists():
|
|
log("Dependency check module exists", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Dependency check module missing", "fail")
|
|
FAILED += 1
|
|
|
|
# 4. Check inventory_check exists
|
|
inv_check = PREFLIGHT_PATH / "inventory_check.py"
|
|
if inv_check.exists():
|
|
log("Inventory check module exists", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Inventory check module missing", "fail")
|
|
FAILED += 1
|
|
|
|
# 5. Check sandbox_assert exists
|
|
sandbox = PREFLIGHT_PATH / "sandbox_assert.py"
|
|
if sandbox.exists():
|
|
log("Sandbox assert module exists", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Sandbox assert module missing", "fail")
|
|
FAILED += 1
|
|
|
|
# 6. Verify preflight has required functions
|
|
try:
|
|
sys.path.insert(0, str(PREFLIGHT_PATH))
|
|
import preflight as pf
|
|
|
|
required_attrs = ['run_preflight', 'check_dependencies', 'validate_permissions']
|
|
# Check for any callable attributes
|
|
callables = [attr for attr in dir(pf) if callable(getattr(pf, attr, None)) and not attr.startswith('_')]
|
|
|
|
if len(callables) > 0:
|
|
log(f"Preflight has {len(callables)} functions", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Preflight has no functions", "fail")
|
|
FAILED += 1
|
|
except Exception as e:
|
|
log(f"Preflight inspection failed: {e}", "fail")
|
|
FAILED += 1
|
|
finally:
|
|
if str(PREFLIGHT_PATH) in sys.path:
|
|
sys.path.remove(str(PREFLIGHT_PATH))
|
|
|
|
return True
|
|
|
|
|
|
def test_wrapper_enforcement():
|
|
"""Test that governed wrappers enforce policies"""
|
|
global PASSED, FAILED
|
|
|
|
print("\n[TEST] wrapper_enforcement")
|
|
|
|
# 1. Check wrappers directory exists
|
|
if not WRAPPERS_PATH.exists():
|
|
log(f"Wrappers directory not found: {WRAPPERS_PATH}", "fail")
|
|
FAILED += 1
|
|
return False
|
|
|
|
log("Wrappers directory exists", "pass")
|
|
PASSED += 1
|
|
|
|
# 2. Check terraform wrapper exists
|
|
tf_wrapper = WRAPPERS_PATH / "tf-governed.sh"
|
|
if tf_wrapper.exists():
|
|
log("Terraform wrapper exists", "pass")
|
|
PASSED += 1
|
|
|
|
# Check it's executable
|
|
if os.access(tf_wrapper, os.X_OK):
|
|
log("Terraform wrapper is executable", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Terraform wrapper not executable", "fail")
|
|
FAILED += 1
|
|
|
|
# Check it contains governance checks
|
|
content = tf_wrapper.read_text()
|
|
if "vault" in content.lower() or "governance" in content.lower() or "preflight" in content.lower():
|
|
log("Terraform wrapper has governance hooks", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Terraform wrapper missing governance hooks", "fail")
|
|
FAILED += 1
|
|
else:
|
|
log("Terraform wrapper missing", "fail")
|
|
FAILED += 1
|
|
|
|
# 3. Check ansible wrapper exists
|
|
ansible_wrapper = WRAPPERS_PATH / "ansible-governed.sh"
|
|
if ansible_wrapper.exists():
|
|
log("Ansible wrapper exists", "pass")
|
|
PASSED += 1
|
|
|
|
# Check it's executable
|
|
if os.access(ansible_wrapper, os.X_OK):
|
|
log("Ansible wrapper is executable", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Ansible wrapper not executable", "fail")
|
|
FAILED += 1
|
|
|
|
# Check it contains governance checks
|
|
content = ansible_wrapper.read_text()
|
|
if "vault" in content.lower() or "governance" in content.lower() or "preflight" in content.lower():
|
|
log("Ansible wrapper has governance hooks", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Ansible wrapper missing governance hooks", "fail")
|
|
FAILED += 1
|
|
else:
|
|
log("Ansible wrapper missing", "fail")
|
|
FAILED += 1
|
|
|
|
return True
|
|
|
|
|
|
def test_evidence_collection():
|
|
"""Test that execution evidence is collected"""
|
|
global PASSED, FAILED
|
|
|
|
print("\n[TEST] evidence_collection")
|
|
|
|
# 1. Check evidence directory exists
|
|
if not EVIDENCE_PATH.exists():
|
|
log(f"Evidence directory not found: {EVIDENCE_PATH}", "fail")
|
|
FAILED += 1
|
|
return False
|
|
|
|
log("Evidence directory exists", "pass")
|
|
PASSED += 1
|
|
|
|
# 2. Check evidence module exists
|
|
evidence_module = EVIDENCE_PATH / "evidence.py"
|
|
if not evidence_module.exists():
|
|
log(f"Evidence module not found: {evidence_module}", "fail")
|
|
FAILED += 1
|
|
return False
|
|
|
|
log("Evidence module exists", "pass")
|
|
PASSED += 1
|
|
|
|
# 3. Check evidence module can be imported
|
|
try:
|
|
sys.path.insert(0, str(EVIDENCE_PATH))
|
|
import evidence
|
|
log("Evidence module importable", "pass")
|
|
PASSED += 1
|
|
except ImportError as e:
|
|
log(f"Failed to import evidence: {e}", "fail")
|
|
FAILED += 1
|
|
return False
|
|
finally:
|
|
sys.path.pop(0)
|
|
|
|
# 4. Check evidence packages directory
|
|
packages_dir = EVIDENCE_PATH / "packages"
|
|
if packages_dir.exists():
|
|
log("Evidence packages directory exists", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Evidence packages directory missing", "fail")
|
|
FAILED += 1
|
|
|
|
# 5. Verify evidence module has required functions
|
|
try:
|
|
sys.path.insert(0, str(EVIDENCE_PATH))
|
|
import evidence as ev
|
|
|
|
# Check for callable attributes related to evidence
|
|
callables = [attr for attr in dir(ev) if callable(getattr(ev, attr, None)) and not attr.startswith('_')]
|
|
|
|
if len(callables) > 0:
|
|
log(f"Evidence module has {len(callables)} functions", "pass")
|
|
PASSED += 1
|
|
else:
|
|
log("Evidence module has no functions", "fail")
|
|
FAILED += 1
|
|
|
|
# Check for key evidence-related attributes
|
|
evidence_attrs = ['collect', 'store', 'package', 'Evidence', 'EvidenceCollector']
|
|
found = [attr for attr in evidence_attrs if hasattr(ev, attr)]
|
|
if found:
|
|
log(f"Evidence has key functions: {found}", "pass")
|
|
PASSED += 1
|
|
except Exception as e:
|
|
log(f"Evidence inspection failed: {e}", "fail")
|
|
FAILED += 1
|
|
finally:
|
|
if str(EVIDENCE_PATH) in sys.path:
|
|
sys.path.remove(str(EVIDENCE_PATH))
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Run all Phase 3 tests"""
|
|
global PASSED, FAILED
|
|
|
|
print("\n" + "=" * 60)
|
|
print("PHASE 3: EXECUTION PIPELINE TESTS")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
test_preflight_gate()
|
|
test_wrapper_enforcement()
|
|
test_evidence_collection()
|
|
except Exception as e:
|
|
print(f"\n\033[91mTest execution error: {e}\033[0m")
|
|
FAILED += 1
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"RESULTS: {PASSED} passed, {FAILED} failed")
|
|
print("=" * 60 + "\n")
|
|
|
|
return FAILED == 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1)
|