Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 153 additions & 83 deletions rotel_python_processor_sdk/processors/context_processor.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
"""
ContextProcessor demonstrates how to pull HTTP/gRPC headers from message metadata
(context) and add them as span attributes. This follows the OTel Collector
pattern where headers/metadata are stored in context by the receiver and processors pull
from context to add attributes.
ContextProcessor - Generic processor for extracting HTTP/gRPC headers from request
context and adding them as span/log/metric attributes.

This processor extracts custom headers from context and adds them as span
attributes following OpenTelemetry semantic conventions (http.request.header.*).
This is a reusable library. To use with rotel, create a config file that imports
this module and sets up the specific headers you need.

Usage:
For HTTP, configure the receiver to include metadata:
ROTEL_OTLP_HTTP_INCLUDE_METADATA=true
ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header
from context_processor import ContextProcessor, ContextProcessorConfig

For gRPC, configure the receiver to include metadata:
ROTEL_OTLP_GRPC_INCLUDE_METADATA=true
ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header
config = ContextProcessorConfig(
headers=["x-request-id", "x-correlation-id", "traceparent"],
attribute_pattern="http.request.header.{header}",
)

Then use this processor to add those headers as span attributes.
processor = ContextProcessor(config)

Message metadata is now exposed to Python processors. Processors can
access headers via:
resource_spans.message_metadata # Returns dict[str, str] or None
# Then expose module-level functions for rotel:
def process_spans(resource_spans):
return processor.process_spans(resource_spans)

This processor works with both HTTP headers and gRPC metadata - they are
both exposed as the same dictionary format to Python processors.
def process_metrics(resource_metrics):
return processor.process_metrics(resource_metrics)

This processor demonstrates how to extract headers from context and add
them as span attributes following OpenTelemetry semantic conventions.
def process_logs(resource_logs):
return processor.process_logs(resource_logs)

For rotel receiver configuration, ensure headers are included:
HTTP:
--otlp-http-include-metadata
--otlp-http-headers-to-include x-request-id,x-correlation-id

gRPC:
--otlp-grpc-include-metadata
--otlp-grpc-headers-to-include x-request-id,x-correlation-id
"""

from typing import Optional
from typing import List, Optional

from rotel_sdk.open_telemetry.common.v1 import AnyValue, KeyValue
from rotel_sdk.open_telemetry.logs.v1 import ResourceLogs
Expand All @@ -38,18 +44,58 @@
from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans


