profit 8c6e7831e9 Add Phase 10-12 implementation: multi-tenant, marketplace, observability
Major additions:
- marketplace/: Agent template registry with FTS5 search, ratings, versioning
- observability/: Prometheus metrics, distributed tracing, structured logging
- ledger/migrations/: Database migration scripts for multi-tenant support
- tests/governance/: 15 new test files for phases 6-12 (295 total tests)
- bin/validate-phases: Full 12-phase validation script

New features:
- Multi-tenant support with tenant isolation and quota enforcement
- Agent marketplace with semantic versioning and search
- Observability with metrics, tracing, and log correlation
- Tier-1 agent bootstrap scripts

Updated components:
- ledger/api.py: Extended API for tenants, marketplace, observability
- ledger/schema.sql: Added tenant, project, marketplace tables
- testing/framework.ts: Enhanced test framework
- checkpoint/checkpoint.py: Improved checkpoint management

Archived:
- External integrations (Slack/GitHub/PagerDuty) moved to .archive/
- Old checkpoint files cleaned up

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 18:39:47 -05:00

316 lines
10 KiB
Python

"""
Secrets Management for External Integrations
Provides secure credential loading from:
1. Environment variables (development/CI)
2. Vault secrets engine (production)
3. Mock credentials (testing)
Usage:
secrets = SecretsManager()
slack_token = secrets.get("SLACK_BOT_TOKEN")
pd_key = secrets.get("PAGERDUTY_ROUTING_KEY")
"""
import os
import json
import subprocess
from typing import Dict, Any, Optional
from dataclasses import dataclass
from pathlib import Path
@dataclass
class SecretConfig:
"""Configuration for a secret"""
name: str
env_var: str
vault_path: Optional[str] = None
required: bool = False
default: Optional[str] = None
description: str = ""
class SecretsManager:
"""
Manages secrets for external integrations.
Priority order:
1. Environment variables (highest)
2. Vault secrets (if available)
3. Mock/default values (testing only)
"""
# Known secrets with their configurations
KNOWN_SECRETS = {
# Slack
"SLACK_BOT_TOKEN": SecretConfig(
name="slack_bot_token",
env_var="SLACK_BOT_TOKEN",
vault_path="secret/data/integrations/slack",
description="Slack Bot OAuth Token (xoxb-...)"
),
"SLACK_WEBHOOK_URL": SecretConfig(
name="slack_webhook_url",
env_var="SLACK_WEBHOOK_URL",
vault_path="secret/data/integrations/slack",
description="Slack Incoming Webhook URL"
),
# GitHub
"GITHUB_TOKEN": SecretConfig(
name="github_token",
env_var="GITHUB_TOKEN",
vault_path="secret/data/integrations/github",
description="GitHub Personal Access Token"
),
"GITHUB_WEBHOOK_SECRET": SecretConfig(
name="github_webhook_secret",
env_var="GITHUB_WEBHOOK_SECRET",
vault_path="secret/data/integrations/github",
description="GitHub Webhook Secret"
),
# PagerDuty
"PAGERDUTY_ROUTING_KEY": SecretConfig(
name="pagerduty_routing_key",
env_var="PAGERDUTY_ROUTING_KEY",
vault_path="secret/data/integrations/pagerduty",
description="PagerDuty Events API v2 Routing Key"
),
"PAGERDUTY_API_TOKEN": SecretConfig(
name="pagerduty_api_token",
env_var="PAGERDUTY_API_TOKEN",
vault_path="secret/data/integrations/pagerduty",
description="PagerDuty REST API Token"
),
"PAGERDUTY_SERVICE_ID": SecretConfig(
name="pagerduty_service_id",
env_var="PAGERDUTY_SERVICE_ID",
vault_path="secret/data/integrations/pagerduty",
description="PagerDuty Service ID"
),
# Redis/DragonflyDB
"REDIS_PASSWORD": SecretConfig(
name="redis_password",
env_var="REDIS_PASSWORD",
vault_path="secret/data/infrastructure/redis",
default="", # Empty password for local dev
description="Redis/DragonflyDB password"
),
"REDIS_URL": SecretConfig(
name="redis_url",
env_var="REDIS_URL",
default="redis://localhost:6379",
description="Redis connection URL"
),
# Database
"DATABASE_URL": SecretConfig(
name="database_url",
env_var="DATABASE_URL",
vault_path="secret/data/infrastructure/database",
default="sqlite:///governance.db",
description="Database connection URL"
),
}
def __init__(
self,
use_vault: bool = None,
vault_addr: str = None,
vault_token: str = None,
mock_mode: bool = None
):
self._cache: Dict[str, str] = {}
# Auto-detect mode
self._mock_mode = mock_mode if mock_mode is not None else (
os.environ.get("INTEGRATION_DRY_RUN", "false").lower() == "true" or
os.environ.get("TESTING", "false").lower() == "true"
)
self._use_vault = use_vault if use_vault is not None else (
os.environ.get("USE_VAULT", "true").lower() == "true"
)
self._vault_addr = vault_addr or os.environ.get("VAULT_ADDR", "https://127.0.0.1:8200")
self._vault_token = vault_token or self._load_vault_token()
def _load_vault_token(self) -> Optional[str]:
"""Load Vault token from environment or file"""
# Environment variable first
token = os.environ.get("VAULT_TOKEN")
if token:
return token
# Try token file
token_files = [
"/opt/vault/init-keys.json",
os.path.expanduser("~/.vault-token")
]
for token_file in token_files:
if Path(token_file).exists():
try:
with open(token_file) as f:
data = json.load(f)
if isinstance(data, dict) and "root_token" in data:
return data["root_token"]
return str(data)
except (json.JSONDecodeError, KeyError):
# Try reading as plain text
with open(token_file) as f:
return f.read().strip()
return None
def get(self, key: str, default: str = None) -> Optional[str]:
"""
Get a secret value.
Args:
key: Secret key (e.g., "SLACK_BOT_TOKEN")
default: Default value if secret not found
Returns:
Secret value or default
"""
# Check cache first
if key in self._cache:
return self._cache[key]
# Get secret config
config = self.KNOWN_SECRETS.get(key, SecretConfig(
name=key.lower(),
env_var=key
))
# Try environment variable first
value = os.environ.get(config.env_var)
if value:
self._cache[key] = value
return value
# In mock mode, skip Vault and return mock values directly
# This ensures consistent test behavior regardless of Vault state
if self._mock_mode:
# Use default if available, otherwise generate mock value
if default is not None:
self._cache[key] = default
return default
if config.default:
self._cache[key] = config.default
return config.default
mock_value = f"mock-{key.lower()}"
self._cache[key] = mock_value
return mock_value
# Try Vault if enabled (only in non-mock mode)
if self._use_vault and config.vault_path and self._vault_token:
vault_value = self._get_from_vault(config.vault_path, config.name)
# Skip placeholder values that indicate unconfigured secrets
if vault_value and not vault_value.startswith("PLACEHOLDER_"):
self._cache[key] = vault_value
return vault_value
# Use default
final_default = default if default is not None else config.default
if final_default:
self._cache[key] = final_default
return final_default
return None
def _get_from_vault(self, path: str, key: str) -> Optional[str]:
"""Get a secret from Vault"""
try:
cmd = [
"curl", "-sk",
"-H", f"X-Vault-Token: {self._vault_token}",
f"{self._vault_addr}/v1/{path}"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout:
data = json.loads(result.stdout)
# KV v2 path
secret_data = data.get("data", {}).get("data", {})
return secret_data.get(key)
except Exception:
pass
return None
def get_all(self, *keys: str) -> Dict[str, Optional[str]]:
"""Get multiple secrets at once"""
return {key: self.get(key) for key in keys}
def has(self, key: str) -> bool:
"""Check if a secret is available"""
return self.get(key) is not None
def require(self, key: str) -> str:
"""Get a required secret, raising if not found"""
value = self.get(key)
if value is None:
raise ValueError(f"Required secret not found: {key}")
return value
def list_configured(self) -> Dict[str, bool]:
"""List all known secrets and whether they're configured"""
return {
key: self.has(key)
for key in self.KNOWN_SECRETS
}
def validate_integration(self, integration_name: str) -> Dict[str, Any]:
"""Validate all secrets for an integration are configured"""
prefixes = {
"slack": ["SLACK_"],
"github": ["GITHUB_"],
"pagerduty": ["PAGERDUTY_"],
"redis": ["REDIS_"],
}
prefix_list = prefixes.get(integration_name.lower(), [integration_name.upper() + "_"])
results = {
"integration": integration_name,
"configured": [],
"missing": [],
"mock_mode": self._mock_mode
}
for key, config in self.KNOWN_SECRETS.items():
if any(key.startswith(p) for p in prefix_list):
if self.has(key):
results["configured"].append(key)
elif config.required:
results["missing"].append(key)
results["ready"] = len(results["missing"]) == 0
return results
# Singleton instance
_secrets_manager = None
def get_secrets() -> SecretsManager:
"""Get the global secrets manager instance"""
global _secrets_manager
if _secrets_manager is None:
_secrets_manager = SecretsManager()
return _secrets_manager
# Convenience functions
def get_secret(key: str, default: str = None) -> Optional[str]:
"""Get a secret value"""
return get_secrets().get(key, default)
def require_secret(key: str) -> str:
"""Get a required secret"""
return get_secrets().require(key)