Skip to content
Merged
54 changes: 53 additions & 1 deletion litellm/integrations/otel/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import OrderedDict
from contextlib import contextmanager
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterator, Mapping, cast
from typing import TYPE_CHECKING, Any, Callable, Iterator, Mapping, Sequence, cast

from opentelemetry.context import attach, get_current
from opentelemetry.sdk.trace import TracerProvider
Expand Down Expand Up @@ -546,6 +546,58 @@ def create_litellm_proxy_request_started_span(
return span


def select_global_otel_v2_logger(
in_memory_loggers: Sequence[object],
registered: "OpenTelemetryV2 | None" = None,
) -> "OpenTelemetryV2":
"""The single ``OpenTelemetryV2`` whose provider should become the OTel global.

The callback factory designates one logger as canonical the moment it builds
the first one (``_init_otel_logger_on_litellm_proxy`` sets
``proxy_server.open_telemetry_logger``), and every other v2 entry point —
guardrail, identity seeding, phase spans — already routes through that same
``registered`` owner. Reuse it here too so the global provider has one source
of truth instead of a second, independently-derived guess; this is the logger
a preset (arize, langfuse, …) folds the ``OTEL_*`` base exporter and its own
exporter into, so the FastAPI server span and the gen-ai spans share one
provider and one trace.

Fall back to ``in_memory_loggers`` for the SDK path, where no proxy global is
set (selecting from there, not ``service_callback``, which a preset logger does
not always reach), and build a generic logger from ``OTEL_*`` only when none was
configured at all. Each fallback still avoids the second generic logger that
orphaned the gen-ai spans onto a different backend than the server span.
"""
if registered is not None:
return registered
existing = next(
(cb for cb in in_memory_loggers if isinstance(cb, OpenTelemetryV2)), None
)
Comment on lines +573 to +575

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant the callback factory just publish the provider?

return existing if existing is not None else OpenTelemetryV2()


def publish_global_otel_v2_provider(
in_memory_loggers: Sequence[object],
set_global_provider: Callable[[TracerProvider], None],
registered: "OpenTelemetryV2 | None" = None,
) -> "OpenTelemetryV2":
"""Select the single v2 logger and publish its provider as the OTel global.

The proxy calls this once at startup, after callbacks are initialized, so the
preset logger already exists; it passes ``registered`` (the canonical owner the
factory designated as ``proxy_server.open_telemetry_logger``) so the global
provider reuses the same logger the rest of the v2 code emits through (see
:func:`select_global_otel_v2_logger`). Both ``registered`` and
``set_global_provider`` (the proxy passes
``opentelemetry.trace.set_tracer_provider``) are injected so the publish step is
unit-testable without reading or mutating real global OTel state. Returns the
logger whose provider was published.
"""
logger = select_global_otel_v2_logger(in_memory_loggers, registered=registered)
set_global_provider(logger._tracer_provider)
return logger


def _registered_v2_logger() -> "OpenTelemetryV2 | None":
try:
from litellm.proxy import proxy_server
Expand Down
27 changes: 27 additions & 0 deletions litellm/integrations/otel/model/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Typed configuration for the OpenTelemetry instrumentation."""

from enum import Enum
from typing import Any, List

from pydantic import AliasChoices, BaseModel, Field, field_validator, model_validator
Expand All @@ -23,6 +24,23 @@ class CaptureMessageContent(str):
SPAN_AND_EVENT = "span_and_event"


class ExporterOwner(str, Enum):
"""The preset that contributed an exporter. Values match the callback names
in ``presets.PRESET_BY_CALLBACK`` so per-request dynamic-credential routing
can match an exporter's owner against the credential source's callback name.
A ``str`` enum so the value compares equal to the bare callback-name string."""

# Arize AX (the hosted platform) and Arize Phoenix (the open-source / Phoenix
# Cloud tracer) are distinct backends with separate config and auth, so they
# are separate owners. The member value stays the public callback name.
ARIZE_AX = "arize"
ARIZE_PHOENIX = "arize_phoenix"
LANGFUSE_OTEL = "langfuse_otel"
WEAVE_OTEL = "weave_otel"
LEVO = "levo"
AGENTOPS = "agentops"


class _OTelV2Flag(BaseSettings):
model_config = SettingsConfigDict(extra="ignore")

Expand All @@ -49,6 +67,15 @@ class ExporterSpec(BaseModel):
)
endpoint: str | None = None
headers: str | None = None
owner: ExporterOwner | None = Field(
default=None,
description=(
"The preset that contributed this exporter. Per-request dynamic OTLP "
"credentials are applied only to the exporter whose owner matches the "
"credential source, so one tenant's vendor key never lands on a "
"different backend's exporter."
),
)
options: dict[str, str] | None = Field(
default=None,
description=(
Expand Down
18 changes: 14 additions & 4 deletions litellm/integrations/otel/plumbing/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,23 @@ def tracer_for(self, default: Tracer, dynamic_params: Any) -> Tracer:
return get_tracer(provider, self._tracer_name)

def _config_with_headers(self, headers: Mapping[str, str]) -> OpenTelemetryV2Config:
"""Clone the config, replacing OTLP exporter headers with ``headers``."""
"""Clone the config, stamping ``headers`` onto the credential's own exporter.

``headers`` are the per-request credentials of ``self._callback_name`` (the
integration that built this cache), so they apply only to the exporter that
integration contributed (``spec.owner``). A request that carries one
tenant's Arize key must never rewrite the headers of a co-configured
Langfuse or self-hosted collector exporter, which would leak that key to a
different backend.
"""
header_str = ",".join(f"{key}={value}" for key, value in headers.items())
header_update: dict[str, str] = {"headers": header_str}
exporters = [
(
spec
if spec.kind.lower() in _NON_OTLP_KINDS
else spec.model_copy(update={"headers": header_str})
spec.model_copy(update=header_update)
if spec.owner == self._callback_name
and spec.kind.lower() not in _NON_OTLP_KINDS
else spec
)
for spec in self._config.exporters
]
Expand Down
7 changes: 6 additions & 1 deletion litellm/integrations/otel/presets/agentops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from pydantic_settings import BaseSettings, SettingsConfigDict

from litellm._logging import verbose_logger
from litellm.integrations.otel.model.config import ExporterSpec, OpenTelemetryV2Config
from litellm.integrations.otel.model.config import (
ExporterOwner,
ExporterSpec,
OpenTelemetryV2Config,
)
from litellm.integrations.otel.plumbing.providers import register_exporter_factory

_AGENTOPS_ENDPOINT = "https://otlp.agentops.cloud/v1/traces"
Expand Down Expand Up @@ -59,6 +63,7 @@ def agentops_preset(
options=(
{"api_key": settings.api_key} if settings.api_key else None
),
owner=ExporterOwner.AGENTOPS,
),
],
"resource_attributes": {
Expand Down
7 changes: 6 additions & 1 deletion litellm/integrations/otel/presets/arize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from pydantic_settings import BaseSettings, SettingsConfigDict

from litellm.integrations.arize.arize import ArizeLogger as _V1ArizeLogger
from litellm.integrations.otel.model.config import ExporterSpec, OpenTelemetryV2Config
from litellm.integrations.otel.model.config import (
ExporterOwner,
ExporterSpec,
OpenTelemetryV2Config,
)
from litellm.integrations.otel.presets.utils import ensure_mappers
from litellm.types.utils import StandardCallbackDynamicParams

Expand Down Expand Up @@ -34,6 +38,7 @@ def arize_preset(
kind=arize_cfg.protocol or "otlp_grpc",
endpoint=arize_cfg.endpoint or "https://otlp.arize.com/v1",
headers=headers,
owner=ExporterOwner.ARIZE_AX,
),
],
"mapper_names": ensure_mappers(base.mapper_names, "openinference"),
Expand Down
7 changes: 6 additions & 1 deletion litellm/integrations/otel/presets/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from litellm.integrations.langfuse.langfuse_otel import (
LangfuseOtelLogger as _V1Langfuse,
)
from litellm.integrations.otel.model.config import ExporterSpec, OpenTelemetryV2Config
from litellm.integrations.otel.model.config import (
ExporterOwner,
ExporterSpec,
OpenTelemetryV2Config,
)
from litellm.integrations.otel.presets.utils import ensure_mappers
from litellm.types.utils import StandardCallbackDynamicParams

Expand All @@ -23,6 +27,7 @@ def langfuse_preset(
kind=kind,
endpoint=cfg.endpoint,
headers=cfg.headers,
owner=ExporterOwner.LANGFUSE_OTEL,
),
],
"mapper_names": ensure_mappers(base.mapper_names, "langfuse"),
Expand Down
7 changes: 6 additions & 1 deletion litellm/integrations/otel/presets/levo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Levo preset — OTLP/HTTP to a Levo collector with org+workspace headers."""

from litellm.integrations.levo.levo import LevoLogger as _V1Levo
from litellm.integrations.otel.model.config import ExporterSpec, OpenTelemetryV2Config
from litellm.integrations.otel.model.config import (
ExporterOwner,
ExporterSpec,
OpenTelemetryV2Config,
)


def levo_preset(
Expand All @@ -18,6 +22,7 @@ def levo_preset(
kind="otlp_http",
endpoint=cfg.endpoint,
headers=cfg.otlp_auth_headers,
owner=ExporterOwner.LEVO,
),
],
}
Expand Down
7 changes: 6 additions & 1 deletion litellm/integrations/otel/presets/phoenix.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from litellm.integrations.arize.arize_phoenix import (
ArizePhoenixLogger as _V1Phoenix,
)
from litellm.integrations.otel.model.config import ExporterSpec, OpenTelemetryV2Config
from litellm.integrations.otel.model.config import (
ExporterOwner,
ExporterSpec,
OpenTelemetryV2Config,
)
from litellm.integrations.otel.presets.utils import ensure_mappers


