Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 30 additions & 84 deletions src/microsoft/opentelemetry/_sdkstats/_network_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
# license information.
# --------------------------------------------------------------------------

"""Network SDKStats gauges.
"""Distro-owned network SDKStats observations.

The upstream statsbeat metrics only count requests sent to the Breeze
endpoint (the Azure Monitor exporter's destination). Microsoft OpenTelemetry also
ships OTLP and Agent365 exporters, whose per-export success counters
live in its own ``_REQUESTS_MAP``. This module registers an
observable gauge on the upstream ``StatsbeatManager``'s ``MeterProvider``
so those counters are exported on the same statsbeat pipeline.
endpoint. This distro also ships OTLP and Agent365 exporters whose
per-export counters live in the distro's own ``_REQUESTS_MAP``. We
contribute extra rows to the upstream ``Request_Success_Count`` metric
via ``add_metric_callback`` so the backend treats them as part of the
same metric stream (same name + InstrumentationScope).
"""

from __future__ import annotations

import logging
import threading
from typing import Iterable, List

from opentelemetry.metrics import CallbackOptions, Observation
Expand All @@ -33,6 +32,14 @@
)

try:
from azure.monitor.opentelemetry.exporter._constants import (
_REQ_DURATION_NAME,
_REQ_EXCEPTION_NAME,
_REQ_FAILURE_NAME,
_REQ_RETRY_NAME,
_REQ_SUCCESS_NAME,
_REQ_THROTTLE_NAME,
)
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat_metrics import (
_StatsbeatMetrics,
)
Expand All @@ -43,9 +50,6 @@

logger = logging.getLogger(__name__)

_REGISTER_LOCK = threading.Lock()
_registered = False


def _get_common_attributes() -> dict:
if _StatsbeatMetrics is None:
Expand All @@ -56,7 +60,6 @@ def _get_common_attributes() -> dict:
def _observe_request_success_count(options: CallbackOptions) -> Iterable[Observation]:
"""Drain the per-endpoint success counts and emit one observation each."""
common = _get_common_attributes()

observations: List[Observation] = []
for key, value in drain(REQUEST_SUCCESS_NAME).items():
attributes = dict(common)
Expand Down Expand Up @@ -148,76 +151,19 @@ def _observe_request_exception_count(options: CallbackOptions) -> Iterable[Obser
return observations


def register_network_gauges() -> bool:
"""Attach our network-stats callbacks to upstream's gauges.

We emit per-endpoint (``Request_Success_Count``, ``Request_Duration``,
``Request_Failure_Count``, ``Retry_Count``, ``Throttle_Count``,
``Exception_Count``) observations via the upstream statsbeat pipeline. We cannot
create separate gauges with the same names because the stats backend identifies
metric streams by InstrumentationScope, and rows from an unknown scope are silently
dropped. Instead we append our callbacks to the already-registered
upstream observable gauges so our observations are emitted on the
exact same instrument/scope as upstream's breeze rows.

Idempotent — subsequent calls are no-ops. Returns ``True`` on the
call that performs registration, ``False`` if registration was
skipped (already registered, upstream unavailable, or upstream
hasn't created the gauges yet).
"""
global _registered # pylint: disable=global-statement

with _REGISTER_LOCK:
if _registered:
return False

try:
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatManager,
)
except ImportError:
logger.debug("Upstream statsbeat unavailable; skipping network gauges.")
return False

manager = StatsbeatManager()
meter_provider = manager._meter_provider # pylint: disable=protected-access
metrics = manager._metrics # pylint: disable=protected-access
if meter_provider is None or metrics is None:
logger.debug("StatsbeatManager not initialised; skipping network gauges.")
return False

attached: List[str] = []
for gauge_attr, callback in (
("_success_count", _observe_request_success_count),
("_average_duration", _observe_request_duration),
("_failure_count", _observe_request_failure_count),
("_retry_count", _observe_request_retry_count),
("_throttle_count", _observe_request_throttle_count),
("_exception_count", _observe_request_exception_count),
):
gauge = getattr(metrics, gauge_attr, None)
if gauge is None:
logger.debug("Upstream %s gauge not yet created; skipping.", gauge_attr)
continue
try:
gauge._callbacks.append(callback) # pylint: disable=protected-access
except AttributeError:
logger.debug(
"Upstream %s gauge has no _callbacks list; cannot attach.",
gauge_attr,
)
continue
attached.append(gauge_attr)

if not attached:
return False

_registered = True
return True


def _reset_for_tests() -> None:
"""Reset the module-level registration guard. Test-only."""
global _registered # pylint: disable=global-statement
with _REGISTER_LOCK:
_registered = False
def register_network_gauges() -> None:
try:
from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager # type: ignore[import-not-found] # pylint: disable=line-too-long
except ImportError:
logger.debug("Upstream statsbeat unavailable; skipping network gauges.")
return
manager = StatsbeatManager()
for metric, callback in (
(_REQ_SUCCESS_NAME[0], _observe_request_success_count),
(_REQ_DURATION_NAME[0], _observe_request_duration),
(_REQ_FAILURE_NAME[0], _observe_request_failure_count),
(_REQ_RETRY_NAME[0], _observe_request_retry_count),
(_REQ_THROTTLE_NAME[0], _observe_request_throttle_count),
(_REQ_EXCEPTION_NAME[0], _observe_request_exception_count),
):
manager.add_additional_metric_callbacks(metric, callback)
109 changes: 70 additions & 39 deletions tests/test_sdkstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
record_throttle,
reset_all,
)

# Network sdkstats wrappers are temporarily disabled. See
# microsoft/opentelemetry/_sdkstats/_otlp_wrapper.py.
# from microsoft.opentelemetry._sdkstats._otlp_wrapper import (
Expand Down Expand Up @@ -86,9 +87,8 @@ def _reset_upstream_singleton():


def _reset_network_metrics_guard():
from microsoft.opentelemetry._sdkstats import _network_metrics

_network_metrics._reset_for_tests()
"""Reset callback registration by recreating the upstream singleton."""
_reset_upstream_singleton()


class TestSdkStatsEnabled(unittest.TestCase):
Expand Down Expand Up @@ -320,29 +320,55 @@ def tearDown(self):
_reset_upstream_singleton()
_reset_network_metrics_guard()

def test_returns_false_when_manager_has_no_meter_provider(self):
def test_can_be_called_multiple_times_without_error(self):
from microsoft.opentelemetry._sdkstats._network_metrics import (
register_network_gauges,
)

self.assertFalse(register_network_gauges())
# register_network_gauges returns None; calling it twice must not raise
register_network_gauges()
register_network_gauges()

def test_returns_true_then_false_on_repeat(self):
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatManager,
def test_register_calls_additional_callback_api_for_each_metric(self):
from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager
from microsoft.opentelemetry._sdkstats._network_metrics import (
register_network_gauges,
)
from microsoft.opentelemetry._sdkstats._config import (
_build_default_sdkstats_config,

with patch.object(StatsbeatManager, "add_additional_metric_callbacks") as mock_add:
register_network_gauges()
self.assertEqual(mock_add.call_count, 6)

def test_registers_callback_under_request_success_metric_name(self):
from azure.monitor.opentelemetry.exporter._constants import _REQ_SUCCESS_NAME
from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager
from microsoft.opentelemetry._sdkstats._network_metrics import (
_observe_request_success_count,
register_network_gauges,
)

register_network_gauges()
callbacks = StatsbeatManager().get_additional_metric_callbacks(_REQ_SUCCESS_NAME[0])
self.assertIn(_observe_request_success_count, callbacks)

def test_returns_none_when_upstream_unavailable(self):
# Simulate upstream import failure by patching the import in
# ``register_network_gauges``'s try block.
import builtins

from microsoft.opentelemetry._sdkstats._network_metrics import (
register_network_gauges,
)

config = _build_default_sdkstats_config()
self.assertTrue(StatsbeatManager().initialize(config))
real_import = builtins.__import__

self.assertTrue(register_network_gauges())
self.assertFalse(register_network_gauges())
def fake_import(name, *args, **kwargs):
if name == "azure.monitor.opentelemetry.exporter.statsbeat._manager":
raise ImportError("simulated")
return real_import(name, *args, **kwargs)

with patch("builtins.__import__", side_effect=fake_import):
self.assertFalse(register_network_gauges())


class TestObserveRequestSuccessCount(unittest.TestCase):
Expand Down Expand Up @@ -646,45 +672,50 @@ def tearDown(self):
_reset_network_metrics_guard()

def test_attaches_to_all_six_gauges(self):
from azure.monitor.opentelemetry.exporter._constants import (
_REQ_DURATION_NAME,
_REQ_EXCEPTION_NAME,
_REQ_FAILURE_NAME,
_REQ_RETRY_NAME,
_REQ_SUCCESS_NAME,
_REQ_THROTTLE_NAME,
)
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatManager,
)
from microsoft.opentelemetry._sdkstats._config import (
_build_default_sdkstats_config,
)
from microsoft.opentelemetry._sdkstats._network_metrics import (
_observe_request_duration,
_observe_request_exception_count,
_observe_request_failure_count,
_observe_request_retry_count,
_observe_request_success_count,
_observe_request_throttle_count,
register_network_gauges,
)

config = _build_default_sdkstats_config()
self.assertTrue(StatsbeatManager().initialize(config))

# The six upstream gauges the distro must attach to.
gauge_attrs = [
"_success_count",
"_average_duration",
"_failure_count",
"_retry_count",
"_throttle_count",
"_exception_count",
]

# Snapshot pre-registration callback counts so we can assert exactly
# one new callback per gauge.
metrics = StatsbeatManager()._metrics # pylint: disable=protected-access
assert metrics is not None
before = {
name: len(getattr(metrics, name)._callbacks) for name in gauge_attrs # pylint: disable=protected-access
}
register_network_gauges()

self.assertTrue(register_network_gauges())

for name in gauge_attrs:
after = len(getattr(metrics, name)._callbacks) # pylint: disable=protected-access
self.assertEqual(
after,
before[name] + 1,
f"Expected one distro callback appended to {name}",
manager = StatsbeatManager()
expected = {
_REQ_SUCCESS_NAME[0]: _observe_request_success_count,
_REQ_DURATION_NAME[0]: _observe_request_duration,
_REQ_FAILURE_NAME[0]: _observe_request_failure_count,
_REQ_RETRY_NAME[0]: _observe_request_retry_count,
_REQ_THROTTLE_NAME[0]: _observe_request_throttle_count,
_REQ_EXCEPTION_NAME[0]: _observe_request_exception_count,
}
for metric_name, callback in expected.items():
callbacks = manager.get_additional_metric_callbacks(metric_name)
self.assertIn(
callback,
callbacks,
f"Expected distro callback registered under {metric_name}",
)


Expand Down
Loading