profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

515 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Learning from History System
Analyzes past task completions to:
- Identify success/failure patterns
- Suggest optimizations
- Predict potential failures
- Recommend agent improvements
"""
import sqlite3
import json
import statistics
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
import redis
LEDGER_PATH = Path("/opt/agent-governance/ledger/governance.db")
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_PASSWORD = "governance2026"
@dataclass
class AgentStats:
"""Statistics for a single agent"""
agent_id: str
total_actions: int = 0
successful_actions: int = 0
failed_actions: int = 0
avg_confidence: float = 0.0
action_distribution: Dict[str, int] = field(default_factory=dict)
error_types: Dict[str, int] = field(default_factory=dict)
promotion_potential: float = 0.0
@dataclass
class Pattern:
"""A detected pattern in agent behavior"""
pattern_type: str
description: str
frequency: int
confidence: float
agents_affected: List[str]
recommendation: str
@dataclass
class Prediction:
"""A failure prediction"""
agent_id: str
risk_level: str # low, medium, high, critical
risk_score: float
factors: List[str]
recommended_actions: List[str]
class HistoryAnalyzer:
"""
Analyzes historical agent data to extract insights.
"""
def __init__(self):
self.conn = sqlite3.connect(LEDGER_PATH)
self.conn.row_factory = sqlite3.Row
self.redis = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True
)
def close(self):
self.conn.close()
def get_agent_stats(self, agent_id: str = None, days: int = 30) -> List[AgentStats]:
"""Get statistics for agent(s)"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
if agent_id:
query = """
SELECT agent_id, action, decision, confidence, success, error_type
FROM agent_actions
WHERE agent_id = ? AND created_at > ?
"""
cursor = self.conn.execute(query, (agent_id, cutoff))
else:
query = """
SELECT agent_id, action, decision, confidence, success, error_type
FROM agent_actions
WHERE created_at > ?
"""
cursor = self.conn.execute(query, (cutoff,))
# Aggregate by agent
agent_data = defaultdict(lambda: {
"actions": [],
"successes": 0,
"failures": 0,
"confidences": [],
"action_types": defaultdict(int),
"error_types": defaultdict(int)
})
for row in cursor:
aid = row["agent_id"]
data = agent_data[aid]
data["actions"].append(row)
data["confidences"].append(row["confidence"] or 0)
data["action_types"][row["action"]] += 1
if row["success"]:
data["successes"] += 1
else:
data["failures"] += 1
if row["error_type"]:
data["error_types"][row["error_type"]] += 1
# Build stats objects
stats = []
for aid, data in agent_data.items():
total = len(data["actions"])
success_rate = data["successes"] / total if total > 0 else 0
stats.append(AgentStats(
agent_id=aid,
total_actions=total,
successful_actions=data["successes"],
failed_actions=data["failures"],
avg_confidence=statistics.mean(data["confidences"]) if data["confidences"] else 0,
action_distribution=dict(data["action_types"]),
error_types=dict(data["error_types"]),
promotion_potential=self._calculate_promotion_potential(success_rate, total)
))
return stats
def _calculate_promotion_potential(self, success_rate: float, total_actions: int) -> float:
"""Calculate promotion potential score (0-1)"""
if total_actions < 5:
return 0.0
# Base on success rate (0-0.5) + volume (0-0.3) + consistency (0-0.2)
rate_score = min(success_rate, 1.0) * 0.5
volume_score = min(total_actions / 50, 1.0) * 0.3
consistency_score = 0.2 if success_rate > 0.9 else (0.1 if success_rate > 0.8 else 0)
return rate_score + volume_score + consistency_score
def detect_patterns(self, days: int = 30) -> List[Pattern]:
"""Detect patterns in agent behavior"""
patterns = []
# Pattern 1: Repeated failures
failure_agents = self._find_repeated_failures(days)
if failure_agents:
patterns.append(Pattern(
pattern_type="REPEATED_FAILURES",
description="Agents with multiple consecutive failures",
frequency=len(failure_agents),
confidence=0.9,
agents_affected=failure_agents,
recommendation="Review error logs and consider additional training or constraints"
))
# Pattern 2: Low confidence decisions
low_conf_agents = self._find_low_confidence_agents(days)
if low_conf_agents:
patterns.append(Pattern(
pattern_type="LOW_CONFIDENCE",
description="Agents consistently making low-confidence decisions",
frequency=len(low_conf_agents),
confidence=0.85,
agents_affected=low_conf_agents,
recommendation="Provide clearer instructions or reduce task complexity"
))
# Pattern 3: Action concentration
concentrated_agents = self._find_action_concentration(days)
if concentrated_agents:
patterns.append(Pattern(
pattern_type="ACTION_CONCENTRATION",
description="Agents heavily focused on single action type",
frequency=len(concentrated_agents),
confidence=0.7,
agents_affected=concentrated_agents,
recommendation="Consider diversifying agent responsibilities or creating specialists"
))
# Pattern 4: Success streaks
success_agents = self._find_success_streaks(days)
if success_agents:
patterns.append(Pattern(
pattern_type="SUCCESS_STREAK",
description="Agents with high success streaks (promotion candidates)",
frequency=len(success_agents),
confidence=0.95,
agents_affected=success_agents,
recommendation="Consider promoting these agents to higher tiers"
))
return patterns
def _find_repeated_failures(self, days: int) -> List[str]:
"""Find agents with repeated failures"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
query = """
SELECT agent_id, COUNT(*) as fail_count
FROM agent_actions
WHERE success = 0 AND created_at > ?
GROUP BY agent_id
HAVING fail_count >= 3
"""
cursor = self.conn.execute(query, (cutoff,))
return [row["agent_id"] for row in cursor]
def _find_low_confidence_agents(self, days: int) -> List[str]:
"""Find agents with consistently low confidence"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
query = """
SELECT agent_id, AVG(confidence) as avg_conf
FROM agent_actions
WHERE created_at > ? AND confidence IS NOT NULL
GROUP BY agent_id
HAVING avg_conf < 0.7 AND COUNT(*) >= 3
"""
cursor = self.conn.execute(query, (cutoff,))
return [row["agent_id"] for row in cursor]
def _find_action_concentration(self, days: int) -> List[str]:
"""Find agents concentrated on single action type"""
stats = self.get_agent_stats(days=days)
concentrated = []
for stat in stats:
if stat.total_actions >= 5:
max_action = max(stat.action_distribution.values()) if stat.action_distribution else 0
if max_action / stat.total_actions > 0.8:
concentrated.append(stat.agent_id)
return concentrated
def _find_success_streaks(self, days: int) -> List[str]:
"""Find agents with high success streaks"""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
query = """
SELECT agent_id,
SUM(success) as successes,
COUNT(*) as total
FROM agent_actions
WHERE created_at > ?
GROUP BY agent_id
HAVING total >= 5 AND (successes * 1.0 / total) >= 0.9
"""
cursor = self.conn.execute(query, (cutoff,))
return [row["agent_id"] for row in cursor]
def predict_failures(self, days: int = 7) -> List[Prediction]:
"""Predict potential failures based on recent trends"""
predictions = []
stats = self.get_agent_stats(days=days)
for stat in stats:
risk_factors = []
risk_score = 0.0
# Factor 1: Recent failure rate
if stat.total_actions > 0:
failure_rate = stat.failed_actions / stat.total_actions
if failure_rate > 0.3:
risk_factors.append(f"High failure rate: {failure_rate:.1%}")
risk_score += failure_rate * 0.4
# Factor 2: Low average confidence
if stat.avg_confidence < 0.6:
risk_factors.append(f"Low avg confidence: {stat.avg_confidence:.2f}")
risk_score += (1 - stat.avg_confidence) * 0.3
# Factor 3: Recurring error types
if stat.error_types:
recurring = [e for e, c in stat.error_types.items() if c >= 2]
if recurring:
risk_factors.append(f"Recurring errors: {', '.join(recurring)}")
risk_score += 0.2
# Factor 4: Check DragonflyDB for recent revoke signals
revoke_signal = self.redis.get(f"agent:{stat.agent_id}:revoke_signal")
if revoke_signal == "1":
risk_factors.append("Revocation signal active")
risk_score += 0.3
if risk_factors:
risk_level = (
"critical" if risk_score > 0.7 else
"high" if risk_score > 0.5 else
"medium" if risk_score > 0.3 else
"low"
)
recommendations = self._generate_recommendations(stat, risk_factors)
predictions.append(Prediction(
agent_id=stat.agent_id,
risk_level=risk_level,
risk_score=min(risk_score, 1.0),
factors=risk_factors,
recommended_actions=recommendations
))
# Sort by risk score
predictions.sort(key=lambda p: p.risk_score, reverse=True)
return predictions
def _generate_recommendations(self, stat: AgentStats, factors: List[str]) -> List[str]:
"""Generate recommendations based on analysis"""
recommendations = []
if stat.failed_actions > stat.successful_actions:
recommendations.append("Consider reducing task complexity or scope")
if stat.avg_confidence < 0.7:
recommendations.append("Provide more detailed instructions")
if stat.error_types:
most_common = max(stat.error_types, key=stat.error_types.get)
recommendations.append(f"Investigate root cause of '{most_common}' errors")
if stat.promotion_potential < 0.3:
recommendations.append("Agent needs more successful runs before promotion")
if not recommendations:
recommendations.append("Monitor agent closely for next few runs")
return recommendations
def suggest_optimizations(self) -> List[Dict[str, Any]]:
"""Suggest system-wide optimizations"""
suggestions = []
# Get overall stats
query = """
SELECT
COUNT(*) as total_actions,
SUM(success) as successes,
AVG(confidence) as avg_confidence,
COUNT(DISTINCT agent_id) as unique_agents
FROM agent_actions
WHERE created_at > datetime('now', '-30 days')
"""
row = self.conn.execute(query).fetchone()
if row["total_actions"] > 0:
success_rate = row["successes"] / row["total_actions"]
# Suggestion 1: Overall success rate
if success_rate < 0.8:
suggestions.append({
"category": "Success Rate",
"current": f"{success_rate:.1%}",
"target": "80%+",
"suggestion": "Review failing agents and consider additional constraints",
"priority": "high" if success_rate < 0.6 else "medium"
})
# Suggestion 2: Confidence levels
if row["avg_confidence"] and row["avg_confidence"] < 0.75:
suggestions.append({
"category": "Confidence",
"current": f"{row['avg_confidence']:.2f}",
"target": "0.75+",
"suggestion": "Improve task clarity and agent training",
"priority": "medium"
})
# Suggestion 3: Agent utilization
metrics_query = """
SELECT agent_id, total_runs, compliant_runs
FROM agent_metrics
WHERE total_runs > 0
"""
idle_agents = []
for row in self.conn.execute(metrics_query):
if row["total_runs"] < 5:
idle_agents.append(row["agent_id"])
if idle_agents:
suggestions.append({
"category": "Agent Utilization",
"current": f"{len(idle_agents)} underutilized agents",
"target": "All agents active",
"suggestion": f"Consider assigning more tasks to: {', '.join(idle_agents[:3])}",
"priority": "low"
})
# Suggestion 4: Promotion queue
promotable = self._find_success_streaks(30)
if promotable:
suggestions.append({
"category": "Promotions",
"current": f"{len(promotable)} agents ready",
"target": "Process promotion queue",
"suggestion": f"Review for promotion: {', '.join(promotable[:3])}",
"priority": "medium"
})
return suggestions
def generate_report(self) -> Dict[str, Any]:
"""Generate a comprehensive analytics report"""
stats = self.get_agent_stats(days=30)
patterns = self.detect_patterns(days=30)
predictions = self.predict_failures(days=7)
suggestions = self.suggest_optimizations()
# Calculate summaries
total_agents = len(stats)
total_actions = sum(s.total_actions for s in stats)
total_successes = sum(s.successful_actions for s in stats)
avg_confidence = statistics.mean([s.avg_confidence for s in stats]) if stats else 0
return {
"generated_at": datetime.utcnow().isoformat(),
"period_days": 30,
"summary": {
"total_agents": total_agents,
"total_actions": total_actions,
"success_rate": total_successes / total_actions if total_actions > 0 else 0,
"avg_confidence": avg_confidence
},
"patterns_detected": len(patterns),
"patterns": [
{
"type": p.pattern_type,
"description": p.description,
"agents_count": len(p.agents_affected),
"recommendation": p.recommendation
}
for p in patterns
],
"risk_predictions": len([p for p in predictions if p.risk_level in ["high", "critical"]]),
"high_risk_agents": [
{
"agent_id": p.agent_id,
"risk_level": p.risk_level,
"risk_score": p.risk_score,
"top_factor": p.factors[0] if p.factors else "Unknown"
}
for p in predictions[:5]
],
"optimization_suggestions": suggestions,
"top_performers": [
{"agent_id": s.agent_id, "success_rate": s.successful_actions / s.total_actions if s.total_actions > 0 else 0}
for s in sorted(stats, key=lambda x: x.promotion_potential, reverse=True)[:3]
]
}
def main():
"""Run analytics and print report"""
print("=" * 60)
print("AGENT GOVERNANCE ANALYTICS")
print("=" * 60)
analyzer = HistoryAnalyzer()
try:
report = analyzer.generate_report()
print(f"\nPeriod: Last {report['period_days']} days")
print(f"Generated: {report['generated_at']}")
print("\n--- SUMMARY ---")
summary = report["summary"]
print(f" Total Agents: {summary['total_agents']}")
print(f" Total Actions: {summary['total_actions']}")
print(f" Success Rate: {summary['success_rate']:.1%}")
print(f" Avg Confidence: {summary['avg_confidence']:.2f}")
print(f"\n--- PATTERNS DETECTED ({report['patterns_detected']}) ---")
for p in report["patterns"]:
print(f" [{p['type']}] {p['description']}")
print(f" Affects {p['agents_count']} agent(s)")
print(f"{p['recommendation']}")
print(f"\n--- RISK PREDICTIONS ---")
if report["high_risk_agents"]:
for agent in report["high_risk_agents"]:
print(f" {agent['risk_level'].upper()}: {agent['agent_id']} (score: {agent['risk_score']:.2f})")
print(f" Factor: {agent['top_factor']}")
else:
print(" No high-risk agents detected")
print(f"\n--- OPTIMIZATION SUGGESTIONS ---")
for s in report["optimization_suggestions"]:
print(f" [{s['priority'].upper()}] {s['category']}")
print(f" Current: {s['current']} → Target: {s['target']}")
print(f"{s['suggestion']}")
print(f"\n--- TOP PERFORMERS ---")
for p in report["top_performers"]:
print(f" {p['agent_id']}: {p['success_rate']:.1%} success rate")
print("\n" + "=" * 60)
finally:
analyzer.close()
if __name__ == "__main__":
main()