profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

649 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Council/Review Layer
====================
Multi-agent review system that aggregates knowledge, debates suggestions,
and makes decisions on fixes. Tracks outcomes for future learning.
Features:
- Aggregates past knowledge and quality metrics
- Multi-perspective review (safety, performance, architecture)
- Decision tracking with outcomes
- Learning from past decisions
"""
import json
import hashlib
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Optional
import redis
from .suggestion_engine import Suggestion, SuggestionStatus, RiskLevel, ImpactLevel
class ReviewerRole(str, Enum):
"""Roles of council reviewers"""
SAFETY = "safety" # Security and stability
PERFORMANCE = "performance" # Efficiency and speed
ARCHITECTURE = "architecture" # Design and maintainability
COMPLIANCE = "compliance" # Policy and governance
QUALITY = "quality" # Code quality and testing
class VoteType(str, Enum):
"""Types of votes"""
APPROVE = "approve"
REJECT = "reject"
ABSTAIN = "abstain"
NEEDS_MORE_INFO = "needs_more_info"
class DecisionType(str, Enum):
"""Final decision types"""
AUTO_APPROVE = "auto_approve" # Safe to auto-implement
HUMAN_APPROVE = "human_approve" # Approved, needs human to implement
DEFER = "defer" # Needs more discussion
REJECT = "reject" # Do not implement
ESCALATE = "escalate" # Needs higher authority
@dataclass
class Vote:
"""A single vote from a reviewer"""
reviewer_role: ReviewerRole
vote: VoteType
reasoning: str
concerns: list[str] = field(default_factory=list)
conditions: list[str] = field(default_factory=list) # "Approve if..."
confidence: float = 0.8 # 0-1
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now(timezone.utc).isoformat()
@dataclass
class Decision:
"""Council decision on a suggestion"""
id: str
suggestion_id: str
decision: DecisionType
votes: list[Vote]
final_reasoning: str
conditions: list[str] = field(default_factory=list)
requires_manual_oversight: bool = True
auto_fix_approved: bool = False
decided_at: str = ""
implemented_at: Optional[str] = None
outcome: Optional[str] = None # "success", "failure", "partial"
outcome_notes: Optional[str] = None
lessons_learned: Optional[str] = None
def __post_init__(self):
if not self.decided_at:
self.decided_at = datetime.now(timezone.utc).isoformat()
if not self.id:
self.id = f"dec-{hashlib.sha256(f'{self.suggestion_id}{self.decided_at}'.encode()).hexdigest()[:12]}"
@dataclass
class ReviewerProfile:
"""Profile of a council reviewer"""
role: ReviewerRole
expertise_areas: list[str]
risk_tolerance: float # 0 (very conservative) to 1 (risk-taking)
priority_focus: list[str] # What this reviewer cares most about
veto_conditions: list[str] # Conditions that trigger automatic rejection
class CouncilReview:
"""
Multi-agent council for reviewing and approving suggestions.
Process:
1. Each reviewer evaluates the suggestion from their perspective
2. Votes are collected with reasoning
3. Final decision is made based on votes
4. Decision is tracked for learning
"""
# Reviewer profiles
REVIEWERS = {
ReviewerRole.SAFETY: ReviewerProfile(
role=ReviewerRole.SAFETY,
expertise_areas=["security", "access_control", "data_protection", "audit"],
risk_tolerance=0.2,
priority_focus=["no_security_regression", "audit_trail", "access_control"],
veto_conditions=[
"removes_security_check",
"bypasses_authentication",
"exposes_secrets",
"disables_audit_logging"
]
),
ReviewerRole.PERFORMANCE: ReviewerProfile(
role=ReviewerRole.PERFORMANCE,
expertise_areas=["latency", "throughput", "resource_usage", "scalability"],
risk_tolerance=0.6,
priority_focus=["response_time", "memory_usage", "scalability"],
veto_conditions=[
"infinite_loop_risk",
"memory_leak",
"blocking_operation_in_critical_path"
]
),
ReviewerRole.ARCHITECTURE: ReviewerProfile(
role=ReviewerRole.ARCHITECTURE,
expertise_areas=["design_patterns", "modularity", "maintainability", "coupling"],
risk_tolerance=0.5,
priority_focus=["clean_design", "separation_of_concerns", "extensibility"],
veto_conditions=[
"circular_dependency",
"god_class_creation",
"breaks_abstraction"
]
),
ReviewerRole.COMPLIANCE: ReviewerProfile(
role=ReviewerRole.COMPLIANCE,
expertise_areas=["governance", "policies", "regulations", "standards"],
risk_tolerance=0.3,
priority_focus=["policy_compliance", "audit_requirements", "documentation"],
veto_conditions=[
"violates_governance_policy",
"missing_audit_trail",
"unauthorized_tier_access"
]
),
ReviewerRole.QUALITY: ReviewerProfile(
role=ReviewerRole.QUALITY,
expertise_areas=["testing", "code_quality", "documentation", "reliability"],
risk_tolerance=0.4,
priority_focus=["test_coverage", "code_quality", "documentation"],
veto_conditions=[
"reduces_test_coverage",
"removes_validation",
"undocumented_behavior"
]
),
}
def __init__(self, base_path: str = "/opt/agent-governance"):
self.base_path = Path(base_path)
self.ledger_db = self.base_path / "ledger" / "governance.db"
self.decisions: list[Decision] = []
self._redis: Optional[redis.Redis] = None
self._setup_redis()
self._load_historical_decisions()
def _setup_redis(self):
"""Connect to DragonflyDB"""
try:
self._redis = redis.Redis(
host='127.0.0.1',
port=6379,
password='governance2026',
decode_responses=True
)
self._redis.ping()
except Exception:
self._redis = None
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _load_historical_decisions(self):
"""Load past decisions for learning"""
if not self._redis:
return
try:
raw = self._redis.lrange("oversight:decisions", 0, 100)
for item in raw:
data = json.loads(item)
# Store for pattern matching
self.decisions.append(data)
except Exception:
pass
def review_suggestion(self, suggestion: Suggestion) -> Decision:
"""
Have the council review a suggestion.
Each reviewer votes, then final decision is made.
"""
votes = []
# Collect votes from each reviewer
for role, profile in self.REVIEWERS.items():
vote = self._get_reviewer_vote(suggestion, profile)
votes.append(vote)
# Make final decision based on votes
decision = self._make_decision(suggestion, votes)
# Persist decision
self._persist_decision(decision)
return decision
def _get_reviewer_vote(self, suggestion: Suggestion, profile: ReviewerProfile) -> Vote:
"""Get vote from a specific reviewer"""
concerns = []
conditions = []
vote_type = VoteType.APPROVE
confidence = 0.8
# Check veto conditions
for veto in profile.veto_conditions:
if self._check_veto_condition(suggestion, veto):
return Vote(
reviewer_role=profile.role,
vote=VoteType.REJECT,
reasoning=f"Veto condition triggered: {veto}",
concerns=[veto],
confidence=0.95
)
# Evaluate based on role
if profile.role == ReviewerRole.SAFETY:
vote_type, concerns, conditions, confidence = self._safety_review(suggestion)
elif profile.role == ReviewerRole.PERFORMANCE:
vote_type, concerns, conditions, confidence = self._performance_review(suggestion)
elif profile.role == ReviewerRole.ARCHITECTURE:
vote_type, concerns, conditions, confidence = self._architecture_review(suggestion)
elif profile.role == ReviewerRole.COMPLIANCE:
vote_type, concerns, conditions, confidence = self._compliance_review(suggestion)
elif profile.role == ReviewerRole.QUALITY:
vote_type, concerns, conditions, confidence = self._quality_review(suggestion)
# Adjust based on risk tolerance
if suggestion.risk == RiskLevel.CRITICAL and profile.risk_tolerance < 0.3:
if vote_type == VoteType.APPROVE:
vote_type = VoteType.NEEDS_MORE_INFO
concerns.append("Critical risk requires additional review")
reasoning = self._generate_reasoning(profile.role, suggestion, vote_type, concerns)
return Vote(
reviewer_role=profile.role,
vote=vote_type,
reasoning=reasoning,
concerns=concerns,
conditions=conditions,
confidence=confidence
)
def _check_veto_condition(self, suggestion: Suggestion, veto: str) -> bool:
"""Check if a veto condition is triggered"""
# Simple keyword matching for now
text = f"{suggestion.title} {suggestion.description} {suggestion.reasoning}".lower()
veto_keywords = veto.lower().replace("_", " ").split()
return all(kw in text for kw in veto_keywords)
def _safety_review(self, suggestion: Suggestion) -> tuple[VoteType, list, list, float]:
"""Safety reviewer evaluation"""
concerns = []
conditions = []
confidence = 0.8
# Check for security-related risks
security_keywords = ["credential", "secret", "password", "token", "auth", "access"]
text = f"{suggestion.title} {suggestion.description}".lower()
if any(kw in text for kw in security_keywords):
concerns.append("Changes involve security-sensitive components")
conditions.append("Must verify no credentials exposed")
if suggestion.risk in [RiskLevel.CRITICAL, RiskLevel.HIGH]:
concerns.append(f"High risk level: {suggestion.risk.value}")
conditions.append("Requires security review before implementation")
return VoteType.NEEDS_MORE_INFO, concerns, conditions, 0.6
if suggestion.auto_fixable and suggestion.risk == RiskLevel.LOW:
return VoteType.APPROVE, concerns, conditions, 0.9
return VoteType.APPROVE, concerns, conditions, confidence
def _performance_review(self, suggestion: Suggestion) -> tuple[VoteType, list, list, float]:
"""Performance reviewer evaluation"""
concerns = []
conditions = []
# Check for performance-related concerns
perf_keywords = ["loop", "query", "batch", "async", "timeout", "cache"]
text = f"{suggestion.title} {suggestion.description}".lower()
if "timeout" in text:
concerns.append("Timeout changes may affect system responsiveness")
conditions.append("Test under load before deploying")
if suggestion.estimated_effort in ["large", "epic"]:
concerns.append("Large change may have performance implications")
conditions.append("Benchmark before and after")
return VoteType.APPROVE, concerns, conditions, 0.75
def _architecture_review(self, suggestion: Suggestion) -> tuple[VoteType, list, list, float]:
"""Architecture reviewer evaluation"""
concerns = []
conditions = []
if len(suggestion.files_affected) > 5:
concerns.append(f"Affects {len(suggestion.files_affected)} files - wide blast radius")
conditions.append("Consider breaking into smaller changes")
if suggestion.impact == ImpactLevel.TRANSFORMATIVE:
concerns.append("Transformative change requires architecture review")
return VoteType.NEEDS_MORE_INFO, concerns, conditions, 0.5
return VoteType.APPROVE, concerns, conditions, 0.8
def _compliance_review(self, suggestion: Suggestion) -> tuple[VoteType, list, list, float]:
"""Compliance reviewer evaluation"""
concerns = []
conditions = []
# Check governance keywords
gov_keywords = ["policy", "tier", "permission", "violation", "audit"]
text = f"{suggestion.title} {suggestion.description}".lower()
if any(kw in text for kw in gov_keywords):
concerns.append("Changes may affect governance policies")
conditions.append("Verify compliance with tier policies")
if "revocation" in text or "promotion" in text:
concerns.append("Affects agent trust system")
conditions.append("Must maintain audit trail")
return VoteType.APPROVE, concerns, conditions, 0.85
def _quality_review(self, suggestion: Suggestion) -> tuple[VoteType, list, list, float]:
"""Quality reviewer evaluation"""
concerns = []
conditions = []
if not suggestion.fix_steps or len(suggestion.fix_steps) < 2:
concerns.append("Fix steps are not well-defined")
conditions.append("Document implementation steps")
if suggestion.auto_fixable:
conditions.append("Auto-fix must include verification step")
return VoteType.APPROVE, concerns, conditions, 0.8
def _generate_reasoning(
self,
role: ReviewerRole,
suggestion: Suggestion,
vote: VoteType,
concerns: list[str]
) -> str:
"""Generate reasoning text for a vote"""
# Handle both enum and string values for risk
risk_val = suggestion.risk.value if hasattr(suggestion.risk, 'value') else suggestion.risk
role_val = role.value if hasattr(role, 'value') else role
vote_val = vote.value if hasattr(vote, 'value') else vote
templates = {
VoteType.APPROVE: f"[{role_val.upper()}] Approved: {suggestion.title}. "
f"Risk level ({risk_val}) is acceptable.",
VoteType.REJECT: f"[{role_val.upper()}] Rejected: {suggestion.title}. "
f"Concerns: {'; '.join(concerns) if concerns else 'See details'}",
VoteType.NEEDS_MORE_INFO: f"[{role_val.upper()}] Needs review: {suggestion.title}. "
f"Additional information required.",
VoteType.ABSTAIN: f"[{role_val.upper()}] Abstained: Outside expertise area."
}
return templates.get(vote, f"[{role_val.upper()}] {vote_val}: {suggestion.title}")
def _make_decision(self, suggestion: Suggestion, votes: list[Vote]) -> Decision:
"""Make final decision based on votes"""
approve_count = sum(1 for v in votes if v.vote == VoteType.APPROVE)
reject_count = sum(1 for v in votes if v.vote == VoteType.REJECT)
needs_info_count = sum(1 for v in votes if v.vote == VoteType.NEEDS_MORE_INFO)
all_concerns = []
all_conditions = []
for v in votes:
all_concerns.extend(v.concerns)
all_conditions.extend(v.conditions)
# Decision logic
if reject_count >= 2:
decision_type = DecisionType.REJECT
reasoning = f"Rejected by {reject_count} reviewers. Concerns: {'; '.join(all_concerns[:3])}"
auto_fix = False
manual = True
elif needs_info_count >= 2:
decision_type = DecisionType.DEFER
reasoning = f"Deferred - {needs_info_count} reviewers need more information."
auto_fix = False
manual = True
elif approve_count >= 4 and suggestion.auto_fixable and suggestion.risk in [RiskLevel.LOW, RiskLevel.TRIVIAL]:
decision_type = DecisionType.AUTO_APPROVE
reasoning = f"Auto-approved by council ({approve_count}/{len(votes)} votes). Low risk, auto-fixable."
auto_fix = True
manual = False
elif approve_count >= 3:
decision_type = DecisionType.HUMAN_APPROVE
reasoning = f"Approved for human implementation ({approve_count}/{len(votes)} votes)."
auto_fix = False
manual = True
else:
decision_type = DecisionType.ESCALATE
reasoning = f"No consensus reached. Votes: {approve_count} approve, {reject_count} reject, {needs_info_count} need info."
auto_fix = False
manual = True
return Decision(
id="",
suggestion_id=suggestion.id,
decision=decision_type,
votes=votes,
final_reasoning=reasoning,
conditions=list(set(all_conditions)),
requires_manual_oversight=manual,
auto_fix_approved=auto_fix
)
def _persist_decision(self, decision: Decision):
"""Persist decision to storage"""
if not self._redis:
self.decisions.append(decision)
return
# Convert Vote objects to dicts
decision_dict = asdict(decision)
decision_dict['votes'] = [asdict(v) if hasattr(v, '__dict__') else v for v in decision.votes]
self._redis.lpush(
"oversight:decisions",
json.dumps(decision_dict, default=str)
)
# Index by decision type
decision_val = decision.decision.value if hasattr(decision.decision, 'value') else decision.decision
self._redis.sadd(f"oversight:decisions:{decision_val}", decision.id)
# Keep history manageable
self._redis.ltrim("oversight:decisions", 0, 499)
def record_outcome(
self,
decision_id: str,
outcome: str,
notes: str = "",
lessons: str = ""
) -> bool:
"""Record the outcome of an implemented decision"""
if not self._redis:
return False
self._redis.hset(f"oversight:decision:{decision_id}", mapping={
"implemented_at": self._now(),
"outcome": outcome,
"outcome_notes": notes,
"lessons_learned": lessons
})
# Update learning data
self._redis.lpush("oversight:lessons", json.dumps({
"decision_id": decision_id,
"outcome": outcome,
"lessons": lessons,
"timestamp": self._now()
}))
return True
def get_decisions(
self,
decision_type: Optional[DecisionType] = None,
limit: int = 50
) -> list[dict]:
"""Get decisions with optional filter"""
if not self._redis:
decisions = self.decisions
if decision_type:
decisions = [d for d in decisions
if (d.get('decision') if isinstance(d, dict) else d.decision) == decision_type.value]
return decisions[:limit]
raw = self._redis.lrange("oversight:decisions", 0, limit - 1)
decisions = []
for item in raw:
try:
data = json.loads(item)
if decision_type and data.get('decision') != decision_type.value:
continue
decisions.append(data)
except Exception:
continue
return decisions
def get_summary(self) -> dict:
"""Get summary of council decisions"""
decisions = self.get_decisions(limit=500)
by_type = {d.value: 0 for d in DecisionType}
outcomes = {"success": 0, "failure": 0, "partial": 0, "pending": 0}
auto_approved = 0
for d in decisions:
decision_type = d.get('decision') if isinstance(d, dict) else d.decision
by_type[decision_type] = by_type.get(decision_type, 0) + 1
if d.get('auto_fix_approved'):
auto_approved += 1
outcome = d.get('outcome')
if outcome in outcomes:
outcomes[outcome] += 1
else:
outcomes["pending"] += 1
return {
"total_decisions": len(decisions),
"by_type": by_type,
"outcomes": outcomes,
"auto_approved": auto_approved,
"learning_entries": self._redis.llen("oversight:lessons") if self._redis else 0
}
def get_lessons_learned(self, limit: int = 20) -> list[dict]:
"""Get lessons learned from past decisions"""
if not self._redis:
return []
raw = self._redis.lrange("oversight:lessons", 0, limit - 1)
lessons = []
for item in raw:
try:
lessons.append(json.loads(item))
except Exception:
continue
return lessons
if __name__ == "__main__":
import argparse
from .bug_watcher import BugWindowWatcher
from .suggestion_engine import SuggestionEngine
parser = argparse.ArgumentParser(description="Council Review")
parser.add_argument("command", choices=["review", "decisions", "lessons", "status"])
parser.add_argument("--phase", type=int)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
council = CouncilReview()
if args.command == "review":
# Get anomalies and suggestions
watcher = BugWindowWatcher()
engine = SuggestionEngine()
anomalies = watcher.scan_all_phases() if not args.phase else watcher.scan_phase(args.phase)
print(f"\n{'='*60}")
print(f"COUNCIL REVIEW")
print(f"{'='*60}")
for anomaly in anomalies[:5]: # Limit for demo
suggestions = engine.generate_suggestions(anomaly)
for suggestion in suggestions[:2]: # Top 2 per anomaly
decision = council.review_suggestion(suggestion)
icon = {"auto_approve": "🤖", "human_approve": "", "defer": "", "reject": "", "escalate": "⚠️"}.get(decision.decision.value, "")
print(f"\n{icon} {decision.decision.value.upper()}: {suggestion.title}")
print(f" {decision.final_reasoning}")
if decision.conditions:
print(f" Conditions: {', '.join(decision.conditions[:3])}")
print(f" Votes: ", end="")
for v in decision.votes:
vote_icon = {"approve": "👍", "reject": "👎", "needs_more_info": "", "abstain": ""}.get(v.vote.value, "")
print(f"{vote_icon}{v.reviewer_role.value[0].upper()} ", end="")
print()
elif args.command == "decisions":
decisions = council.get_decisions()
if args.json:
print(json.dumps(decisions, indent=2, default=str))
else:
for d in decisions[:20]:
print(f"[{d['id']}] {d['decision']}: {d['final_reasoning'][:60]}...")
elif args.command == "lessons":
lessons = council.get_lessons_learned()
if args.json:
print(json.dumps(lessons, indent=2))
else:
for l in lessons:
print(f"[{l['decision_id']}] {l['outcome']}: {l['lessons'][:80]}...")
elif args.command == "status":
summary = council.get_summary()
if args.json:
print(json.dumps(summary, indent=2))
else:
print(f"\nCouncil Status")
print(f"Total Decisions: {summary['total_decisions']}")
print(f"Auto-Approved: {summary['auto_approved']}")
print(f"Lessons Learned: {summary['learning_entries']}")