diff --git a/rotel_python_processor_sdk/processors/context_processor.py b/rotel_python_processor_sdk/processors/context_processor.py index ad754a09..f9c90ce4 100644 --- a/rotel_python_processor_sdk/processors/context_processor.py +++ b/rotel_python_processor_sdk/processors/context_processor.py @@ -1,35 +1,41 @@ """ -ContextProcessor demonstrates how to pull HTTP/gRPC headers from message metadata -(context) and add them as span attributes. This follows the OTel Collector -pattern where headers/metadata are stored in context by the receiver and processors pull -from context to add attributes. +ContextProcessor - Generic processor for extracting HTTP/gRPC headers from request +context and adding them as span/log/metric attributes. -This processor extracts custom headers from context and adds them as span -attributes following OpenTelemetry semantic conventions (http.request.header.*). +This is a reusable library. To use with rotel, create a config file that imports +this module and sets up the specific headers you need. Usage: - For HTTP, configure the receiver to include metadata: - ROTEL_OTLP_HTTP_INCLUDE_METADATA=true - ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header + from context_processor import ContextProcessor, ContextProcessorConfig - For gRPC, configure the receiver to include metadata: - ROTEL_OTLP_GRPC_INCLUDE_METADATA=true - ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header + config = ContextProcessorConfig( + headers=["x-request-id", "x-correlation-id", "traceparent"], + attribute_pattern="http.request.header.{header}", + ) - Then use this processor to add those headers as span attributes. + processor = ContextProcessor(config) -Message metadata is now exposed to Python processors. Processors can -access headers via: - resource_spans.message_metadata # Returns dict[str, str] or None + # Then expose module-level functions for rotel: + def process_spans(resource_spans): + return processor.process_spans(resource_spans) -This processor works with both HTTP headers and gRPC metadata - they are -both exposed as the same dictionary format to Python processors. + def process_metrics(resource_metrics): + return processor.process_metrics(resource_metrics) -This processor demonstrates how to extract headers from context and add -them as span attributes following OpenTelemetry semantic conventions. + def process_logs(resource_logs): + return processor.process_logs(resource_logs) + +For rotel receiver configuration, ensure headers are included: + HTTP: + --otlp-http-include-metadata + --otlp-http-headers-to-include x-request-id,x-correlation-id + + gRPC: + --otlp-grpc-include-metadata + --otlp-grpc-headers-to-include x-request-id,x-correlation-id """ -from typing import Optional +from typing import List, Optional from rotel_sdk.open_telemetry.common.v1 import AnyValue, KeyValue from rotel_sdk.open_telemetry.logs.v1 import ResourceLogs @@ -38,18 +44,58 @@ from rotel_sdk.open_telemetry.trace.v1 import ResourceSpans -def _get_header_from_context( +class ContextProcessorConfig: + """Configuration for the ContextProcessor.""" + + def __init__( + self, + headers: List[str] = None, + attribute_pattern: str = "http.request.header.{header}", + span_attribute_target: str = "span", + log_attribute_target: str = "log", + metric_attribute_target: str = "resource", + ): + """ + Initialize the ContextProcessor configuration. + + Args: + headers: List of header names to extract from request context. + Headers are matched case-insensitively. + attribute_pattern: Pattern for naming attributes. Use {header} as placeholder. + Default: "http.request.header.{header}" + span_attribute_target: Where to add attributes for spans - "span" or "resource" + log_attribute_target: Where to add attributes for logs - "log" or "resource" + metric_attribute_target: Where to add attributes for metrics - "resource" + """ + self.headers = [h.lower() for h in (headers or [])] + self.attribute_pattern = attribute_pattern + + if span_attribute_target not in ("span", "resource"): + raise ValueError( + f"span_attribute_target must be 'span' or 'resource', got '{span_attribute_target}'" + ) + self.span_attribute_target = span_attribute_target + + if log_attribute_target not in ("log", "resource"): + raise ValueError( + f"log_attribute_target must be 'log' or 'resource', got '{log_attribute_target}'" + ) + self.log_attribute_target = log_attribute_target + + if metric_attribute_target != "resource": + raise ValueError( + f"metric_attribute_target must be 'resource', got '{metric_attribute_target}'" + ) + self.metric_attribute_target = metric_attribute_target + + +def get_header_from_context( request_context: Optional[RequestContext], header_name: str ) -> Optional[str]: """ - Get a header value from message metadata (context). - - Accesses headers via: - resource_spans.request_context.get(header_name) + Get a header value from request context (HTTP headers or gRPC metadata). - The pattern: - - Headers/metadata keys are stored with lowercase keys - - Works for both HTTP headers and gRPC metadata + Headers are stored with lowercase keys in both HTTP and gRPC contexts. """ if request_context is not None: if isinstance(request_context, RequestContext.HttpContext): @@ -59,83 +105,107 @@ def _get_header_from_context( return None +def get_all_headers_from_context( + request_context: Optional[RequestContext], header_names: List[str] +) -> dict: + """ + Get multiple header values from request context. + + Returns a dict of {header_name: value} for headers that exist. + """ + result = {} + for header_name in header_names: + value = get_header_from_context(request_context, header_name) + if value is not None: + result[header_name] = value + return result + + class ContextProcessor: - def process_spans(self, resource_spans: ResourceSpans): + """ + Processor that extracts headers from request context and adds them as attributes. + """ + + def __init__(self, config: ContextProcessorConfig): """ - Process ResourceSpans by extracting a custom header from context - and adding it as a span attribute. + Initialize the processor with configuration. - This function extracts "my-custom-header" from context and adds it as - a span attribute following OTel semantic convention: http.request.header.* + Args: + config: ContextProcessorConfig specifying which headers to extract + """ + self.config = config - Example: If the receiver is configured with: - ROTEL_OTLP_HTTP_INCLUDE_METADATA=true - ROTEL_OTLP_HTTP_HEADERS_TO_INCLUDE=my-custom-header + def _create_attribute(self, header_name: str, value: str) -> KeyValue: + """Create a KeyValue attribute for a header.""" + attr_name = self.config.attribute_pattern.format(header=header_name) + return KeyValue(key=attr_name, value=AnyValue(value)) - Or for gRPC: - ROTEL_OTLP_GRPC_INCLUDE_METADATA=true - ROTEL_OTLP_GRPC_HEADERS_TO_INCLUDE=my-custom-header + def _create_attributes(self, headers: dict) -> List[KeyValue]: + """Create KeyValue attributes for all extracted headers.""" + return [self._create_attribute(name, value) for name, value in headers.items()] - And the request includes "my-custom-header: test-value-123", then - this processor will add the attribute - "http.request.header.my-custom-header" = "test-value-123" to all spans. + def process_spans(self, resource_spans: ResourceSpans): """ - # Header to extract from context - header_name = "my-custom-header" - - # Get header value from context - header_value = _get_header_from_context( - resource_spans.request_context, header_name + Process ResourceSpans by extracting configured headers from context + and adding them as attributes. + """ + headers = get_all_headers_from_context( + resource_spans.request_context, self.config.headers ) - if header_value: - # Create attribute following OTel semantic convention - attr = KeyValue( - key=f"http.request.header.{header_name}", - value=AnyValue(header_value), - ) + if not headers: + return + + attributes = self._create_attributes(headers) - # Add attribute to all spans + if self.config.span_attribute_target == "resource": + if resource_spans.resource: + for attr in attributes: + resource_spans.resource.attributes.append(attr) + else: for scope_spans in resource_spans.scope_spans: for span in scope_spans.spans: - span.attributes.append(attr) - - # Example: You can also add to resource attributes instead: - # if header_value and resource_spans.resource: - # resource_spans.resource.attributes.append(attr) + for attr in attributes: + span.attributes.append(attr) def process_metrics(self, resource_metrics: ResourceMetrics): """ - Process metrics - add custom header to resource attributes. - Metrics typically use resource attributes rather than per-metric - attributes. + Process ResourceMetrics by extracting configured headers from context + and adding them as resource attributes. """ - header_name = "my-custom-header" - header_value = _get_header_from_context( - resource_metrics.request_context, header_name + headers = get_all_headers_from_context( + resource_metrics.request_context, self.config.headers ) - if header_value and resource_metrics.resource: - attr = KeyValue( - key=f"http.request.header.{header_name}", - value=AnyValue(header_value), - ) - resource_metrics.resource.attributes.append(attr) + if not headers: + return + + attributes = self._create_attributes(headers) + + if resource_metrics.resource: + for attr in attributes: + resource_metrics.resource.attributes.append(attr) def process_logs(self, resource_logs: ResourceLogs): """ - Process logs - add custom header to log record attributes. + Process ResourceLogs by extracting configured headers from context + and adding them as attributes. """ - header_name = "my-custom-header" - header_value = _get_header_from_context( - resource_logs.request_context, header_name + headers = get_all_headers_from_context( + resource_logs.request_context, self.config.headers ) - if header_value: - attr = KeyValue( - key=f"http.request.header.{header_name}", - value=AnyValue(header_value), - ) + if not headers: + return + + attributes = self._create_attributes(headers) + + if self.config.log_attribute_target == "resource": + if resource_logs.resource: + for attr in attributes: + resource_logs.resource.attributes.append(attr) + else: for scope_logs in resource_logs.scope_logs: for log_record in scope_logs.log_records: - log_record.attributes.append(attr) + for attr in attributes: + log_record.attributes.append(attr)