profit 8c6e7831e9 Add Phase 10-12 implementation: multi-tenant, marketplace, observability
Major additions:
- marketplace/: Agent template registry with FTS5 search, ratings, versioning
- observability/: Prometheus metrics, distributed tracing, structured logging
- ledger/migrations/: Database migration scripts for multi-tenant support
- tests/governance/: 15 new test files for phases 6-12 (295 total tests)
- bin/validate-phases: Full 12-phase validation script

New features:
- Multi-tenant support with tenant isolation and quota enforcement
- Agent marketplace with semantic versioning and search
- Observability with metrics, tracing, and log correlation
- Tier-1 agent bootstrap scripts

Updated components:
- ledger/api.py: Extended API for tenants, marketplace, observability
- ledger/schema.sql: Added tenant, project, marketplace tables
- testing/framework.ts: Enhanced test framework
- checkpoint/checkpoint.py: Improved checkpoint management

Archived:
- External integrations (Slack/GitHub/PagerDuty) moved to .archive/
- Old checkpoint files cleaned up

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 18:39:47 -05:00

399 lines
14 KiB
Python

"""
PagerDuty Integration for Agent Governance System
Provides:
- Critical incident alerts
- On-call escalation
- Service health status
- Incident lifecycle management
"""
import os
import json
import hashlib
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import sys
sys.path.insert(0, str(__file__).rsplit("/", 2)[0])
from common.base import BaseIntegration, IntegrationConfig, IntegrationEvent
class IncidentSeverity(Enum):
"""PagerDuty incident severity levels"""
CRITICAL = "critical"
ERROR = "error"
WARNING = "warning"
INFO = "info"
@dataclass
class PagerDutyIncident:
"""Represents a PagerDuty incident"""
routing_key: str
event_action: str # trigger, acknowledge, resolve
dedup_key: str
severity: IncidentSeverity
summary: str
source: str
component: Optional[str] = None
group: Optional[str] = None
custom_details: Optional[Dict[str, Any]] = None
links: Optional[List[Dict[str, str]]] = None
images: Optional[List[Dict[str, str]]] = None
class PagerDutyIntegration(BaseIntegration):
"""
PagerDuty integration for agent governance.
Capabilities:
- Trigger incidents for critical violations
- Acknowledge in-progress remediation
- Resolve when issues are fixed
- Escalate based on severity
"""
# Events V2 API endpoint
EVENTS_API_URL = "https://events.pagerduty.com/v2/enqueue"
def __init__(
self,
routing_key: str = None,
service_id: str = None,
api_token: str = None
):
# Support both Events API (routing_key) and REST API (api_token)
routing_key = routing_key or os.environ.get("PAGERDUTY_ROUTING_KEY")
api_token = api_token or os.environ.get("PAGERDUTY_API_TOKEN")
service_id = service_id or os.environ.get("PAGERDUTY_SERVICE_ID")
config = IntegrationConfig(
name="pagerduty",
enabled=routing_key is not None or api_token is not None,
api_key=api_token,
api_url=self.EVENTS_API_URL,
extra={
"routing_key": routing_key,
"service_id": service_id,
"default_severity": os.environ.get("PAGERDUTY_DEFAULT_SEVERITY", "error"),
"source": os.environ.get("PAGERDUTY_SOURCE", "agent-governance")
}
)
super().__init__(config)
@property
def enabled(self) -> bool:
"""PagerDuty is enabled if routing key or API token is set"""
has_routing_key = self.config.extra.get("routing_key") is not None
has_api_token = self.config.api_key is not None
return self.config.enabled and (has_routing_key or has_api_token)
def test_connection(self) -> bool:
"""Test PagerDuty connection"""
if self._dry_run:
self._audit("test_connection", True, {"dry_run": True})
return True
if not self.enabled:
self._audit("test_connection", False, {"error": "no_credentials"})
return False
# In production, would make a test API call
self._audit("test_connection", True)
return True
def send_event(self, event: IntegrationEvent) -> bool:
"""Route event to appropriate PagerDuty action"""
# Only handle critical/high severity events
priority = event.priority
if priority not in ["critical", "high"]:
self._audit("send_event_skipped", True, {
"reason": "low_priority",
"priority": priority
})
return True
handlers = {
"violation_detected": self._trigger_violation_incident,
"agent_revoked": self._trigger_revocation_incident,
"system_failure": self._trigger_system_incident,
"execution_failed": self._trigger_execution_incident,
"remediation_started": self._acknowledge_incident,
"issue_resolved": self._resolve_incident,
}
handler = handlers.get(event.event_type)
if handler:
return handler(event)
# For other high-priority events, trigger generic incident
if priority == "critical":
return self._trigger_generic_incident(event)
return True
def _trigger_violation_incident(self, event: IntegrationEvent) -> bool:
"""Trigger incident for governance violation"""
violation = event.data.get("violation", {})
agent_id = event.source
severity = self._map_severity(violation.get("severity", "high"))
incident = self._build_incident(
event_action="trigger",
dedup_key=f"violation-{agent_id}-{violation.get('type', 'unknown')}",
severity=severity,
summary=f"Governance Violation: {violation.get('type', 'Unknown')} by agent {agent_id}",
component=f"agent-{agent_id}",
group="governance",
custom_details={
"agent_id": agent_id,
"violation_type": violation.get("type"),
"violation_severity": violation.get("severity"),
"description": violation.get("description"),
"evidence": violation.get("evidence", {})
}
)
return self._send_incident(incident)
def _trigger_revocation_incident(self, event: IntegrationEvent) -> bool:
"""Trigger incident for agent revocation"""
revocation = event.data.get("revocation", {})
agent_id = event.source
incident = self._build_incident(
event_action="trigger",
dedup_key=f"revocation-{agent_id}",
severity=IncidentSeverity.CRITICAL,
summary=f"Agent Revoked: {agent_id}",
component=f"agent-{agent_id}",
group="governance",
custom_details={
"agent_id": agent_id,
"reason": revocation.get("reason"),
"previous_tier": revocation.get("previous_tier"),
"revoked_at": datetime.utcnow().isoformat()
}
)
return self._send_incident(incident)
def _trigger_system_incident(self, event: IntegrationEvent) -> bool:
"""Trigger incident for system failure"""
failure = event.data.get("failure", {})
incident = self._build_incident(
event_action="trigger",
dedup_key=f"system-{failure.get('component', 'unknown')}",
severity=IncidentSeverity.CRITICAL,
summary=f"System Failure: {failure.get('component', 'Unknown')}",
component=failure.get("component"),
group="infrastructure",
custom_details={
"component": failure.get("component"),
"error": failure.get("error"),
"timestamp": datetime.utcnow().isoformat()
}
)
return self._send_incident(incident)
def _trigger_execution_incident(self, event: IntegrationEvent) -> bool:
"""Trigger incident for execution failure"""
result = event.data.get("result", {})
agent_id = event.source
incident = self._build_incident(
event_action="trigger",
dedup_key=f"execution-{agent_id}-{result.get('task_id', 'unknown')}",
severity=IncidentSeverity.ERROR,
summary=f"Execution Failed: Agent {agent_id}",
component=f"agent-{agent_id}",
group="execution",
custom_details={
"agent_id": agent_id,
"task_id": result.get("task_id"),
"error": result.get("error"),
"duration": result.get("duration")
}
)
return self._send_incident(incident)
def _acknowledge_incident(self, event: IntegrationEvent) -> bool:
"""Acknowledge an existing incident"""
incident_key = event.data.get("incident_key")
if not incident_key:
return True
incident = self._build_incident(
event_action="acknowledge",
dedup_key=incident_key,
severity=IncidentSeverity.INFO,
summary=f"Remediation in progress"
)
return self._send_incident(incident)
def _resolve_incident(self, event: IntegrationEvent) -> bool:
"""Resolve an existing incident"""
incident_key = event.data.get("incident_key")
if not incident_key:
return True
incident = self._build_incident(
event_action="resolve",
dedup_key=incident_key,
severity=IncidentSeverity.INFO,
summary=f"Issue resolved"
)
return self._send_incident(incident)
def _trigger_generic_incident(self, event: IntegrationEvent) -> bool:
"""Trigger a generic incident for critical events"""
incident = self._build_incident(
event_action="trigger",
dedup_key=f"generic-{event.event_type}-{self._hash_data(event.data)}",
severity=IncidentSeverity.ERROR,
summary=f"Critical Event: {event.event_type}",
component=event.source,
custom_details=event.data
)
return self._send_incident(incident)
# === Incident Building Helpers ===
def _build_incident(
self,
event_action: str,
dedup_key: str,
severity: IncidentSeverity,
summary: str,
component: str = None,
group: str = None,
custom_details: Dict = None,
links: List[Dict] = None,
images: List[Dict] = None
) -> PagerDutyIncident:
"""Build a PagerDuty incident"""
return PagerDutyIncident(
routing_key=self.config.extra.get("routing_key", ""),
event_action=event_action,
dedup_key=dedup_key,
severity=severity,
summary=summary,
source=self.config.extra.get("source", "agent-governance"),
component=component,
group=group,
custom_details=custom_details,
links=links,
images=images
)
def _map_severity(self, violation_severity: str) -> IncidentSeverity:
"""Map violation severity to PagerDuty severity"""
mapping = {
"critical": IncidentSeverity.CRITICAL,
"high": IncidentSeverity.ERROR,
"medium": IncidentSeverity.WARNING,
"low": IncidentSeverity.INFO
}
return mapping.get(violation_severity.lower(), IncidentSeverity.ERROR)
def _hash_data(self, data: Dict) -> str:
"""Create a short hash of data for dedup key"""
content = json.dumps(data, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()[:8]
def _send_incident(self, incident: PagerDutyIncident) -> bool:
"""Send incident to PagerDuty"""
if self._dry_run:
self._audit("send_incident", True, {
"dry_run": True,
"action": incident.event_action,
"dedup_key": incident.dedup_key,
"severity": incident.severity.value
})
return True
if not self.enabled:
return False
# Build Events API v2 payload
payload = {
"routing_key": incident.routing_key,
"event_action": incident.event_action,
"dedup_key": incident.dedup_key,
"payload": {
"summary": incident.summary,
"severity": incident.severity.value,
"source": incident.source,
}
}
if incident.component:
payload["payload"]["component"] = incident.component
if incident.group:
payload["payload"]["group"] = incident.group
if incident.custom_details:
payload["payload"]["custom_details"] = incident.custom_details
if incident.links:
payload["links"] = incident.links
if incident.images:
payload["images"] = incident.images
# In production, would POST to Events API
self._audit("send_incident", True, {
"action": incident.event_action,
"dedup_key": incident.dedup_key,
"severity": incident.severity.value
})
return True
# === Convenience Methods ===
def trigger(
self,
summary: str,
severity: IncidentSeverity = IncidentSeverity.ERROR,
dedup_key: str = None,
details: Dict = None
) -> bool:
"""Convenience method to trigger an incident"""
if dedup_key is None:
dedup_key = f"manual-{self._hash_data({'summary': summary, 'ts': datetime.utcnow().isoformat()})}"
incident = self._build_incident(
event_action="trigger",
dedup_key=dedup_key,
severity=severity,
summary=summary,
custom_details=details
)
return self._send_incident(incident)
def acknowledge(self, dedup_key: str) -> bool:
"""Convenience method to acknowledge an incident"""
incident = self._build_incident(
event_action="acknowledge",
dedup_key=dedup_key,
severity=IncidentSeverity.INFO,
summary="Acknowledged"
)
return self._send_incident(incident)
def resolve(self, dedup_key: str) -> bool:
"""Convenience method to resolve an incident"""
incident = self._build_incident(
event_action="resolve",
dedup_key=dedup_key,
severity=IncidentSeverity.INFO,
summary="Resolved"
)
return self._send_incident(incident)