""" GitHub Integration for Agent Governance System Provides: - PR creation and management - Issue tracking - Commit status updates - Webhook handling """ import os import json from typing import Dict, Any, Optional, List from dataclasses import dataclass from datetime import datetime import sys sys.path.insert(0, str(__file__).rsplit("/", 2)[0]) from common.base import BaseIntegration, IntegrationConfig, IntegrationEvent @dataclass class PullRequest: """Represents a GitHub Pull Request""" number: int title: str body: str head: str base: str state: str url: str created_at: datetime author: str @dataclass class CommitStatus: """Represents a GitHub Commit Status""" sha: str state: str # pending, success, failure, error context: str description: str target_url: Optional[str] = None class GitHubIntegration(BaseIntegration): """ GitHub integration for agent governance. Capabilities: - Create/update PRs from agent plans - Post commit statuses for governance checks - Create issues for violations - Read PR/Issue comments for human feedback """ def __init__(self, token: str = None, repo: str = None): config = IntegrationConfig( name="github", enabled=token is not None, api_key=token or os.environ.get("GITHUB_TOKEN"), api_url=os.environ.get("GITHUB_API_URL", "https://api.github.com"), extra={"repo": repo or os.environ.get("GITHUB_REPO")} ) super().__init__(config) self._repo = config.extra.get("repo") def test_connection(self) -> bool: """Test GitHub API connection""" if self._dry_run: self._audit("test_connection", True, {"dry_run": True}) return True if not self.config.api_key: self._audit("test_connection", False, {"error": "no_token"}) return False # Would make API call: GET /user self._audit("test_connection", True, {"repo": self._repo}) return True def send_event(self, event: IntegrationEvent) -> bool: """Route event to appropriate GitHub action""" handlers = { "plan_created": self._handle_plan_created, "execution_complete": self._handle_execution_complete, "violation_detected": self._handle_violation, "promotion_requested": self._handle_promotion, "agent_revoked": self._handle_revocation, } handler = handlers.get(event.event_type) if handler: return handler(event) self._audit("send_event", False, {"error": f"unknown_event_type: {event.event_type}"}) return False def _handle_plan_created(self, event: IntegrationEvent) -> bool: """Create a PR for an agent plan""" plan = event.data.get("plan", {}) agent_id = event.source if self._dry_run: self._audit("create_pr", True, { "dry_run": True, "title": f"[Agent] {plan.get('title', 'Plan')}", "agent": agent_id }) return True # Would create PR via API pr_data = { "title": f"[Agent {agent_id}] {plan.get('title', 'Automated Plan')}", "body": self._format_plan_body(plan, agent_id), "head": f"agent/{agent_id}/{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}", "base": "main" } self._audit("create_pr", True, pr_data) return True def _handle_execution_complete(self, event: IntegrationEvent) -> bool: """Update commit status after execution""" result = event.data.get("result", {}) success = result.get("success", False) status = CommitStatus( sha=event.data.get("commit_sha", "HEAD"), state="success" if success else "failure", context="agent-governance/execution", description=f"Agent execution {'completed' if success else 'failed'}" ) if self._dry_run: self._audit("commit_status", True, {"dry_run": True, "status": status.state}) return True self._audit("commit_status", True, { "sha": status.sha, "state": status.state, "context": status.context }) return True def _handle_violation(self, event: IntegrationEvent) -> bool: """Create an issue for a governance violation""" violation = event.data.get("violation", {}) agent_id = event.source issue_data = { "title": f"[Violation] {violation.get('type', 'Unknown')} - Agent {agent_id}", "body": self._format_violation_body(violation, agent_id), "labels": ["governance", "violation", violation.get("severity", "medium")] } if self._dry_run: self._audit("create_issue", True, {"dry_run": True, **issue_data}) return True self._audit("create_issue", True, issue_data) return True def _handle_promotion(self, event: IntegrationEvent) -> bool: """Create PR for tier promotion request""" promotion = event.data.get("promotion", {}) agent_id = event.source pr_data = { "title": f"[Promotion] {agent_id}: Tier {promotion.get('from_tier')} → {promotion.get('to_tier')}", "body": self._format_promotion_body(promotion, agent_id), "labels": ["governance", "promotion"] } if self._dry_run: self._audit("create_promotion_pr", True, {"dry_run": True, **pr_data}) return True self._audit("create_promotion_pr", True, pr_data) return True def _handle_revocation(self, event: IntegrationEvent) -> bool: """Create issue for agent revocation""" revocation = event.data.get("revocation", {}) agent_id = event.source issue_data = { "title": f"[Revoked] Agent {agent_id}", "body": self._format_revocation_body(revocation, agent_id), "labels": ["governance", "revocation", "critical"] } if self._dry_run: self._audit("create_revocation_issue", True, {"dry_run": True, **issue_data}) return True self._audit("create_revocation_issue", True, issue_data) return True def _format_plan_body(self, plan: Dict, agent_id: str) -> str: """Format plan as PR body""" return f"""## Automated Plan by Agent `{agent_id}` ### Objective {plan.get('objective', 'N/A')} ### Steps {self._format_steps(plan.get('steps', []))} ### Constraints {self._format_list(plan.get('constraints', []))} ### Risk Assessment - **Confidence:** {plan.get('confidence', 'N/A')} - **Tier Required:** {plan.get('tier_required', 'N/A')} --- *Generated by Agent Governance System* """ def _format_violation_body(self, violation: Dict, agent_id: str) -> str: """Format violation as issue body""" return f"""## Governance Violation Detected **Agent:** `{agent_id}` **Type:** {violation.get('type', 'Unknown')} **Severity:** {violation.get('severity', 'medium')} **Timestamp:** {violation.get('timestamp', datetime.utcnow().isoformat())} ### Details {violation.get('description', 'No details provided')} ### Evidence ```json {json.dumps(violation.get('evidence', {}), indent=2)} ``` ### Recommended Actions {self._format_list(violation.get('recommendations', ['Review agent configuration']))} --- *Generated by Agent Governance System* """ def _format_promotion_body(self, promotion: Dict, agent_id: str) -> str: """Format promotion request as PR body""" return f"""## Tier Promotion Request **Agent:** `{agent_id}` **Current Tier:** {promotion.get('from_tier', 'N/A')} **Requested Tier:** {promotion.get('to_tier', 'N/A')} ### Metrics - Compliant Runs: {promotion.get('compliant_runs', 0)} - Consecutive Compliant: {promotion.get('consecutive_compliant', 0)} - Violations: {promotion.get('violations', 0)} ### Evidence {promotion.get('evidence_summary', 'See attached artifacts')} ### Approval Required - [ ] Review agent history - [ ] Verify metrics - [ ] Approve promotion --- *Generated by Agent Governance System* """ def _format_revocation_body(self, revocation: Dict, agent_id: str) -> str: """Format revocation as issue body""" return f"""## Agent Revoked **Agent:** `{agent_id}` **Reason:** {revocation.get('reason', 'Unknown')} **Timestamp:** {revocation.get('timestamp', datetime.utcnow().isoformat())} ### Details {revocation.get('description', 'No details provided')} ### Last Known State ```json {json.dumps(revocation.get('last_state', {}), indent=2)} ``` ### Required Actions - [ ] Review violation - [ ] Determine root cause - [ ] Decide on re-promotion eligibility --- *Generated by Agent Governance System* """ def _format_steps(self, steps: List) -> str: """Format steps as numbered list""" if not steps: return "No steps defined" return "\n".join([f"{i+1}. {step}" for i, step in enumerate(steps)]) def _format_list(self, items: List) -> str: """Format items as bullet list""" if not items: return "None" return "\n".join([f"- {item}" for item in items]) # === Stub API Methods === def create_pr(self, title: str, body: str, head: str, base: str = "main") -> Optional[PullRequest]: """Create a pull request (stub)""" if self._dry_run or not self.config.api_key: return None # Would make API call: POST /repos/{owner}/{repo}/pulls self._audit("create_pr_api", True, {"title": title, "head": head, "base": base}) return None def update_commit_status(self, status: CommitStatus) -> bool: """Update commit status (stub)""" if self._dry_run or not self.config.api_key: return True # Would make API call: POST /repos/{owner}/{repo}/statuses/{sha} self._audit("update_status_api", True, {"sha": status.sha, "state": status.state}) return True def create_issue(self, title: str, body: str, labels: List[str] = None) -> bool: """Create an issue (stub)""" if self._dry_run or not self.config.api_key: return True # Would make API call: POST /repos/{owner}/{repo}/issues self._audit("create_issue_api", True, {"title": title, "labels": labels}) return True def get_pr_comments(self, pr_number: int) -> List[Dict]: """Get PR comments (stub)""" if self._dry_run or not self.config.api_key: return [] # Would make API call: GET /repos/{owner}/{repo}/pulls/{pr_number}/comments return []