Phase 8 Production Hardening with complete governance infrastructure: - Vault integration with tiered policies (T0-T4) - DragonflyDB state management - SQLite audit ledger - Pipeline DSL and templates - Promotion/revocation engine - Checkpoint system for session persistence - Health manager and circuit breaker for fault tolerance - GitHub/Slack integrations - Architectural test pipeline with bug watcher, suggestion engine, council review - Multi-agent chaos testing framework Test Results: - Governance tests: 68/68 passing - E2E workflow: 16/16 passing - Phase 2 Vault: 14/14 passing - Integration tests: 27/27 passing Coverage: 57.6% average across 12 phases Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
174 lines
4.9 KiB
Python
174 lines
4.9 KiB
Python
"""
|
|
Base classes for external integrations.
|
|
|
|
Provides common patterns for authenticated API clients,
|
|
rate limiting, retries, and audit logging.
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, Optional, List
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
import json
|
|
import time
|
|
import os
|
|
|
|
|
|
@dataclass
|
|
class IntegrationConfig:
|
|
"""Configuration for an external integration"""
|
|
name: str
|
|
enabled: bool = False
|
|
api_key: Optional[str] = None
|
|
api_url: Optional[str] = None
|
|
timeout: int = 30
|
|
retries: int = 3
|
|
retry_delay: float = 1.0
|
|
rate_limit_per_minute: int = 60
|
|
extra: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class IntegrationEvent:
|
|
"""An event to be sent to an integration"""
|
|
event_type: str
|
|
source: str # agent_id or system
|
|
data: Dict[str, Any]
|
|
timestamp: datetime = field(default_factory=datetime.utcnow)
|
|
priority: str = "normal" # low, normal, high, critical
|
|
|
|
|
|
class BaseIntegration(ABC):
|
|
"""
|
|
Base class for external integrations.
|
|
|
|
Provides:
|
|
- Configuration management
|
|
- Rate limiting
|
|
- Retry logic
|
|
- Audit logging
|
|
- Dry-run mode
|
|
"""
|
|
|
|
def __init__(self, config: IntegrationConfig):
|
|
self.config = config
|
|
self._request_times: List[float] = []
|
|
self._audit_log: List[Dict[str, Any]] = []
|
|
self._dry_run = os.environ.get("INTEGRATION_DRY_RUN", "false").lower() == "true"
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return self.config.name
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return self.config.enabled and self.config.api_key is not None
|
|
|
|
def _check_rate_limit(self) -> bool:
|
|
"""Check if we're within rate limits"""
|
|
now = time.time()
|
|
minute_ago = now - 60
|
|
|
|
# Remove old timestamps
|
|
self._request_times = [t for t in self._request_times if t > minute_ago]
|
|
|
|
return len(self._request_times) < self.config.rate_limit_per_minute
|
|
|
|
def _record_request(self):
|
|
"""Record a request for rate limiting"""
|
|
self._request_times.append(time.time())
|
|
|
|
def _audit(self, operation: str, success: bool, details: Dict = None):
|
|
"""Record an audit entry"""
|
|
self._audit_log.append({
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"integration": self.name,
|
|
"operation": operation,
|
|
"success": success,
|
|
"details": details or {},
|
|
"dry_run": self._dry_run
|
|
})
|
|
|
|
def _with_retry(self, func, *args, **kwargs) -> Any:
|
|
"""Execute function with retry logic"""
|
|
last_error = None
|
|
|
|
for attempt in range(self.config.retries):
|
|
try:
|
|
if not self._check_rate_limit():
|
|
time.sleep(self.config.retry_delay)
|
|
continue
|
|
|
|
self._record_request()
|
|
return func(*args, **kwargs)
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
if attempt < self.config.retries - 1:
|
|
time.sleep(self.config.retry_delay * (attempt + 1))
|
|
|
|
raise last_error
|
|
|
|
@abstractmethod
|
|
def test_connection(self) -> bool:
|
|
"""Test if integration is properly configured"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def send_event(self, event: IntegrationEvent) -> bool:
|
|
"""Send an event to the integration"""
|
|
pass
|
|
|
|
def get_audit_log(self) -> List[Dict[str, Any]]:
|
|
"""Get audit log for this integration"""
|
|
return self._audit_log.copy()
|
|
|
|
|
|
class IntegrationManager:
|
|
"""
|
|
Manages multiple integrations.
|
|
|
|
Provides unified interface for sending events to all
|
|
configured integrations.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._integrations: Dict[str, BaseIntegration] = {}
|
|
|
|
def register(self, integration: BaseIntegration):
|
|
"""Register an integration"""
|
|
self._integrations[integration.name] = integration
|
|
|
|
def get(self, name: str) -> Optional[BaseIntegration]:
|
|
"""Get an integration by name"""
|
|
return self._integrations.get(name)
|
|
|
|
def list_enabled(self) -> List[str]:
|
|
"""List enabled integrations"""
|
|
return [
|
|
name for name, integ in self._integrations.items()
|
|
if integ.enabled
|
|
]
|
|
|
|
def broadcast(self, event: IntegrationEvent) -> Dict[str, bool]:
|
|
"""
|
|
Send event to all enabled integrations.
|
|
|
|
Returns: {integration_name: success}
|
|
"""
|
|
results = {}
|
|
for name, integration in self._integrations.items():
|
|
if integration.enabled:
|
|
try:
|
|
results[name] = integration.send_event(event)
|
|
except Exception as e:
|
|
results[name] = False
|
|
return results
|
|
|
|
def test_all(self) -> Dict[str, bool]:
|
|
"""Test all registered integrations"""
|
|
return {
|
|
name: integration.test_connection()
|
|
for name, integration in self._integrations.items()
|
|
}
|