Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ services:
timeout: 10s
retries: 10

otel-collector:
image: otel/opentelemetry-collector-contrib:0.112.0
container_name: rootpilot-otel-collector
ports:
- "4318:4318"
- "4317:4317"
- "8888:8888"
- "8889:8889"
environment:
- OTEL_LOG_LEVEL=debug
volumes:
- ./infrastructure/monitoring/otel/otel-collector.yml:/etc/otelcol-contrib/config.yaml

volumes:
rootpilot-postgres-data:
rootpilot-elasticsearch-data:
1 change: 1 addition & 0 deletions infrastructure/monitoring/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Monitoring infrastructure — OTel, Prometheus, and Grafana adapters."""
21 changes: 21 additions & 0 deletions infrastructure/monitoring/otel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""OpenTelemetry tracer provider implementation."""

from infrastructure.monitoring.otel.instrumentation import (
OpenTelemetryMiddleware,
setup_tracing,
get_trace_context,
)
from infrastructure.monitoring.otel.otel_tracer_provider import (
OTelSpan,
OTelTracer,
OTelTracerProvider,
)

__all__ = [
"OTelSpan",
"OTelTracer",
"OTelTracerProvider",
"OpenTelemetryMiddleware",
"setup_tracing",
"get_trace_context",
]
111 changes: 111 additions & 0 deletions infrastructure/monitoring/otel/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Tracing setup utilities and ASGI middleware for FastAPI services."""

import logging
from collections.abc import Awaitable, Callable
from typing import Any

from fastapi import FastAPI
from opentelemetry import trace as otel_trace
from starlette.types import ASGIApp, Message, Receive, Scope, Send

from infrastructure.monitoring.otel.otel_tracer_provider import (
OTelTracerProvider,
_from_otel_span_context,
)
from shared.config import BaseAppSettings
from shared.observability.tracing import SpanContext, SpanKind

logger = logging.getLogger(__name__)


def setup_tracing(
app: FastAPI,
settings: BaseAppSettings,
endpoint: str | None = None,
) -> OTelTracerProvider:
provider = OTelTracerProvider(
service_name=settings.resolved_otel_service_name,
endpoint=endpoint or settings.otel_exporter_otlp_endpoint,
)
app.state.tracer_provider = provider
app.state.tracer = provider.get_tracer(
name=settings.resolved_otel_service_name,
version="0.1.0",
)
logger.info(
"Tracing initialised",
extra={
"service_name": settings.resolved_otel_service_name,
"otlp_endpoint": endpoint or settings.otel_exporter_otlp_endpoint,
},
)
return provider


def get_trace_context() -> SpanContext | None:
current_span = otel_trace.get_current_span()
octx = current_span.get_span_context()
if octx.trace_id == 0:
return None
return _from_otel_span_context(octx)


class OpenTelemetryMiddleware:
def __init__(self, app: ASGIApp) -> None:
self.app = app

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return

provider: OTelTracerProvider | None = getattr(
getattr(self.app, "state", None), "tracer_provider", None
)
if provider is None:
await self.app(scope, receive, send)
return

headers = _build_header_dict(scope.get("headers", []))
parent_ctx = provider.extract(headers)

tracer = provider.get_tracer("rootpilot-http")
span = tracer.start_span(
name=f"{scope.get('method', 'UNKNOWN')} {scope.get('path', '/')}",
context=parent_ctx,
kind=SpanKind.SERVER,
attributes={
"http.method": scope.get("method", "UNKNOWN"),
"http.url": scope.get("path", "/"),
"http.scheme": scope.get("scheme", "http"),
"http.host": _get_header(headers, "host", ""),
"http.user_agent": _get_header(headers, "user-agent", ""),
},
)

async def _send_wrapper(message: Message) -> None:
if message.get("type") == "http.response.start":
status = message.get("status", 0)
span.set_attribute("http.status_code", status)
if status >= 500:
span.set_status(2, f"HTTP {status}")
else:
span.set_status(1)
await send(message)

try:
await self.app(scope, receive, _send_wrapper)
except BaseException as exc:
span.set_status(2, str(exc))
span.add_event("exception", {"exception.message": str(exc)})
raise
finally:
span.end()


def _build_header_dict(raw_headers: list[tuple[bytes, bytes]]) -> dict[str, str]:
return {k.decode("utf-8", errors="replace"): v.decode("utf-8", errors="replace") for k, v in raw_headers}


def _get_header(headers: dict[str, str], key: str, default: str = "") -> str:
return headers.get(key, headers.get(key.replace("-", "_"), default))
35 changes: 35 additions & 0 deletions infrastructure/monitoring/otel/otel-collector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318

exporters:
debug:
verbosity: detailed
otlp:
endpoint: "localhost:4317"
tls:
insecure: true

processors:
batch:
timeout: 1s
send_batch_size: 1024

service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [debug]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug]
logs:
receivers: [otlp]
processors: [batch]
exporters: [debug]
172 changes: 172 additions & 0 deletions infrastructure/monitoring/otel/otel_tracer_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Concrete OpenTelemetry implementation of the TracerProvider abstraction."""

from typing import Any

from opentelemetry import context as otel_context
from opentelemetry import trace as otel_trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.propagators.composite import CompositeHTTPPropagator
from opentelemetry.propagators.textmap import Setter, TextMapPropagator
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import Span as OTelSDKSpan
from opentelemetry.sdk.trace import TracerProvider as OTelSDKTracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import NonRecordingSpan, SpanContext as OTelSpanContext
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from shared.observability.tracing import Span, SpanContext, SpanKind, SpanStatus, Tracer, TracerProvider

