#!/usr/bin/env python3 """ Phase 12: Observability Tests ============================= Validates metrics, tracing, and logging systems. """ import json import sqlite3 import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) DB_PATH = Path("/opt/agent-governance/ledger/governance.db") def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn # ============================================================================= # Metrics Tests # ============================================================================= def test_metrics_module_exists(): """Verify metrics module exists""" metrics_path = Path("/opt/agent-governance/observability/metrics.py") assert metrics_path.exists(), "Metrics module not found" content = metrics_path.read_text() assert "Counter" in content, "Counter class not found" assert "Gauge" in content, "Gauge class not found" assert "Histogram" in content, "Histogram class not found" assert "to_prometheus" in content, "Prometheus export not found" print("[PASS] metrics module exists with Counter, Gauge, Histogram") def test_metrics_registry_exists(): """Verify metrics registry can be imported""" try: from observability.metrics import registry assert registry is not None, "Registry is None" print("[PASS] metrics registry can be imported") except ImportError as e: raise AssertionError(f"Failed to import metrics registry: {e}") def test_counter_increment(): """Test counter increment functionality""" from observability.metrics import Counter counter = Counter(name="test_counter", help="Test counter", labels=["status"]) counter.inc({"status": "success"}) counter.inc({"status": "success"}) counter.inc({"status": "error"}) assert counter.values[("success",)] == 2, "Counter increment failed" assert counter.values[("error",)] == 1, "Counter increment failed" print("[PASS] counter increment works") def test_gauge_set(): """Test gauge set functionality""" from observability.metrics import Gauge gauge = Gauge(name="test_gauge", help="Test gauge", labels=["component"]) gauge.set(42, {"component": "database"}) gauge.set(100, {"component": "cache"}) assert gauge.values[("database",)] == 42, "Gauge set failed" assert gauge.values[("cache",)] == 100, "Gauge set failed" print("[PASS] gauge set works") def test_histogram_observe(): """Test histogram observe functionality""" from observability.metrics import Histogram hist = Histogram( name="test_histogram", help="Test histogram", labels=["endpoint"], buckets=[0.1, 0.5, 1.0, 5.0] ) hist.observe(0.05, {"endpoint": "/api"}) hist.observe(0.3, {"endpoint": "/api"}) hist.observe(2.0, {"endpoint": "/api"}) observations = hist.observations[("/api",)] assert len(observations) == 3, "Histogram observe failed" assert 0.05 in observations, "Missing observation" print("[PASS] histogram observe works") def test_prometheus_format(): """Test Prometheus format output""" from observability.metrics import Counter counter = Counter(name="http_requests_total", help="Total HTTP requests", labels=["method", "status"]) counter.inc({"method": "GET", "status": "200"}, 10) output = counter.to_prometheus() assert "# HELP http_requests_total" in output, "Missing HELP line" assert "# TYPE http_requests_total counter" in output, "Missing TYPE line" assert 'method="GET"' in output, "Missing label" assert "10" in output, "Missing value" print("[PASS] Prometheus format output works") # ============================================================================= # Tracing Tests # ============================================================================= def test_tracing_module_exists(): """Verify tracing module exists""" tracing_path = Path("/opt/agent-governance/observability/tracing.py") assert tracing_path.exists(), "Tracing module not found" content = tracing_path.read_text() assert "Span" in content, "Span class not found" assert "Trace" in content, "Trace class not found" assert "Tracer" in content, "Tracer class not found" print("[PASS] tracing module exists with Span, Trace, Tracer") def test_tracer_creation(): """Test tracer creation""" from observability.tracing import Tracer tracer = Tracer(service_name="test-service") assert tracer.service_name == "test-service", "Service name not set" print("[PASS] tracer creation works") def test_span_creation(): """Test span creation and timing""" from observability.tracing import Tracer tracer = Tracer() with tracer.trace("test_operation") as span: time.sleep(0.01) # Small delay span.set_attribute("test_key", "test_value") assert span.trace_id is not None, "Trace ID not set" assert span.span_id is not None, "Span ID not set" assert span.end_time is not None, "End time not set" assert span.duration_ms > 0, "Duration should be positive" assert span.attributes.get("test_key") == "test_value", "Attribute not set" print("[PASS] span creation and timing works") def test_nested_spans(): """Test nested span relationships""" from observability.tracing import Tracer tracer = Tracer() with tracer.trace("parent_operation") as parent: with tracer.span("child_operation") as child: child.set_attribute("level", "child") assert child.parent_span_id == parent.span_id, "Parent-child relationship not set" assert child.trace_id == parent.trace_id, "Trace ID mismatch" print("[PASS] nested spans with parent-child relationships work") def test_span_error_handling(): """Test span error recording""" from observability.tracing import Tracer tracer = Tracer() try: with tracer.trace("failing_operation") as span: raise ValueError("Test error") except ValueError: pass assert span.status == "error", "Span status should be error" assert "ValueError" in span.attributes.get("error.type", ""), "Error type not recorded" print("[PASS] span error handling works") def test_trace_context_propagation(): """Test trace context thread-local storage""" from observability.tracing import Tracer, get_current_trace_id tracer = Tracer() with tracer.trace("context_test"): trace_id = get_current_trace_id() assert trace_id is not None, "Trace ID not in context" # After trace ends, should be None assert get_current_trace_id() is None, "Trace ID should be cleared" print("[PASS] trace context propagation works") def test_traces_table_creation(): """Test that traces table gets created""" from observability.tracing import TraceStorage storage = TraceStorage() conn = get_db() cursor = conn.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='traces' """) row = cursor.fetchone() conn.close() assert row is not None, "traces table not created" print("[PASS] traces table exists") # ============================================================================= # Logging Tests # ============================================================================= def test_logging_module_exists(): """Verify logging module exists""" logging_path = Path("/opt/agent-governance/observability/logging.py") assert logging_path.exists(), "Logging module not found" content = logging_path.read_text() assert "LogEntry" in content, "LogEntry class not found" assert "LogStorage" in content, "LogStorage class not found" assert "get_logger" in content, "get_logger function not found" print("[PASS] logging module exists with LogEntry, LogStorage, get_logger") def test_logger_creation(): """Test logger creation""" from observability.logging import get_logger logger = get_logger("test_module") assert logger is not None, "Logger not created" assert logger.name == "test_module", "Logger name not set" print("[PASS] logger creation works") def test_log_entry_structure(): """Test log entry data structure""" from observability.logging import LogEntry entry = LogEntry( timestamp="2026-01-24T12:00:00Z", level="INFO", logger="test", message="Test message", trace_id="trace-123", attributes={"key": "value"} ) d = entry.to_dict() assert d["level"] == "INFO", "Level not set" assert d["message"] == "Test message", "Message not set" assert d["trace_id"] == "trace-123", "Trace ID not set" assert d["key"] == "value", "Attributes not merged" print("[PASS] log entry structure works") def test_log_entry_json_serialization(): """Test log entry JSON serialization""" from observability.logging import LogEntry entry = LogEntry( timestamp="2026-01-24T12:00:00Z", level="ERROR", logger="test", message="Error occurred" ) json_str = entry.to_json() parsed = json.loads(json_str) assert parsed["level"] == "ERROR", "JSON serialization failed" print("[PASS] log entry JSON serialization works") def test_logs_table_creation(): """Test that logs table gets created""" from observability.logging import LogStorage storage = LogStorage() conn = get_db() cursor = conn.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='logs' """) row = cursor.fetchone() conn.close() assert row is not None, "logs table not created" print("[PASS] logs table exists") def test_log_storage_save_and_search(): """Test log storage save and search""" from observability.logging import LogStorage, LogEntry storage = LogStorage() # Save a test log entry = LogEntry( timestamp="2026-01-24T12:00:00Z", level="INFO", logger="test_storage", message="Test log entry for search", tenant_id="default" ) storage.save(entry) # Search for it logs = storage.search(logger="test_storage", limit=10) # Should find at least one log found = any("Test log entry for search" in log.get("message", "") for log in logs) assert found, "Log entry not found in search" print("[PASS] log storage save and search works") # ============================================================================= # Integration Tests # ============================================================================= def test_tracing_with_logging(): """Test trace correlation in logs""" from observability.tracing import Tracer, get_current_trace_id tracer = Tracer() with tracer.trace("correlated_operation") as span: trace_id = get_current_trace_id() assert trace_id is not None, "No trace ID in context" assert trace_id == span.trace_id, "Trace ID mismatch" print("[PASS] tracing and logging correlation works") def test_observability_package_import(): """Test that observability package imports cleanly""" try: from observability import ( registry, get_tracer, get_logger, record_agent_execution, record_violation ) assert registry is not None print("[PASS] observability package imports cleanly") except ImportError as e: raise AssertionError(f"Failed to import observability package: {e}") def run_all_tests(): """Run all Phase 12 tests""" print("=" * 60) print("PHASE 12: OBSERVABILITY TESTS") print("=" * 60) print() tests = [ # Metrics tests test_metrics_module_exists, test_metrics_registry_exists, test_counter_increment, test_gauge_set, test_histogram_observe, test_prometheus_format, # Tracing tests test_tracing_module_exists, test_tracer_creation, test_span_creation, test_nested_spans, test_span_error_handling, test_trace_context_propagation, test_traces_table_creation, # Logging tests test_logging_module_exists, test_logger_creation, test_log_entry_structure, test_log_entry_json_serialization, test_logs_table_creation, test_log_storage_save_and_search, # Integration tests test_tracing_with_logging, test_observability_package_import, ] passed = 0 failed = 0 for test in tests: try: test() passed += 1 except AssertionError as e: print(f"[FAIL] {test.__name__}: {e}") failed += 1 except Exception as e: print(f"[ERROR] {test.__name__}: {e}") failed += 1 print() print("=" * 60) print(f"RESULTS: {passed} passed, {failed} failed") print("=" * 60) return failed == 0 if __name__ == "__main__": success = run_all_tests() sys.exit(0 if success else 1)