#!/usr/bin/env python3 """ Distributed Tracing Module ========================== Provides trace ID propagation and span tracking for agent operations. Usage: from observability.tracing import Tracer, get_current_trace tracer = Tracer() with tracer.span("agent_execution", agent_id="agent-001") as span: # Do work span.set_attribute("result", "success") with tracer.span("sub_operation") as child: # Child span automatically linked to parent pass """ import json import secrets import sqlite3 import threading import time from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, List, Any # ============================================================================= # Configuration # ============================================================================= DB_PATH = Path("/opt/agent-governance/ledger/governance.db") # Thread-local storage for current trace context _trace_context = threading.local() # ============================================================================= # Data Classes # ============================================================================= @dataclass class Span: """A single span in a trace""" trace_id: str span_id: str parent_span_id: Optional[str] operation_name: str service_name: str = "agent-governance" start_time: float = field(default_factory=time.time) end_time: Optional[float] = None duration_ms: Optional[float] = None status: str = "ok" # ok, error attributes: Dict[str, Any] = field(default_factory=dict) events: List[Dict[str, Any]] = field(default_factory=list) def set_attribute(self, key: str, value: Any): """Set span attribute""" self.attributes[key] = value def add_event(self, name: str, attributes: Dict[str, Any] = None): """Add an event to the span""" self.events.append({ "name": name, "timestamp": time.time(), "attributes": attributes or {} }) def set_error(self, error: Exception): """Mark span as error with exception details""" self.status = "error" self.attributes["error.type"] = type(error).__name__ self.attributes["error.message"] = str(error) def finish(self): """Finish the span""" self.end_time = time.time() self.duration_ms = (self.end_time - self.start_time) * 1000 def to_dict(self) -> dict: """Convert to dictionary""" return { "trace_id": self.trace_id, "span_id": self.span_id, "parent_span_id": self.parent_span_id, "operation_name": self.operation_name, "service_name": self.service_name, "start_time": datetime.fromtimestamp(self.start_time, tz=timezone.utc).isoformat(), "end_time": datetime.fromtimestamp(self.end_time, tz=timezone.utc).isoformat() if self.end_time else None, "duration_ms": self.duration_ms, "status": self.status, "attributes": self.attributes, "events": self.events } @dataclass class Trace: """A complete trace consisting of multiple spans""" trace_id: str root_span_id: str spans: List[Span] = field(default_factory=list) start_time: float = field(default_factory=time.time) end_time: Optional[float] = None def add_span(self, span: Span): """Add a span to the trace""" self.spans.append(span) def finish(self): """Finish the trace""" self.end_time = time.time() def to_dict(self) -> dict: """Convert to dictionary""" return { "trace_id": self.trace_id, "root_span_id": self.root_span_id, "span_count": len(self.spans), "start_time": datetime.fromtimestamp(self.start_time, tz=timezone.utc).isoformat(), "end_time": datetime.fromtimestamp(self.end_time, tz=timezone.utc).isoformat() if self.end_time else None, "duration_ms": (self.end_time - self.start_time) * 1000 if self.end_time else None, "spans": [s.to_dict() for s in self.spans] } # ============================================================================= # Trace Storage # ============================================================================= class TraceStorage: """Stores traces in SQLite and Redis for querying""" def __init__(self): self._ensure_table() def _ensure_table(self): """Create traces table if not exists""" try: conn = sqlite3.connect(DB_PATH) conn.execute(""" CREATE TABLE IF NOT EXISTS traces ( trace_id TEXT PRIMARY KEY, root_operation TEXT, service_name TEXT, start_time TEXT, end_time TEXT, duration_ms REAL, span_count INTEGER, status TEXT, spans TEXT, -- JSON tenant_id TEXT DEFAULT 'default', created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_traces_time ON traces(start_time) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_traces_tenant ON traces(tenant_id) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_traces_operation ON traces(root_operation) """) conn.commit() conn.close() except Exception as e: print(f"Error creating traces table: {e}") def save(self, trace: Trace, tenant_id: str = "default"): """Save trace to storage""" try: conn = sqlite3.connect(DB_PATH) # Find root span root_span = next((s for s in trace.spans if s.span_id == trace.root_span_id), None) root_operation = root_span.operation_name if root_span else "unknown" status = "error" if any(s.status == "error" for s in trace.spans) else "ok" conn.execute(""" INSERT OR REPLACE INTO traces (trace_id, root_operation, service_name, start_time, end_time, duration_ms, span_count, status, spans, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( trace.trace_id, root_operation, "agent-governance", datetime.fromtimestamp(trace.start_time, tz=timezone.utc).isoformat(), datetime.fromtimestamp(trace.end_time, tz=timezone.utc).isoformat() if trace.end_time else None, (trace.end_time - trace.start_time) * 1000 if trace.end_time else None, len(trace.spans), status, json.dumps([s.to_dict() for s in trace.spans]), tenant_id )) conn.commit() conn.close() except Exception as e: print(f"Error saving trace: {e}") def get(self, trace_id: str) -> Optional[dict]: """Get trace by ID""" try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.execute(""" SELECT * FROM traces WHERE trace_id = ? """, (trace_id,)) row = cursor.fetchone() conn.close() if row: result = dict(row) result["spans"] = json.loads(result["spans"]) if result.get("spans") else [] return result return None except Exception as e: print(f"Error getting trace: {e}") return None def search( self, operation: str = None, status: str = None, min_duration_ms: float = None, tenant_id: str = None, start_after: str = None, limit: int = 100 ) -> List[dict]: """Search traces""" try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row query = "SELECT trace_id, root_operation, start_time, duration_ms, span_count, status FROM traces WHERE 1=1" params = [] if operation: query += " AND root_operation LIKE ?" params.append(f"%{operation}%") if status: query += " AND status = ?" params.append(status) if min_duration_ms: query += " AND duration_ms >= ?" params.append(min_duration_ms) if tenant_id: query += " AND tenant_id = ?" params.append(tenant_id) if start_after: query += " AND start_time > ?" params.append(start_after) query += " ORDER BY start_time DESC LIMIT ?" params.append(limit) cursor = conn.execute(query, params) results = [dict(row) for row in cursor.fetchall()] conn.close() return results except Exception as e: print(f"Error searching traces: {e}") return [] # ============================================================================= # Tracer # ============================================================================= class Tracer: """Main tracer class for creating and managing traces""" def __init__(self, service_name: str = "agent-governance", tenant_id: str = "default"): self.service_name = service_name self.tenant_id = tenant_id self.storage = TraceStorage() self._current_trace: Optional[Trace] = None self._span_stack: List[Span] = [] def _generate_id(self, length: int = 16) -> str: """Generate random ID""" return secrets.token_hex(length) def start_trace(self, operation_name: str, **attributes) -> Trace: """Start a new trace""" trace_id = self._generate_id(16) span_id = self._generate_id(8) root_span = Span( trace_id=trace_id, span_id=span_id, parent_span_id=None, operation_name=operation_name, service_name=self.service_name, attributes=attributes ) trace = Trace(trace_id=trace_id, root_span_id=span_id) trace.add_span(root_span) self._current_trace = trace self._span_stack = [root_span] # Store in thread-local _trace_context.trace = trace _trace_context.current_span = root_span return trace def start_span(self, operation_name: str, **attributes) -> Span: """Start a new span within current trace""" if not self._current_trace: # Auto-start trace if none exists self.start_trace(operation_name, **attributes) return self._span_stack[-1] parent_span = self._span_stack[-1] if self._span_stack else None span = Span( trace_id=self._current_trace.trace_id, span_id=self._generate_id(8), parent_span_id=parent_span.span_id if parent_span else None, operation_name=operation_name, service_name=self.service_name, attributes=attributes ) self._current_trace.add_span(span) self._span_stack.append(span) _trace_context.current_span = span return span def end_span(self, span: Span = None): """End a span""" if span is None and self._span_stack: span = self._span_stack[-1] if span: span.finish() if self._span_stack and self._span_stack[-1] == span: self._span_stack.pop() # Update current span in context if self._span_stack: _trace_context.current_span = self._span_stack[-1] def end_trace(self): """End the current trace and save it""" if self._current_trace: # End any remaining spans while self._span_stack: self._span_stack[-1].finish() self._span_stack.pop() self._current_trace.finish() self.storage.save(self._current_trace, self.tenant_id) self._current_trace = None _trace_context.trace = None _trace_context.current_span = None @contextmanager def span(self, operation_name: str, **attributes): """Context manager for spans""" span = self.start_span(operation_name, **attributes) try: yield span except Exception as e: span.set_error(e) raise finally: self.end_span(span) @contextmanager def trace(self, operation_name: str, **attributes): """Context manager for complete traces""" self.start_trace(operation_name, **attributes) root_span = self._span_stack[0] if self._span_stack else None try: yield root_span except Exception as e: if root_span: root_span.set_error(e) raise finally: self.end_trace() def get_current_span(self) -> Optional[Span]: """Get current span""" return getattr(_trace_context, 'current_span', None) def get_current_trace_id(self) -> Optional[str]: """Get current trace ID""" trace = getattr(_trace_context, 'trace', None) return trace.trace_id if trace else None # ============================================================================= # Global Tracer # ============================================================================= _default_tracer: Optional[Tracer] = None def get_tracer(service_name: str = "agent-governance", tenant_id: str = "default") -> Tracer: """Get or create default tracer""" global _default_tracer if _default_tracer is None: _default_tracer = Tracer(service_name, tenant_id) return _default_tracer def get_current_trace_id() -> Optional[str]: """Get current trace ID from context""" trace = getattr(_trace_context, 'trace', None) return trace.trace_id if trace else None def get_current_span() -> Optional[Span]: """Get current span from context""" return getattr(_trace_context, 'current_span', None) # ============================================================================= # FastAPI Router # ============================================================================= from fastapi import APIRouter, Query, HTTPException router = APIRouter(prefix="/traces", tags=["Tracing"]) @router.get("") async def list_traces( operation: Optional[str] = None, status: Optional[str] = None, min_duration_ms: Optional[float] = None, limit: int = Query(50, le=200) ): """List recent traces""" storage = TraceStorage() traces = storage.search( operation=operation, status=status, min_duration_ms=min_duration_ms, limit=limit ) return {"traces": traces, "count": len(traces)} @router.get("/{trace_id}") async def get_trace(trace_id: str): """Get trace details""" storage = TraceStorage() trace = storage.get(trace_id) if not trace: raise HTTPException(status_code=404, detail="Trace not found") return trace # ============================================================================= # Trace ID Propagation Headers # ============================================================================= TRACE_ID_HEADER = "X-Trace-ID" SPAN_ID_HEADER = "X-Span-ID" PARENT_SPAN_HEADER = "X-Parent-Span-ID" def extract_trace_context(headers: dict) -> dict: """Extract trace context from HTTP headers""" return { "trace_id": headers.get(TRACE_ID_HEADER), "span_id": headers.get(SPAN_ID_HEADER), "parent_span_id": headers.get(PARENT_SPAN_HEADER) } def inject_trace_context(headers: dict, span: Span): """Inject trace context into HTTP headers""" headers[TRACE_ID_HEADER] = span.trace_id headers[SPAN_ID_HEADER] = span.span_id if span.parent_span_id: headers[PARENT_SPAN_HEADER] = span.parent_span_id