def _get_header_from_context(
class ContextProcessorConfig:
"""Configuration for the ContextProcessor."""

def __init__(
self,
headers: List[str] = None,
attribute_pattern: str = "http.request.header.{header}",
span_attribute_target: str = "span",
log_attribute_target: str = "log",
metric_attribute_target: str = "resource",
):
"""
Initialize the ContextProcessor configuration.

Args:
headers: List of header names to extract from request context.
Headers are matched case-insensitively.
attribute_pattern: Pattern for naming attributes. Use {header} as placeholder.
Default: "http.request.header.{header}"
span_attribute_target: Where to add attributes for spans - "span" or "resource"
log_attribute_target: Where to add attributes for logs - "log" or "resource"
metric_attribute_target: Where to add attributes for metrics - "resource"
"""
self.headers = [h.lower() for h in (headers or [])]
self.attribute_pattern = attribute_pattern

if span_attribute_target not in ("span", "resource"):
raise ValueError(
f"span_attribute_target must be 'span' or 'resource', got '{span_attribute_target}'"
)
self.span_attribute_target = span_attribute_target

if log_attribute_target not in ("log", "resource"):
raise ValueError(
f"log_attribute_target must be 'log' or 'resource', got '{log_attribute_target}'"
)
self.log_attribute_target = log_attribute_target

if metric_attribute_target != "resource":
raise ValueError(
f"metric_attribute_target must be 'resource', got '{metric_attribute_target}'"
)
self.metric_attribute_target = metric_attribute_target


def get_header_from_context(
request_context: Optional[RequestContext], header_name: str
) -> Optional[str]:
"""
Get a header value from message metadata (context).

Accesses headers via:
resource_spans.request_context.get(header_name)
Get a header value from request context (HTTP headers or gRPC metadata).

The pattern:
- Headers/metadata keys are stored with lowercase keys
- Works for both HTTP headers and gRPC metadata
Headers are stored with lowercase keys in both HTTP and gRPC contexts.
"""
if request_context is not None:
if isinstance(request_context, RequestContext.HttpContext):
Expand All @@ -59,83 +105,107 @@ def _get_header_from_context(
return None


def get_all_headers_from_context(
request_context: Optional[RequestContext], header_names: List[str]
) -> dict:
"""
Get multiple header values from request context.

Returns a dict of {header_name: value} for headers that exist.
"""
result = {}
for header_name in header_names:
value = get_header_from_context(request_context, header_name)
if value is not None:
result[header_name] = value
return result


class ContextProcessor:
def process_spans(self, resource_spans: ResourceSpans):
"""
Processor that extracts headers from request context and adds them as attributes.
"""

def __init__(self, config: ContextProcessorConfig):
"""
Process ResourceSpans by extracting a custom header from context
and adding it as a span attribute.
Initialize the processor with configuration.

This function extracts "my-custom-header" from context and adds it as
a span attribute following OTel semantic convention: http.request.header.*
Args:
config: ContextProcessorConfig specifying which headers to extract
"""
self.config = config

Example: If the receiver is configured with:
ROTEL_OTLP_HTTP_INCLUDE_METADATA=true
ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header
def _create_attribute(self, header_name: str, value: str) -> KeyValue:
"""Create a KeyValue attribute for a header."""
attr_name = self.config.attribute_pattern.format(header=header_name)
return KeyValue(key=attr_name, value=AnyValue(value))

Or for gRPC:
ROTEL_OTLP_GRPC_INCLUDE_METADATA=true
ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header
def _create_attributes(self, headers: dict) -> List[KeyValue]:
"""Create KeyValue attributes for all extracted headers."""
return [self._create_attribute(name, value) for name, value in headers.items()]

And the request includes "my-custom-header: test-value-123", then
this processor will add the attribute
"http.request.header.my-custom-header" = "test-value-123" to all spans.
def process_spans(self, resource_spans: ResourceSpans):
"""
# Header to extract from context
header_name = "my-custom-header"

# Get header value from context
header_value = _get_header_from_context(
resource_spans.request_context, header_name
Process ResourceSpans by extracting configured headers from context
and adding them as attributes.
"""
headers = get_all_headers_from_context(
resource_spans.request_context, self.config.headers
)

if header_value:
# Create attribute following OTel semantic convention
attr = KeyValue(
key=f"http.request.header.{header_name}",
value=AnyValue(header_value),
)
if not headers:
return

attributes = self._create_attributes(headers)

# Add attribute to all spans
if self.config.span_attribute_target == "resource":
if resource_spans.resource:
for attr in attributes:
resource_spans.resource.attributes.append(attr)
else:
for scope_spans in resource_spans.scope_spans:
for span in scope_spans.spans:
span.attributes.append(attr)

# Example: You can also add to resource attributes instead:
# if header_value and resource_spans.resource:
# resource_spans.resource.attributes.append(attr)
for attr in attributes:
span.attributes.append(attr)

def process_metrics(self, resource_metrics: ResourceMetrics):
"""
Process metrics - add custom header to resource attributes.
Metrics typically use resource attributes rather than per-metric
attributes.
Process ResourceMetrics by extracting configured headers from context
and adding them as resource attributes.
"""
header_name = "my-custom-header"
header_value = _get_header_from_context(
resource_metrics.request_context, header_name
headers = get_all_headers_from_context(
resource_metrics.request_context, self.config.headers
)

if header_value and resource_metrics.resource:
attr = KeyValue(
key=f"http.request.header.{header_name}",
value=AnyValue(header_value),
)
resource_metrics.resource.attributes.append(attr)
if not headers:
return

attributes = self._create_attributes(headers)

if resource_metrics.resource:
for attr in attributes:
resource_metrics.resource.attributes.append(attr)

def process_logs(self, resource_logs: ResourceLogs):
"""
Process logs - add custom header to log record attributes.
Process ResourceLogs by extracting configured headers from context
and adding them as attributes.
"""
header_name = "my-custom-header"
header_value = _get_header_from_context(
resource_logs.request_context, header_name
headers = get_all_headers_from_context(
resource_logs.request_context, self.config.headers
)
if header_value:
attr = KeyValue(
key=f"http.request.header.{header_name}",
value=AnyValue(header_value),
)

if not headers:
return

attributes = self._create_attributes(headers)

if self.config.log_attribute_target == "resource":
if resource_logs.resource:
for attr in attributes:
resource_logs.resource.attributes.append(attr)
else:
for scope_logs in resource_logs.scope_logs:
for log_record in scope_logs.log_records:
log_record.attributes.append(attr)
for attr in attributes:
log_record.attributes.append(attr)
Loading