profit 8c6e7831e9 Add Phase 10-12 implementation: multi-tenant, marketplace, observability
Major additions:
- marketplace/: Agent template registry with FTS5 search, ratings, versioning
- observability/: Prometheus metrics, distributed tracing, structured logging
- ledger/migrations/: Database migration scripts for multi-tenant support
- tests/governance/: 15 new test files for phases 6-12 (295 total tests)
- bin/validate-phases: Full 12-phase validation script

New features:
- Multi-tenant support with tenant isolation and quota enforcement
- Agent marketplace with semantic versioning and search
- Observability with metrics, tracing, and log correlation
- Tier-1 agent bootstrap scripts

Updated components:
- ledger/api.py: Extended API for tenants, marketplace, observability
- ledger/schema.sql: Added tenant, project, marketplace tables
- testing/framework.ts: Enhanced test framework
- checkpoint/checkpoint.py: Improved checkpoint management

Archived:
- External integrations (Slack/GitHub/PagerDuty) moved to .archive/
- Old checkpoint files cleaned up

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 18:39:47 -05:00

590 lines
21 KiB
Python

"""
Error Injector
==============
Controlled fault injection for testing the oversight system.
Features:
- Inject known errors to test watcher detection
- Validate suggestion/council response
- Clean up injected errors after tests
- Safe mode to prevent production impact
"""
import json
import os
import hashlib
import shutil
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Optional
import redis
class InjectionType(str, Enum):
"""Types of errors that can be injected"""
MISSING_FILE = "missing_file"
CORRUPTED_CONFIG = "corrupted_config"
INVALID_STATUS = "invalid_status"
DEPENDENCY_FAILURE = "dependency_failure"
PERMISSION_ERROR = "permission_error"
STATE_INCONSISTENCY = "state_inconsistency"
TIMEOUT_SIMULATION = "timeout_simulation"
SECURITY_VIOLATION = "security_violation"
class InjectionScope(str, Enum):
"""Scope of error injection"""
FILE = "file" # Single file
DIRECTORY = "directory" # Directory level
SERVICE = "service" # Service/dependency
DATABASE = "database" # Database entry
MEMORY = "memory" # In-memory state
@dataclass
class Injection:
"""Represents an error injection"""
id: str
type: InjectionType
scope: InjectionScope
target: str
description: str
phase_affected: int
original_state: Optional[dict] = None # For restoration
injected_at: str = ""
cleaned_up: bool = False
cleanup_at: Optional[str] = None
detected: bool = False
detected_at: Optional[str] = None
suggestion_generated: bool = False
council_reviewed: bool = False
def __post_init__(self):
if not self.injected_at:
self.injected_at = datetime.now(timezone.utc).isoformat()
if not self.id:
self.id = f"inj-{hashlib.sha256(f'{self.type}{self.target}{self.injected_at}'.encode()).hexdigest()[:12]}"
@dataclass
class InjectionResult:
"""Result of an injection test"""
injection_id: str
detected_by_watcher: bool
detection_time_ms: int
suggestion_quality: str # "accurate", "partial", "missed", "n/a"
council_decision: str # decision type or "n/a"
false_positives: int
test_passed: bool
notes: str = ""
class ErrorInjector:
"""
Controlled error injection for testing oversight layers.
Safety features:
- All injections are tracked and reversible
- Safe mode prevents production impact
- Automatic cleanup after tests
"""
# Predefined injection scenarios
SCENARIOS = {
"missing_config": {
"type": InjectionType.MISSING_FILE,
"scope": InjectionScope.FILE,
"target": "agents/tier0-agent/config/agent.json",
"description": "Remove agent config to test missing file detection",
"phase": 5
},
"corrupted_status": {
"type": InjectionType.INVALID_STATUS,
"scope": InjectionScope.FILE,
"target": "checkpoint/STATUS.md",
"description": "Corrupt STATUS.md to test status validation",
"phase": 5
},
"stale_checkpoint": {
"type": InjectionType.STATE_INCONSISTENCY,
"scope": InjectionScope.DIRECTORY,
"target": "checkpoint",
"description": "Create very old checkpoint to test staleness detection",
"phase": 5
},
"redis_key_missing": {
"type": InjectionType.DEPENDENCY_FAILURE,
"scope": InjectionScope.DATABASE,
"target": "oversight:watcher",
"description": "Delete Redis key to test dependency detection",
"phase": 8
},
"violation_unacked": {
"type": InjectionType.SECURITY_VIOLATION,
"scope": InjectionScope.DATABASE,
"target": "violations",
"description": "Insert unacknowledged critical violation",
"phase": 4
},
"blocked_directory": {
"type": InjectionType.STATE_INCONSISTENCY,
"scope": InjectionScope.DIRECTORY,
"target": "preflight/",
"description": "Mark directory as BLOCKED in STATUS.md",
"phase": 3
}
}
def __init__(self, base_path: str = "/opt/agent-governance", safe_mode: bool = True):
self.base_path = Path(base_path)
self.safe_mode = safe_mode
self.injections: list[Injection] = []
self.backups: dict[str, Any] = {}
self._redis: Optional[redis.Redis] = None
self._setup_redis()
def _setup_redis(self):
"""Connect to DragonflyDB"""
try:
self._redis = redis.Redis(
host='127.0.0.1',
port=6379,
password='governance2026',
decode_responses=True
)
self._redis.ping()
except Exception:
self._redis = None
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
def inject(self, scenario_name: str) -> Optional[Injection]:
"""Inject an error using a predefined scenario"""
scenario = self.SCENARIOS.get(scenario_name)
if not scenario:
return None
return self.inject_custom(
injection_type=scenario["type"],
scope=scenario["scope"],
target=scenario["target"],
description=scenario["description"],
phase=scenario["phase"]
)
def inject_custom(
self,
injection_type: InjectionType,
scope: InjectionScope,
target: str,
description: str,
phase: int
) -> Injection:
"""Inject a custom error"""
injection = Injection(
id="",
type=injection_type,
scope=scope,
target=target,
description=description,
phase_affected=phase
)
# Backup original state
injection.original_state = self._backup_state(scope, target)
# Perform injection
if not self.safe_mode:
self._perform_injection(injection)
else:
# In safe mode, just simulate
injection.original_state = {"simulated": True}
self.injections.append(injection)
self._persist_injection(injection)
return injection
def _backup_state(self, scope: InjectionScope, target: str) -> dict:
"""Backup state before injection"""
backup = {"scope": scope.value, "target": target}
if scope == InjectionScope.FILE:
file_path = self.base_path / target
if file_path.exists():
backup["content"] = file_path.read_text()
backup["exists"] = True
else:
backup["exists"] = False
elif scope == InjectionScope.DIRECTORY:
dir_path = self.base_path / target
status_file = dir_path / "STATUS.md"
if status_file.exists():
backup["status_content"] = status_file.read_text()
elif scope == InjectionScope.DATABASE and self._redis:
if ":" in target:
# Redis key - handle different types
key_type = self._redis.type(target)
backup["redis_type"] = key_type
try:
if key_type == "string":
backup["redis_value"] = self._redis.get(target)
elif key_type == "hash":
backup["redis_value"] = self._redis.hgetall(target)
elif key_type == "list":
backup["redis_value"] = self._redis.lrange(target, 0, -1)
elif key_type == "set":
backup["redis_value"] = list(self._redis.smembers(target))
else:
backup["redis_value"] = None
except Exception:
backup["redis_value"] = None
return backup
def _perform_injection(self, injection: Injection):
"""Actually perform the injection (only in non-safe mode)"""
if injection.scope == InjectionScope.FILE:
if injection.type == InjectionType.MISSING_FILE:
file_path = self.base_path / injection.target
if file_path.exists():
# Rename instead of delete for safety
backup_path = file_path.with_suffix(file_path.suffix + ".injection_backup")
shutil.move(str(file_path), str(backup_path))
self.backups[injection.id] = {"backup_path": str(backup_path)}
elif injection.type == InjectionType.CORRUPTED_CONFIG:
file_path = self.base_path / injection.target
if file_path.exists():
self.backups[injection.id] = {"original": file_path.read_text()}
file_path.write_text("CORRUPTED_BY_INJECTION_TEST")
elif injection.scope == InjectionScope.DIRECTORY:
if injection.type == InjectionType.STATE_INCONSISTENCY:
dir_path = self.base_path / injection.target
status_file = dir_path / "STATUS.md"
if status_file.exists():
original = status_file.read_text()
self.backups[injection.id] = {"original": original}
# Add BLOCKED marker
status_file.write_text(original.replace("IN_PROGRESS", "BLOCKED"))
elif injection.scope == InjectionScope.DATABASE and self._redis:
if injection.type == InjectionType.DEPENDENCY_FAILURE:
key = injection.target
original = self._redis.get(key)
self.backups[injection.id] = {"redis_key": key, "original": original}
self._redis.delete(key)
elif injection.type == InjectionType.SECURITY_VIOLATION:
# Insert a test violation
import sqlite3
conn = sqlite3.connect(self.base_path / "ledger" / "governance.db")
cursor = conn.cursor()
cursor.execute("""
INSERT INTO violations (agent_id, violation_type, severity, description, acknowledged, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", (
"injection-test-agent",
"INJECTION_TEST",
"critical",
"Test violation injected by ErrorInjector",
0,
self._now()
))
self.backups[injection.id] = {"violation_id": cursor.lastrowid}
conn.commit()
conn.close()
def cleanup(self, injection_id: str) -> bool:
"""Clean up a specific injection"""
injection = None
for inj in self.injections:
if inj.id == injection_id:
injection = inj
break
if not injection:
return False
if injection.cleaned_up:
return True
if self.safe_mode:
injection.cleaned_up = True
injection.cleanup_at = self._now()
return True
# Restore original state
backup = self.backups.get(injection_id, {})
if injection.scope == InjectionScope.FILE:
if "backup_path" in backup:
backup_path = Path(backup["backup_path"])
original_path = self.base_path / injection.target
if backup_path.exists():
shutil.move(str(backup_path), str(original_path))
elif "original" in backup:
file_path = self.base_path / injection.target
file_path.write_text(backup["original"])
elif injection.scope == InjectionScope.DIRECTORY:
if "original" in backup:
dir_path = self.base_path / injection.target
status_file = dir_path / "STATUS.md"
status_file.write_text(backup["original"])
elif injection.scope == InjectionScope.DATABASE:
if "redis_key" in backup and self._redis:
if backup.get("original"):
self._redis.set(backup["redis_key"], backup["original"])
if "violation_id" in backup:
import sqlite3
conn = sqlite3.connect(self.base_path / "ledger" / "governance.db")
cursor = conn.cursor()
cursor.execute("DELETE FROM violations WHERE id = ?", (backup["violation_id"],))
conn.commit()
conn.close()
injection.cleaned_up = True
injection.cleanup_at = self._now()
return True
def cleanup_all(self) -> int:
"""Clean up all injections"""
cleaned = 0
for injection in self.injections:
if not injection.cleaned_up:
if self.cleanup(injection.id):
cleaned += 1
return cleaned
def _persist_injection(self, injection: Injection):
"""Persist injection to storage"""
if not self._redis:
return
self._redis.lpush(
"oversight:injections",
json.dumps(asdict(injection))
)
def mark_detected(self, injection_id: str) -> bool:
"""Mark an injection as detected by the watcher"""
for injection in self.injections:
if injection.id == injection_id:
injection.detected = True
injection.detected_at = self._now()
return True
return False
def mark_suggestion_generated(self, injection_id: str) -> bool:
"""Mark that a suggestion was generated for this injection"""
for injection in self.injections:
if injection.id == injection_id:
injection.suggestion_generated = True
return True
return False
def mark_council_reviewed(self, injection_id: str) -> bool:
"""Mark that council reviewed this injection"""
for injection in self.injections:
if injection.id == injection_id:
injection.council_reviewed = True
return True
return False
def get_injections(self, active_only: bool = False) -> list[Injection]:
"""Get all injections"""
if active_only:
return [i for i in self.injections if not i.cleaned_up]
return self.injections
def run_scenario(self, scenario_name: str) -> InjectionResult:
"""Run a complete injection test scenario"""
import time
# Inject error
injection = self.inject(scenario_name)
if not injection:
return InjectionResult(
injection_id="",
detected_by_watcher=False,
detection_time_ms=0,
suggestion_quality="n/a",
council_decision="n/a",
false_positives=0,
test_passed=False,
notes=f"Unknown scenario: {scenario_name}"
)
# Run watcher
from .bug_watcher import BugWindowWatcher
start_time = time.time()
watcher = BugWindowWatcher(str(self.base_path))
anomalies = watcher.scan_phase(injection.phase_affected)
detection_time = int((time.time() - start_time) * 1000)
# Check if our injection was detected
detected = False
for anomaly in anomalies:
if injection.target in anomaly.message or injection.target in anomaly.directory:
detected = True
self.mark_detected(injection.id)
break
# Run suggestion engine
suggestion_quality = "n/a"
if detected:
from .suggestion_engine import SuggestionEngine
engine = SuggestionEngine(str(self.base_path))
for anomaly in anomalies:
suggestions = engine.generate_suggestions(anomaly)
if suggestions:
self.mark_suggestion_generated(injection.id)
suggestion_quality = "accurate" if len(suggestions) > 0 else "missed"
break
# Run council review
council_decision = "n/a"
if suggestion_quality == "accurate":
from .council import CouncilReview
from .suggestion_engine import SuggestionEngine, Suggestion
council = CouncilReview(str(self.base_path))
# Get a suggestion to review
for anomaly in anomalies:
suggestions = engine.generate_suggestions(anomaly)
for sug in suggestions[:1]:
decision = council.review_suggestion(sug)
self.mark_council_reviewed(injection.id)
council_decision = decision.decision.value
break
break
# Clean up
self.cleanup(injection.id)
# Evaluate test
test_passed = (
detected and
suggestion_quality in ["accurate", "partial"] and
council_decision != "n/a"
)
return InjectionResult(
injection_id=injection.id,
detected_by_watcher=detected,
detection_time_ms=detection_time,
suggestion_quality=suggestion_quality,
council_decision=council_decision,
false_positives=len([a for a in anomalies if injection.target not in a.message]),
test_passed=test_passed,
notes=f"Scenario: {scenario_name}"
)
def run_all_scenarios(self) -> list[InjectionResult]:
"""Run all predefined scenarios"""
results = []
for scenario_name in self.SCENARIOS:
result = self.run_scenario(scenario_name)
results.append(result)
return results
def get_summary(self) -> dict:
"""Get summary of injections"""
total = len(self.injections)
detected = sum(1 for i in self.injections if i.detected)
suggestion_generated = sum(1 for i in self.injections if i.suggestion_generated)
council_reviewed = sum(1 for i in self.injections if i.council_reviewed)
cleaned = sum(1 for i in self.injections if i.cleaned_up)
return {
"total_injections": total,
"detected": detected,
"detection_rate": f"{detected/total*100:.1f}%" if total > 0 else "0%",
"suggestions_generated": suggestion_generated,
"council_reviewed": council_reviewed,
"cleaned_up": cleaned,
"active": total - cleaned,
"safe_mode": self.safe_mode
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Error Injector")
parser.add_argument("command", choices=["inject", "cleanup", "list", "test", "test-all"])
parser.add_argument("--scenario", help="Scenario name")
parser.add_argument("--unsafe", action="store_true", help="Disable safe mode (actually modify files)")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
injector = ErrorInjector(safe_mode=not args.unsafe)
if args.command == "inject" and args.scenario:
injection = injector.inject(args.scenario)
if injection:
print(f"Injected: {injection.id}")
print(f"Type: {injection.type.value}")
print(f"Target: {injection.target}")
print(f"Safe mode: {injector.safe_mode}")
else:
print(f"Unknown scenario: {args.scenario}")
print(f"Available: {', '.join(injector.SCENARIOS.keys())}")
elif args.command == "cleanup":
cleaned = injector.cleanup_all()
print(f"Cleaned up {cleaned} injections")
elif args.command == "list":
injections = injector.get_injections()
if args.json:
print(json.dumps([asdict(i) for i in injections], indent=2))
else:
for i in injections:
status = "" if i.cleaned_up else "🔴"
print(f"{status} [{i.id}] {i.type.value}: {i.target}")
elif args.command == "test" and args.scenario:
print(f"\n{'='*60}")
print(f"INJECTION TEST: {args.scenario}")
print(f"{'='*60}")
result = injector.run_scenario(args.scenario)
if args.json:
print(json.dumps(asdict(result), indent=2))
else:
print(f"Detected: {'' if result.detected_by_watcher else ''} ({result.detection_time_ms}ms)")
print(f"Suggestion: {result.suggestion_quality}")
print(f"Council: {result.council_decision}")
print(f"Test: {'PASSED ✅' if result.test_passed else 'FAILED ❌'}")
elif args.command == "test-all":
print(f"\n{'='*60}")
print(f"RUNNING ALL INJECTION SCENARIOS")
print(f"{'='*60}")
results = injector.run_all_scenarios()
passed = sum(1 for r in results if r.test_passed)
for r in results:
icon = "" if r.test_passed else ""
print(f"{icon} {r.notes}: detect={r.detected_by_watcher}, suggest={r.suggestion_quality}, council={r.council_decision}")
print(f"\n{'='*60}")
print(f"RESULTS: {passed}/{len(results)} tests passed")