profit 77655c298c Initial commit: Agent Governance System Phase 8
Phase 8 Production Hardening with complete governance infrastructure:

- Vault integration with tiered policies (T0-T4)
- DragonflyDB state management
- SQLite audit ledger
- Pipeline DSL and templates
- Promotion/revocation engine
- Checkpoint system for session persistence
- Health manager and circuit breaker for fault tolerance
- GitHub/Slack integrations
- Architectural test pipeline with bug watcher, suggestion engine, council review
- Multi-agent chaos testing framework

Test Results:
- Governance tests: 68/68 passing
- E2E workflow: 16/16 passing
- Phase 2 Vault: 14/14 passing
- Integration tests: 27/27 passing

Coverage: 57.6% average across 12 phases

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 22:07:06 -05:00

477 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Architectural Test Pipeline
===========================
Multi-layer oversight system for continuous validation across all 12 phases.
Layers:
1. Bug Window Watcher - Real-time anomaly detection
2. Suggestion Engine - AI-driven fix recommendations
3. Council Review - Multi-agent decision making
4. Phase Validator - Coverage across all phases
5. Error Injector - Controlled fault injection
6. Reporter - Comprehensive reporting
Usage:
# Run full validation
python pipeline.py run
# Run with injection tests
python pipeline.py run --inject
# Validate specific phase
python pipeline.py validate --phase 5
# Generate report only
python pipeline.py report
"""
import json
import sys
import time
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Optional
from .bug_watcher import BugWindowWatcher, Anomaly, Severity
from .suggestion_engine import SuggestionEngine, Suggestion
from .council import CouncilReview, Decision, DecisionType
from .phase_validator import PhaseValidator, ValidationLevel
from .error_injector import ErrorInjector
from .reporter import OversightReporter, OversightReport
@dataclass
class PipelineConfig:
"""Configuration for the pipeline"""
base_path: str = "/opt/agent-governance"
validation_level: ValidationLevel = ValidationLevel.STANDARD
run_injections: bool = False
safe_mode: bool = True
focus_phase: Optional[int] = None # Special attention phase (default: 5)
max_suggestions_per_anomaly: int = 3
auto_fix_enabled: bool = False
generate_report: bool = True
verbose: bool = False
@dataclass
class PipelineResult:
"""Result of pipeline execution"""
success: bool
started_at: str
completed_at: str
duration_ms: int
phases_validated: int
anomalies_detected: int
suggestions_generated: int
council_decisions: int
auto_fixes_applied: int
injection_tests_run: int
injection_tests_passed: int
report_id: Optional[str] = None
errors: list[str] = field(default_factory=list)
class ArchitecturalTestPipeline:
"""
Main orchestrator for the architectural test pipeline.
Runs all oversight layers in sequence:
1. Phase Validation - Ensure all phases have required components
2. Bug Detection - Scan for anomalies across all phases
3. Suggestion Generation - Create fix recommendations
4. Council Review - Multi-perspective decision making
5. Auto-Fix (if enabled) - Apply approved low-risk fixes
6. Injection Testing (if enabled) - Verify oversight works
7. Reporting - Generate comprehensive report
"""
def __init__(self, config: Optional[PipelineConfig] = None):
self.config = config or PipelineConfig()
self.base_path = Path(self.config.base_path)
# Initialize components
self.watcher = BugWindowWatcher(str(self.base_path))
self.suggestion_engine = SuggestionEngine(str(self.base_path))
self.council = CouncilReview(str(self.base_path))
self.phase_validator = PhaseValidator(str(self.base_path))
self.error_injector = ErrorInjector(str(self.base_path), safe_mode=self.config.safe_mode)
self.reporter = OversightReporter(str(self.base_path))
# Results tracking
self.anomalies: list[Anomaly] = []
self.suggestions: list[Suggestion] = []
self.decisions: list[Decision] = []
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def _log(self, message: str, level: str = "info"):
"""Log a message"""
if self.config.verbose or level in ["error", "warning"]:
timestamp = datetime.now().strftime("%H:%M:%S")
icons = {"info": "", "success": "", "warning": "⚠️", "error": "", "step": "➡️"}
icon = icons.get(level, "")
print(f"[{timestamp}] {icon} {message}")
def run(self) -> PipelineResult:
"""Run the full pipeline"""
start_time = time.time()
started_at = self._now()
errors = []
self._log("Starting Architectural Test Pipeline", "step")
print("=" * 60)
# 1. Phase Validation
self._log("Phase 1/7: Validating all phases...", "step")
try:
validation_results = self.phase_validator.validate_all_phases(self.config.validation_level)
phases_validated = len(validation_results)
self._log(f"Validated {phases_validated} phases", "success")
# Special attention to focus phase
focus = self.config.focus_phase or 5
if focus in validation_results:
focus_result = validation_results[focus]
self._log(f"Phase {focus} ({focus_result.phase_name}): {focus_result.status.value}, {focus_result.coverage_percent:.1f}% coverage", "info")
except Exception as e:
errors.append(f"Phase validation failed: {e}")
self._log(f"Phase validation error: {e}", "error")
phases_validated = 0
# 2. Bug Detection
self._log("Phase 2/7: Scanning for anomalies...", "step")
try:
self.watcher.start()
if self.config.focus_phase:
self.anomalies = self.watcher.scan_phase(self.config.focus_phase)
else:
self.anomalies = self.watcher.scan_all_phases()
self._log(f"Detected {len(self.anomalies)} anomalies", "success")
# Report critical anomalies
critical = [a for a in self.anomalies if a.severity == Severity.CRITICAL]
if critical:
self._log(f"⚠️ {len(critical)} CRITICAL anomalies found!", "warning")
except Exception as e:
errors.append(f"Bug detection failed: {e}")
self._log(f"Bug detection error: {e}", "error")
# 3. Suggestion Generation
self._log("Phase 3/7: Generating suggestions...", "step")
try:
for anomaly in self.anomalies[:20]: # Limit for performance
suggestions = self.suggestion_engine.generate_suggestions(anomaly)
self.suggestions.extend(suggestions[:self.config.max_suggestions_per_anomaly])
self._log(f"Generated {len(self.suggestions)} suggestions", "success")
except Exception as e:
errors.append(f"Suggestion generation failed: {e}")
self._log(f"Suggestion generation error: {e}", "error")
# 4. Council Review
self._log("Phase 4/7: Council reviewing suggestions...", "step")
try:
for suggestion in self.suggestions[:15]: # Limit for performance
decision = self.council.review_suggestion(suggestion)
self.decisions.append(decision)
auto_approved = sum(1 for d in self.decisions if d.decision == DecisionType.AUTO_APPROVE)
human_approved = sum(1 for d in self.decisions if d.decision == DecisionType.HUMAN_APPROVE)
rejected = sum(1 for d in self.decisions if d.decision == DecisionType.REJECT)
self._log(f"Council decisions: {auto_approved} auto-approve, {human_approved} human-approve, {rejected} rejected", "success")
except Exception as e:
errors.append(f"Council review failed: {e}")
self._log(f"Council review error: {e}", "error")
# 5. Auto-Fix (if enabled)
auto_fixes_applied = 0
if self.config.auto_fix_enabled:
self._log("Phase 5/7: Applying auto-fixes...", "step")
auto_approved = [d for d in self.decisions if d.auto_fix_approved]
self._log(f"Auto-fix disabled in safe mode. {len(auto_approved)} fixes would be applied.", "info")
else:
self._log("Phase 5/7: Auto-fix disabled, skipping...", "step")
# 6. Injection Testing
injection_tests_run = 0
injection_tests_passed = 0
if self.config.run_injections:
self._log("Phase 6/7: Running injection tests...", "step")
try:
scenarios = list(self.error_injector.SCENARIOS.keys())[:4] # Limit
for scenario in scenarios:
result = self.error_injector.run_scenario(scenario)
injection_tests_run += 1
if result.test_passed:
injection_tests_passed += 1
self._log(f"{scenario}: PASSED", "info")
else:
self._log(f"{scenario}: FAILED", "warning")
self._log(f"Injection tests: {injection_tests_passed}/{injection_tests_run} passed", "success")
except Exception as e:
errors.append(f"Injection testing failed: {e}")
self._log(f"Injection testing error: {e}", "error")
else:
self._log("Phase 6/7: Injection tests disabled, skipping...", "step")
# 7. Generate Report
report_id = None
if self.config.generate_report:
self._log("Phase 7/7: Generating report...", "step")
try:
report = self.reporter.generate_report(include_injections=self.config.run_injections)
report_id = report.report_id
self._log(f"Report generated: {report_id}", "success")
except Exception as e:
errors.append(f"Report generation failed: {e}")
self._log(f"Report generation error: {e}", "error")
else:
self._log("Phase 7/7: Report generation disabled, skipping...", "step")
# Calculate duration
duration_ms = int((time.time() - start_time) * 1000)
completed_at = self._now()
# Determine success
success = len(errors) == 0
print("=" * 60)
self._log(f"Pipeline {'completed successfully' if success else 'completed with errors'}", "success" if success else "warning")
self._log(f"Duration: {duration_ms}ms", "info")
return PipelineResult(
success=success,
started_at=started_at,
completed_at=completed_at,
duration_ms=duration_ms,
phases_validated=phases_validated,
anomalies_detected=len(self.anomalies),
suggestions_generated=len(self.suggestions),
council_decisions=len(self.decisions),
auto_fixes_applied=auto_fixes_applied,
injection_tests_run=injection_tests_run,
injection_tests_passed=injection_tests_passed,
report_id=report_id,
errors=errors
)
def run_quick_validation(self) -> dict:
"""Run a quick validation without full pipeline"""
self._log("Running quick validation...", "step")
# Just validate phases and scan for anomalies
validation_results = self.phase_validator.validate_all_phases(ValidationLevel.BASIC)
self.watcher.start()
anomalies = self.watcher.scan_all_phases()
summary = self.phase_validator.get_summary()
watcher_summary = self.watcher.get_summary()
return {
"phases": summary,
"anomalies": watcher_summary,
"critical_issues": summary.get('critical_gaps', []),
"phase_5_status": summary.get('phase_5_status', 'unknown')
}
def validate_phase(self, phase_num: int) -> dict:
"""Validate a specific phase in detail"""
self._log(f"Validating Phase {phase_num}...", "step")
# Validate phase
result = self.phase_validator.validate_phase(phase_num, self.config.validation_level)
# Scan for anomalies
self.watcher.start()
anomalies = self.watcher.scan_phase(phase_num)
# Generate suggestions for anomalies
suggestions = []
for anomaly in anomalies[:10]:
sugs = self.suggestion_engine.generate_suggestions(anomaly)
suggestions.extend(sugs[:2])
# Council review
decisions = []
for sug in suggestions[:5]:
decision = self.council.review_suggestion(sug)
decisions.append({
"suggestion": sug.title,
"decision": decision.decision.value,
"auto_fix": decision.auto_fix_approved
})
return {
"phase": phase_num,
"name": result.phase_name,
"status": result.status.value,
"coverage": result.coverage_percent,
"anomalies": len(anomalies),
"suggestions": len(suggestions),
"decisions": decisions,
"gaps": result.gaps,
"recommendations": result.recommendations
}
def get_status(self) -> dict:
"""Get current pipeline status"""
return {
"config": asdict(self.config),
"watcher": self.watcher.get_summary() if self.watcher else {},
"suggestions": self.suggestion_engine.get_summary() if self.suggestion_engine else {},
"council": self.council.get_summary() if self.council else {},
"phases": self.phase_validator.get_summary() if self.phase_validator else {}
}
def main():
"""CLI entry point"""
import argparse
parser = argparse.ArgumentParser(
description="Architectural Test Pipeline - Multi-layer oversight system",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python pipeline.py run # Full pipeline
python pipeline.py run --inject # With injection tests
python pipeline.py run --phase 5 # Focus on Phase 5
python pipeline.py validate --phase 5 # Validate specific phase
python pipeline.py quick # Quick validation
python pipeline.py report # Generate report only
python pipeline.py matrix # Show phase matrix
"""
)
parser.add_argument("command", choices=["run", "quick", "validate", "report", "matrix", "status"],
help="Command to execute")
parser.add_argument("--phase", type=int, help="Focus on specific phase")
parser.add_argument("--inject", action="store_true", help="Run injection tests")
parser.add_argument("--unsafe", action="store_true", help="Disable safe mode")
parser.add_argument("--auto-fix", action="store_true", help="Enable auto-fix")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
# Build config
config = PipelineConfig(
run_injections=args.inject,
safe_mode=not args.unsafe,
focus_phase=args.phase,
auto_fix_enabled=args.auto_fix,
verbose=args.verbose
)
pipeline = ArchitecturalTestPipeline(config)
if args.command == "run":
result = pipeline.run()
if args.json:
print(json.dumps(asdict(result), indent=2))
else:
print(f"\n{'='*60}")
print("PIPELINE RESULT SUMMARY")
print(f"{'='*60}")
print(f"Status: {'✅ SUCCESS' if result.success else '❌ FAILED'}")
print(f"Duration: {result.duration_ms}ms")
print(f"Phases Validated: {result.phases_validated}")
print(f"Anomalies Detected: {result.anomalies_detected}")
print(f"Suggestions Generated: {result.suggestions_generated}")
print(f"Council Decisions: {result.council_decisions}")
if result.injection_tests_run > 0:
print(f"Injection Tests: {result.injection_tests_passed}/{result.injection_tests_run} passed")
if result.report_id:
print(f"\nReport: testing/oversight/reports/{result.report_id}.md")
if result.errors:
print(f"\nErrors:")
for err in result.errors:
print(f" - {err}")
elif args.command == "quick":
result = pipeline.run_quick_validation()
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"\n{'='*60}")
print("QUICK VALIDATION SUMMARY")
print(f"{'='*60}")
print(f"Phases: {result['phases'].get('phases_validated', 0)}")
print(f"Coverage: {result['phases'].get('average_coverage', 0)}%")
print(f"Anomalies: {result['anomalies'].get('total_anomalies', 0)}")
print(f"Phase 5: {result['phase_5_status']}")
if result['critical_issues']:
print(f"\nCritical Issues:")
for issue in result['critical_issues'][:5]:
print(f" - {issue}")
elif args.command == "validate" and args.phase:
result = pipeline.validate_phase(args.phase)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"\n{'='*60}")
print(f"PHASE {result['phase']} VALIDATION: {result['name']}")
print(f"{'='*60}")
print(f"Status: {result['status']}")
print(f"Coverage: {result['coverage']:.1f}%")
print(f"Anomalies: {result['anomalies']}")
print(f"Suggestions: {result['suggestions']}")
if result['decisions']:
print(f"\nCouncil Decisions:")
for d in result['decisions']:
icon = "🤖" if d['auto_fix'] else "👤"
print(f" {icon} {d['decision']}: {d['suggestion'][:50]}...")
if result['gaps']:
print(f"\nGaps:")
for gap in result['gaps'][:5]:
print(f" - {gap}")
elif args.command == "report":
report = pipeline.reporter.generate_report(include_injections=args.inject)
if args.json:
print(json.dumps(asdict(report), indent=2, default=str))
else:
print(report.to_markdown())
elif args.command == "matrix":
pipeline.phase_validator.validate_all_phases()
print(pipeline.phase_validator.get_phase_matrix())
elif args.command == "status":
status = pipeline.get_status()
if args.json:
print(json.dumps(status, indent=2))
else:
print(f"\n{'='*60}")
print("PIPELINE STATUS")
print(f"{'='*60}")
print(f"Safe Mode: {status['config']['safe_mode']}")
print(f"Focus Phase: {status['config']['focus_phase'] or 'All'}")
print(f"Auto-Fix: {status['config']['auto_fix_enabled']}")
if __name__ == "__main__":
main()