diff --git a/docker-compose.yml b/docker-compose.yml index d809ac8..ed6053a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,6 +48,19 @@ services: timeout: 10s retries: 10 + otel-collector: + image: otel/opentelemetry-collector-contrib:0.112.0 + container_name: rootpilot-otel-collector + ports: + - "4318:4318" + - "4317:4317" + - "8888:8888" + - "8889:8889" + environment: + - OTEL_LOG_LEVEL=debug + volumes: + - ./infrastructure/monitoring/otel/otel-collector.yml:/etc/otelcol-contrib/config.yaml + volumes: rootpilot-postgres-data: rootpilot-elasticsearch-data: diff --git a/infrastructure/monitoring/__init__.py b/infrastructure/monitoring/__init__.py new file mode 100644 index 0000000..65aff4f --- /dev/null +++ b/infrastructure/monitoring/__init__.py @@ -0,0 +1 @@ +"""Monitoring infrastructure — OTel, Prometheus, and Grafana adapters.""" diff --git a/infrastructure/monitoring/otel/__init__.py b/infrastructure/monitoring/otel/__init__.py new file mode 100644 index 0000000..e882d24 --- /dev/null +++ b/infrastructure/monitoring/otel/__init__.py @@ -0,0 +1,21 @@ +"""OpenTelemetry tracer provider implementation.""" + +from infrastructure.monitoring.otel.instrumentation import ( + OpenTelemetryMiddleware, + setup_tracing, + get_trace_context, +) +from infrastructure.monitoring.otel.otel_tracer_provider import ( + OTelSpan, + OTelTracer, + OTelTracerProvider, +) + +__all__ = [ + "OTelSpan", + "OTelTracer", + "OTelTracerProvider", + "OpenTelemetryMiddleware", + "setup_tracing", + "get_trace_context", +] diff --git a/infrastructure/monitoring/otel/instrumentation.py b/infrastructure/monitoring/otel/instrumentation.py new file mode 100644 index 0000000..07f1c24 --- /dev/null +++ b/infrastructure/monitoring/otel/instrumentation.py @@ -0,0 +1,111 @@ +"""Tracing setup utilities and ASGI middleware for FastAPI services.""" + +import logging +from collections.abc import Awaitable, Callable +from typing import Any + +from fastapi import FastAPI +from opentelemetry import trace as otel_trace +from starlette.types import ASGIApp, Message, Receive, Scope, Send + +from infrastructure.monitoring.otel.otel_tracer_provider import ( + OTelTracerProvider, + _from_otel_span_context, +) +from shared.config import BaseAppSettings +from shared.observability.tracing import SpanContext, SpanKind + +logger = logging.getLogger(__name__) + + +def setup_tracing( + app: FastAPI, + settings: BaseAppSettings, + endpoint: str | None = None, +) -> OTelTracerProvider: + provider = OTelTracerProvider( + service_name=settings.resolved_otel_service_name, + endpoint=endpoint or settings.otel_exporter_otlp_endpoint, + ) + app.state.tracer_provider = provider + app.state.tracer = provider.get_tracer( + name=settings.resolved_otel_service_name, + version="0.1.0", + ) + logger.info( + "Tracing initialised", + extra={ + "service_name": settings.resolved_otel_service_name, + "otlp_endpoint": endpoint or settings.otel_exporter_otlp_endpoint, + }, + ) + return provider + + +def get_trace_context() -> SpanContext | None: + current_span = otel_trace.get_current_span() + octx = current_span.get_span_context() + if octx.trace_id == 0: + return None + return _from_otel_span_context(octx) + + +class OpenTelemetryMiddleware: + def __init__(self, app: ASGIApp) -> None: + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + provider: OTelTracerProvider | None = getattr( + getattr(self.app, "state", None), "tracer_provider", None + ) + if provider is None: + await self.app(scope, receive, send) + return + + headers = _build_header_dict(scope.get("headers", [])) + parent_ctx = provider.extract(headers) + + tracer = provider.get_tracer("rootpilot-http") + span = tracer.start_span( + name=f"{scope.get('method', 'UNKNOWN')} {scope.get('path', '/')}", + context=parent_ctx, + kind=SpanKind.SERVER, + attributes={ + "http.method": scope.get("method", "UNKNOWN"), + "http.url": scope.get("path", "/"), + "http.scheme": scope.get("scheme", "http"), + "http.host": _get_header(headers, "host", ""), + "http.user_agent": _get_header(headers, "user-agent", ""), + }, + ) + + async def _send_wrapper(message: Message) -> None: + if message.get("type") == "http.response.start": + status = message.get("status", 0) + span.set_attribute("http.status_code", status) + if status >= 500: + span.set_status(2, f"HTTP {status}") + else: + span.set_status(1) + await send(message) + + try: + await self.app(scope, receive, _send_wrapper) + except BaseException as exc: + span.set_status(2, str(exc)) + span.add_event("exception", {"exception.message": str(exc)}) + raise + finally: + span.end() + + +def _build_header_dict(raw_headers: list[tuple[bytes, bytes]]) -> dict[str, str]: + return {k.decode("utf-8", errors="replace"): v.decode("utf-8", errors="replace") for k, v in raw_headers} + + +def _get_header(headers: dict[str, str], key: str, default: str = "") -> str: + return headers.get(key, headers.get(key.replace("-", "_"), default)) diff --git a/infrastructure/monitoring/otel/otel-collector.yml b/infrastructure/monitoring/otel/otel-collector.yml new file mode 100644 index 0000000..814917f --- /dev/null +++ b/infrastructure/monitoring/otel/otel-collector.yml @@ -0,0 +1,35 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + debug: + verbosity: detailed + otlp: + endpoint: "localhost:4317" + tls: + insecure: true + +processors: + batch: + timeout: 1s + send_batch_size: 1024 + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug] + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] diff --git a/infrastructure/monitoring/otel/otel_tracer_provider.py b/infrastructure/monitoring/otel/otel_tracer_provider.py new file mode 100644 index 0000000..368e4ed --- /dev/null +++ b/infrastructure/monitoring/otel/otel_tracer_provider.py @@ -0,0 +1,172 @@ +"""Concrete OpenTelemetry implementation of the TracerProvider abstraction.""" + +from typing import Any + +from opentelemetry import context as otel_context +from opentelemetry import trace as otel_trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.propagators.composite import CompositeHTTPPropagator +from opentelemetry.propagators.textmap import Setter, TextMapPropagator +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import Span as OTelSDKSpan +from opentelemetry.sdk.trace import TracerProvider as OTelSDKTracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import NonRecordingSpan, SpanContext as OTelSpanContext +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +from shared.observability.tracing import Span, SpanContext, SpanKind, SpanStatus, Tracer, TracerProvider + +_OTEL_KIND_MAP: dict[SpanKind, otel_trace.SpanKind] = { + SpanKind.INTERNAL: otel_trace.SpanKind.INTERNAL, + SpanKind.SERVER: otel_trace.SpanKind.SERVER, + SpanKind.CLIENT: otel_trace.SpanKind.CLIENT, + SpanKind.PRODUCER: otel_trace.SpanKind.PRODUCER, + SpanKind.CONSUMER: otel_trace.SpanKind.CONSUMER, +} + +_OTEL_STATUS_MAP: dict[int, otel_trace.StatusCode] = { + 0: otel_trace.StatusCode.UNSET, + 1: otel_trace.StatusCode.OK, + 2: otel_trace.StatusCode.ERROR, +} + + +def _to_otel_kind(kind: SpanKind) -> otel_trace.SpanKind: + return _OTEL_KIND_MAP.get(kind, otel_trace.SpanKind.INTERNAL) + + +def _to_otel_status_code(code: int) -> otel_trace.StatusCode: + return _OTEL_STATUS_MAP.get(code, otel_trace.StatusCode.UNSET) + + +def _to_span_context(sc: SpanContext) -> OTelSpanContext | None: + try: + trace_id = int(sc.trace_id, 16) + span_id = int(sc.span_id, 16) + except ValueError: + return None + return OTelSpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=otel_trace.TraceFlags(sc.trace_flags), + ) + + +def _from_otel_span_context(octx: OTelSpanContext) -> SpanContext: + return SpanContext( + trace_id=format(octx.trace_id, "032x"), + span_id=format(octx.span_id, "016x"), + trace_flags=octx.trace_flags, + ) + + +class _DictSetter(Setter[dict[str, str]]): + def set(self, carrier: dict[str, str], key: str, value: str) -> None: + carrier[key] = value + + +class OTelSpan(Span): + def __init__(self, otel_span_instance: otel_trace.Span) -> None: + self._span = otel_span_instance + + def set_attribute(self, key: str, value: str | bool | float | int) -> None: + self._span.set_attribute(key, value) + + def set_status(self, status: SpanStatus | int, description: str | None = None) -> None: + if isinstance(status, SpanStatus): + sc = _to_otel_status_code(int(status.status_code)) + self._span.set_status(otel_trace.Status(status_code=sc, description=status.description)) + else: + sc = _to_otel_status_code(status) + self._span.set_status(otel_trace.Status(status_code=sc, description=description or "")) + + def add_event(self, name: str, attributes: dict[str, Any] | None = None) -> None: + self._span.add_event(name, attributes or {}) + + def end(self) -> None: + self._span.end() + + @property + def context(self) -> SpanContext: + octx = self._span.get_span_context() + return _from_otel_span_context(octx) + + +class OTelTracer(Tracer): + def __init__(self, otel_tracer_instance: otel_trace.Tracer) -> None: + self._tracer = otel_tracer_instance + + def start_span( + self, + name: str, + context: SpanContext | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: dict[str, Any] | None = None, + ) -> Span: + otel_ctx: otel_context.Context | None = None + if context is not None: + parent_octx = _to_span_context(context) + if parent_octx is not None: + otel_ctx = otel_trace.set_span_in_context( + NonRecordingSpan(parent_octx) + ) + + otel_span = self._tracer.start_span( + name=name, + context=otel_ctx, + kind=_to_otel_kind(kind), + attributes=attributes, + ) + + return OTelSpan(otel_span) + + +class OTelTracerProvider(TracerProvider): + def __init__( + self, + service_name: str, + endpoint: str | None = None, + ) -> None: + resource = Resource.create({"service.name": service_name}) + sdk_provider = OTelSDKTracerProvider(resource=resource) + + if endpoint: + exporter = OTLPSpanExporter(endpoint=endpoint) + span_processor = BatchSpanProcessor(exporter) + sdk_provider.add_span_processor(span_processor) + + self._provider = sdk_provider + self._propagator: TextMapPropagator = CompositeHTTPPropagator([ + TraceContextTextMapPropagator(), + ]) + + otel_trace.set_tracer_provider(sdk_provider) + + def get_tracer(self, name: str, version: str | None = None) -> Tracer: + otel_tracer = self._provider.get_tracer(name, version or "") + return OTelTracer(otel_tracer) + + def inject(self, context: SpanContext, headers: dict[str, str]) -> dict[str, str]: + octx = _to_span_context(context) + if octx is None: + return headers + carrier: dict[str, str] = {} + otel_ctx = otel_trace.set_span_in_context(NonRecordingSpan(octx)) + self._propagator.inject(carrier, context=otel_ctx, setter=_DictSetter()) + headers.update(carrier) + return headers + + def extract(self, headers: dict[str, str]) -> SpanContext | None: + otel_ctx = self._propagator.extract(carrier=headers) + otel_span = otel_trace.get_current_span(otel_ctx) + octx = otel_span.get_span_context() + if octx.trace_id == 0: + return None + return _from_otel_span_context(octx) + + async def force_flush(self) -> None: + self._provider.force_flush() + + async def shutdown(self) -> None: + self._provider.shutdown() diff --git a/pyproject.toml b/pyproject.toml index 32daacd..fcc748f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,10 @@ dependencies = [ "openai", "aio-pika>=9.5,<10", "elasticsearch>=8.15,<9", + "opentelemetry-api>=1.28,<2", + "opentelemetry-sdk>=1.28,<2", + "opentelemetry-exporter-otlp-proto-http>=1.28,<2", + "opentelemetry-propagator-aws-xray>=1.0,<2", ] [project.optional-dependencies] diff --git a/services/ai-investigation-service/requirements.txt b/services/ai-investigation-service/requirements.txt index 993008f..06ca932 100644 --- a/services/ai-investigation-service/requirements.txt +++ b/services/ai-investigation-service/requirements.txt @@ -4,3 +4,6 @@ pydantic pydantic-settings langgraph openai +opentelemetry-api>=1.28,<2 +opentelemetry-sdk>=1.28,<2 +opentelemetry-exporter-otlp-proto-http>=1.28,<2 diff --git a/services/correlation-service/app/main.py b/services/correlation-service/app/main.py index 70f2601..a3d8f8f 100644 --- a/services/correlation-service/app/main.py +++ b/services/correlation-service/app/main.py @@ -6,6 +6,7 @@ from app.config import CorrelationServiceSettings from app.routers import correlation, health, timeline +from infrastructure.monitoring.otel import OpenTelemetryMiddleware, setup_tracing from shared.config import load_settings from shared.domain.correlation.engine import CorrelationEngine from shared.domain.timeline.services import TimelineReconstructor @@ -23,6 +24,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: format="%(levelname)s\t%(name)s\t%(message)s", ) + setup_tracing(app, settings) + app.state.reconstructor = TimelineReconstructor( window_duration_seconds=settings.timeline_window_duration, ) @@ -41,6 +44,7 @@ def create_app() -> FastAPI: version="0.1.0", lifespan=lifespan, ) + app.add_middleware(OpenTelemetryMiddleware) app.include_router(health.router) app.include_router(timeline.router) app.include_router(correlation.router) diff --git a/services/correlation-service/requirements.txt b/services/correlation-service/requirements.txt index b4ecae1..b350f03 100644 --- a/services/correlation-service/requirements.txt +++ b/services/correlation-service/requirements.txt @@ -2,3 +2,6 @@ fastapi uvicorn[standard] pydantic pydantic-settings +opentelemetry-api>=1.28,<2 +opentelemetry-sdk>=1.28,<2 +opentelemetry-exporter-otlp-proto-http>=1.28,<2 diff --git a/services/gateway-service/requirements.txt b/services/gateway-service/requirements.txt index b4ecae1..b350f03 100644 --- a/services/gateway-service/requirements.txt +++ b/services/gateway-service/requirements.txt @@ -2,3 +2,6 @@ fastapi uvicorn[standard] pydantic pydantic-settings +opentelemetry-api>=1.28,<2 +opentelemetry-sdk>=1.28,<2 +opentelemetry-exporter-otlp-proto-http>=1.28,<2 diff --git a/services/incident-service/requirements.txt b/services/incident-service/requirements.txt index b4ecae1..b350f03 100644 --- a/services/incident-service/requirements.txt +++ b/services/incident-service/requirements.txt @@ -2,3 +2,6 @@ fastapi uvicorn[standard] pydantic pydantic-settings +opentelemetry-api>=1.28,<2 +opentelemetry-sdk>=1.28,<2 +opentelemetry-exporter-otlp-proto-http>=1.28,<2 diff --git a/services/ingestion-service/app/main.py b/services/ingestion-service/app/main.py index 72a33d3..7fdc63c 100644 --- a/services/ingestion-service/app/main.py +++ b/services/ingestion-service/app/main.py @@ -6,6 +6,7 @@ from app.config import IngestionServiceSettings from app.routers import health, ingest +from infrastructure.monitoring.otel import OpenTelemetryMiddleware, setup_tracing from infrastructure.rabbitmq import RabbitMQConfig, RabbitMQEventBus from shared.config import load_settings @@ -22,6 +23,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: format="%(levelname)s\t%(name)s\t%(message)s", ) + setup_tracing(app, settings) + config = RabbitMQConfig(url=settings.event_bus_url) event_bus = RabbitMQEventBus(config=config) try: @@ -44,6 +47,7 @@ def create_app() -> FastAPI: version="0.1.0", lifespan=lifespan, ) + app.add_middleware(OpenTelemetryMiddleware) app.include_router(health.router) app.include_router(ingest.router) return app diff --git a/services/ingestion-service/requirements.txt b/services/ingestion-service/requirements.txt index 3032ef9..cfe9a0c 100644 --- a/services/ingestion-service/requirements.txt +++ b/services/ingestion-service/requirements.txt @@ -4,3 +4,6 @@ pydantic pydantic-settings aio-pika>=9.5,<10 elasticsearch>=8.15,<9 +opentelemetry-api>=1.28,<2 +opentelemetry-sdk>=1.28,<2 +opentelemetry-exporter-otlp-proto-http>=1.28,<2 diff --git a/shared/contracts/events/__init__.py b/shared/contracts/events/__init__.py index 521a4e7..0b4fbb5 100644 --- a/shared/contracts/events/__init__.py +++ b/shared/contracts/events/__init__.py @@ -5,6 +5,7 @@ from shared.contracts.events.incident import IncidentDetectedEvent from shared.contracts.events.investigation import InvestigationRequestedEvent from shared.contracts.events.telemetry import TelemetryEvent +from shared.observability.tracing.models import SpanContext as TraceContext __all__ = [ "Event", @@ -13,5 +14,6 @@ "Severity", "ServiceName", "TelemetryEvent", + "TraceContext", ] diff --git a/shared/contracts/events/base.py b/shared/contracts/events/base.py index ea69752..432e808 100644 --- a/shared/contracts/events/base.py +++ b/shared/contracts/events/base.py @@ -6,6 +6,8 @@ from pydantic import BaseModel, Field +from shared.observability.tracing.models import SpanContext + def _new_id() -> str: return uuid4().hex @@ -20,3 +22,7 @@ class Event(BaseModel): default_factory=lambda: datetime.now(timezone.utc), description="When the event was created (UTC).", ) + trace_context: SpanContext | None = Field( + default=None, + description="Carried trace context for distributed tracing propagation.", + ) diff --git a/shared/observability/__init__.py b/shared/observability/__init__.py new file mode 100644 index 0000000..5d9265b --- /dev/null +++ b/shared/observability/__init__.py @@ -0,0 +1 @@ +"""Observability abstractions — tracing, logging, and metrics interfaces.""" diff --git a/shared/observability/tracing/__init__.py b/shared/observability/tracing/__init__.py new file mode 100644 index 0000000..2ca8f25 --- /dev/null +++ b/shared/observability/tracing/__init__.py @@ -0,0 +1,13 @@ +"""Tracing provider abstraction for provider-agnostic distributed tracing.""" + +from shared.observability.tracing.models import SpanContext, SpanKind, SpanStatus +from shared.observability.tracing.provider import Span, Tracer, TracerProvider + +__all__ = [ + "SpanContext", + "SpanKind", + "SpanStatus", + "Span", + "Tracer", + "TracerProvider", +] diff --git a/shared/observability/tracing/models.py b/shared/observability/tracing/models.py new file mode 100644 index 0000000..8fb2424 --- /dev/null +++ b/shared/observability/tracing/models.py @@ -0,0 +1,35 @@ +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel, Field + + +class SpanContext(BaseModel): + """Serializable trace context for propagation across service boundaries.""" + + trace_id: str = Field(description="32-character hex-encoded trace ID.") + span_id: str = Field(description="16-character hex-encoded span ID.") + trace_flags: int = Field(default=1, ge=0, le=255, description="W3C trace flags byte.") + + +class SpanKind(StrEnum): + INTERNAL = "internal" + SERVER = "server" + CLIENT = "client" + PRODUCER = "producer" + CONSUMER = "consumer" + + +SpanStatusValue = int | str + + +class SpanStatus: + """Status representation for a span.""" + + UNSET: SpanStatusValue = 0 + OK: SpanStatusValue = 1 + ERROR: SpanStatusValue = 2 + + def __init__(self, status_code: SpanStatusValue, description: str = "") -> None: + self.status_code = status_code + self.description = description diff --git a/shared/observability/tracing/provider.py b/shared/observability/tracing/provider.py new file mode 100644 index 0000000..f2d85a7 --- /dev/null +++ b/shared/observability/tracing/provider.py @@ -0,0 +1,67 @@ +from abc import ABC, abstractmethod +from typing import Any + +from shared.observability.tracing.models import SpanContext, SpanKind, SpanStatus + + +class Span(ABC): + """Represents a single unit of work within a distributed trace.""" + + @abstractmethod + def set_attribute(self, key: str, value: str | bool | float | int) -> None: + ... + + @abstractmethod + def set_status(self, status: SpanStatus | int, description: str | None = None) -> None: + ... + + @abstractmethod + def add_event(self, name: str, attributes: dict[str, Any] | None = None) -> None: + ... + + @abstractmethod + def end(self) -> None: + ... + + @property + @abstractmethod + def context(self) -> SpanContext: + ... + + +class Tracer(ABC): + """Creates spans and manages trace context.""" + + @abstractmethod + def start_span( + self, + name: str, + context: SpanContext | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: dict[str, Any] | None = None, + ) -> Span: + ... + + +class TracerProvider(ABC): + """Provider-agnostic entry point for distributed tracing.""" + + @abstractmethod + def get_tracer(self, name: str, version: str | None = None) -> Tracer: + ... + + @abstractmethod + def inject(self, context: SpanContext, headers: dict[str, str]) -> dict[str, str]: + ... + + @abstractmethod + def extract(self, headers: dict[str, str]) -> SpanContext | None: + ... + + @abstractmethod + async def force_flush(self) -> None: + ... + + @abstractmethod + async def shutdown(self) -> None: + ...