#!/usr/bin/env python3 """ Phase 2: Vault Policy Engine Tests ================================== Tests for trust tier policies, secrets engines, and AppRole authentication. Required tests: - policy_enforcement: Verify tier policies are enforced - secrets_access: Verify secret path access controls - approle_auth: Verify AppRole authentication works """ import json import subprocess import sys from pathlib import Path # Configuration VAULT_ADDR = "https://127.0.0.1:8200" VAULT_TOKEN_FILE = "/opt/vault/init-keys.json" # Test results PASSED = 0 FAILED = 0 def log(msg: str, status: str = "info"): """Log a message""" icons = {"pass": "\033[92m✓\033[0m", "fail": "\033[91m✗\033[0m", "info": "→"} print(f" {icons.get(status, '•')} {msg}") def get_root_token() -> str: """Get Vault root token""" with open(VAULT_TOKEN_FILE) as f: return json.load(f)["root_token"] def vault_request(method: str, path: str, token: str, data: dict = None) -> dict: """Make a Vault API request""" cmd = ["curl", "-sk", "-X", method, "-H", f"X-Vault-Token: {token}"] if data: cmd.extend(["-d", json.dumps(data)]) cmd.append(f"{VAULT_ADDR}/v1/{path}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.stdout: return json.loads(result.stdout) return {} def test_policy_enforcement(): """Test that tier policies are loaded and enforced""" global PASSED, FAILED print("\n[TEST] policy_enforcement") root_token = get_root_token() # 1. Check all tier policies exist resp = vault_request("GET", "sys/policy", root_token) policies = resp.get("policies", []) required_policies = ["t0-observer", "t1-operator", "t2-builder", "t3-executor", "t4-architect"] missing = [p for p in required_policies if p not in policies] if missing: log(f"Missing policies: {missing}", "fail") FAILED += 1 return False log("All tier policies loaded", "pass") PASSED += 1 # 2. Verify policy content for t0-observer (read-only) resp = vault_request("GET", "sys/policy/t0-observer", root_token) policy_content = resp.get("data", {}).get("rules", "") # T0 should have read/list but NOT create/update/delete has_read = "read" in policy_content.lower() has_write = any(w in policy_content.lower() for w in ["create", "delete"]) if not has_read: log("T0 policy missing read capability", "fail") FAILED += 1 return False if has_write: log("T0 policy has unexpected write capabilities", "fail") FAILED += 1 return False log("T0 policy has read-only restrictions", "pass") PASSED += 1 # 3. Verify t1-operator has ssh capabilities resp = vault_request("GET", "sys/policy/t1-operator", root_token) policy_content = resp.get("data", {}).get("rules", "") if "ssh" not in policy_content.lower(): log("T1 policy missing SSH capabilities", "fail") FAILED += 1 return False log("T1 policy has SSH capabilities", "pass") PASSED += 1 return True def test_secrets_access(): """Test secret path access controls""" global PASSED, FAILED print("\n[TEST] secrets_access") root_token = get_root_token() # 1. Check secret engine is enabled resp = vault_request("GET", "sys/mounts", root_token) mounts = resp.get("data", {}) if "secret/" not in mounts: log("Secret KV engine not enabled", "fail") FAILED += 1 return False log("Secret KV engine enabled", "pass") PASSED += 1 # 2. Write a test secret test_data = {"data": {"test_key": "test_value", "timestamp": "2026-01-24"}} resp = vault_request("POST", "secret/data/test/phase2", root_token, test_data) if "errors" in resp: log(f"Failed to write test secret: {resp.get('errors')}", "fail") FAILED += 1 return False log("Successfully wrote test secret", "pass") PASSED += 1 # 3. Read the test secret back resp = vault_request("GET", "secret/data/test/phase2", root_token) if resp.get("data", {}).get("data", {}).get("test_key") != "test_value": log("Failed to read test secret", "fail") FAILED += 1 return False log("Successfully read test secret", "pass") PASSED += 1 # 4. Check agent secrets path exists resp = vault_request("GET", "secret/data/agents", root_token) # 404 is OK (path may not exist yet), 403 would be a policy issue log("Agent secrets path accessible", "pass") PASSED += 1 return True def test_approle_auth(): """Test AppRole authentication""" global PASSED, FAILED print("\n[TEST] approle_auth") root_token = get_root_token() # 1. Check AppRole auth is enabled resp = vault_request("GET", "sys/auth", root_token) auth_methods = resp.get("data", {}) if "approle/" not in auth_methods: log("AppRole auth not enabled", "fail") FAILED += 1 return False log("AppRole auth enabled", "pass") PASSED += 1 # 2. Check tier1-agent role exists resp = vault_request("GET", "auth/approle/role/tier1-agent", root_token) if "errors" in resp: log("tier1-agent role not found", "fail") FAILED += 1 return False log("tier1-agent role exists", "pass") PASSED += 1 # 3. Get role-id resp = vault_request("GET", "auth/approle/role/tier1-agent/role-id", root_token) role_id = resp.get("data", {}).get("role_id") if not role_id: log("Failed to get role-id", "fail") FAILED += 1 return False log(f"Got role-id: {role_id[:8]}...", "pass") PASSED += 1 # 4. Generate secret-id resp = vault_request("POST", "auth/approle/role/tier1-agent/secret-id", root_token) secret_id = resp.get("data", {}).get("secret_id") if not secret_id: log("Failed to generate secret-id", "fail") FAILED += 1 return False log("Generated secret-id", "pass") PASSED += 1 # 5. Login with AppRole login_data = {"role_id": role_id, "secret_id": secret_id} resp = vault_request("POST", "auth/approle/login", root_token, login_data) client_token = resp.get("auth", {}).get("client_token") policies = resp.get("auth", {}).get("policies", []) if not client_token: log("AppRole login failed", "fail") FAILED += 1 return False log("AppRole login successful", "pass") PASSED += 1 # 6. Verify policies attached if "t1-operator" not in policies: log(f"Missing t1-operator policy, got: {policies}", "fail") FAILED += 1 return False log(f"Correct policies attached: {policies}", "pass") PASSED += 1 # 7. Test the new token can read allowed paths resp = vault_request("GET", "secret/data/inventory/proxmox", client_token) # 200 or 404 is fine (path may not exist), 403 would be policy issue if resp.get("errors") and "permission denied" in str(resp.get("errors")).lower(): log("New token cannot read allowed paths", "fail") FAILED += 1 return False log("New token has correct permissions", "pass") PASSED += 1 return True def main(): """Run all Phase 2 tests""" global PASSED, FAILED print("\n" + "=" * 60) print("PHASE 2: VAULT POLICY ENGINE TESTS") print("=" * 60) try: test_policy_enforcement() test_secrets_access() test_approle_auth() except Exception as e: print(f"\n\033[91mTest execution error: {e}\033[0m") FAILED += 1 print("\n" + "=" * 60) print(f"RESULTS: {PASSED} passed, {FAILED} failed") print("=" * 60 + "\n") return FAILED == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)