#!/usr/bin/env python3 """ Learning from History System Analyzes past task completions to: - Identify success/failure patterns - Suggest optimizations - Predict potential failures - Recommend agent improvements """ import sqlite3 import json import statistics from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Any, Optional, Tuple import redis LEDGER_PATH = Path("/opt/agent-governance/ledger/governance.db") REDIS_HOST = "127.0.0.1" REDIS_PORT = 6379 REDIS_PASSWORD = "governance2026" @dataclass class AgentStats: """Statistics for a single agent""" agent_id: str total_actions: int = 0 successful_actions: int = 0 failed_actions: int = 0 avg_confidence: float = 0.0 action_distribution: Dict[str, int] = field(default_factory=dict) error_types: Dict[str, int] = field(default_factory=dict) promotion_potential: float = 0.0 @dataclass class Pattern: """A detected pattern in agent behavior""" pattern_type: str description: str frequency: int confidence: float agents_affected: List[str] recommendation: str @dataclass class Prediction: """A failure prediction""" agent_id: str risk_level: str # low, medium, high, critical risk_score: float factors: List[str] recommended_actions: List[str] class HistoryAnalyzer: """ Analyzes historical agent data to extract insights. """ def __init__(self): self.conn = sqlite3.connect(LEDGER_PATH) self.conn.row_factory = sqlite3.Row self.redis = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True ) def close(self): self.conn.close() def get_agent_stats(self, agent_id: str = None, days: int = 30) -> List[AgentStats]: """Get statistics for agent(s)""" cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() if agent_id: query = """ SELECT agent_id, action, decision, confidence, success, error_type FROM agent_actions WHERE agent_id = ? AND created_at > ? """ cursor = self.conn.execute(query, (agent_id, cutoff)) else: query = """ SELECT agent_id, action, decision, confidence, success, error_type FROM agent_actions WHERE created_at > ? """ cursor = self.conn.execute(query, (cutoff,)) # Aggregate by agent agent_data = defaultdict(lambda: { "actions": [], "successes": 0, "failures": 0, "confidences": [], "action_types": defaultdict(int), "error_types": defaultdict(int) }) for row in cursor: aid = row["agent_id"] data = agent_data[aid] data["actions"].append(row) data["confidences"].append(row["confidence"] or 0) data["action_types"][row["action"]] += 1 if row["success"]: data["successes"] += 1 else: data["failures"] += 1 if row["error_type"]: data["error_types"][row["error_type"]] += 1 # Build stats objects stats = [] for aid, data in agent_data.items(): total = len(data["actions"]) success_rate = data["successes"] / total if total > 0 else 0 stats.append(AgentStats( agent_id=aid, total_actions=total, successful_actions=data["successes"], failed_actions=data["failures"], avg_confidence=statistics.mean(data["confidences"]) if data["confidences"] else 0, action_distribution=dict(data["action_types"]), error_types=dict(data["error_types"]), promotion_potential=self._calculate_promotion_potential(success_rate, total) )) return stats def _calculate_promotion_potential(self, success_rate: float, total_actions: int) -> float: """Calculate promotion potential score (0-1)""" if total_actions < 5: return 0.0 # Base on success rate (0-0.5) + volume (0-0.3) + consistency (0-0.2) rate_score = min(success_rate, 1.0) * 0.5 volume_score = min(total_actions / 50, 1.0) * 0.3 consistency_score = 0.2 if success_rate > 0.9 else (0.1 if success_rate > 0.8 else 0) return rate_score + volume_score + consistency_score def detect_patterns(self, days: int = 30) -> List[Pattern]: """Detect patterns in agent behavior""" patterns = [] # Pattern 1: Repeated failures failure_agents = self._find_repeated_failures(days) if failure_agents: patterns.append(Pattern( pattern_type="REPEATED_FAILURES", description="Agents with multiple consecutive failures", frequency=len(failure_agents), confidence=0.9, agents_affected=failure_agents, recommendation="Review error logs and consider additional training or constraints" )) # Pattern 2: Low confidence decisions low_conf_agents = self._find_low_confidence_agents(days) if low_conf_agents: patterns.append(Pattern( pattern_type="LOW_CONFIDENCE", description="Agents consistently making low-confidence decisions", frequency=len(low_conf_agents), confidence=0.85, agents_affected=low_conf_agents, recommendation="Provide clearer instructions or reduce task complexity" )) # Pattern 3: Action concentration concentrated_agents = self._find_action_concentration(days) if concentrated_agents: patterns.append(Pattern( pattern_type="ACTION_CONCENTRATION", description="Agents heavily focused on single action type", frequency=len(concentrated_agents), confidence=0.7, agents_affected=concentrated_agents, recommendation="Consider diversifying agent responsibilities or creating specialists" )) # Pattern 4: Success streaks success_agents = self._find_success_streaks(days) if success_agents: patterns.append(Pattern( pattern_type="SUCCESS_STREAK", description="Agents with high success streaks (promotion candidates)", frequency=len(success_agents), confidence=0.95, agents_affected=success_agents, recommendation="Consider promoting these agents to higher tiers" )) return patterns def _find_repeated_failures(self, days: int) -> List[str]: """Find agents with repeated failures""" cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() query = """ SELECT agent_id, COUNT(*) as fail_count FROM agent_actions WHERE success = 0 AND created_at > ? GROUP BY agent_id HAVING fail_count >= 3 """ cursor = self.conn.execute(query, (cutoff,)) return [row["agent_id"] for row in cursor] def _find_low_confidence_agents(self, days: int) -> List[str]: """Find agents with consistently low confidence""" cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() query = """ SELECT agent_id, AVG(confidence) as avg_conf FROM agent_actions WHERE created_at > ? AND confidence IS NOT NULL GROUP BY agent_id HAVING avg_conf < 0.7 AND COUNT(*) >= 3 """ cursor = self.conn.execute(query, (cutoff,)) return [row["agent_id"] for row in cursor] def _find_action_concentration(self, days: int) -> List[str]: """Find agents concentrated on single action type""" stats = self.get_agent_stats(days=days) concentrated = [] for stat in stats: if stat.total_actions >= 5: max_action = max(stat.action_distribution.values()) if stat.action_distribution else 0 if max_action / stat.total_actions > 0.8: concentrated.append(stat.agent_id) return concentrated def _find_success_streaks(self, days: int) -> List[str]: """Find agents with high success streaks""" cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() query = """ SELECT agent_id, SUM(success) as successes, COUNT(*) as total FROM agent_actions WHERE created_at > ? GROUP BY agent_id HAVING total >= 5 AND (successes * 1.0 / total) >= 0.9 """ cursor = self.conn.execute(query, (cutoff,)) return [row["agent_id"] for row in cursor] def predict_failures(self, days: int = 7) -> List[Prediction]: """Predict potential failures based on recent trends""" predictions = [] stats = self.get_agent_stats(days=days) for stat in stats: risk_factors = [] risk_score = 0.0 # Factor 1: Recent failure rate if stat.total_actions > 0: failure_rate = stat.failed_actions / stat.total_actions if failure_rate > 0.3: risk_factors.append(f"High failure rate: {failure_rate:.1%}") risk_score += failure_rate * 0.4 # Factor 2: Low average confidence if stat.avg_confidence < 0.6: risk_factors.append(f"Low avg confidence: {stat.avg_confidence:.2f}") risk_score += (1 - stat.avg_confidence) * 0.3 # Factor 3: Recurring error types if stat.error_types: recurring = [e for e, c in stat.error_types.items() if c >= 2] if recurring: risk_factors.append(f"Recurring errors: {', '.join(recurring)}") risk_score += 0.2 # Factor 4: Check DragonflyDB for recent revoke signals revoke_signal = self.redis.get(f"agent:{stat.agent_id}:revoke_signal") if revoke_signal == "1": risk_factors.append("Revocation signal active") risk_score += 0.3 if risk_factors: risk_level = ( "critical" if risk_score > 0.7 else "high" if risk_score > 0.5 else "medium" if risk_score > 0.3 else "low" ) recommendations = self._generate_recommendations(stat, risk_factors) predictions.append(Prediction( agent_id=stat.agent_id, risk_level=risk_level, risk_score=min(risk_score, 1.0), factors=risk_factors, recommended_actions=recommendations )) # Sort by risk score predictions.sort(key=lambda p: p.risk_score, reverse=True) return predictions def _generate_recommendations(self, stat: AgentStats, factors: List[str]) -> List[str]: """Generate recommendations based on analysis""" recommendations = [] if stat.failed_actions > stat.successful_actions: recommendations.append("Consider reducing task complexity or scope") if stat.avg_confidence < 0.7: recommendations.append("Provide more detailed instructions") if stat.error_types: most_common = max(stat.error_types, key=stat.error_types.get) recommendations.append(f"Investigate root cause of '{most_common}' errors") if stat.promotion_potential < 0.3: recommendations.append("Agent needs more successful runs before promotion") if not recommendations: recommendations.append("Monitor agent closely for next few runs") return recommendations def suggest_optimizations(self) -> List[Dict[str, Any]]: """Suggest system-wide optimizations""" suggestions = [] # Get overall stats query = """ SELECT COUNT(*) as total_actions, SUM(success) as successes, AVG(confidence) as avg_confidence, COUNT(DISTINCT agent_id) as unique_agents FROM agent_actions WHERE created_at > datetime('now', '-30 days') """ row = self.conn.execute(query).fetchone() if row["total_actions"] > 0: success_rate = row["successes"] / row["total_actions"] # Suggestion 1: Overall success rate if success_rate < 0.8: suggestions.append({ "category": "Success Rate", "current": f"{success_rate:.1%}", "target": "80%+", "suggestion": "Review failing agents and consider additional constraints", "priority": "high" if success_rate < 0.6 else "medium" }) # Suggestion 2: Confidence levels if row["avg_confidence"] and row["avg_confidence"] < 0.75: suggestions.append({ "category": "Confidence", "current": f"{row['avg_confidence']:.2f}", "target": "0.75+", "suggestion": "Improve task clarity and agent training", "priority": "medium" }) # Suggestion 3: Agent utilization metrics_query = """ SELECT agent_id, total_runs, compliant_runs FROM agent_metrics WHERE total_runs > 0 """ idle_agents = [] for row in self.conn.execute(metrics_query): if row["total_runs"] < 5: idle_agents.append(row["agent_id"]) if idle_agents: suggestions.append({ "category": "Agent Utilization", "current": f"{len(idle_agents)} underutilized agents", "target": "All agents active", "suggestion": f"Consider assigning more tasks to: {', '.join(idle_agents[:3])}", "priority": "low" }) # Suggestion 4: Promotion queue promotable = self._find_success_streaks(30) if promotable: suggestions.append({ "category": "Promotions", "current": f"{len(promotable)} agents ready", "target": "Process promotion queue", "suggestion": f"Review for promotion: {', '.join(promotable[:3])}", "priority": "medium" }) return suggestions def generate_report(self) -> Dict[str, Any]: """Generate a comprehensive analytics report""" stats = self.get_agent_stats(days=30) patterns = self.detect_patterns(days=30) predictions = self.predict_failures(days=7) suggestions = self.suggest_optimizations() # Calculate summaries total_agents = len(stats) total_actions = sum(s.total_actions for s in stats) total_successes = sum(s.successful_actions for s in stats) avg_confidence = statistics.mean([s.avg_confidence for s in stats]) if stats else 0 return { "generated_at": datetime.utcnow().isoformat(), "period_days": 30, "summary": { "total_agents": total_agents, "total_actions": total_actions, "success_rate": total_successes / total_actions if total_actions > 0 else 0, "avg_confidence": avg_confidence }, "patterns_detected": len(patterns), "patterns": [ { "type": p.pattern_type, "description": p.description, "agents_count": len(p.agents_affected), "recommendation": p.recommendation } for p in patterns ], "risk_predictions": len([p for p in predictions if p.risk_level in ["high", "critical"]]), "high_risk_agents": [ { "agent_id": p.agent_id, "risk_level": p.risk_level, "risk_score": p.risk_score, "top_factor": p.factors[0] if p.factors else "Unknown" } for p in predictions[:5] ], "optimization_suggestions": suggestions, "top_performers": [ {"agent_id": s.agent_id, "success_rate": s.successful_actions / s.total_actions if s.total_actions > 0 else 0} for s in sorted(stats, key=lambda x: x.promotion_potential, reverse=True)[:3] ] } def main(): """Run analytics and print report""" print("=" * 60) print("AGENT GOVERNANCE ANALYTICS") print("=" * 60) analyzer = HistoryAnalyzer() try: report = analyzer.generate_report() print(f"\nPeriod: Last {report['period_days']} days") print(f"Generated: {report['generated_at']}") print("\n--- SUMMARY ---") summary = report["summary"] print(f" Total Agents: {summary['total_agents']}") print(f" Total Actions: {summary['total_actions']}") print(f" Success Rate: {summary['success_rate']:.1%}") print(f" Avg Confidence: {summary['avg_confidence']:.2f}") print(f"\n--- PATTERNS DETECTED ({report['patterns_detected']}) ---") for p in report["patterns"]: print(f" [{p['type']}] {p['description']}") print(f" Affects {p['agents_count']} agent(s)") print(f" → {p['recommendation']}") print(f"\n--- RISK PREDICTIONS ---") if report["high_risk_agents"]: for agent in report["high_risk_agents"]: print(f" {agent['risk_level'].upper()}: {agent['agent_id']} (score: {agent['risk_score']:.2f})") print(f" Factor: {agent['top_factor']}") else: print(" No high-risk agents detected") print(f"\n--- OPTIMIZATION SUGGESTIONS ---") for s in report["optimization_suggestions"]: print(f" [{s['priority'].upper()}] {s['category']}") print(f" Current: {s['current']} → Target: {s['target']}") print(f" → {s['suggestion']}") print(f"\n--- TOP PERFORMERS ---") for p in report["top_performers"]: print(f" {p['agent_id']}: {p['success_rate']:.1%} success rate") print("\n" + "=" * 60) finally: analyzer.close() if __name__ == "__main__": main()