#!/usr/bin/env python3 """ Prometheus Metrics Module ========================= Exposes metrics in Prometheus format for monitoring agent governance system. Metrics: - agent_executions_total - Total agent executions by tier and status - agent_execution_duration_seconds - Execution duration histogram - agent_violations_total - Violations by type and severity - agent_promotions_total - Promotions by tier transition - api_requests_total - API requests by endpoint and status - tenant_quota_usage - Quota usage by tenant and resource - system_health - Component health status """ import sqlite3 import time from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, List from dataclasses import dataclass, field from contextlib import contextmanager import threading # ============================================================================= # Configuration # ============================================================================= DB_PATH = Path("/opt/agent-governance/ledger/governance.db") # ============================================================================= # Metric Types # ============================================================================= @dataclass class Counter: """Prometheus counter metric""" name: str help: str labels: List[str] = field(default_factory=list) values: Dict[tuple, float] = field(default_factory=dict) _lock: threading.Lock = field(default_factory=threading.Lock) def inc(self, labels: dict = None, value: float = 1): """Increment counter""" key = tuple(labels.values()) if labels else () with self._lock: self.values[key] = self.values.get(key, 0) + value def to_prometheus(self) -> str: """Format as Prometheus text""" lines = [f"# HELP {self.name} {self.help}", f"# TYPE {self.name} counter"] for key, value in self.values.items(): if self.labels and key: label_str = ",".join(f'{l}="{v}"' for l, v in zip(self.labels, key)) lines.append(f"{self.name}{{{label_str}}} {value}") else: lines.append(f"{self.name} {value}") return "\n".join(lines) @dataclass class Gauge: """Prometheus gauge metric""" name: str help: str labels: List[str] = field(default_factory=list) values: Dict[tuple, float] = field(default_factory=dict) _lock: threading.Lock = field(default_factory=threading.Lock) def set(self, value: float, labels: dict = None): """Set gauge value""" key = tuple(labels.values()) if labels else () with self._lock: self.values[key] = value def inc(self, labels: dict = None, value: float = 1): """Increment gauge""" key = tuple(labels.values()) if labels else () with self._lock: self.values[key] = self.values.get(key, 0) + value def dec(self, labels: dict = None, value: float = 1): """Decrement gauge""" key = tuple(labels.values()) if labels else () with self._lock: self.values[key] = self.values.get(key, 0) - value def to_prometheus(self) -> str: """Format as Prometheus text""" lines = [f"# HELP {self.name} {self.help}", f"# TYPE {self.name} gauge"] for key, value in self.values.items(): if self.labels and key: label_str = ",".join(f'{l}="{v}"' for l, v in zip(self.labels, key)) lines.append(f"{self.name}{{{label_str}}} {value}") else: lines.append(f"{self.name} {value}") return "\n".join(lines) @dataclass class Histogram: """Prometheus histogram metric""" name: str help: str labels: List[str] = field(default_factory=list) buckets: List[float] = field(default_factory=lambda: [0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60]) observations: Dict[tuple, List[float]] = field(default_factory=dict) _lock: threading.Lock = field(default_factory=threading.Lock) def observe(self, value: float, labels: dict = None): """Record observation""" key = tuple(labels.values()) if labels else () with self._lock: if key not in self.observations: self.observations[key] = [] self.observations[key].append(value) def to_prometheus(self) -> str: """Format as Prometheus text""" lines = [f"# HELP {self.name} {self.help}", f"# TYPE {self.name} histogram"] for key, values in self.observations.items(): label_str = ",".join(f'{l}="{v}"' for l, v in zip(self.labels, key)) if self.labels and key else "" prefix = f"{self.name}{{{label_str}," if label_str else f"{self.name}{{le=" # Bucket counts for bucket in self.buckets: count = sum(1 for v in values if v <= bucket) if label_str: lines.append(f'{self.name}_bucket{{{label_str},le="{bucket}"}} {count}') else: lines.append(f'{self.name}_bucket{{le="{bucket}"}} {count}') # +Inf bucket if label_str: lines.append(f'{self.name}_bucket{{{label_str},le="+Inf"}} {len(values)}') else: lines.append(f'{self.name}_bucket{{le="+Inf"}} {len(values)}') # Sum and count total = sum(values) if label_str: lines.append(f"{self.name}_sum{{{label_str}}} {total}") lines.append(f"{self.name}_count{{{label_str}}} {len(values)}") else: lines.append(f"{self.name}_sum {total}") lines.append(f"{self.name}_count {len(values)}") return "\n".join(lines) # ============================================================================= # Metrics Registry # ============================================================================= class MetricsRegistry: """Central registry for all metrics""" def __init__(self): # Agent execution metrics self.agent_executions_total = Counter( name="agent_executions_total", help="Total number of agent executions", labels=["tier", "action", "status"] ) self.agent_execution_duration = Histogram( name="agent_execution_duration_seconds", help="Agent execution duration in seconds", labels=["tier", "action"], buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300] ) self.agent_active = Gauge( name="agent_active_count", help="Number of currently active agents", labels=["tier"] ) # Violation metrics self.violations_total = Counter( name="agent_violations_total", help="Total number of violations", labels=["type", "severity"] ) # Promotion metrics self.promotions_total = Counter( name="agent_promotions_total", help="Total number of promotions", labels=["from_tier", "to_tier"] ) # API metrics self.api_requests_total = Counter( name="api_requests_total", help="Total API requests", labels=["method", "endpoint", "status"] ) self.api_request_duration = Histogram( name="api_request_duration_seconds", help="API request duration in seconds", labels=["method", "endpoint"], buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] ) # Tenant/quota metrics self.tenant_quota_usage = Gauge( name="tenant_quota_usage_ratio", help="Tenant quota usage as ratio (0-1)", labels=["tenant_id", "resource"] ) self.tenant_api_calls = Counter( name="tenant_api_calls_total", help="API calls by tenant", labels=["tenant_id"] ) # System health metrics self.component_health = Gauge( name="component_health", help="Component health status (1=healthy, 0=unhealthy)", labels=["component"] ) self.db_connections = Gauge( name="db_connections_active", help="Active database connections" ) # Marketplace metrics self.template_downloads = Counter( name="marketplace_template_downloads_total", help="Template downloads", labels=["template_id", "category"] ) self.template_installs = Gauge( name="marketplace_template_installs_active", help="Active template installations", labels=["category"] ) # Orchestration metrics self.orchestration_requests = Counter( name="orchestration_requests_total", help="Orchestration requests by model", labels=["model", "status"] ) self.orchestration_tokens = Counter( name="orchestration_tokens_total", help="Tokens used in orchestration", labels=["model"] ) # Uptime self._start_time = time.time() self.uptime = Gauge( name="governance_uptime_seconds", help="System uptime in seconds" ) def collect_from_db(self): """Collect metrics from database""" try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # Agent counts by tier cursor = conn.execute(""" SELECT current_tier, COUNT(*) as count FROM agent_metrics GROUP BY current_tier """) for row in cursor.fetchall(): self.agent_active.set(row["count"], {"tier": str(row["current_tier"])}) # Recent violations by type/severity cursor = conn.execute(""" SELECT violation_type, severity, COUNT(*) as count FROM violations WHERE timestamp > datetime('now', '-1 hour') GROUP BY violation_type, severity """) for row in cursor.fetchall(): self.violations_total.values[(row["violation_type"], row["severity"])] = row["count"] # Promotions cursor = conn.execute(""" SELECT from_tier, to_tier, COUNT(*) as count FROM promotions GROUP BY from_tier, to_tier """) for row in cursor.fetchall(): self.promotions_total.values[(str(row["from_tier"]), str(row["to_tier"]))] = row["count"] # Tenant usage cursor = conn.execute(""" SELECT tu.tenant_id, tu.api_calls, tu.tokens_used, tq.max_api_calls_per_day, tq.max_tokens_per_day FROM tenant_usage tu JOIN tenant_quotas tq ON tu.tenant_id = tq.tenant_id WHERE tu.period_start = date('now') """) for row in cursor.fetchall(): if row["max_api_calls_per_day"] > 0: ratio = row["api_calls"] / row["max_api_calls_per_day"] self.tenant_quota_usage.set(ratio, {"tenant_id": row["tenant_id"], "resource": "api_calls"}) if row["max_tokens_per_day"] > 0: ratio = row["tokens_used"] / row["max_tokens_per_day"] self.tenant_quota_usage.set(ratio, {"tenant_id": row["tenant_id"], "resource": "tokens"}) # Marketplace stats cursor = conn.execute(""" SELECT category, SUM(active_installs) as installs FROM template_stats ts JOIN agent_templates t ON ts.template_id = t.template_id GROUP BY category """) for row in cursor.fetchall(): self.template_installs.set(row["installs"] or 0, {"category": row["category"]}) conn.close() except Exception as e: print(f"Error collecting DB metrics: {e}") def check_health(self): """Check component health""" import subprocess # Database try: conn = sqlite3.connect(DB_PATH) conn.execute("SELECT 1") conn.close() self.component_health.set(1, {"component": "database"}) except: self.component_health.set(0, {"component": "database"}) # Vault try: result = subprocess.run( ["docker", "exec", "vault", "vault", "status", "-format=json"], capture_output=True, timeout=5 ) self.component_health.set(1 if result.returncode == 0 else 0, {"component": "vault"}) except: self.component_health.set(0, {"component": "vault"}) # DragonflyDB try: import redis r = redis.Redis(host='127.0.0.1', port=6379, password='governance2026') r.ping() self.component_health.set(1, {"component": "dragonfly"}) except: self.component_health.set(0, {"component": "dragonfly"}) # Uptime self.uptime.set(time.time() - self._start_time) def to_prometheus(self) -> str: """Export all metrics in Prometheus format""" self.collect_from_db() self.check_health() metrics = [ self.agent_executions_total, self.agent_execution_duration, self.agent_active, self.violations_total, self.promotions_total, self.api_requests_total, self.api_request_duration, self.tenant_quota_usage, self.tenant_api_calls, self.component_health, self.db_connections, self.template_downloads, self.template_installs, self.orchestration_requests, self.orchestration_tokens, self.uptime, ] output = [] for metric in metrics: if metric.values or isinstance(metric, Histogram) and metric.observations: output.append(metric.to_prometheus()) elif not metric.values and not (isinstance(metric, Histogram) and metric.observations): # Include empty metrics with help text output.append(f"# HELP {metric.name} {metric.help}") output.append(f"# TYPE {metric.name} {type(metric).__name__.lower()}") return "\n\n".join(output) # ============================================================================= # Global Registry # ============================================================================= registry = MetricsRegistry() # ============================================================================= # FastAPI Router # ============================================================================= from fastapi import APIRouter, Response router = APIRouter(tags=["Observability"]) @router.get("/metrics") async def get_metrics(): """Prometheus metrics endpoint""" return Response( content=registry.to_prometheus(), media_type="text/plain; version=0.0.4; charset=utf-8" ) @router.get("/health/detailed") async def detailed_health(): """Detailed health check with all components""" import subprocess health = { "status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat(), "uptime_seconds": time.time() - registry._start_time, "components": {} } # Database try: conn = sqlite3.connect(DB_PATH) cursor = conn.execute("SELECT COUNT(*) FROM agent_metrics") count = cursor.fetchone()[0] conn.close() health["components"]["database"] = {"status": "healthy", "agent_count": count} except Exception as e: health["components"]["database"] = {"status": "unhealthy", "error": str(e)} health["status"] = "degraded" # Vault try: result = subprocess.run( ["docker", "exec", "vault", "vault", "status", "-format=json"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: import json vault_status = json.loads(result.stdout) health["components"]["vault"] = { "status": "healthy", "sealed": vault_status.get("sealed", True), "version": vault_status.get("version", "unknown") } else: health["components"]["vault"] = {"status": "unhealthy", "error": "non-zero exit"} health["status"] = "degraded" except Exception as e: health["components"]["vault"] = {"status": "unhealthy", "error": str(e)} health["status"] = "degraded" # DragonflyDB try: import redis r = redis.Redis(host='127.0.0.1', port=6379, password='governance2026') info = r.info() health["components"]["dragonfly"] = { "status": "healthy", "connected_clients": info.get("connected_clients", 0), "used_memory_human": info.get("used_memory_human", "unknown") } except Exception as e: health["components"]["dragonfly"] = {"status": "unhealthy", "error": str(e)} health["status"] = "degraded" return health # ============================================================================= # Middleware for request metrics # ============================================================================= class MetricsMiddleware: """Middleware to collect API request metrics""" def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return start_time = time.time() path = scope.get("path", "unknown") method = scope.get("method", "unknown") # Normalize path (remove IDs) import re normalized_path = re.sub(r'/[a-f0-9-]{8,}', '/{id}', path) normalized_path = re.sub(r'/\d+', '/{id}', normalized_path) status_code = 500 async def send_wrapper(message): nonlocal status_code if message["type"] == "http.response.start": status_code = message["status"] await send(message) try: await self.app(scope, receive, send_wrapper) finally: duration = time.time() - start_time # Record metrics registry.api_requests_total.inc({ "method": method, "endpoint": normalized_path, "status": str(status_code) }) registry.api_request_duration.observe(duration, { "method": method, "endpoint": normalized_path }) # ============================================================================= # Helper functions for use in other modules # ============================================================================= def record_agent_execution(tier: int, action: str, success: bool, duration: float): """Record agent execution metrics""" registry.agent_executions_total.inc({ "tier": str(tier), "action": action, "status": "success" if success else "error" }) registry.agent_execution_duration.observe(duration, { "tier": str(tier), "action": action }) def record_violation(violation_type: str, severity: str): """Record violation metric""" registry.violations_total.inc({ "type": violation_type, "severity": severity }) def record_promotion(from_tier: int, to_tier: int): """Record promotion metric""" registry.promotions_total.inc({ "from_tier": str(from_tier), "to_tier": str(to_tier) }) def record_orchestration(model: str, success: bool, tokens: int = 0): """Record orchestration metrics""" registry.orchestration_requests.inc({ "model": model, "status": "success" if success else "error" }) if tokens > 0: registry.orchestration_tokens.inc({"model": model}, tokens) def record_template_download(template_id: str, category: str): """Record template download""" registry.template_downloads.inc({ "template_id": template_id, "category": category })