Phase 8 Production Hardening with complete governance infrastructure: - Vault integration with tiered policies (T0-T4) - DragonflyDB state management - SQLite audit ledger - Pipeline DSL and templates - Promotion/revocation engine - Checkpoint system for session persistence - Health manager and circuit breaker for fault tolerance - GitHub/Slack integrations - Architectural test pipeline with bug watcher, suggestion engine, council review - Multi-agent chaos testing framework Test Results: - Governance tests: 68/68 passing - E2E workflow: 16/16 passing - Phase 2 Vault: 14/14 passing - Integration tests: 27/27 passing Coverage: 57.6% average across 12 phases Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
638 lines
22 KiB
Python
Executable File
638 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Revocation Engine
|
|
=================
|
|
Real-time violation detection and immediate credential revocation.
|
|
Part of Phase 4: Promotion and Revocation Engine.
|
|
|
|
Immediate Revocation Events:
|
|
- Resource created outside approved pool
|
|
- Terraform apply without stored plan
|
|
- Ansible run without check-mode (no waiver)
|
|
- Prod access without gate approval
|
|
- Unrecorded root session
|
|
- Direct baseline mutation
|
|
- Error budget exceeded
|
|
- Procedure violations
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import redis
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
|
|
# =============================================================================
|
|
# Configuration
|
|
# =============================================================================
|
|
|
|
LEDGER_DB = "/opt/agent-governance/ledger/governance.db"
|
|
|
|
|
|
class ViolationType(str, Enum):
|
|
# Critical - Immediate revocation
|
|
UNAUTHORIZED_POOL = "UNAUTHORIZED_POOL"
|
|
APPLY_WITHOUT_PLAN = "APPLY_WITHOUT_PLAN"
|
|
RUN_WITHOUT_CHECK = "RUN_WITHOUT_CHECK"
|
|
UNAUTHORIZED_PROD = "UNAUTHORIZED_PROD"
|
|
UNRECORDED_ROOT = "UNRECORDED_ROOT"
|
|
BASELINE_MUTATION = "BASELINE_MUTATION"
|
|
|
|
# High - Immediate revocation
|
|
ERROR_BUDGET_EXCEEDED = "ERROR_BUDGET_EXCEEDED"
|
|
PROCEDURE_VIOLATION = "PROCEDURE_VIOLATION"
|
|
HEARTBEAT_TIMEOUT = "HEARTBEAT_TIMEOUT"
|
|
LOCK_EXPIRED = "LOCK_EXPIRED"
|
|
|
|
# Medium - Warning then revocation
|
|
SCOPE_VIOLATION = "SCOPE_VIOLATION"
|
|
FORBIDDEN_ACTION = "FORBIDDEN_ACTION"
|
|
|
|
# Low - Warning only
|
|
CONFIDENCE_BELOW_THRESHOLD = "CONFIDENCE_BELOW_THRESHOLD"
|
|
MISSING_ARTIFACT = "MISSING_ARTIFACT"
|
|
|
|
|
|
class Severity(str, Enum):
|
|
CRITICAL = "critical" # Immediate revocation, alert
|
|
HIGH = "high" # Immediate revocation
|
|
MEDIUM = "medium" # Warning, second offense = revoke
|
|
LOW = "low" # Warning only
|
|
|
|
|
|
# Violation severity mapping
|
|
VIOLATION_SEVERITY = {
|
|
ViolationType.UNAUTHORIZED_POOL: Severity.CRITICAL,
|
|
ViolationType.APPLY_WITHOUT_PLAN: Severity.CRITICAL,
|
|
ViolationType.RUN_WITHOUT_CHECK: Severity.CRITICAL,
|
|
ViolationType.UNAUTHORIZED_PROD: Severity.CRITICAL,
|
|
ViolationType.UNRECORDED_ROOT: Severity.CRITICAL,
|
|
ViolationType.BASELINE_MUTATION: Severity.CRITICAL,
|
|
ViolationType.ERROR_BUDGET_EXCEEDED: Severity.HIGH,
|
|
ViolationType.PROCEDURE_VIOLATION: Severity.HIGH,
|
|
ViolationType.HEARTBEAT_TIMEOUT: Severity.HIGH,
|
|
ViolationType.LOCK_EXPIRED: Severity.HIGH,
|
|
ViolationType.SCOPE_VIOLATION: Severity.MEDIUM,
|
|
ViolationType.FORBIDDEN_ACTION: Severity.MEDIUM,
|
|
ViolationType.CONFIDENCE_BELOW_THRESHOLD: Severity.LOW,
|
|
ViolationType.MISSING_ARTIFACT: Severity.LOW,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Violation:
|
|
agent_id: str
|
|
violation_type: ViolationType
|
|
severity: Severity
|
|
description: str
|
|
triggering_action: str
|
|
evidence: dict
|
|
timestamp: str
|
|
remediation: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"agent_id": self.agent_id,
|
|
"violation_type": self.violation_type.value,
|
|
"severity": self.severity.value,
|
|
"description": self.description,
|
|
"triggering_action": self.triggering_action,
|
|
"evidence": self.evidence,
|
|
"timestamp": self.timestamp,
|
|
"remediation": self.remediation
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class RevocationResult:
|
|
agent_id: str
|
|
success: bool
|
|
action_taken: str
|
|
violation: Violation
|
|
vault_revoked: bool
|
|
dragonfly_revoked: bool
|
|
timestamp: str
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"agent_id": self.agent_id,
|
|
"success": self.success,
|
|
"action_taken": self.action_taken,
|
|
"violation": self.violation.to_dict(),
|
|
"vault_revoked": self.vault_revoked,
|
|
"dragonfly_revoked": self.dragonfly_revoked,
|
|
"timestamp": self.timestamp
|
|
}
|
|
|
|
|
|
class RevocationEngine:
|
|
"""
|
|
Detects violations and revokes agent access.
|
|
"""
|
|
|
|
def __init__(self, db_path: str = LEDGER_DB):
|
|
self.db_path = db_path
|
|
self.vault_token = self._get_vault_token()
|
|
self.redis = self._get_redis()
|
|
|
|
def _now(self) -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
def _get_vault_token(self) -> str:
|
|
try:
|
|
with open("/opt/vault/init-keys.json") as f:
|
|
return json.load(f)["root_token"]
|
|
except:
|
|
return ""
|
|
|
|
def _get_redis(self) -> Optional[redis.Redis]:
|
|
try:
|
|
# Get credentials from Vault
|
|
result = subprocess.run([
|
|
"curl", "-sk",
|
|
"-H", f"X-Vault-Token: {self.vault_token}",
|
|
"https://127.0.0.1:8200/v1/secret/data/services/dragonfly"
|
|
], capture_output=True, text=True)
|
|
|
|
creds = json.loads(result.stdout)["data"]["data"]
|
|
return redis.Redis(
|
|
host=creds["host"],
|
|
port=int(creds["port"]),
|
|
password=creds["password"],
|
|
decode_responses=True
|
|
)
|
|
except:
|
|
return None
|
|
|
|
def _get_conn(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def record_violation(self, violation: Violation) -> int:
|
|
"""Record a violation in the ledger"""
|
|
conn = self._get_conn()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO violations
|
|
(timestamp, agent_id, violation_type, severity, description,
|
|
triggering_action, evidence, remediation)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
violation.timestamp,
|
|
violation.agent_id,
|
|
violation.violation_type.value,
|
|
violation.severity.value,
|
|
violation.description,
|
|
violation.triggering_action,
|
|
json.dumps(violation.evidence),
|
|
violation.remediation
|
|
))
|
|
|
|
violation_id = cursor.lastrowid
|
|
|
|
# Update agent metrics
|
|
cursor.execute("""
|
|
UPDATE agent_metrics
|
|
SET consecutive_compliant = 0,
|
|
last_violation_at = ?,
|
|
promotion_eligible = 0,
|
|
updated_at = ?
|
|
WHERE agent_id = ?
|
|
""", (violation.timestamp, violation.timestamp, violation.agent_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return violation_id
|
|
|
|
def revoke_vault_token(self, agent_id: str) -> bool:
|
|
"""Revoke agent's Vault token"""
|
|
# In production, this would look up the agent's token accessor
|
|
# and revoke it via Vault API
|
|
|
|
# For now, set a revocation signal that the agent must respect
|
|
try:
|
|
result = subprocess.run([
|
|
"curl", "-sk", "-X", "POST",
|
|
"-H", f"X-Vault-Token: {self.vault_token}",
|
|
"-d", json.dumps({"revoked_at": self._now(), "agent_id": agent_id}),
|
|
f"https://127.0.0.1:8200/v1/secret/data/revocations/{agent_id}"
|
|
], capture_output=True, text=True)
|
|
|
|
return "errors" not in result.stdout
|
|
except:
|
|
return False
|
|
|
|
def revoke_dragonfly_access(self, agent_id: str) -> bool:
|
|
"""Revoke agent's access in DragonflyDB"""
|
|
if not self.redis:
|
|
return False
|
|
|
|
try:
|
|
# Set revocation signal
|
|
self.redis.set(f"agent:{agent_id}:revoke_signal", "1", ex=86400)
|
|
|
|
# Update agent state to REVOKED
|
|
state_key = f"agent:{agent_id}:state"
|
|
state_data = self.redis.get(state_key)
|
|
if state_data:
|
|
state = json.loads(state_data)
|
|
state["status"] = "REVOKED"
|
|
state["revoked_at"] = self._now()
|
|
self.redis.set(state_key, json.dumps(state))
|
|
|
|
# Release any locks
|
|
self.redis.delete(f"agent:{agent_id}:lock")
|
|
|
|
# Add to revocation ledger
|
|
revocation_event = {
|
|
"agent_id": agent_id,
|
|
"revoked_at": self._now(),
|
|
"reason": "VIOLATION"
|
|
}
|
|
self.redis.rpush("revocations:ledger", json.dumps(revocation_event))
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to revoke DragonflyDB access: {e}")
|
|
return False
|
|
|
|
def send_alert(self, violation: Violation):
|
|
"""Send alert for critical violations"""
|
|
# In production, this would integrate with PagerDuty, Slack, etc.
|
|
print(f"\n{'!'*60}")
|
|
print(f"CRITICAL ALERT: {violation.violation_type.value}")
|
|
print(f"{'!'*60}")
|
|
print(f"Agent: {violation.agent_id}")
|
|
print(f"Description: {violation.description}")
|
|
print(f"Severity: {violation.severity.value}")
|
|
print(f"Time: {violation.timestamp}")
|
|
print(f"{'!'*60}\n")
|
|
|
|
# Store alert for dashboard
|
|
if self.redis:
|
|
alert = {
|
|
"type": "VIOLATION",
|
|
"violation_type": violation.violation_type.value,
|
|
"agent_id": violation.agent_id,
|
|
"severity": violation.severity.value,
|
|
"message": violation.description,
|
|
"timestamp": violation.timestamp
|
|
}
|
|
self.redis.rpush("alerts:queue", json.dumps(alert))
|
|
self.redis.ltrim("alerts:queue", -100, -1) # Keep last 100 alerts
|
|
|
|
def process_violation(self, violation: Violation) -> RevocationResult:
|
|
"""Process a violation and take appropriate action"""
|
|
print(f"\n[VIOLATION] Processing: {violation.violation_type.value}")
|
|
print(f"[VIOLATION] Agent: {violation.agent_id}")
|
|
print(f"[VIOLATION] Severity: {violation.severity.value}")
|
|
|
|
# Record the violation
|
|
violation_id = self.record_violation(violation)
|
|
|
|
# Determine action based on severity
|
|
vault_revoked = False
|
|
dragonfly_revoked = False
|
|
action_taken = "RECORDED"
|
|
|
|
if violation.severity in [Severity.CRITICAL, Severity.HIGH]:
|
|
# Immediate revocation
|
|
print(f"[REVOKE] Initiating immediate revocation for {violation.agent_id}")
|
|
|
|
vault_revoked = self.revoke_vault_token(violation.agent_id)
|
|
dragonfly_revoked = self.revoke_dragonfly_access(violation.agent_id)
|
|
|
|
action_taken = "REVOKED"
|
|
|
|
if violation.severity == Severity.CRITICAL:
|
|
self.send_alert(violation)
|
|
|
|
elif violation.severity == Severity.MEDIUM:
|
|
# Check for prior warnings
|
|
conn = self._get_conn()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*) as count FROM violations
|
|
WHERE agent_id = ?
|
|
AND severity = 'medium'
|
|
AND datetime(timestamp) >= datetime('now', '-7 days')
|
|
""", (violation.agent_id,))
|
|
|
|
prior_warnings = cursor.fetchone()['count']
|
|
conn.close()
|
|
|
|
if prior_warnings >= 2: # This is the 3rd+ medium violation
|
|
print(f"[REVOKE] Multiple medium violations, revoking {violation.agent_id}")
|
|
vault_revoked = self.revoke_vault_token(violation.agent_id)
|
|
dragonfly_revoked = self.revoke_dragonfly_access(violation.agent_id)
|
|
action_taken = "REVOKED"
|
|
else:
|
|
print(f"[WARNING] Warning issued to {violation.agent_id}")
|
|
action_taken = "WARNING"
|
|
|
|
else: # LOW severity
|
|
print(f"[WARNING] Low severity violation recorded for {violation.agent_id}")
|
|
action_taken = "WARNING"
|
|
|
|
return RevocationResult(
|
|
agent_id=violation.agent_id,
|
|
success=True,
|
|
action_taken=action_taken,
|
|
violation=violation,
|
|
vault_revoked=vault_revoked,
|
|
dragonfly_revoked=dragonfly_revoked,
|
|
timestamp=self._now()
|
|
)
|
|
|
|
def create_violation(
|
|
self,
|
|
agent_id: str,
|
|
violation_type: ViolationType,
|
|
description: str,
|
|
triggering_action: str = "",
|
|
evidence: dict = None,
|
|
remediation: str = ""
|
|
) -> Violation:
|
|
"""Helper to create a violation object"""
|
|
return Violation(
|
|
agent_id=agent_id,
|
|
violation_type=violation_type,
|
|
severity=VIOLATION_SEVERITY[violation_type],
|
|
description=description,
|
|
triggering_action=triggering_action,
|
|
evidence=evidence or {},
|
|
timestamp=self._now(),
|
|
remediation=remediation
|
|
)
|
|
|
|
def get_violations(self, agent_id: str = None, severity: str = None,
|
|
limit: int = 50) -> list:
|
|
"""Get violation history"""
|
|
conn = self._get_conn()
|
|
cursor = conn.cursor()
|
|
|
|
query = "SELECT * FROM violations WHERE 1=1"
|
|
params = []
|
|
|
|
if agent_id:
|
|
query += " AND agent_id = ?"
|
|
params.append(agent_id)
|
|
|
|
if severity:
|
|
query += " AND severity = ?"
|
|
params.append(severity)
|
|
|
|
query += " ORDER BY timestamp DESC LIMIT ?"
|
|
params.append(limit)
|
|
|
|
cursor.execute(query, params)
|
|
rows = [dict(row) for row in cursor.fetchall()]
|
|
conn.close()
|
|
|
|
return rows
|
|
|
|
def get_active_revocations(self) -> list:
|
|
"""Get currently revoked agents from DragonflyDB"""
|
|
if not self.redis:
|
|
return []
|
|
|
|
revoked = []
|
|
keys = self.redis.keys("agent:*:state")
|
|
|
|
for key in keys:
|
|
data = self.redis.get(key)
|
|
if data:
|
|
state = json.loads(data)
|
|
if state.get("status") == "REVOKED":
|
|
revoked.append({
|
|
"agent_id": state.get("agent_id"),
|
|
"revoked_at": state.get("revoked_at"),
|
|
"notes": state.get("notes", "")
|
|
})
|
|
|
|
return revoked
|
|
|
|
def acknowledge_violation(self, violation_id: int, acknowledged_by: str) -> bool:
|
|
"""Acknowledge a violation (for remediation tracking)"""
|
|
conn = self._get_conn()
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
UPDATE violations
|
|
SET acknowledged = 1,
|
|
acknowledged_by = ?
|
|
WHERE id = ?
|
|
""", (acknowledged_by, violation_id))
|
|
|
|
success = cursor.rowcount > 0
|
|
conn.commit()
|
|
conn.close()
|
|
return success
|
|
|
|
|
|
# =============================================================================
|
|
# Violation Detectors
|
|
# =============================================================================
|
|
|
|
class ViolationDetector:
|
|
"""
|
|
Detects violations in real-time.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.engine = RevocationEngine()
|
|
|
|
def check_plan_artifact(self, agent_id: str, action: str, artifact_exists: bool) -> Optional[Violation]:
|
|
"""Check if apply/run has a corresponding plan artifact"""
|
|
if action in ["terraform_apply", "ansible_run"] and not artifact_exists:
|
|
return self.engine.create_violation(
|
|
agent_id=agent_id,
|
|
violation_type=ViolationType.APPLY_WITHOUT_PLAN,
|
|
description=f"Attempted {action} without required plan artifact",
|
|
triggering_action=action,
|
|
remediation="Always run plan/check before apply/run"
|
|
)
|
|
return None
|
|
|
|
def check_pool_authorization(self, agent_id: str, agent_tier: int,
|
|
target_pool: str) -> Optional[Violation]:
|
|
"""Check if agent is authorized for target pool"""
|
|
forbidden_pools = {
|
|
0: ["pve-sandbox", "pve-staging", "pve-prod"],
|
|
1: ["pve-staging", "pve-prod"],
|
|
2: ["pve-prod"],
|
|
3: [],
|
|
4: []
|
|
}
|
|
|
|
if target_pool in forbidden_pools.get(agent_tier, []):
|
|
return self.engine.create_violation(
|
|
agent_id=agent_id,
|
|
violation_type=ViolationType.UNAUTHORIZED_POOL,
|
|
description=f"Tier {agent_tier} agent accessed forbidden pool: {target_pool}",
|
|
evidence={"agent_tier": agent_tier, "target_pool": target_pool},
|
|
remediation="Request promotion or use authorized pools only"
|
|
)
|
|
return None
|
|
|
|
def check_production_access(self, agent_id: str, target: str,
|
|
has_approval: bool) -> Optional[Violation]:
|
|
"""Check if production access has gate approval"""
|
|
if "prod" in target.lower() and not has_approval:
|
|
return self.engine.create_violation(
|
|
agent_id=agent_id,
|
|
violation_type=ViolationType.UNAUTHORIZED_PROD,
|
|
description=f"Production access attempted without approval: {target}",
|
|
evidence={"target": target, "approval": False},
|
|
remediation="Request production access through gate approval process"
|
|
)
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# CLI
|
|
# =============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Revocation Engine")
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
# Report violation command
|
|
report_parser = subparsers.add_parser("report", help="Report a violation")
|
|
report_parser.add_argument("agent_id", help="Agent ID")
|
|
report_parser.add_argument("--type", required=True,
|
|
choices=[v.value for v in ViolationType],
|
|
help="Violation type")
|
|
report_parser.add_argument("--description", required=True, help="Description")
|
|
report_parser.add_argument("--action", default="", help="Triggering action")
|
|
|
|
# List violations command
|
|
list_parser = subparsers.add_parser("list", help="List violations")
|
|
list_parser.add_argument("--agent-id", help="Filter by agent")
|
|
list_parser.add_argument("--severity", choices=["critical", "high", "medium", "low"])
|
|
list_parser.add_argument("--limit", type=int, default=20)
|
|
list_parser.add_argument("--json", action="store_true")
|
|
|
|
# List revoked command
|
|
revoked_parser = subparsers.add_parser("revoked", help="List revoked agents")
|
|
revoked_parser.add_argument("--json", action="store_true")
|
|
|
|
# Acknowledge command
|
|
ack_parser = subparsers.add_parser("acknowledge", help="Acknowledge violation")
|
|
ack_parser.add_argument("violation_id", type=int)
|
|
ack_parser.add_argument("--by", required=True, help="Acknowledger name")
|
|
|
|
# Types command
|
|
types_parser = subparsers.add_parser("types", help="List violation types")
|
|
|
|
args = parser.parse_args()
|
|
|
|
engine = RevocationEngine()
|
|
|
|
if args.command == "report":
|
|
violation_type = ViolationType(args.type)
|
|
violation = engine.create_violation(
|
|
agent_id=args.agent_id,
|
|
violation_type=violation_type,
|
|
description=args.description,
|
|
triggering_action=args.action
|
|
)
|
|
|
|
result = engine.process_violation(violation)
|
|
|
|
print(f"\n{'='*60}")
|
|
print("VIOLATION PROCESSED")
|
|
print(f"{'='*60}")
|
|
print(f"Agent: {result.agent_id}")
|
|
print(f"Type: {result.violation.violation_type.value}")
|
|
print(f"Severity: {result.violation.severity.value}")
|
|
print(f"Action Taken: {result.action_taken}")
|
|
print(f"Vault Revoked: {result.vault_revoked}")
|
|
print(f"DragonflyDB Revoked: {result.dragonfly_revoked}")
|
|
print(f"{'='*60}")
|
|
|
|
elif args.command == "list":
|
|
violations = engine.get_violations(
|
|
agent_id=args.agent_id,
|
|
severity=args.severity,
|
|
limit=args.limit
|
|
)
|
|
|
|
if args.json:
|
|
print(json.dumps(violations, indent=2))
|
|
else:
|
|
print(f"\n{'='*60}")
|
|
print("VIOLATION HISTORY")
|
|
print(f"{'='*60}")
|
|
|
|
if not violations:
|
|
print("No violations found")
|
|
else:
|
|
for v in violations:
|
|
severity_icon = {
|
|
"critical": "[CRIT]",
|
|
"high": "[HIGH]",
|
|
"medium": "[MED]",
|
|
"low": "[LOW]"
|
|
}.get(v['severity'], "[?]")
|
|
|
|
ack = "[ACK]" if v['acknowledged'] else ""
|
|
print(f"\n{severity_icon} {v['violation_type']} {ack}")
|
|
print(f" Agent: {v['agent_id']}")
|
|
print(f" {v['description']}")
|
|
print(f" Time: {v['timestamp']}")
|
|
|
|
print(f"{'='*60}")
|
|
|
|
elif args.command == "revoked":
|
|
revoked = engine.get_active_revocations()
|
|
|
|
if args.json:
|
|
print(json.dumps(revoked, indent=2))
|
|
else:
|
|
print(f"\n{'='*60}")
|
|
print("CURRENTLY REVOKED AGENTS")
|
|
print(f"{'='*60}")
|
|
|
|
if not revoked:
|
|
print("No agents currently revoked")
|
|
else:
|
|
for r in revoked:
|
|
print(f"\n {r['agent_id']}")
|
|
print(f" Revoked: {r['revoked_at']}")
|
|
if r['notes']:
|
|
print(f" Notes: {r['notes']}")
|
|
|
|
print(f"{'='*60}")
|
|
|
|
elif args.command == "acknowledge":
|
|
success = engine.acknowledge_violation(args.violation_id, args.by)
|
|
if success:
|
|
print(f"[OK] Violation {args.violation_id} acknowledged by {args.by}")
|
|
else:
|
|
print(f"[FAILED] Could not acknowledge violation {args.violation_id}")
|
|
sys.exit(1)
|
|
|
|
elif args.command == "types":
|
|
print(f"\n{'='*60}")
|
|
print("VIOLATION TYPES")
|
|
print(f"{'='*60}")
|
|
|
|
for vtype in ViolationType:
|
|
severity = VIOLATION_SEVERITY[vtype]
|
|
print(f"\n {vtype.value}")
|
|
print(f" Severity: {severity.value}")
|
|
|
|
print(f"{'='*60}")
|