""" Oversight Reporter ================== Comprehensive reporting for the architectural test pipeline. Features: - Aggregates data from all oversight layers - Generates detailed reports per phase - Tracks pending actions - Links to checkpoints/status/memory """ import json from datetime import datetime, timezone from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Any, Optional import redis @dataclass class PhaseReport: """Report for a single phase""" phase_number: int phase_name: str status: str coverage_percent: float bugs_detected: int suggestions_generated: int council_decisions: dict pending_actions: list[str] critical_issues: list[str] recommendations: list[str] @dataclass class OversightReport: """Complete oversight report""" report_id: str generated_at: str summary: dict phase_reports: list[PhaseReport] watcher_summary: dict suggestion_summary: dict council_summary: dict injection_results: list[dict] pending_actions: list[dict] checkpoint_link: Optional[str] = None memory_entries: int = 0 def to_markdown(self) -> str: """Convert report to markdown format""" lines = [] # Header lines.append("# Architectural Test Pipeline Report") lines.append(f"\n**Generated:** {self.generated_at}") lines.append(f"**Report ID:** {self.report_id}") if self.checkpoint_link: lines.append(f"**Checkpoint:** {self.checkpoint_link}") # Executive Summary lines.append("\n## Executive Summary\n") lines.append(f"- **Phases Validated:** {self.summary.get('phases_validated', 0)}") lines.append(f"- **Average Coverage:** {self.summary.get('average_coverage', 0)}%") lines.append(f"- **Total Anomalies:** {self.summary.get('total_anomalies', 0)}") lines.append(f"- **Critical Gaps:** {len(self.summary.get('critical_gaps', []))}") # Phase Status Matrix lines.append("\n## Phase Status Matrix\n") lines.append("| Phase | Name | Status | Coverage | Bugs |") lines.append("|-------|------|--------|----------|------|") status_icons = { "complete": "✅", "in_progress": "🚧", "blocked": "❌", "needs_review": "⚠️", "not_started": "⬜" } for pr in self.phase_reports: icon = status_icons.get(pr.status, "❓") lines.append(f"| {pr.phase_number} | {pr.phase_name[:40]} | {icon} {pr.status} | {pr.coverage_percent:.1f}% | {pr.bugs_detected} |") # Bug Watcher Summary lines.append("\n## Bug Watcher Summary\n") lines.append(f"- **Total Anomalies:** {self.watcher_summary.get('total_anomalies', 0)}") lines.append(f"- **Unresolved:** {self.watcher_summary.get('unresolved', 0)}") by_severity = self.watcher_summary.get('by_severity', {}) if by_severity: lines.append("\n**By Severity:**") for sev, count in by_severity.items(): if count > 0: lines.append(f"- {sev}: {count}") # Suggestion Engine Summary lines.append("\n## Suggestion Engine Summary\n") lines.append(f"- **Total Suggestions:** {self.suggestion_summary.get('total', 0)}") lines.append(f"- **Pending:** {self.suggestion_summary.get('pending', 0)}") lines.append(f"- **Auto-fixable:** {self.suggestion_summary.get('auto_fixable', 0)}") # Council Decisions lines.append("\n## Council Decisions\n") lines.append(f"- **Total Decisions:** {self.council_summary.get('total_decisions', 0)}") lines.append(f"- **Auto-Approved:** {self.council_summary.get('auto_approved', 0)}") lines.append(f"- **Lessons Learned:** {self.council_summary.get('learning_entries', 0)}") by_type = self.council_summary.get('by_type', {}) if by_type: lines.append("\n**By Decision Type:**") for dtype, count in by_type.items(): if count > 0: lines.append(f"- {dtype}: {count}") # Injection Test Results if self.injection_results: lines.append("\n## Injection Test Results\n") lines.append("| Scenario | Detected | Suggestion | Council | Result |") lines.append("|----------|----------|------------|---------|--------|") for r in self.injection_results: detected = "✅" if r.get('detected_by_watcher') else "❌" result = "✅ PASS" if r.get('test_passed') else "❌ FAIL" lines.append(f"| {r.get('notes', 'Unknown')[:30]} | {detected} | {r.get('suggestion_quality', 'n/a')} | {r.get('council_decision', 'n/a')} | {result} |") # Pending Actions if self.pending_actions: lines.append("\n## Pending Actions\n") for i, action in enumerate(self.pending_actions[:20], 1): priority = action.get('priority', 'medium') icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🔵"}.get(priority, "⚪") lines.append(f"{i}. {icon} **{action.get('title', 'Unknown')}**") if action.get('phase'): lines.append(f" - Phase: {action['phase']}") if action.get('description'): lines.append(f" - {action['description'][:100]}") # Critical Issues critical = self.summary.get('critical_gaps', []) if critical: lines.append("\n## Critical Issues\n") for issue in critical[:10]: lines.append(f"- ❌ {issue}") # Footer lines.append("\n---") lines.append(f"*Report generated by Architectural Test Pipeline*") lines.append(f"*Memory entries available: {self.memory_entries}*") return "\n".join(lines) class OversightReporter: """ Generates comprehensive reports from all oversight layers. """ def __init__(self, base_path: str = "/opt/agent-governance"): self.base_path = Path(base_path) self._redis: Optional[redis.Redis] = None self._setup_redis() def _setup_redis(self): """Connect to DragonflyDB""" try: self._redis = redis.Redis( host='127.0.0.1', port=6379, password='governance2026', decode_responses=True ) self._redis.ping() except Exception: self._redis = None def _now(self) -> str: return datetime.now(timezone.utc).isoformat() def generate_report(self, include_injections: bool = False) -> OversightReport: """Generate a complete oversight report""" from .bug_watcher import BugWindowWatcher from .suggestion_engine import SuggestionEngine from .council import CouncilReview from .phase_validator import PhaseValidator from .error_injector import ErrorInjector # Initialize components watcher = BugWindowWatcher(str(self.base_path)) engine = SuggestionEngine(str(self.base_path)) council = CouncilReview(str(self.base_path)) validator = PhaseValidator(str(self.base_path)) # Run validation watcher.start() validation_results = validator.validate_all_phases() # Gather summaries watcher_summary = watcher.get_summary() suggestion_summary = engine.get_summary() council_summary = council.get_summary() phase_summary = validator.get_summary() # Build phase reports phase_reports = [] for phase_num, result in validation_results.items(): # Get council decisions for this phase phase_anomalies = watcher.scan_phase(phase_num) decisions = {"auto_approve": 0, "human_approve": 0, "reject": 0, "defer": 0} # Handle both enum and string status status_val = result.status.value if hasattr(result.status, 'value') else result.status pr = PhaseReport( phase_number=phase_num, phase_name=result.phase_name, status=status_val, coverage_percent=result.coverage_percent, bugs_detected=result.anomalies_found, suggestions_generated=len(phase_anomalies), council_decisions=decisions, pending_actions=[f"Address: {g}" for g in result.gaps[:3]], critical_issues=[g for g in result.gaps if "critical" in g.lower() or "missing" in g.lower()][:3], recommendations=result.recommendations ) phase_reports.append(pr) # Run injection tests if requested injection_results = [] if include_injections: injector = ErrorInjector(str(self.base_path), safe_mode=True) for scenario in list(injector.SCENARIOS.keys())[:3]: # Limit for speed result = injector.run_scenario(scenario) injection_results.append(asdict(result)) # Gather pending actions pending_actions = self._gather_pending_actions(phase_reports, watcher_summary) # Get checkpoint link checkpoint_link = self._get_latest_checkpoint() # Count memory entries memory_entries = self._count_memory_entries() # Generate report ID report_id = f"rpt-{datetime.now().strftime('%Y%m%d-%H%M%S')}" report = OversightReport( report_id=report_id, generated_at=self._now(), summary=phase_summary, phase_reports=phase_reports, watcher_summary=watcher_summary, suggestion_summary=suggestion_summary, council_summary=council_summary, injection_results=injection_results, pending_actions=pending_actions, checkpoint_link=checkpoint_link, memory_entries=memory_entries ) # Persist report self._persist_report(report) return report def _gather_pending_actions(self, phase_reports: list[PhaseReport], watcher_summary: dict) -> list[dict]: """Gather all pending actions from various sources""" actions = [] # From phase reports for pr in phase_reports: for action in pr.pending_actions: actions.append({ "title": action, "phase": pr.phase_number, "priority": "high" if pr.status in ["blocked", "needs_review"] else "medium", "source": "phase_validator" }) for rec in pr.recommendations[:2]: actions.append({ "title": rec, "phase": pr.phase_number, "priority": "medium", "source": "recommendation" }) # From watcher (unresolved anomalies) unresolved = watcher_summary.get('unresolved', 0) if unresolved > 0: by_severity = watcher_summary.get('by_severity', {}) if by_severity.get('critical', 0) > 0: actions.append({ "title": f"Address {by_severity['critical']} critical anomalies", "priority": "critical", "source": "bug_watcher" }) if by_severity.get('high', 0) > 0: actions.append({ "title": f"Address {by_severity['high']} high-severity anomalies", "priority": "high", "source": "bug_watcher" }) # Sort by priority priority_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} actions.sort(key=lambda x: priority_order.get(x.get('priority', 'medium'), 2)) return actions[:30] # Limit def _get_latest_checkpoint(self) -> Optional[str]: """Get latest checkpoint ID""" checkpoint_dir = self.base_path / "checkpoint" / "storage" if not checkpoint_dir.exists(): return None checkpoints = sorted(checkpoint_dir.glob("ckpt-*.json"), reverse=True) if checkpoints: try: data = json.loads(checkpoints[0].read_text()) return data.get("checkpoint_id") except Exception: pass return None def _count_memory_entries(self) -> int: """Count memory entries""" memory_dir = self.base_path / "memory" / "summaries" if not memory_dir.exists(): return 0 return len(list(memory_dir.glob("*.json"))) def _persist_report(self, report: OversightReport): """Persist report to storage""" # Save to file reports_dir = self.base_path / "testing" / "oversight" / "reports" reports_dir.mkdir(exist_ok=True) report_file = reports_dir / f"{report.report_id}.json" report_file.write_text(json.dumps(asdict(report), indent=2, default=str)) # Save markdown version md_file = reports_dir / f"{report.report_id}.md" md_file.write_text(report.to_markdown()) # Update Redis if self._redis: self._redis.set("oversight:latest_report", report.report_id) self._redis.lpush("oversight:reports", report.report_id) self._redis.ltrim("oversight:reports", 0, 99) def get_latest_report(self) -> Optional[OversightReport]: """Get the latest report""" reports_dir = self.base_path / "testing" / "oversight" / "reports" if not reports_dir.exists(): return None reports = sorted(reports_dir.glob("rpt-*.json"), reverse=True) if reports: try: data = json.loads(reports[0].read_text()) # Convert phase_reports back to objects phase_reports = [PhaseReport(**pr) for pr in data.pop('phase_reports', [])] return OversightReport(**data, phase_reports=phase_reports) except Exception: pass return None def list_reports(self, limit: int = 20) -> list[dict]: """List available reports""" reports_dir = self.base_path / "testing" / "oversight" / "reports" if not reports_dir.exists(): return [] reports = sorted(reports_dir.glob("rpt-*.json"), reverse=True)[:limit] result = [] for report_file in reports: try: data = json.loads(report_file.read_text()) result.append({ "report_id": data.get("report_id"), "generated_at": data.get("generated_at"), "phases_validated": data.get("summary", {}).get("phases_validated", 0), "total_anomalies": data.get("summary", {}).get("total_anomalies", 0) }) except Exception: continue return result if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Oversight Reporter") parser.add_argument("command", choices=["generate", "latest", "list", "view"]) parser.add_argument("--inject", action="store_true", help="Include injection tests") parser.add_argument("--markdown", action="store_true", help="Output as markdown") parser.add_argument("--json", action="store_true") parser.add_argument("--report-id", help="Report ID to view") args = parser.parse_args() reporter = OversightReporter() if args.command == "generate": print("Generating oversight report...") report = reporter.generate_report(include_injections=args.inject) if args.markdown: print(report.to_markdown()) elif args.json: print(json.dumps(asdict(report), indent=2, default=str)) else: print(f"\n{'='*60}") print(f"OVERSIGHT REPORT: {report.report_id}") print(f"{'='*60}") print(f"Generated: {report.generated_at}") print(f"Phases: {report.summary.get('phases_validated', 0)}") print(f"Coverage: {report.summary.get('average_coverage', 0)}%") print(f"Anomalies: {report.summary.get('total_anomalies', 0)}") print(f"\nReport saved to: testing/oversight/reports/{report.report_id}.md") elif args.command == "latest": report = reporter.get_latest_report() if report: if args.markdown: print(report.to_markdown()) else: print(f"Latest Report: {report.report_id}") print(f"Generated: {report.generated_at}") else: print("No reports found") elif args.command == "list": reports = reporter.list_reports() if args.json: print(json.dumps(reports, indent=2)) else: for r in reports: print(f"[{r['report_id']}] {r['generated_at']} - {r['total_anomalies']} anomalies") elif args.command == "view" and args.report_id: reports_dir = Path("/opt/agent-governance/testing/oversight/reports") report_file = reports_dir / f"{args.report_id}.md" if report_file.exists(): print(report_file.read_text()) else: print(f"Report not found: {args.report_id}")