""" MockVault - Simulates HashiCorp Vault for testing. Provides deterministic policy evaluation, token management, and secrets access without requiring a real Vault instance. """ from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field from datetime import datetime, timedelta import json import hashlib @dataclass class MockToken: """Represents a Vault token""" accessor: str policies: List[str] ttl: int created_at: datetime renewable: bool = True revoked: bool = False def is_valid(self) -> bool: if self.revoked: return False if datetime.utcnow() > self.created_at + timedelta(seconds=self.ttl): return False return True @dataclass class MockAppRole: """Represents an AppRole configuration""" role_id: str policies: List[str] token_ttl: int token_max_ttl: int secret_id_ttl: int class MockVault: """ Mock Vault implementation for testing. Simulates: - AppRole authentication - Token lifecycle (creation, renewal, revocation) - Policy evaluation - KV secrets access - Audit logging """ def __init__(self): self.kv_store: Dict[str, Dict[str, Any]] = {} self.policies: Dict[str, Dict[str, List[str]]] = {} self.approles: Dict[str, MockAppRole] = {} self.tokens: Dict[str, MockToken] = {} self.secret_ids: Dict[str, Dict[str, Any]] = {} self.audit_log: List[Dict[str, Any]] = [] # Initialize default tier policies self._setup_default_policies() self._setup_default_approles() def _setup_default_policies(self): """Set up default trust tier policies""" self.policies = { "t0-observer": { "secret/data/docs/*": ["read", "list"], "secret/data/inventory/*": ["read", "list"], "secret/data/logs/*": ["read", "list"] }, "t1-operator": { "secret/data/docs/*": ["read", "list"], "secret/data/inventory/*": ["read", "list"], "ssh/creds/sandbox-*": ["read"], "secret/data/ansible/sandbox/*": ["read"] }, "t2-builder": { "secret/data/docs/*": ["read", "list"], "secret/data/inventory/*": ["read", "list"], "ssh/creds/sandbox-*": ["read"], "secret/data/frameworks/*": ["create", "read", "update", "list"] }, "t3-executor": { "secret/data/*": ["read", "list"], "ssh/creds/*/root-controlled": ["read"], "proxmox/creds/staging": ["read"] }, "t4-architect": { "secret/data/*": ["read", "list", "create", "update"], "sys/policies/acl/*": ["read", "list"], "secret/data/governance/*": ["create", "read", "update"] } } def _setup_default_approles(self): """Set up default AppRole configurations""" tier_configs = [ ("tier0-agent", ["t0-observer"], 3600, 14400, 86400), ("tier1-agent", ["t1-operator"], 1800, 7200, 86400), ("tier2-agent", ["t2-builder"], 1800, 7200, 86400), ("tier3-agent", ["t3-executor"], 900, 3600, 86400), ("tier4-agent", ["t4-architect"], 900, 3600, 86400), ] for role_name, policies, ttl, max_ttl, secret_ttl in tier_configs: role_id = self._generate_id(f"role-{role_name}") self.approles[role_name] = MockAppRole( role_id=role_id, policies=policies, token_ttl=ttl, token_max_ttl=max_ttl, secret_id_ttl=secret_ttl ) def _generate_id(self, prefix: str) -> str: """Generate a deterministic ID""" h = hashlib.sha256(prefix.encode()).hexdigest()[:8] return f"{prefix}-{h}" def _audit(self, operation: str, path: str, success: bool, details: Dict = None): """Record audit log entry""" self.audit_log.append({ "timestamp": datetime.utcnow().isoformat(), "operation": operation, "path": path, "success": success, "details": details or {} }) # === AppRole Authentication === def get_role_id(self, role_name: str) -> Optional[str]: """Get role_id for an AppRole""" if role_name in self.approles: return self.approles[role_name].role_id return None def generate_secret_id(self, role_name: str) -> Optional[str]: """Generate a new secret_id for an AppRole""" if role_name not in self.approles: return None secret_id = self._generate_id(f"secret-{role_name}-{datetime.utcnow().timestamp()}") self.secret_ids[secret_id] = { "role_name": role_name, "created_at": datetime.utcnow(), "ttl": self.approles[role_name].secret_id_ttl } return secret_id def approle_login(self, role_id: str, secret_id: str) -> Tuple[bool, Optional[str], str]: """ Authenticate with AppRole credentials. Returns: (success, token, message) """ # Find role by role_id role_name = None for name, approle in self.approles.items(): if approle.role_id == role_id: role_name = name break if not role_name: self._audit("approle_login", "auth/approle/login", False, {"error": "invalid_role_id"}) return False, None, "Invalid role_id" # Validate secret_id if secret_id not in self.secret_ids: self._audit("approle_login", "auth/approle/login", False, {"error": "invalid_secret_id"}) return False, None, "Invalid secret_id" secret_info = self.secret_ids[secret_id] if secret_info["role_name"] != role_name: self._audit("approle_login", "auth/approle/login", False, {"error": "secret_id_mismatch"}) return False, None, "secret_id does not match role" # Check secret_id TTL age = (datetime.utcnow() - secret_info["created_at"]).total_seconds() if age > secret_info["ttl"]: self._audit("approle_login", "auth/approle/login", False, {"error": "secret_id_expired"}) return False, None, "secret_id expired" # Create token approle = self.approles[role_name] token = self._generate_id(f"token-{role_name}-{datetime.utcnow().timestamp()}") accessor = self._generate_id(f"accessor-{token}") self.tokens[token] = MockToken( accessor=accessor, policies=approle.policies, ttl=approle.token_ttl, created_at=datetime.utcnow() ) self._audit("approle_login", "auth/approle/login", True, { "role": role_name, "token_accessor": accessor }) return True, token, f"Authenticated as {role_name}" # === Token Management === def validate_token(self, token: str) -> Tuple[bool, Optional[MockToken]]: """Validate a token""" if token not in self.tokens: return False, None mock_token = self.tokens[token] if not mock_token.is_valid(): return False, None return True, mock_token def revoke_token(self, token: str) -> bool: """Revoke a token""" if token in self.tokens: self.tokens[token].revoked = True self._audit("revoke", f"auth/token/revoke/{token[:8]}...", True) return True return False def renew_token(self, token: str, increment: int = None) -> Tuple[bool, int]: """ Renew a token. Returns: (success, new_ttl) """ valid, mock_token = self.validate_token(token) if not valid: return False, 0 if not mock_token.renewable: return False, 0 # Reset TTL (up to max) mock_token.created_at = datetime.utcnow() self._audit("renew", f"auth/token/renew", True, {"new_ttl": mock_token.ttl}) return True, mock_token.ttl # === Policy Evaluation === def check_policy(self, token: str, path: str, capability: str) -> bool: """ Check if token has capability for path. Supports glob patterns (* at end of path). """ valid, mock_token = self.validate_token(token) if not valid: return False for policy_name in mock_token.policies: if policy_name not in self.policies: continue policy = self.policies[policy_name] for policy_path, capabilities in policy.items(): if self._path_matches(path, policy_path): if capability in capabilities: self._audit("policy_check", path, True, { "policy": policy_name, "capability": capability }) return True self._audit("policy_check", path, False, {"capability": capability}) return False def _path_matches(self, actual_path: str, policy_path: str) -> bool: """Check if actual path matches policy path pattern""" if policy_path.endswith("/*"): prefix = policy_path[:-2] return actual_path.startswith(prefix) elif policy_path.endswith("*"): prefix = policy_path[:-1] return actual_path.startswith(prefix) else: return actual_path == policy_path # === KV Secrets === def kv_put(self, path: str, data: Dict[str, Any], token: str = None) -> bool: """Store a secret""" if token and not self.check_policy(token, path, "create"): return False self.kv_store[path] = { "data": data, "metadata": { "created_time": datetime.utcnow().isoformat(), "version": self.kv_store.get(path, {}).get("metadata", {}).get("version", 0) + 1 } } self._audit("kv_put", path, True) return True def kv_get(self, path: str, token: str = None) -> Optional[Dict[str, Any]]: """Retrieve a secret""" if token and not self.check_policy(token, path, "read"): return None if path in self.kv_store: self._audit("kv_get", path, True) return self.kv_store[path]["data"] self._audit("kv_get", path, False, {"error": "not_found"}) return None def kv_list(self, path: str, token: str = None) -> List[str]: """List secrets under path""" if token and not self.check_policy(token, path, "list"): return [] prefix = path.rstrip("/") + "/" keys = [] for key in self.kv_store.keys(): if key.startswith(prefix): # Get the next path component remainder = key[len(prefix):] next_component = remainder.split("/")[0] if next_component and next_component not in keys: keys.append(next_component) return keys # === Test Helpers === def reset(self): """Reset all state for testing""" self.kv_store.clear() self.tokens.clear() self.secret_ids.clear() self.audit_log.clear() self._setup_default_policies() self._setup_default_approles() def get_audit_log(self) -> List[Dict[str, Any]]: """Get audit log for test assertions""" return self.audit_log.copy() def inject_policy(self, name: str, rules: Dict[str, List[str]]): """Inject a custom policy for testing""" self.policies[name] = rules def inject_token(self, token: str, policies: List[str], ttl: int = 3600): """Inject a pre-created token for testing""" accessor = self._generate_id(f"accessor-{token}") self.tokens[token] = MockToken( accessor=accessor, policies=policies, ttl=ttl, created_at=datetime.utcnow() )