Expand Down Expand Up @@ -37,6 +41,7 @@ def phoenix_preset(
kind=cfg.protocol if hasattr(cfg, "protocol") else "otlp_http",
endpoint=cfg.endpoint,
headers=headers,
owner=ExporterOwner.ARIZE_PHOENIX,
),
],
"mapper_names": ensure_mappers(base.mapper_names, "openinference"),
Expand Down
7 changes: 6 additions & 1 deletion litellm/integrations/otel/presets/weave.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Weave (W&B) preset."""

from litellm.integrations.otel.model.config import ExporterSpec, OpenTelemetryV2Config
from litellm.integrations.otel.model.config import (
ExporterOwner,
ExporterSpec,
OpenTelemetryV2Config,
)
from litellm.integrations.otel.presets.utils import ensure_mappers
from litellm.integrations.weave.weave_otel import (
_get_weave_authorization_header,
Expand All @@ -23,6 +27,7 @@ def weave_preset(
kind=weave_cfg.protocol or "otlp_http",
endpoint=weave_cfg.endpoint,
headers=weave_cfg.otlp_auth_headers,
owner=ExporterOwner.WEAVE_OTEL,
),
],
# Weave consumes OpenInference + a small Weave-specific overlay.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: owner should be an enum

Expand Down
67 changes: 36 additions & 31 deletions litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,37 +840,6 @@ async def proxy_startup_event(app: FastAPI):
if isinstance(worker_config, dict):
await initialize(**worker_config)

## V2 OTEL: now that config (and therefore the callbacks) is loaded, publish
## the chosen V2 logger's TracerProvider as the OTel global. The FastAPI
## instrumentation mounted at app-creation binds to the global provider, so
## this is what makes server spans and gen-ai spans share one provider and
## land in the same trace. Prefer an already-registered preset logger
## (arize, langfuse, …) so server spans export to that backend too; otherwise
## build a generic one from OTEL_* envs. ``set_tracer_provider`` only takes
## effect once, so the first configured logger wins.
try:
from litellm.integrations.otel.model.config import is_otel_v2_enabled

if is_otel_v2_enabled():
from opentelemetry import trace as _otel_trace

from litellm.integrations.otel.logger import OpenTelemetryV2

_otel_v2_logger = (
next(
(
cb
for cb in litellm.service_callback
if isinstance(cb, OpenTelemetryV2)
),
None,
)
or OpenTelemetryV2()
)
_otel_trace.set_tracer_provider(_otel_v2_logger._tracer_provider)
except Exception as e:
verbose_proxy_logger.debug("Skipping OTel V2 provider setup: %s", e)

# check if DATABASE_URL in environment - load from there
if prisma_client is None:
_db_url: Optional[str] = get_secret("DATABASE_URL", None) # type: ignore
Expand Down Expand Up @@ -907,6 +876,42 @@ async def _run_pw_migration():
redis_usage_cache=transaction_buffer_redis_cache,
)

## V2 OTEL: publish the chosen V2 logger's TracerProvider as the OTel global.
## This MUST run after callback initialization above: a preset (arize, langfuse,
## …) builds its logger there, folding the OTEL_* base exporter and its own
## exporter into one logger. The FastAPI instrumentation mounted at app-creation
## binds to the global provider, so reusing that one logger is what makes the
## server span and the gen-ai spans share one provider and land in the same
## trace, exporting to every configured backend. Running before callback init
## (when no logger exists yet) would build a second, generic logger whose
## provider became the global, orphaning the gen-ai spans onto a different
## backend than the server span. A generic logger is built only when none was
## configured.
try:
from litellm.integrations.otel.model.config import is_otel_v2_enabled

if is_otel_v2_enabled():
from opentelemetry import trace as _otel_trace

from litellm.litellm_core_utils.litellm_logging import _in_memory_loggers
from litellm.integrations.otel.logger import (
OpenTelemetryV2,
publish_global_otel_v2_provider,
)

registered = (
open_telemetry_logger
if isinstance(open_telemetry_logger, OpenTelemetryV2)
else None
)
Comment on lines +902 to +906

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use select_global_otel_v2_logger? tbh this can be a single method called select_and_publish_global_otel_provider

publish_global_otel_v2_provider(
_in_memory_loggers, # any-ok: pre-existing untyped List[Any] global
_otel_trace.set_tracer_provider,
registered=registered,
)
except Exception as e:
verbose_proxy_logger.debug("Skipping OTel V2 provider setup: %s", e)

## Validate use_redis_transaction_buffer requires Redis cache ##
ProxyStartupEvent._validate_redis_transaction_buffer_config(
general_settings=general_settings,
Expand Down
Loading
Loading