profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

361 lines
12 KiB
Python

"""
MockVault - Simulates HashiCorp Vault for testing.
Provides deterministic policy evaluation, token management,
and secrets access without requiring a real Vault instance.
"""
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import hashlib
@dataclass
class MockToken:
"""Represents a Vault token"""
accessor: str
policies: List[str]
ttl: int
created_at: datetime
renewable: bool = True
revoked: bool = False
def is_valid(self) -> bool:
if self.revoked:
return False
if datetime.utcnow() > self.created_at + timedelta(seconds=self.ttl):
return False
return True
@dataclass
class MockAppRole:
"""Represents an AppRole configuration"""
role_id: str
policies: List[str]
token_ttl: int
token_max_ttl: int
secret_id_ttl: int
class MockVault:
"""
Mock Vault implementation for testing.
Simulates:
- AppRole authentication
- Token lifecycle (creation, renewal, revocation)
- Policy evaluation
- KV secrets access
- Audit logging
"""
def __init__(self):
self.kv_store: Dict[str, Dict[str, Any]] = {}
self.policies: Dict[str, Dict[str, List[str]]] = {}
self.approles: Dict[str, MockAppRole] = {}
self.tokens: Dict[str, MockToken] = {}
self.secret_ids: Dict[str, Dict[str, Any]] = {}
self.audit_log: List[Dict[str, Any]] = []
# Initialize default tier policies
self._setup_default_policies()
self._setup_default_approles()
def _setup_default_policies(self):
"""Set up default trust tier policies"""
self.policies = {
"t0-observer": {
"secret/data/docs/*": ["read", "list"],
"secret/data/inventory/*": ["read", "list"],
"secret/data/logs/*": ["read", "list"]
},
"t1-operator": {
"secret/data/docs/*": ["read", "list"],
"secret/data/inventory/*": ["read", "list"],
"ssh/creds/sandbox-*": ["read"],
"secret/data/ansible/sandbox/*": ["read"]
},
"t2-builder": {
"secret/data/docs/*": ["read", "list"],
"secret/data/inventory/*": ["read", "list"],
"ssh/creds/sandbox-*": ["read"],
"secret/data/frameworks/*": ["create", "read", "update", "list"]
},
"t3-executor": {
"secret/data/*": ["read", "list"],
"ssh/creds/*/root-controlled": ["read"],
"proxmox/creds/staging": ["read"]
},
"t4-architect": {
"secret/data/*": ["read", "list", "create", "update"],
"sys/policies/acl/*": ["read", "list"],
"secret/data/governance/*": ["create", "read", "update"]
}
}
def _setup_default_approles(self):
"""Set up default AppRole configurations"""
tier_configs = [
("tier0-agent", ["t0-observer"], 3600, 14400, 86400),
("tier1-agent", ["t1-operator"], 1800, 7200, 86400),
("tier2-agent", ["t2-builder"], 1800, 7200, 86400),
("tier3-agent", ["t3-executor"], 900, 3600, 86400),
("tier4-agent", ["t4-architect"], 900, 3600, 86400),
]
for role_name, policies, ttl, max_ttl, secret_ttl in tier_configs:
role_id = self._generate_id(f"role-{role_name}")
self.approles[role_name] = MockAppRole(
role_id=role_id,
policies=policies,
token_ttl=ttl,
token_max_ttl=max_ttl,
secret_id_ttl=secret_ttl
)
def _generate_id(self, prefix: str) -> str:
"""Generate a deterministic ID"""
h = hashlib.sha256(prefix.encode()).hexdigest()[:8]
return f"{prefix}-{h}"
def _audit(self, operation: str, path: str, success: bool, details: Dict = None):
"""Record audit log entry"""
self.audit_log.append({
"timestamp": datetime.utcnow().isoformat(),
"operation": operation,
"path": path,
"success": success,
"details": details or {}
})
# === AppRole Authentication ===
def get_role_id(self, role_name: str) -> Optional[str]:
"""Get role_id for an AppRole"""
if role_name in self.approles:
return self.approles[role_name].role_id
return None
def generate_secret_id(self, role_name: str) -> Optional[str]:
"""Generate a new secret_id for an AppRole"""
if role_name not in self.approles:
return None
secret_id = self._generate_id(f"secret-{role_name}-{datetime.utcnow().timestamp()}")
self.secret_ids[secret_id] = {
"role_name": role_name,
"created_at": datetime.utcnow(),
"ttl": self.approles[role_name].secret_id_ttl
}
return secret_id
def approle_login(self, role_id: str, secret_id: str) -> Tuple[bool, Optional[str], str]:
"""
Authenticate with AppRole credentials.
Returns: (success, token, message)
"""
# Find role by role_id
role_name = None
for name, approle in self.approles.items():
if approle.role_id == role_id:
role_name = name
break
if not role_name:
self._audit("approle_login", "auth/approle/login", False, {"error": "invalid_role_id"})
return False, None, "Invalid role_id"
# Validate secret_id
if secret_id not in self.secret_ids:
self._audit("approle_login", "auth/approle/login", False, {"error": "invalid_secret_id"})
return False, None, "Invalid secret_id"
secret_info = self.secret_ids[secret_id]
if secret_info["role_name"] != role_name:
self._audit("approle_login", "auth/approle/login", False, {"error": "secret_id_mismatch"})
return False, None, "secret_id does not match role"
# Check secret_id TTL
age = (datetime.utcnow() - secret_info["created_at"]).total_seconds()
if age > secret_info["ttl"]:
self._audit("approle_login", "auth/approle/login", False, {"error": "secret_id_expired"})
return False, None, "secret_id expired"
# Create token
approle = self.approles[role_name]
token = self._generate_id(f"token-{role_name}-{datetime.utcnow().timestamp()}")
accessor = self._generate_id(f"accessor-{token}")
self.tokens[token] = MockToken(
accessor=accessor,
policies=approle.policies,
ttl=approle.token_ttl,
created_at=datetime.utcnow()
)
self._audit("approle_login", "auth/approle/login", True, {
"role": role_name,
"token_accessor": accessor
})
return True, token, f"Authenticated as {role_name}"
# === Token Management ===
def validate_token(self, token: str) -> Tuple[bool, Optional[MockToken]]:
"""Validate a token"""
if token not in self.tokens:
return False, None
mock_token = self.tokens[token]
if not mock_token.is_valid():
return False, None
return True, mock_token
def revoke_token(self, token: str) -> bool:
"""Revoke a token"""
if token in self.tokens:
self.tokens[token].revoked = True
self._audit("revoke", f"auth/token/revoke/{token[:8]}...", True)
return True
return False
def renew_token(self, token: str, increment: int = None) -> Tuple[bool, int]:
"""
Renew a token.
Returns: (success, new_ttl)
"""
valid, mock_token = self.validate_token(token)
if not valid:
return False, 0
if not mock_token.renewable:
return False, 0
# Reset TTL (up to max)
mock_token.created_at = datetime.utcnow()
self._audit("renew", f"auth/token/renew", True, {"new_ttl": mock_token.ttl})
return True, mock_token.ttl
# === Policy Evaluation ===
def check_policy(self, token: str, path: str, capability: str) -> bool:
"""
Check if token has capability for path.
Supports glob patterns (* at end of path).
"""
valid, mock_token = self.validate_token(token)
if not valid:
return False
for policy_name in mock_token.policies:
if policy_name not in self.policies:
continue
policy = self.policies[policy_name]
for policy_path, capabilities in policy.items():
if self._path_matches(path, policy_path):
if capability in capabilities:
self._audit("policy_check", path, True, {
"policy": policy_name,
"capability": capability
})
return True
self._audit("policy_check", path, False, {"capability": capability})
return False
def _path_matches(self, actual_path: str, policy_path: str) -> bool:
"""Check if actual path matches policy path pattern"""
if policy_path.endswith("/*"):
prefix = policy_path[:-2]
return actual_path.startswith(prefix)
elif policy_path.endswith("*"):
prefix = policy_path[:-1]
return actual_path.startswith(prefix)
else:
return actual_path == policy_path
# === KV Secrets ===
def kv_put(self, path: str, data: Dict[str, Any], token: str = None) -> bool:
"""Store a secret"""
if token and not self.check_policy(token, path, "create"):
return False
self.kv_store[path] = {
"data": data,
"metadata": {
"created_time": datetime.utcnow().isoformat(),
"version": self.kv_store.get(path, {}).get("metadata", {}).get("version", 0) + 1
}
}
self._audit("kv_put", path, True)
return True
def kv_get(self, path: str, token: str = None) -> Optional[Dict[str, Any]]:
"""Retrieve a secret"""
if token and not self.check_policy(token, path, "read"):
return None
if path in self.kv_store:
self._audit("kv_get", path, True)
return self.kv_store[path]["data"]
self._audit("kv_get", path, False, {"error": "not_found"})
return None
def kv_list(self, path: str, token: str = None) -> List[str]:
"""List secrets under path"""
if token and not self.check_policy(token, path, "list"):
return []
prefix = path.rstrip("/") + "/"
keys = []
for key in self.kv_store.keys():
if key.startswith(prefix):
# Get the next path component
remainder = key[len(prefix):]
next_component = remainder.split("/")[0]
if next_component and next_component not in keys:
keys.append(next_component)
return keys
# === Test Helpers ===
def reset(self):
"""Reset all state for testing"""
self.kv_store.clear()
self.tokens.clear()
self.secret_ids.clear()
self.audit_log.clear()
self._setup_default_policies()
self._setup_default_approles()
def get_audit_log(self) -> List[Dict[str, Any]]:
"""Get audit log for test assertions"""
return self.audit_log.copy()
def inject_policy(self, name: str, rules: Dict[str, List[str]]):
"""Inject a custom policy for testing"""
self.policies[name] = rules
def inject_token(self, token: str, policies: List[str], ttl: int = 3600):
"""Inject a pre-created token for testing"""
accessor = self._generate_id(f"accessor-{token}")
self.tokens[token] = MockToken(
accessor=accessor,
policies=policies,
ttl=ttl,
created_at=datetime.utcnow()
)