#!/usr/bin/env python3 """ Phase 5: Agent Bootstrapping Tests (PRIORITY) ============================================== Tests for checkpoint operations, tier0 agent constraints, orchestrator delegation, and context preservation. Required tests: - checkpoint_create_load: Verify checkpoint create/load operations - tier0_agent_constraints: Verify T0 agent has proper restrictions - orchestrator_delegation: Verify orchestrator delegates correctly - context_preservation: Verify context is preserved across sessions """ import json import os import subprocess import sys from datetime import datetime from pathlib import Path # Configuration BASE_PATH = Path("/opt/agent-governance") CHECKPOINT_PATH = BASE_PATH / "checkpoint" AGENTS_PATH = BASE_PATH / "agents" ORCHESTRATOR_PATH = BASE_PATH / "orchestrator" # Test results PASSED = 0 FAILED = 0 def log(msg: str, status: str = "info"): """Log a message""" icons = {"pass": "\033[92m✓\033[0m", "fail": "\033[91m✗\033[0m", "info": "→"} print(f" {icons.get(status, '•')} {msg}") def test_checkpoint_create_load(): """Test that checkpoint create/load operations work correctly""" global PASSED, FAILED print("\n[TEST] checkpoint_create_load") # 1. Check checkpoint module exists checkpoint_module = CHECKPOINT_PATH / "checkpoint.py" if not checkpoint_module.exists(): log(f"Checkpoint module not found: {checkpoint_module}", "fail") FAILED += 1 return False log("Checkpoint module exists", "pass") PASSED += 1 # 2. Check checkpoint storage directory storage_dir = CHECKPOINT_PATH / "storage" if not storage_dir.exists(): log(f"Checkpoint storage not found: {storage_dir}", "fail") FAILED += 1 return False log("Checkpoint storage directory exists", "pass") PASSED += 1 # 3. Check for existing checkpoints checkpoints = list(storage_dir.glob("ckpt-*.json")) if checkpoints: log(f"Found {len(checkpoints)} existing checkpoints", "pass") PASSED += 1 else: log("No checkpoints found (may be first run)", "info") # 4. Validate a checkpoint structure if checkpoints: latest = max(checkpoints, key=lambda p: p.stat().st_mtime) try: with open(latest) as f: ckpt = json.load(f) required_fields = ['checkpoint_id', 'created_at', 'phase'] missing = [f for f in required_fields if f not in ckpt] if missing: log(f"Checkpoint missing fields: {missing}", "fail") FAILED += 1 else: log(f"Checkpoint has required fields", "pass") PASSED += 1 # Check phase structure if isinstance(ckpt.get('phase'), dict): phase = ckpt['phase'] if 'number' in phase and 'name' in phase: log(f"Checkpoint phase: {phase['number']} - {phase['name']}", "pass") PASSED += 1 else: log("Checkpoint phase missing number/name", "fail") FAILED += 1 else: log("Checkpoint phase not a dict", "fail") FAILED += 1 except json.JSONDecodeError as e: log(f"Checkpoint JSON invalid: {e}", "fail") FAILED += 1 except Exception as e: log(f"Checkpoint read error: {e}", "fail") FAILED += 1 # 5. Check checkpoint module has required functions try: # Read the module content to check for functions content = checkpoint_module.read_text() key_functions = ['save', 'load', 'create', 'restore', 'Checkpoint'] found = [f for f in key_functions if f in content] if found: log(f"Checkpoint has key functions: {found}", "pass") PASSED += 1 else: log("Checkpoint missing key functions", "fail") FAILED += 1 except Exception as e: log(f"Checkpoint inspection failed: {e}", "fail") FAILED += 1 return True def test_tier0_agent_constraints(): """Test that T0 agent has proper restrictions""" global PASSED, FAILED print("\n[TEST] tier0_agent_constraints") # 1. Check tier0 agent directory exists tier0_path = AGENTS_PATH / "tier0-agent" if not tier0_path.exists(): log(f"Tier0 agent directory not found: {tier0_path}", "fail") FAILED += 1 return False log("Tier0 agent directory exists", "pass") PASSED += 1 # 2. Check tier0 agent.py exists agent_py = tier0_path / "agent.py" if agent_py.exists(): log("Tier0 agent.py exists", "pass") PASSED += 1 else: log("Tier0 agent.py missing", "fail") FAILED += 1 return False # 3. Check for tier0 config config_dir = tier0_path / "config" if config_dir.exists(): log("Tier0 config directory exists", "pass") PASSED += 1 # Check for policy files configs = list(config_dir.glob("*.json")) + list(config_dir.glob("*.yaml")) + list(config_dir.glob("*.yml")) if configs: log(f"Tier0 has {len(configs)} config files", "pass") PASSED += 1 else: log("Tier0 config directory empty", "info") else: log("Tier0 config directory missing", "fail") FAILED += 1 # 4. Check agent has read-only/observer constraints content = agent_py.read_text() constraint_patterns = ['read', 'observe', 'readonly', 'read-only', 'no_write', 'restricted'] found_constraints = [c for c in constraint_patterns if c in content.lower()] if found_constraints: log(f"Tier0 has constraint indicators: {found_constraints[:3]}", "pass") PASSED += 1 else: log("Tier0 constraint indicators not found", "info") # 5. Compare with tier1 to verify difference tier1_path = AGENTS_PATH / "tier1-agent" if tier1_path.exists(): tier1_agent = tier1_path / "agent.py" if tier1_agent.exists(): tier1_content = tier1_agent.read_text() # Tier1 should have more capabilities tier1_caps = ['write', 'execute', 'create', 'modify'] tier1_found = [c for c in tier1_caps if c in tier1_content.lower()] if tier1_found: log(f"Tier1 has more capabilities than Tier0: {tier1_found[:3]}", "pass") PASSED += 1 else: log("Could not verify tier capability difference", "info") else: log("Tier1 agent not found for comparison", "info") return True def test_orchestrator_delegation(): """Test that orchestrator delegates correctly""" global PASSED, FAILED print("\n[TEST] orchestrator_delegation") # 1. Check orchestrator directory exists if not ORCHESTRATOR_PATH.exists(): log(f"Orchestrator directory not found: {ORCHESTRATOR_PATH}", "fail") FAILED += 1 return False log("Orchestrator directory exists", "pass") PASSED += 1 # 2. Check model controller exists controller = ORCHESTRATOR_PATH / "model_controller.py" if not controller.exists(): log(f"Model controller not found: {controller}", "fail") FAILED += 1 return False log("Model controller exists", "pass") PASSED += 1 # 3. Check orchestrator config config = ORCHESTRATOR_PATH / "config.json" if config.exists(): log("Orchestrator config exists", "pass") PASSED += 1 try: with open(config) as f: cfg = json.load(f) # Check for delegation-related config if isinstance(cfg, dict): log(f"Config has {len(cfg)} top-level keys", "pass") PASSED += 1 except Exception as e: log(f"Config parse error: {e}", "fail") FAILED += 1 else: log("Orchestrator config missing", "fail") FAILED += 1 # 4. Check model controller has delegation logic content = controller.read_text() delegation_patterns = ['delegate', 'dispatch', 'route', 'assign', 'forward', 'agent'] found = [p for p in delegation_patterns if p in content.lower()] if found: log(f"Controller has delegation patterns: {found[:4]}", "pass") PASSED += 1 else: log("Controller missing delegation patterns", "fail") FAILED += 1 # 5. Check for tier-aware routing tier_patterns = ['tier', 't0', 't1', 't2', 't3', 't4', 'trust'] tier_found = [p for p in tier_patterns if p in content.lower()] if tier_found: log(f"Controller is tier-aware: {tier_found[:4]}", "pass") PASSED += 1 else: log("Controller not tier-aware", "fail") FAILED += 1 return True def test_context_preservation(): """Test that context is preserved across sessions""" global PASSED, FAILED print("\n[TEST] context_preservation") # 1. Check for context-related fields in checkpoints storage_dir = CHECKPOINT_PATH / "storage" checkpoints = list(storage_dir.glob("ckpt-*.json")) if not checkpoints: log("No checkpoints to verify context preservation", "info") PASSED += 1 return True latest = max(checkpoints, key=lambda p: p.stat().st_mtime) try: with open(latest) as f: ckpt = json.load(f) # 2. Check for context fields context_fields = ['variables', 'recent_outputs', 'memory_refs', 'memory_summary', 'pending_instructions', 'last_model_response'] found = [f for f in context_fields if f in ckpt] if found: log(f"Checkpoint preserves context: {found[:4]}", "pass") PASSED += 1 else: log("Checkpoint missing context fields", "fail") FAILED += 1 # 3. Check for session continuity session_id = ckpt.get('session_id') if session_id: log(f"Session ID preserved: {str(session_id)[:20]}...", "pass") PASSED += 1 else: log("Session ID not preserved", "info") # 4. Check for parent checkpoint reference if 'parent_checkpoint_id' in ckpt: log("Parent checkpoint reference exists", "pass") PASSED += 1 else: log("Parent checkpoint not referenced", "info") # 5. Check for directory statuses (state preservation) if 'directory_statuses' in ckpt: statuses = ckpt['directory_statuses'] if isinstance(statuses, (dict, list)) and len(statuses) > 0: log(f"Directory statuses preserved: {len(statuses)} entries", "pass") PASSED += 1 else: log("Directory statuses empty", "info") else: log("Directory statuses not preserved", "fail") FAILED += 1 # 6. Check for estimated tokens (resource tracking) if 'estimated_tokens' in ckpt: log(f"Token count preserved: {ckpt['estimated_tokens']}", "pass") PASSED += 1 else: log("Token count not preserved", "info") except Exception as e: log(f"Context verification error: {e}", "fail") FAILED += 1 return True def main(): """Run all Phase 5 tests""" global PASSED, FAILED print("\n" + "=" * 60) print("PHASE 5: AGENT BOOTSTRAPPING TESTS (PRIORITY)") print("=" * 60) try: test_checkpoint_create_load() test_tier0_agent_constraints() test_orchestrator_delegation() test_context_preservation() except Exception as e: print(f"\n\033[91mTest execution error: {e}\033[0m") FAILED += 1 print("\n" + "=" * 60) print(f"RESULTS: {PASSED} passed, {FAILED} failed") print("=" * 60 + "\n") return FAILED == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)