_OTEL_KIND_MAP: dict[SpanKind, otel_trace.SpanKind] = {
SpanKind.INTERNAL: otel_trace.SpanKind.INTERNAL,
SpanKind.SERVER: otel_trace.SpanKind.SERVER,
SpanKind.CLIENT: otel_trace.SpanKind.CLIENT,
SpanKind.PRODUCER: otel_trace.SpanKind.PRODUCER,
SpanKind.CONSUMER: otel_trace.SpanKind.CONSUMER,
}

_OTEL_STATUS_MAP: dict[int, otel_trace.StatusCode] = {
0: otel_trace.StatusCode.UNSET,
1: otel_trace.StatusCode.OK,
2: otel_trace.StatusCode.ERROR,
}


def _to_otel_kind(kind: SpanKind) -> otel_trace.SpanKind:
return _OTEL_KIND_MAP.get(kind, otel_trace.SpanKind.INTERNAL)


def _to_otel_status_code(code: int) -> otel_trace.StatusCode:
return _OTEL_STATUS_MAP.get(code, otel_trace.StatusCode.UNSET)


def _to_span_context(sc: SpanContext) -> OTelSpanContext | None:
try:
trace_id = int(sc.trace_id, 16)
span_id = int(sc.span_id, 16)
except ValueError:
return None
return OTelSpanContext(
trace_id=trace_id,
span_id=span_id,
is_remote=False,
trace_flags=otel_trace.TraceFlags(sc.trace_flags),
)


def _from_otel_span_context(octx: OTelSpanContext) -> SpanContext:
return SpanContext(
trace_id=format(octx.trace_id, "032x"),
span_id=format(octx.span_id, "016x"),
trace_flags=octx.trace_flags,
)


class _DictSetter(Setter[dict[str, str]]):
def set(self, carrier: dict[str, str], key: str, value: str) -> None:
carrier[key] = value


class OTelSpan(Span):
def __init__(self, otel_span_instance: otel_trace.Span) -> None:
self._span = otel_span_instance

def set_attribute(self, key: str, value: str | bool | float | int) -> None:
self._span.set_attribute(key, value)

def set_status(self, status: SpanStatus | int, description: str | None = None) -> None:
if isinstance(status, SpanStatus):
sc = _to_otel_status_code(int(status.status_code))
self._span.set_status(otel_trace.Status(status_code=sc, description=status.description))
else:
sc = _to_otel_status_code(status)
self._span.set_status(otel_trace.Status(status_code=sc, description=description or ""))

def add_event(self, name: str, attributes: dict[str, Any] | None = None) -> None:
self._span.add_event(name, attributes or {})

def end(self) -> None:
self._span.end()

@property
def context(self) -> SpanContext:
octx = self._span.get_span_context()
return _from_otel_span_context(octx)


class OTelTracer(Tracer):
def __init__(self, otel_tracer_instance: otel_trace.Tracer) -> None:
self._tracer = otel_tracer_instance

def start_span(
self,
name: str,
context: SpanContext | None = None,
kind: SpanKind = SpanKind.INTERNAL,
attributes: dict[str, Any] | None = None,
) -> Span:
otel_ctx: otel_context.Context | None = None
if context is not None:
parent_octx = _to_span_context(context)
if parent_octx is not None:
otel_ctx = otel_trace.set_span_in_context(
NonRecordingSpan(parent_octx)
)

otel_span = self._tracer.start_span(
name=name,
context=otel_ctx,
kind=_to_otel_kind(kind),
attributes=attributes,
)

return OTelSpan(otel_span)


class OTelTracerProvider(TracerProvider):
def __init__(
self,
service_name: str,
endpoint: str | None = None,
) -> None:
resource = Resource.create({"service.name": service_name})
sdk_provider = OTelSDKTracerProvider(resource=resource)

if endpoint:
exporter = OTLPSpanExporter(endpoint=endpoint)
span_processor = BatchSpanProcessor(exporter)
sdk_provider.add_span_processor(span_processor)

self._provider = sdk_provider
self._propagator: TextMapPropagator = CompositeHTTPPropagator([
TraceContextTextMapPropagator(),
])

otel_trace.set_tracer_provider(sdk_provider)

def get_tracer(self, name: str, version: str | None = None) -> Tracer:
otel_tracer = self._provider.get_tracer(name, version or "")
return OTelTracer(otel_tracer)

def inject(self, context: SpanContext, headers: dict[str, str]) -> dict[str, str]:
octx = _to_span_context(context)
if octx is None:
return headers
carrier: dict[str, str] = {}
otel_ctx = otel_trace.set_span_in_context(NonRecordingSpan(octx))
self._propagator.inject(carrier, context=otel_ctx, setter=_DictSetter())
headers.update(carrier)
return headers

def extract(self, headers: dict[str, str]) -> SpanContext | None:
otel_ctx = self._propagator.extract(carrier=headers)
otel_span = otel_trace.get_current_span(otel_ctx)
octx = otel_span.get_span_context()
if octx.trace_id == 0:
return None
return _from_otel_span_context(octx)

async def force_flush(self) -> None:
self._provider.force_flush()

async def shutdown(self) -> None:
self._provider.shutdown()
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ dependencies = [
"openai",
"aio-pika>=9.5,<10",
"elasticsearch>=8.15,<9",
"opentelemetry-api>=1.28,<2",
"opentelemetry-sdk>=1.28,<2",
"opentelemetry-exporter-otlp-proto-http>=1.28,<2",
"opentelemetry-propagator-aws-xray>=1.0,<2",
]

[project.optional-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions services/ai-investigation-service/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ pydantic
pydantic-settings
langgraph
openai
opentelemetry-api>=1.28,<2
opentelemetry-sdk>=1.28,<2
opentelemetry-exporter-otlp-proto-http>=1.28,<2
Loading
Loading