profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

528 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Promotion Engine
================
Manages agent tier progression based on compliance metrics.
Part of Phase 4: Promotion and Revocation Engine.
Promotion Rules:
- T0 → T1: 5 compliant plan-only tasks, correct scoping, clear rollbacks
- T1 → T2: N consecutive compliant sandbox runs, 2+ successful modifications
- T2 → T3: Demonstrated complexity reduction, correct escalation behavior
- T3 → T4: Rare, manual review required
"""
import json
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
# =============================================================================
# Configuration
# =============================================================================
LEDGER_DB = "/opt/agent-governance/ledger/governance.db"
# Promotion requirements per tier
PROMOTION_REQUIREMENTS = {
0: { # T0 → T1
"min_compliant_runs": 5,
"min_consecutive_compliant": 3,
"required_action_types": ["generate_plan"],
"max_violations_30d": 0,
"description": "5 compliant plan-only tasks, 3 consecutive, no violations"
},
1: { # T1 → T2
"min_compliant_runs": 10,
"min_consecutive_compliant": 5,
"required_action_types": ["ansible_check", "terraform_plan"],
"min_successful_modifications": 2,
"max_violations_30d": 0,
"description": "10 compliant runs, 5 consecutive, 2+ mods, no violations"
},
2: { # T2 → T3
"min_compliant_runs": 25,
"min_consecutive_compliant": 10,
"required_action_types": ["ansible_run", "terraform_apply"],
"min_successful_modifications": 5,
"max_violations_30d": 0,
"requires_manual_review": True,
"description": "25 compliant runs, demonstrated complexity handling"
},
3: { # T3 → T4
"min_compliant_runs": 50,
"min_consecutive_compliant": 25,
"requires_manual_review": True,
"requires_admin_approval": True,
"description": "Rare, requires admin approval and extensive track record"
}
}
class PromotionStatus(str, Enum):
ELIGIBLE = "ELIGIBLE"
NOT_ELIGIBLE = "NOT_ELIGIBLE"
PENDING_REVIEW = "PENDING_REVIEW"
APPROVED = "APPROVED"
DENIED = "DENIED"
@dataclass
class PromotionEvaluation:
agent_id: str
current_tier: int
target_tier: int
status: PromotionStatus
requirements_met: dict
requirements_missing: list
recommendation: str
evidence: dict
timestamp: str
def to_dict(self) -> dict:
return {
"agent_id": self.agent_id,
"current_tier": self.current_tier,
"target_tier": self.target_tier,
"status": self.status.value,
"requirements_met": self.requirements_met,
"requirements_missing": self.requirements_missing,
"recommendation": self.recommendation,
"evidence": self.evidence,
"timestamp": self.timestamp
}
class PromotionEngine:
"""
Evaluates and processes agent tier promotions.
"""
def __init__(self, db_path: str = LEDGER_DB):
self.db_path = db_path
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _get_conn(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def get_agent_metrics(self, agent_id: str) -> Optional[dict]:
"""Get current metrics for an agent"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM agent_metrics WHERE agent_id = ?
""", (agent_id,))
row = cursor.fetchone()
conn.close()
if row:
return dict(row)
return None
def get_agent_history(self, agent_id: str, days: int = 30) -> dict:
"""Get agent's action history for evaluation"""
conn = self._get_conn()
cursor = conn.cursor()
# Get total runs and successes
cursor.execute("""
SELECT
COUNT(*) as total_actions,
SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as successful_actions,
SUM(CASE WHEN decision = 'EXECUTE' AND success = 1 THEN 1 ELSE 0 END) as successful_executions
FROM agent_actions
WHERE agent_id = ?
AND datetime(timestamp) >= datetime('now', ?)
""", (agent_id, f'-{days} days'))
action_stats = dict(cursor.fetchone())
# Get action types used
cursor.execute("""
SELECT DISTINCT action FROM agent_actions
WHERE agent_id = ?
AND datetime(timestamp) >= datetime('now', ?)
""", (agent_id, f'-{days} days'))
action_types = [row['action'] for row in cursor.fetchall()]
# Get violation count
cursor.execute("""
SELECT COUNT(*) as violation_count
FROM violations
WHERE agent_id = ?
AND datetime(timestamp) >= datetime('now', ?)
""", (agent_id, f'-{days} days'))
violations = cursor.fetchone()['violation_count']
conn.close()
return {
"total_actions": action_stats['total_actions'],
"successful_actions": action_stats['successful_actions'],
"successful_executions": action_stats['successful_executions'],
"action_types": action_types,
"violations_30d": violations
}
def evaluate_promotion(self, agent_id: str) -> PromotionEvaluation:
"""Evaluate if an agent is eligible for promotion"""
metrics = self.get_agent_metrics(agent_id)
if not metrics:
return PromotionEvaluation(
agent_id=agent_id,
current_tier=0,
target_tier=1,
status=PromotionStatus.NOT_ELIGIBLE,
requirements_met={},
requirements_missing=["Agent not found in metrics"],
recommendation="Agent has no recorded activity",
evidence={},
timestamp=self._now()
)
current_tier = metrics['current_tier']
target_tier = current_tier + 1
if target_tier > 4:
return PromotionEvaluation(
agent_id=agent_id,
current_tier=current_tier,
target_tier=current_tier,
status=PromotionStatus.NOT_ELIGIBLE,
requirements_met={},
requirements_missing=["Already at maximum tier"],
recommendation="Agent is at Tier 4 (maximum)",
evidence={"current_tier": current_tier},
timestamp=self._now()
)
requirements = PROMOTION_REQUIREMENTS.get(current_tier, {})
history = self.get_agent_history(agent_id)
# Evaluate each requirement
met = {}
missing = []
# Check compliant runs
min_runs = requirements.get('min_compliant_runs', 0)
if metrics['compliant_runs'] >= min_runs:
met['compliant_runs'] = f"{metrics['compliant_runs']} >= {min_runs}"
else:
missing.append(f"Need {min_runs - metrics['compliant_runs']} more compliant runs")
# Check consecutive compliant
min_consecutive = requirements.get('min_consecutive_compliant', 0)
if metrics['consecutive_compliant'] >= min_consecutive:
met['consecutive_compliant'] = f"{metrics['consecutive_compliant']} >= {min_consecutive}"
else:
missing.append(f"Need {min_consecutive - metrics['consecutive_compliant']} more consecutive compliant runs")
# Check violations
max_violations = requirements.get('max_violations_30d', 0)
if history['violations_30d'] <= max_violations:
met['violations_30d'] = f"{history['violations_30d']} <= {max_violations}"
else:
missing.append(f"Too many violations ({history['violations_30d']} > {max_violations})")
# Check action types
required_types = requirements.get('required_action_types', [])
for action_type in required_types:
if action_type in history['action_types']:
met[f'action_{action_type}'] = f"Has performed {action_type}"
else:
missing.append(f"Needs to perform action type: {action_type}")
# Check successful modifications (for T1+)
min_mods = requirements.get('min_successful_modifications', 0)
if min_mods > 0:
if history['successful_executions'] >= min_mods:
met['modifications'] = f"{history['successful_executions']} >= {min_mods}"
else:
missing.append(f"Need {min_mods - history['successful_executions']} more successful modifications")
# Determine status
if len(missing) == 0:
if requirements.get('requires_manual_review', False):
status = PromotionStatus.PENDING_REVIEW
recommendation = "All automatic criteria met. Manual review required."
else:
status = PromotionStatus.ELIGIBLE
recommendation = "Agent meets all promotion criteria"
else:
status = PromotionStatus.NOT_ELIGIBLE
recommendation = f"Agent needs to complete {len(missing)} requirements"
return PromotionEvaluation(
agent_id=agent_id,
current_tier=current_tier,
target_tier=target_tier,
status=status,
requirements_met=met,
requirements_missing=missing,
recommendation=recommendation,
evidence={
"metrics": metrics,
"history": history,
"requirements": requirements
},
timestamp=self._now()
)
def process_promotion(self, agent_id: str, approved_by: str,
rationale: str = None) -> tuple[bool, str]:
"""Process an approved promotion"""
evaluation = self.evaluate_promotion(agent_id)
if evaluation.status not in [PromotionStatus.ELIGIBLE, PromotionStatus.PENDING_REVIEW]:
return False, f"Agent not eligible: {evaluation.recommendation}"
conn = self._get_conn()
cursor = conn.cursor()
try:
# Update agent tier in metrics
cursor.execute("""
UPDATE agent_metrics
SET current_tier = ?,
updated_at = ?
WHERE agent_id = ?
""", (evaluation.target_tier, self._now(), agent_id))
# Record promotion
cursor.execute("""
INSERT INTO promotions (timestamp, agent_id, from_tier, to_tier,
approved_by, rationale, evidence)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
self._now(),
agent_id,
evaluation.current_tier,
evaluation.target_tier,
approved_by,
rationale or evaluation.recommendation,
json.dumps(evaluation.evidence)
))
conn.commit()
# Update Vault policy
self._update_vault_policy(agent_id, evaluation.target_tier)
return True, f"Promoted {agent_id} from Tier {evaluation.current_tier} to Tier {evaluation.target_tier}"
except Exception as e:
conn.rollback()
return False, f"Promotion failed: {str(e)}"
finally:
conn.close()
def _update_vault_policy(self, agent_id: str, new_tier: int):
"""Update agent's Vault policy to new tier"""
# In production, this would update the AppRole's token_policies
# For now, just log the action
print(f"[VAULT] Would update {agent_id} to tier{new_tier}-agent role")
# Example Vault API call (commented out for safety):
# curl -sk -X POST \
# -H "X-Vault-Token: $TOKEN" \
# -d '{"token_policies": ["t{new_tier}-observer"]}' \
# https://127.0.0.1:8200/v1/auth/approle/role/tier{new_tier}-agent
def get_promotion_history(self, agent_id: str = None, limit: int = 20) -> list:
"""Get promotion history"""
conn = self._get_conn()
cursor = conn.cursor()
if agent_id:
cursor.execute("""
SELECT * FROM promotions
WHERE agent_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (agent_id, limit))
else:
cursor.execute("""
SELECT * FROM promotions
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
def get_all_eligible_agents(self) -> list[PromotionEvaluation]:
"""Find all agents eligible for promotion"""
conn = self._get_conn()
cursor = conn.cursor()
cursor.execute("SELECT agent_id FROM agent_metrics")
agents = [row['agent_id'] for row in cursor.fetchall()]
conn.close()
eligible = []
for agent_id in agents:
eval = self.evaluate_promotion(agent_id)
if eval.status in [PromotionStatus.ELIGIBLE, PromotionStatus.PENDING_REVIEW]:
eligible.append(eval)
return eligible
# =============================================================================
# CLI
# =============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Promotion Engine")
subparsers = parser.add_subparsers(dest="command", required=True)
# Evaluate command
eval_parser = subparsers.add_parser("evaluate", help="Evaluate agent for promotion")
eval_parser.add_argument("agent_id", help="Agent ID to evaluate")
eval_parser.add_argument("--json", action="store_true", help="Output JSON")
# Promote command
promote_parser = subparsers.add_parser("promote", help="Process promotion")
promote_parser.add_argument("agent_id", help="Agent ID to promote")
promote_parser.add_argument("--approved-by", required=True, help="Approver name")
promote_parser.add_argument("--rationale", help="Promotion rationale")
# List eligible command
eligible_parser = subparsers.add_parser("eligible", help="List eligible agents")
eligible_parser.add_argument("--json", action="store_true", help="Output JSON")
# History command
history_parser = subparsers.add_parser("history", help="View promotion history")
history_parser.add_argument("--agent-id", help="Filter by agent")
history_parser.add_argument("--limit", type=int, default=20)
history_parser.add_argument("--json", action="store_true", help="Output JSON")
# Requirements command
req_parser = subparsers.add_parser("requirements", help="Show promotion requirements")
req_parser.add_argument("--tier", type=int, help="Specific tier")
args = parser.parse_args()
engine = PromotionEngine()
if args.command == "evaluate":
result = engine.evaluate_promotion(args.agent_id)
if args.json:
print(json.dumps(result.to_dict(), indent=2))
else:
print("\n" + "=" * 60)
print("PROMOTION EVALUATION")
print("=" * 60)
print(f"Agent: {result.agent_id}")
print(f"Current Tier: {result.current_tier}")
print(f"Target Tier: {result.target_tier}")
print(f"Status: {result.status.value}")
print()
print("Requirements Met:")
for key, value in result.requirements_met.items():
print(f" [OK] {key}: {value}")
if result.requirements_missing:
print("\nRequirements Missing:")
for req in result.requirements_missing:
print(f" [X] {req}")
print(f"\nRecommendation: {result.recommendation}")
print("=" * 60)
elif args.command == "promote":
success, message = engine.process_promotion(
args.agent_id,
args.approved_by,
args.rationale
)
if success:
print(f"[OK] {message}")
else:
print(f"[FAILED] {message}")
sys.exit(1)
elif args.command == "eligible":
eligible = engine.get_all_eligible_agents()
if args.json:
print(json.dumps([e.to_dict() for e in eligible], indent=2))
else:
print("\n" + "=" * 60)
print("AGENTS ELIGIBLE FOR PROMOTION")
print("=" * 60)
if not eligible:
print("No agents currently eligible for promotion")
else:
for e in eligible:
status_icon = "[READY]" if e.status == PromotionStatus.ELIGIBLE else "[REVIEW]"
print(f"\n{status_icon} {e.agent_id}")
print(f" Tier {e.current_tier} → Tier {e.target_tier}")
print(f" {e.recommendation}")
print("=" * 60)
elif args.command == "history":
history = engine.get_promotion_history(args.agent_id, args.limit)
if args.json:
print(json.dumps(history, indent=2))
else:
print("\n" + "=" * 60)
print("PROMOTION HISTORY")
print("=" * 60)
if not history:
print("No promotion history found")
else:
for p in history:
print(f"\n{p['timestamp']}")
print(f" {p['agent_id']}: Tier {p['from_tier']} → Tier {p['to_tier']}")
print(f" Approved by: {p['approved_by']}")
if p['rationale']:
print(f" Rationale: {p['rationale']}")
print("=" * 60)
elif args.command == "requirements":
print("\n" + "=" * 60)
print("PROMOTION REQUIREMENTS")
print("=" * 60)
tiers = [args.tier] if args.tier is not None else range(4)
for tier in tiers:
req = PROMOTION_REQUIREMENTS.get(tier, {})
print(f"\nTier {tier} → Tier {tier + 1}:")
print(f" {req.get('description', 'No description')}")
print(f" - Min compliant runs: {req.get('min_compliant_runs', 'N/A')}")
print(f" - Min consecutive: {req.get('min_consecutive_compliant', 'N/A')}")
print(f" - Max violations (30d): {req.get('max_violations_30d', 'N/A')}")
if req.get('requires_manual_review'):
print(f" - Requires manual review: Yes")
if req.get('requires_admin_approval'):
print(f" - Requires admin approval: Yes")
print("=" * 60)