From 979b98cbfe67a6dad5bedee71c5a3abcaa4ecb70 Mon Sep 17 00:00:00 2001 From: Vivek Nair Date: Sat, 7 Jun 2025 14:02:53 -0400 Subject: [PATCH 1/6] feat: Add automatic AST-based tracing for Gentrace API --- CLAUDE.md | 48 ---- examples/auto_trace/README.md | 110 ++++++++ .../auto_trace/auto_trace_pipeline_example.py | 105 +++++++ .../pipeline_example_app/__init__.py | 1 + .../pipeline_example_app/workflow.py | 105 +++++++ src/gentrace/__init__.py | 4 + src/gentrace/lib/auto_trace/__init__.py | 91 ++++++ src/gentrace/lib/auto_trace/ast_utils.py | 108 ++++++++ src/gentrace/lib/auto_trace/import_hook.py | 127 +++++++++ src/gentrace/lib/auto_trace/rewrite_ast.py | 238 ++++++++++++++++ src/gentrace/lib/auto_trace/types.py | 22 ++ tests/test_auto_trace.py | 259 ++++++++++++++++++ 12 files changed, 1170 insertions(+), 48 deletions(-) delete mode 100644 CLAUDE.md create mode 100644 examples/auto_trace/README.md create mode 100644 examples/auto_trace/auto_trace_pipeline_example.py create mode 100644 examples/auto_trace/pipeline_example_app/__init__.py create mode 100644 examples/auto_trace/pipeline_example_app/workflow.py create mode 100644 src/gentrace/lib/auto_trace/__init__.py create mode 100644 src/gentrace/lib/auto_trace/ast_utils.py create mode 100644 src/gentrace/lib/auto_trace/import_hook.py create mode 100644 src/gentrace/lib/auto_trace/rewrite_ast.py create mode 100644 src/gentrace/lib/auto_trace/types.py create mode 100644 tests/test_auto_trace.py diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index dd6330ca..00000000 --- a/CLAUDE.md +++ /dev/null @@ -1,48 +0,0 @@ -# Claude Guidelines for gentrace-python - -## Project Overview -gentrace-python is a Python SDK for the Gentrace API, which provides tools for evaluating and monitoring AI applications. - -## Code Style Guidelines -- Follow PEP 8 style guidelines -- Use type hints where appropriate -- Maintain consistent indentation (4 spaces) -- Use meaningful variable and function names -- Add docstrings for public APIs -- Follow the existing error handling patterns - -## Testing Requirements -- Write unit tests for new functionality using pytest -- Ensure all tests pass before submitting PRs -- Maintain or improve test coverage - -## PR Guidelines -- Keep PRs focused on a single feature or bug fix -- Include clear descriptions of changes -- Reference related issues -- Update documentation as needed - -## Commit Message Format -- Use clear, descriptive commit messages -- Start with a verb in the present tense (e.g., "Add", "Fix", "Update") -- Reference issue numbers when applicable - -## Dependencies -- Minimize adding new dependencies -- Prefer well-maintained, widely-used packages -- Consider compatibility with different Python versions - -## Security Considerations -- Never expose API keys or sensitive information -- Follow secure coding practices -- Validate user inputs - -## Documentation -- Update README.md for significant changes -- Document new features and APIs -- Keep code comments up-to-date - -## Performance -- Consider performance implications of changes -- Avoid unnecessary computations or memory usage - diff --git a/examples/auto_trace/README.md b/examples/auto_trace/README.md new file mode 100644 index 00000000..cea761e4 --- /dev/null +++ b/examples/auto_trace/README.md @@ -0,0 +1,110 @@ +# Gentrace Auto-Tracing Example + +This example demonstrates Gentrace's automatic AST-based tracing feature with pipeline ID support, which instruments all functions in specified modules without requiring manual decoration. + +## Prerequisites + +Before running the example, ensure you have the following environment variables set: + +```bash +export GENTRACE_API_KEY="your-gentrace-api-key" +export GENTRACE_BASE_URL="http://localhost:3000/api" # or your Gentrace instance URL +export GENTRACE_PIPELINE_ID="your-pipeline-uuid" +export OPENAI_API_KEY="your-openai-api-key" # Optional +``` + +## Overview + +The example simulates a data processing pipeline with multiple layers of function calls, creating a deep tree of spans to showcase the automatic tracing capabilities with pipeline association. + +## Structure + +``` +auto_trace/ +├── README.md # This file +├── auto_trace_pipeline_example.py # Main example script +└── pipeline_example_app/ # Example application package + ├── __init__.py + └── workflow.py # Pipeline workflow functions +``` + +## Features Demonstrated + +1. **Automatic Function Tracing**: All functions in the `pipeline_example_app` module are automatically traced without decorators +2. **Pipeline ID Support**: All traces are associated with a specific Gentrace pipeline +3. **Deep Span Trees**: The pipeline creates a multi-level hierarchy of spans showing the call flow +4. **OpenTelemetry Integration**: Proper span context propagation with parent-child relationships +5. **Root Span Attribution**: Only the root span includes the `gentrace.pipeline_id` attribute + +## Running the Example + +```bash +cd examples/auto_trace +python auto_trace_pipeline_example.py +``` + +## Expected Output + +The example will: +1. Initialize OpenTelemetry with Gentrace configuration +2. Install auto-tracing for the `pipeline_example_app` module +3. Execute a multi-step data processing pipeline +4. Export spans to the Gentrace backend via OTLP + +## Span Tree Structure + +The automatic tracing creates a span tree like this: + +``` +run_data_processing_pipeline [root - has pipeline_id] +├── extract_data +│ └── process_record (multiple calls) +├── transform_data +│ ├── apply_transformation (multiple calls) +│ └── validate_transformation (multiple calls) +└── load_results + └── generate_summary +``` + +## Key Concepts + +### Installing Auto-Tracing with Pipeline ID + +```python +gentrace.install_auto_tracing( + ['pipeline_example_app'], + min_duration=0, + pipeline_id=pipeline_id +) +``` + +This must be called BEFORE importing the modules you want to trace. + +### How Pipeline ID Works + +When a pipeline_id is provided: +- The root span (first span in the trace) includes the `gentrace.pipeline_id` attribute +- Child spans do NOT include the pipeline_id attribute +- This allows the Gentrace backend to associate the entire trace tree with the pipeline +- All spans in the trace are connected via the same `traceId` + +### OpenTelemetry Configuration + +The example shows proper OpenTelemetry setup with: +- Resource configuration with service name +- Gentrace sampler for sampling decisions +- Gentrace span processor for baggage propagation +- OTLP exporter configured for the Gentrace backend + +### Minimum Duration + +The `min_duration` parameter (in seconds) can be used to only trace functions that take longer than the specified duration. This helps reduce noise from very fast functions. + +## Troubleshooting + +If spans don't appear in the Gentrace UI: +1. Check that all environment variables are set correctly +2. Verify the Gentrace backend is running and accessible +3. Ensure the ClickHouse replication task is running +4. Check the console output for any error messages +5. Verify the pipeline ID exists in your Gentrace instance \ No newline at end of file diff --git a/examples/auto_trace/auto_trace_pipeline_example.py b/examples/auto_trace/auto_trace_pipeline_example.py new file mode 100644 index 00000000..30228643 --- /dev/null +++ b/examples/auto_trace/auto_trace_pipeline_example.py @@ -0,0 +1,105 @@ +"""Example demonstrating Gentrace's automatic AST-based tracing with pipeline_id support. + +This example shows how auto-traced functions can be associated with a specific +Gentrace pipeline and exported to the Gentrace backend. + +To run this example, ensure the following environment variables are set: + GENTRACE_API_KEY: Your Gentrace API token for authentication. + GENTRACE_BASE_URL: The base URL for your Gentrace instance (e.g., http://localhost:3000/api). + GENTRACE_PIPELINE_ID: The UUID of the pipeline to associate traces with. + OPENAI_API_KEY: Your OpenAI API key (optional, for OpenAI examples). +""" + +import os +import atexit +from typing import Dict + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +import gentrace +from gentrace import GentraceSampler, GentraceSpanProcessor + +# Get configuration from environment +api_key = os.getenv("GENTRACE_API_KEY", "") +gentrace_base_url = os.getenv("GENTRACE_BASE_URL", "") +pipeline_id = os.getenv("GENTRACE_PIPELINE_ID", "") + +if not api_key: + raise ValueError("GENTRACE_API_KEY environment variable not set.") +if not gentrace_base_url: + raise ValueError("GENTRACE_BASE_URL environment variable not set.") +if not pipeline_id: + raise ValueError("GENTRACE_PIPELINE_ID environment variable not set.") + +print(f"Gentrace Base URL: {gentrace_base_url}") +print(f"Pipeline ID: {pipeline_id}") +print("=" * 60) + +# Setup OpenTelemetry with Gentrace +resource = Resource(attributes={"service.name": "auto-trace-pipeline-example"}) +tracer_provider = TracerProvider(resource=resource, sampler=GentraceSampler()) + +# Configure OTLP exporter to send traces to Gentrace +otlp_headers: Dict[str, str] = {} +if api_key: + otlp_headers["Authorization"] = f"Bearer {api_key}" + +span_exporter = OTLPSpanExporter( + endpoint=f"{gentrace_base_url}/otel/v1/traces", + headers=otlp_headers, +) + +# Add Gentrace span processor for enrichment +gentrace_baggage_processor = GentraceSpanProcessor() +tracer_provider.add_span_processor(gentrace_baggage_processor) + +# Add the export processor +simple_export_processor = SimpleSpanProcessor(span_exporter) +tracer_provider.add_span_processor(simple_export_processor) + +# Set the global tracer provider +trace.set_tracer_provider(tracer_provider) + +# Install auto-tracing with pipeline_id for our example modules +# All functions in these modules will be automatically associated with this pipeline +print("Installing auto-tracing with pipeline_id...") +gentrace.install_auto_tracing( + ['pipeline_example_app'], + min_duration=0, + pipeline_id=pipeline_id +) + +# Now import the modules - they will be automatically instrumented with pipeline_id +from pipeline_example_app import workflow + +if __name__ == "__main__": + # Register shutdown handler + atexit.register(tracer_provider.shutdown) + + print("\nStarting Gentrace Auto-Trace Pipeline Example") + print("=" * 60) + + # Run the workflow - all traced functions will include the pipeline_id + try: + result = workflow.run_data_processing_pipeline() + + print("\n" + "=" * 60) + print(f"Pipeline completed successfully!") + print(f"Result: {result}") + print(f"\nAll spans in this trace include 'gentrace.pipeline_id': '{pipeline_id}'") + print("Check the Gentrace dashboard to see all traces associated with this pipeline!") + except Exception as e: + print(f"\nError running pipeline: {e}") + raise + + print("\nShutting down...") + # Force flush to ensure all spans are sent + tracer_provider.force_flush() + + # Add a small delay to ensure spans are processed + import time + time.sleep(1) \ No newline at end of file diff --git a/examples/auto_trace/pipeline_example_app/__init__.py b/examples/auto_trace/pipeline_example_app/__init__.py new file mode 100644 index 00000000..c304885f --- /dev/null +++ b/examples/auto_trace/pipeline_example_app/__init__.py @@ -0,0 +1 @@ +"""Example application demonstrating pipeline-aware auto-tracing.""" \ No newline at end of file diff --git a/examples/auto_trace/pipeline_example_app/workflow.py b/examples/auto_trace/pipeline_example_app/workflow.py new file mode 100644 index 00000000..f98703c1 --- /dev/null +++ b/examples/auto_trace/pipeline_example_app/workflow.py @@ -0,0 +1,105 @@ +"""Example workflow that will be auto-traced with pipeline_id.""" + +import time +from typing import Any, Dict, List + + +def run_data_processing_pipeline() -> Dict[str, Any]: + """Main pipeline entry point - this and all called functions will have pipeline_id.""" + print("Starting data processing pipeline...") + + # Step 1: Extract data + print("Step 1: Extracting data...") + data = extract_data() + print(f" Extracted {len(data)} records") + + # Step 2: Transform data + print("Step 2: Transforming data...") + transformed = transform_data(data) + print(f" Transformed {len(transformed)} records") + + # Step 3: Load results + print("Step 3: Loading results...") + result = load_results(transformed) + print(" Results loaded successfully") + + return result + + +def extract_data() -> List[Dict[str, Any]]: + """Extract data from source.""" + time.sleep(0.05) # Simulate work + + # Simulate extracting data + data = [] + for i in range(5): # Reduced for faster example + record = process_record(i) + data.append(record) + + return data + + +def process_record(record_id: int) -> Dict[str, Any]: + """Process individual record.""" + # This will also be traced with the pipeline_id + return { + 'id': record_id, + 'value': record_id * 10, + 'status': 'extracted' + } + + +def transform_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Transform the extracted data.""" + transformed = [] + for record in data: + # Apply transformation + result = apply_transformation(record) + if validate_transformation(result): + transformed.append(result) + + return transformed + + +def apply_transformation(record: Dict[str, Any]) -> Dict[str, Any]: + """Apply transformation to a single record.""" + time.sleep(0.01) # Simulate work + + return { + **record, + 'value': record['value'] * 2, + 'status': 'transformed', + 'transform_timestamp': time.time() + } + + +def validate_transformation(record: Dict[str, Any]) -> bool: + """Validate the transformed record.""" + return record.get('value', 0) > 0 and 'transform_timestamp' in record + + +def load_results(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Load the results to destination.""" + time.sleep(0.05) # Simulate work + + summary = generate_summary(data) + + return { + 'status': 'success', + 'records_processed': len(data), + 'summary': summary, + 'pipeline_complete': True + } + + +def generate_summary(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Generate summary statistics.""" + if not data: + return {'total': 0, 'average': 0} + + values = [r.get('value', 0) for r in data] + return { + 'total': sum(values), + 'average': sum(values) / len(values), + 'count': len(data) + } \ No newline at end of file diff --git a/src/gentrace/__init__.py b/src/gentrace/__init__.py index 5afdf300..31efee8e 100644 --- a/src/gentrace/__init__.py +++ b/src/gentrace/__init__.py @@ -106,10 +106,12 @@ def __getattr__(self, name: str) -> _t.Any: ATTR_GENTRACE_FN_ARGS_EVENT_NAME, ATTR_GENTRACE_FN_OUTPUT_EVENT_NAME, ) +from .lib.auto_trace import install_auto_tracing from .lib.experiment import experiment from .lib.interaction import interaction from .lib.eval_dataset import TestInput, eval_dataset from .lib.span_processor import GentraceSpanProcessor +from .lib.auto_trace.rewrite_ast import no_auto_trace ### End custom Gentrace imports @@ -173,6 +175,8 @@ def __getattr__(self, name: str) -> _t.Any: "Experiment", "Dataset", "Pipeline", + "install_auto_tracing", + "no_auto_trace", # End custom Gentrace exports ] diff --git a/src/gentrace/lib/auto_trace/__init__.py b/src/gentrace/lib/auto_trace/__init__.py new file mode 100644 index 00000000..cb4b8ff3 --- /dev/null +++ b/src/gentrace/lib/auto_trace/__init__.py @@ -0,0 +1,91 @@ +"""Automatic tracing functionality for Gentrace using AST transformation.""" + +from __future__ import annotations + +import sys +import uuid +import warnings +from typing import TYPE_CHECKING, Union, Literal, Callable, Optional, Sequence + +from .types import AutoTraceModule +from .import_hook import GentraceFinder + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + + +def install_auto_tracing( + modules: Union[Sequence[str], Callable[[AutoTraceModule], bool]], + *, + min_duration: float = 0, + check_imported_modules: Literal['error', 'warn', 'ignore'] = 'error', + tracer: Optional['Tracer'] = None, + pipeline_id: Optional[str] = None, +) -> None: + """Install automatic tracing for specified modules. + + Args: + modules: Either a list of module name prefixes to trace, or a callable + that returns True for modules that should be traced + min_duration: Minimum duration in seconds for a function to be traced + check_imported_modules: How to handle modules that have already been imported + tracer: OpenTelemetry tracer to use (defaults to gentrace tracer) + pipeline_id: Optional Gentrace pipeline ID to associate all auto-traced spans with. + Must be a valid UUID string if provided. + """ + if isinstance(modules, Sequence): + modules = modules_func_from_sequence(modules) # type: ignore + + if not callable(modules): + raise TypeError('modules must be a list of strings or a callable') + + if check_imported_modules not in ('error', 'warn', 'ignore'): + raise ValueError('check_imported_modules must be one of "error", "warn", or "ignore"') + + # Validate pipeline_id if provided + if pipeline_id is not None: + try: + uuid.UUID(pipeline_id) + except ValueError as e: + raise ValueError( + f"pipeline_id must be a valid UUID string. Received: '{pipeline_id}'" + ) from e + + if check_imported_modules != 'ignore': + for module in list(sys.modules.values()): + try: + auto_trace_module = AutoTraceModule(module.__name__, module.__file__) + except Exception: + continue + + if modules(auto_trace_module): + if check_imported_modules == 'error': + raise AutoTraceModuleAlreadyImportedException( + f'The module {module.__name__!r} matches modules to trace, but it has already been imported. ' + f'Either call `install_auto_tracing` earlier, ' + f"or set `check_imported_modules` to 'warn' or 'ignore'." + ) + else: + warnings.warn( + f'The module {module.__name__!r} matches modules to trace, but it has already been imported. ' + f'Either call `install_auto_tracing` earlier, ' + f"or set `check_imported_modules` to 'ignore'.", + AutoTraceModuleAlreadyImportedWarning, + stacklevel=2, + ) + + min_duration_ns = int(min_duration * 1_000_000_000) + finder = GentraceFinder(modules, min_duration_ns, tracer, pipeline_id) + sys.meta_path.insert(0, finder) + + +def modules_func_from_sequence(modules: Sequence[str]) -> Callable[[AutoTraceModule], bool]: + return lambda module: module.parts_start_with(modules) + + +class AutoTraceModuleAlreadyImportedException(Exception): + pass + + +class AutoTraceModuleAlreadyImportedWarning(Warning): + pass \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/ast_utils.py b/src/gentrace/lib/auto_trace/ast_utils.py new file mode 100644 index 00000000..fb7b7b1b --- /dev/null +++ b/src/gentrace/lib/auto_trace/ast_utils.py @@ -0,0 +1,108 @@ +"""AST transformation utilities for Gentrace auto-tracing.""" + +from __future__ import annotations + +import ast +from typing import Any, Dict, List, Union, cast +from dataclasses import dataclass + + +@dataclass +class BaseTransformer(ast.NodeTransformer): + """Helper for rewriting ASTs to wrap function bodies in `with tracer.start_as_current_span(...):`.""" + + span_name_prefix: str + filename: str + module_name: str + + def __post_init__(self): + # Names of functions and classes that we're currently inside, + # so we can construct the qualified name of the current function. + self.qualname_stack: List[str] = [] + + def visit_ClassDef(self, node: ast.ClassDef): + self.qualname_stack.append(node.name) + # We need to call generic_visit here to modify any functions defined inside the class. + node = cast(ast.ClassDef, self.generic_visit(node)) + self.qualname_stack.pop() + return node + + def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): + self.qualname_stack.append(node.name) + qualname = '.'.join(self.qualname_stack) + self.qualname_stack.append('') + # We need to call generic_visit here to modify any classes/functions nested inside. + self.generic_visit(node) + self.qualname_stack.pop() # + self.qualname_stack.pop() # node.name + + return self.rewrite_function(node, qualname) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef): + return self.visit_FunctionDef(node) + + def rewrite_function(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.AST: + # Replace the body of the function with: + # with : + # + body = node.body.copy() + new_body: List[ast.stmt] = [] + if ( + body + and isinstance(body[0], ast.Expr) + and isinstance(body[0].value, ast.Constant) + and isinstance(body[0].value.value, str) + ): + # If the first statement is just a string literal, it's a docstring. + # Keep it as the first statement in the new body, not wrapped in a span, + # so it's still recognized as a docstring. + new_body.append(body.pop(0)) + + # Ignore functions with a trivial/empty body: + # - If `body` is empty, that means it originally was just a docstring that got popped above. + # - If `body` is just a single `pass` statement + # - If `body` is just a constant expression, particularly an ellipsis (`...`) + if not body or ( + len(body) == 1 + and ( + isinstance(body[0], ast.Pass) + or (isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Constant)) + ) + ): + return node + + span = ast.With( + items=[ + ast.withitem( + context_expr=self.create_span_call_node(node, qualname), + ) + ], + body=body, + type_comment=node.type_comment, + ) + new_body.append(span) + + return ast.fix_missing_locations( + ast.copy_location( + type(node)( # type: ignore + name=node.name, + args=node.args, + body=new_body, + decorator_list=node.decorator_list, + returns=node.returns, + type_comment=node.type_comment, + ), + node, + ) + ) + + def create_span_call_node(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.Call: + raise NotImplementedError() + + def get_span_attributes(self, qualname: str, lineno: int) -> Dict[str, Any]: + """Get the attributes to set on the span.""" + return { + 'code.filepath': self.filename, + 'code.lineno': lineno, + 'code.function': qualname, + } \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/import_hook.py b/src/gentrace/lib/auto_trace/import_hook.py new file mode 100644 index 00000000..4c7f3c12 --- /dev/null +++ b/src/gentrace/lib/auto_trace/import_hook.py @@ -0,0 +1,127 @@ +"""Import hook for Gentrace auto-tracing.""" + +from __future__ import annotations + +import ast +import sys +from types import ModuleType +from typing import TYPE_CHECKING, Any, Dict, Callable, Iterator, Optional, Sequence, cast +from dataclasses import dataclass +from importlib.abc import Loader, MetaPathFinder +from importlib.util import spec_from_loader +from importlib.machinery import ModuleSpec + +from .types import AutoTraceModule +from .rewrite_ast import compile_source + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + + +@dataclass +class GentraceFinder(MetaPathFinder): + """The import hook entry point, inserted into `sys.meta_path` to apply AST rewriting to matching modules.""" + + modules_filter: Callable[[AutoTraceModule], bool] + min_duration: int + tracer: Optional['Tracer'] + pipeline_id: Optional[str] = None + + def find_spec( + self, fullname: str, path: Optional[Sequence[str]], target: Optional[ModuleType] = None + ) -> Optional[ModuleSpec]: + """This is the method that is called by the import system.""" + for plain_spec in self._find_plain_specs(fullname, path, target): + # Not all loaders have get_source, but it's an abstract method of the standard ABC InspectLoader. + # In particular it's implemented by `importlib.machinery.SourceFileLoader` + # which is provided by default. + get_source = getattr(plain_spec.loader, 'get_source', None) + if not callable(get_source): + continue + + try: + source = cast(str, get_source(fullname)) + except Exception: + continue + + if not source: + continue + + # We fully expect plain_spec.origin and self.get_filename(...) + # to be the same thing (a valid filename), but they're optional. + filename = plain_spec.origin + if not filename: + try: + filename = cast('Optional[str]', plain_spec.loader.get_filename(fullname)) # type: ignore + except Exception: + pass + + if not self.modules_filter(AutoTraceModule(fullname, filename)): + return None # tell the import system to try the next meta path finder + + try: + tree = ast.parse(source) + except Exception: + # The plain finder gave us invalid source code. Try another one. + continue + + filename = filename or f'<{fullname}>' + + try: + execute = compile_source(tree, filename, fullname, self.min_duration, self.tracer, self.pipeline_id) + except Exception: + # Auto-tracing failed with an unexpected error. Ensure that this doesn't crash the whole application. + return None # tell the import system to try the next meta path finder + + loader = GentraceLoader(plain_spec, execute) + return spec_from_loader(fullname, loader) + + def _find_plain_specs( + self, fullname: str, path: Optional[Sequence[str]], target: Optional[ModuleType] + ) -> Iterator[ModuleSpec]: + """Yield module specs returned by other finders on `sys.meta_path`.""" + for finder in sys.meta_path: + # Skip this finder or any like it to avoid infinite recursion. + if isinstance(finder, GentraceFinder): + continue + + try: + plain_spec = finder.find_spec(fullname, path, target) + except Exception: + continue + + if plain_spec: + yield plain_spec + + +@dataclass +class GentraceLoader(Loader): + """An import loader produced by GentraceFinder which executes a modified AST of the module's source code.""" + + plain_spec: ModuleSpec + """A spec for the module that was returned by another meta path finder.""" + + execute: Callable[[Dict[str, Any]], None] + """A function which accepts module globals and executes the compiled code.""" + + def exec_module(self, module: ModuleType): + """Execute a modified AST of the module's source code in the module's namespace.""" + self.execute(module.__dict__) + + # This is required when `exec_module` is defined. + # It returns None to indicate that the usual module creation process should be used. + def create_module(self, spec: ModuleSpec): + return None + + def get_code(self, _name: str): + # `python -m` uses the `runpy` module which calls this method instead of going through the normal protocol. + # So return some code which can be executed with the module namespace. + # Here `__loader__` will be this object, i.e. `self`. + source = '__loader__.execute(globals())' + return compile(source, '', 'exec', dont_inherit=True) + + def __getattr__(self, item: str): + """Forward some methods to the plain spec's loader (likely a `SourceFileLoader`) if they exist.""" + if item in {'get_filename', 'is_package'}: + return getattr(self.plain_spec.loader, item) + raise AttributeError(item) \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/rewrite_ast.py b/src/gentrace/lib/auto_trace/rewrite_ast.py new file mode 100644 index 00000000..8ab32cd1 --- /dev/null +++ b/src/gentrace/lib/auto_trace/rewrite_ast.py @@ -0,0 +1,238 @@ +"""AST rewriting for Gentrace auto-tracing.""" + +from __future__ import annotations + +import ast +import time +import uuid +from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable, Optional, ContextManager +from collections import deque +from dataclasses import dataclass + +from opentelemetry import trace + +from .ast_utils import BaseTransformer + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + + +def compile_source( + tree: ast.AST, + filename: str, + module_name: str, + min_duration: int, + tracer: Optional['Tracer'] = None, + pipeline_id: Optional[str] = None, +) -> Callable[[Dict[str, Any]], None]: + """Compile a modified AST of the module's source code. + + Returns a function which accepts module globals and executes the compiled code. + """ + gentrace_name = f'gentrace_{uuid.uuid4().hex}' + context_factories: List[Callable[[], ContextManager[Any]]] = [] + tree = rewrite_ast(tree, filename, gentrace_name, module_name, context_factories, min_duration, tracer, pipeline_id) + assert isinstance(tree, ast.Module) # for type checking + # dont_inherit=True is necessary to prevent the module from inheriting the __future__ import from this module. + code = compile(tree, filename, 'exec', dont_inherit=True) + + def execute(globs: Dict[str, Any]): + globs[gentrace_name] = context_factories + exec(code, globs, globs) + + return execute + + +def rewrite_ast( + tree: ast.AST, + filename: str, + gentrace_name: str, + module_name: str, + context_factories: List[Callable[[], ContextManager[Any]]], + min_duration: int, + tracer: Optional['Tracer'] = None, + pipeline_id: Optional[str] = None, +) -> ast.AST: + transformer = AutoTraceTransformer( + gentrace_name, filename, module_name, context_factories, min_duration, tracer, pipeline_id + ) + return transformer.visit(tree) + + +@dataclass +class AutoTraceTransformer(BaseTransformer): + """Trace all encountered functions except those explicitly marked with `@no_auto_trace`.""" + + context_factories: List[Callable[[], ContextManager[Any]]] + min_duration: int + tracer: Optional['Tracer'] + pipeline_id: Optional[str] = None + + def check_no_auto_trace(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef]) -> bool: + """Return true if the node has a `@no_auto_trace` decorator.""" + return any( + ( + isinstance(node, ast.Name) + and node.id == 'no_auto_trace' + or ( + isinstance(node, ast.Attribute) + and node.attr == 'no_auto_trace' + and isinstance(node.value, ast.Name) + and node.value.id == 'gentrace' + ) + ) + for node in node.decorator_list + ) + + def visit_ClassDef(self, node: ast.ClassDef): + if self.check_no_auto_trace(node): + return node + + return super().visit_ClassDef(node) + + def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): + if self.check_no_auto_trace(node): + return node + + return super().visit_FunctionDef(node) + + def rewrite_function(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.AST: + if has_yield(node): + return node + + return super().rewrite_function(node, qualname) + + def get_span_attributes(self, qualname: str, lineno: int) -> Dict[str, Any]: + """Get the attributes to set on the span.""" + # Note: pipeline_id will be added dynamically only to root spans + return super().get_span_attributes(qualname, lineno) + + def create_span_call_node(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.Call: + # See the compile_source docstring + index = len(self.context_factories) + + # Get or create tracer + tracer = self.tracer or trace.get_tracer("gentrace") + + # Create span factory + span_name = f'{self.module_name}.{qualname}' + attributes = self.get_span_attributes(qualname, node.lineno) + + # When pipeline_id is set, we need special handling + if self.pipeline_id is not None: + from opentelemetry import trace as otel_trace, baggage as otel_baggage, context as otel_context + + from ..constants import ATTR_GENTRACE_SAMPLE_KEY, ATTR_GENTRACE_PIPELINE_ID + + def create_span_with_pipeline_support(): + """Create span with pipeline_id support.""" + current_context = otel_context.get_current() + + # Check if this is a root span (no active span in context) + current_span = otel_trace.get_current_span(current_context) + is_root_span = current_span is None or not current_span.is_recording() + + # Prepare attributes - only add pipeline_id to root spans + span_attributes = attributes.copy() + if is_root_span: + span_attributes[ATTR_GENTRACE_PIPELINE_ID] = self.pipeline_id + + # For root spans, set baggage context + if is_root_span: + # Check if baggage is already set + existing_baggage = otel_baggage.get_baggage(ATTR_GENTRACE_SAMPLE_KEY, context=current_context) + if existing_baggage is None: + # Set baggage context (similar to @interaction decorator) + context_with_baggage = otel_baggage.set_baggage( + ATTR_GENTRACE_SAMPLE_KEY, "true", context=current_context + ) + token = otel_context.attach(context_with_baggage) + + # Create span with the appropriate attributes + span = tracer.start_as_current_span(span_name, attributes=span_attributes) + + # Wrap to ensure we detach the baggage context + class SpanWithBaggageCleanup: + def __init__(self, span, token): + self.span = span + self.token = token + + def __enter__(self): + return self.span.__enter__() + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + return self.span.__exit__(exc_type, exc_val, exc_tb) + finally: + otel_context.detach(self.token) + + return SpanWithBaggageCleanup(span, token) + + # For non-root spans or if baggage already set, create normal span + return tracer.start_as_current_span(span_name, attributes=span_attributes) + + span_factory = create_span_with_pipeline_support + else: + # No pipeline_id, use standard span creation + span_factory = lambda: tracer.start_as_current_span(span_name, attributes=attributes) + + if self.min_duration > 0: + # This needs to be as fast as possible since it's the cost of auto-tracing a function + # that never actually gets instrumented because its calls are all faster than `min_duration`. + class MeasureTime: + __slots__ = 'start' + + def __enter__(_self): + _self.start = time.perf_counter_ns() + + def __exit__(_self, *_): + if time.perf_counter_ns() - _self.start >= self.min_duration: + self.context_factories[index] = span_factory + + self.context_factories.append(MeasureTime) + else: + self.context_factories.append(span_factory) + + # This node means: + # context_factories[index]() + # where `context_factories` is a global variable with the name `self.span_name_prefix` + # pointing to the `self.context_factories` list. + return ast.Call( + func=ast.Subscript( + value=ast.Name(id=self.span_name_prefix, ctx=ast.Load()), + slice=ast.Index(value=ast.Constant(value=index)), # type: ignore + ctx=ast.Load(), + ), + args=[], + keywords=[], + ) + + +T = TypeVar('T') + + +def no_auto_trace(x: T) -> T: + """Decorator to prevent a function/class from being traced by `gentrace.install_auto_tracing`. + + This is useful for small functions that are called very frequently and would generate too much noise. + + The decorator is detected at import time. + Only `@no_auto_trace` or `@gentrace.no_auto_trace` are supported. + + Any decorated function, or any function defined anywhere inside a decorated function/class, + will be completely ignored by auto-tracing. + + This decorator simply returns the argument unchanged, so there is zero runtime overhead. + """ + return x + + +def has_yield(node: ast.AST): + queue = deque([node]) + while queue: + node = queue.popleft() + for child in ast.iter_child_nodes(node): + if isinstance(child, (ast.Yield, ast.YieldFrom)): + return True + if not isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)): + queue.append(child) \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/types.py b/src/gentrace/lib/auto_trace/types.py new file mode 100644 index 00000000..33b97e44 --- /dev/null +++ b/src/gentrace/lib/auto_trace/types.py @@ -0,0 +1,22 @@ +"""Types for auto-tracing functionality.""" + +from __future__ import annotations + +from typing import List, Tuple, Union, Optional + + +class AutoTraceModule: + """Information about a module that might be auto-traced.""" + + def __init__(self, name: str, filename: Optional[str]): + self.name = name + self.filename = filename + + def parts_start_with(self, prefixes: Union[Tuple[str, ...], List[str]]) -> bool: + """Check if the module name starts with any of the given prefixes.""" + parts = self.name.split('.') + for prefix in prefixes: + prefix_parts = prefix.split('.') + if len(parts) >= len(prefix_parts) and parts[:len(prefix_parts)] == prefix_parts: + return True + return False \ No newline at end of file diff --git a/tests/test_auto_trace.py b/tests/test_auto_trace.py new file mode 100644 index 00000000..ff21587b --- /dev/null +++ b/tests/test_auto_trace.py @@ -0,0 +1,259 @@ +"""Tests for the auto-trace functionality.""" + +import ast +import sys +from unittest.mock import Mock + +import pytest + + +class TestAutoTrace: + """Test the auto-trace AST transformation functionality.""" + + @pytest.fixture + def mock_tracer(self): + """Create a mock tracer for testing.""" + tracer = Mock() + span = Mock() + span.__enter__ = Mock(return_value=span) + span.__exit__ = Mock(return_value=None) + tracer.start_as_current_span.return_value = span + return tracer + + def test_compile_source_basic(self, mock_tracer): + """Test basic AST compilation with auto-tracing.""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + # Simple test code + code = """ +def add(a, b): + return a + b + +result = add(1, 2) +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs = {} + execute(globs) + + # Check that the function was defined and works + assert 'add' in globs + assert globs['result'] == 3 + + # Check that tracer was called + mock_tracer.start_as_current_span.assert_called() + + def test_no_auto_trace_decorator(self, mock_tracer): + """Test that @no_auto_trace decorator prevents tracing.""" + from gentrace.lib.auto_trace.rewrite_ast import no_auto_trace, compile_source + + # Code with no_auto_trace decorator + code = """ +from gentrace.lib.auto_trace.rewrite_ast import no_auto_trace + +@no_auto_trace +def skipped(): + return "not traced" + +def traced(): + return "traced" + +r1 = skipped() +r2 = traced() +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute with the decorator available + globs = {'no_auto_trace': no_auto_trace} + execute(globs) + + # Check results + assert globs['r1'] == "not traced" + assert globs['r2'] == "traced" + + # Check that only one function was traced + assert mock_tracer.start_as_current_span.call_count == 1 + + def test_class_methods_traced(self, mock_tracer): + """Test that class methods are traced.""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + code = """ +class Calculator: + def add(self, a, b): + return a + b + + @classmethod + def multiply(cls, a, b): + return a * b + +calc = Calculator() +r1 = calc.add(2, 3) +r2 = Calculator.multiply(4, 5) +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs = {} + execute(globs) + + # Check results + assert globs['r1'] == 5 + assert globs['r2'] == 20 + + # Check that methods were traced + assert mock_tracer.start_as_current_span.call_count >= 2 + + def test_generator_functions_not_traced(self, mock_tracer): + """Test that generator functions are not traced (they have yield).""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + code = """ +def generator(): + yield 1 + yield 2 + +def regular(): + return list(generator()) + +result = regular() +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs = {} + execute(globs) + + # Check results + assert globs['result'] == [1, 2] + + # Only regular() should be traced, not generator() + assert mock_tracer.start_as_current_span.call_count == 1 + + def test_install_auto_tracing(self): + """Test the install_auto_tracing function.""" + from gentrace.lib.auto_trace import GentraceFinder, install_auto_tracing + + # Install auto-tracing for a test module pattern + install_auto_tracing(['test_dummy_module'], check_imported_modules='ignore') + + # Check that the finder was added to sys.meta_path + finder = None + for item in sys.meta_path: + if isinstance(item, GentraceFinder): + finder = item + break + + assert finder is not None + + # Clean up + sys.meta_path.remove(finder) + + def test_install_auto_tracing_with_pipeline_id(self): + """Test install_auto_tracing with pipeline_id.""" + import uuid + + from gentrace.lib.auto_trace import GentraceFinder, install_auto_tracing + + pipeline_id = str(uuid.uuid4()) + + # Install auto-tracing with pipeline_id + install_auto_tracing( + ['test_dummy_module'], + check_imported_modules='ignore', + pipeline_id=pipeline_id + ) + + # Check that the finder was added with pipeline_id + finder = None + for item in sys.meta_path: + if isinstance(item, GentraceFinder): + finder = item + break + + assert finder is not None + assert finder.pipeline_id == pipeline_id + + # Clean up + sys.meta_path.remove(finder) + + def test_install_auto_tracing_invalid_pipeline_id(self): + """Test that invalid pipeline_id raises ValueError.""" + from gentrace.lib.auto_trace import install_auto_tracing + + with pytest.raises(ValueError, match="pipeline_id must be a valid UUID"): + install_auto_tracing( + ['test_dummy_module'], + check_imported_modules='ignore', + pipeline_id="not-a-uuid" + ) + + def test_compile_source_with_pipeline_id(self, mock_tracer): + """Test that pipeline_id is handled correctly for auto-traced functions.""" + import uuid + + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + pipeline_id = str(uuid.uuid4()) + + # Simple test code + code = """ +def test_func(): + return "hello" + +result = test_func() +""" + + # Parse and compile with pipeline_id + tree = ast.parse(code) + execute = compile_source( + tree, '', 'test_module', + min_duration=0, + tracer=mock_tracer, + pipeline_id=pipeline_id + ) + + # Execute + globs = {} + execute(globs) + + # The function should have been called to create a span + mock_tracer.start_as_current_span.assert_called() + + # Note: The actual pipeline_id attribute is added dynamically at runtime + # based on whether it's a root span or not, so we can't test it directly + # without mocking the OpenTelemetry context + + def test_ast_transformation_preserves_docstrings(self, mock_tracer): + """Test that docstrings are preserved during AST transformation.""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + code = ''' +def documented(): + """This is a docstring.""" + return 42 +''' + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs = {} + execute(globs) + + # Check that docstring is preserved + assert globs['documented'].__doc__ == "This is a docstring." \ No newline at end of file From 1b3d2bc7da8fccf4445c9ec5f5ba5164cef0773c Mon Sep 17 00:00:00 2001 From: Vivek Nair Date: Sat, 7 Jun 2025 14:23:47 -0400 Subject: [PATCH 2/6] feat: Add decorator compatibility for auto-tracing code --- examples/auto_trace/README.md | 15 +++++++- .../auto_trace/auto_trace_pipeline_example.py | 6 ++- .../pipeline_example_app/workflow.py | 37 +++++++++++++++++-- src/gentrace/lib/auto_trace/import_hook.py | 2 +- 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/examples/auto_trace/README.md b/examples/auto_trace/README.md index cea761e4..39725279 100644 --- a/examples/auto_trace/README.md +++ b/examples/auto_trace/README.md @@ -35,6 +35,7 @@ auto_trace/ 3. **Deep Span Trees**: The pipeline creates a multi-level hierarchy of spans showing the call flow 4. **OpenTelemetry Integration**: Proper span context propagation with parent-child relationships 5. **Root Span Attribution**: Only the root span includes the `gentrace.pipeline_id` attribute +6. **Decorator Compatibility**: The `@interaction` decorator can be used alongside auto-tracing, with decorator settings taking precedence ## Running the Example @@ -56,14 +57,15 @@ The example will: The automatic tracing creates a span tree like this: ``` -run_data_processing_pipeline [root - has pipeline_id] +run_data_processing_pipeline [root - has pipeline_id from auto-tracing] ├── extract_data │ └── process_record (multiple calls) ├── transform_data │ ├── apply_transformation (multiple calls) │ └── validate_transformation (multiple calls) └── load_results - └── generate_summary + ├── generate_summary [auto-traced] + └── Manual Summary Enrichment [@traced decorator - custom attributes] ``` ## Key Concepts @@ -88,6 +90,15 @@ When a pipeline_id is provided: - This allows the Gentrace backend to associate the entire trace tree with the pipeline - All spans in the trace are connected via the same `traceId` +### Mixing Auto-Tracing with Manual Tracing + +The example demonstrates that manual tracing decorators can be used within auto-traced code: +- Functions marked with `@no_auto_trace` are excluded from automatic tracing +- The `@traced` decorator can be used within auto-traced code for fine-grained control +- Manual traces can add custom attributes and names +- Both auto-traced and manually traced spans maintain proper parent-child relationships +- This allows selective manual instrumentation while maintaining automatic tracing for the rest of the code + ### OpenTelemetry Configuration The example shows proper OpenTelemetry setup with: diff --git a/examples/auto_trace/auto_trace_pipeline_example.py b/examples/auto_trace/auto_trace_pipeline_example.py index 30228643..1f00231a 100644 --- a/examples/auto_trace/auto_trace_pipeline_example.py +++ b/examples/auto_trace/auto_trace_pipeline_example.py @@ -90,8 +90,10 @@ print("\n" + "=" * 60) print(f"Pipeline completed successfully!") print(f"Result: {result}") - print(f"\nAll spans in this trace include 'gentrace.pipeline_id': '{pipeline_id}'") - print("Check the Gentrace dashboard to see all traces associated with this pipeline!") + print(f"\nMost spans include 'gentrace.pipeline_id': '{pipeline_id}'") + print("Note: The 'enrich_summary_manually' function uses @traced decorator") + print("to demonstrate manual tracing within auto-traced code.") + print("\nCheck the Gentrace dashboard to see the mixed trace!") except Exception as e: print(f"\nError running pipeline: {e}") raise diff --git a/examples/auto_trace/pipeline_example_app/workflow.py b/examples/auto_trace/pipeline_example_app/workflow.py index f98703c1..b29fd848 100644 --- a/examples/auto_trace/pipeline_example_app/workflow.py +++ b/examples/auto_trace/pipeline_example_app/workflow.py @@ -3,6 +3,8 @@ import time from typing import Any, Dict, List +import gentrace + def run_data_processing_pipeline() -> Dict[str, Any]: """Main pipeline entry point - this and all called functions will have pipeline_id.""" @@ -82,24 +84,53 @@ def load_results(data: List[Dict[str, Any]]) -> Dict[str, Any]: """Load the results to destination.""" time.sleep(0.05) # Simulate work + # Generate summary with auto-tracing summary = generate_summary(data) + # Also demonstrate manual tracing within auto-traced code + enriched_summary = enrich_summary_manually(summary) + return { 'status': 'success', 'records_processed': len(data), - 'summary': summary, + 'summary': enriched_summary, 'pipeline_complete': True } +@gentrace.no_auto_trace # Exclude from auto-tracing +def enrich_summary_manually(summary: Dict[str, Any]) -> Dict[str, Any]: + """Function that uses @traced decorator for manual tracing.""" + # Use the traced decorator directly + @gentrace.traced( + name="Manual Summary Enrichment", + attributes={"custom_attribute": "manual_trace", "enrichment_type": "statistics"} + ) + def _enrich() -> Dict[str, Any]: + time.sleep(0.01) # Simulate work + return { + **summary, + 'enriched': True, + 'percentile_95': summary.get('average', 0) * 1.5 if summary else 0, + 'traced_manually': True + } + + return _enrich() + + def generate_summary(data: List[Dict[str, Any]]) -> Dict[str, Any]: - """Generate summary statistics.""" + """Generate summary statistics (auto-traced).""" if not data: return {'total': 0, 'average': 0} values = [r.get('value', 0) for r in data] + + # Add a small simulated calculation + time.sleep(0.02) # Simulate computation time + return { 'total': sum(values), 'average': sum(values) / len(values), - 'count': len(data) + 'count': len(data), + 'interaction_wrapped': True # Marker to show this was wrapped } \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/import_hook.py b/src/gentrace/lib/auto_trace/import_hook.py index 4c7f3c12..462cadd0 100644 --- a/src/gentrace/lib/auto_trace/import_hook.py +++ b/src/gentrace/lib/auto_trace/import_hook.py @@ -110,7 +110,7 @@ def exec_module(self, module: ModuleType): # This is required when `exec_module` is defined. # It returns None to indicate that the usual module creation process should be used. - def create_module(self, spec: ModuleSpec): + def create_module(self, spec: ModuleSpec): # noqa: ARG002 return None def get_code(self, _name: str): From 30caa29e87dcaa50b4110cd2228639770de32d41 Mon Sep 17 00:00:00 2001 From: Vivek Nair Date: Sat, 7 Jun 2025 14:29:22 -0400 Subject: [PATCH 3/6] chore: Update type annotations and ignore pyright warnings --- tests/test_auto_trace.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/tests/test_auto_trace.py b/tests/test_auto_trace.py index ff21587b..b9296e46 100644 --- a/tests/test_auto_trace.py +++ b/tests/test_auto_trace.py @@ -2,16 +2,19 @@ import ast import sys +from typing import Any from unittest.mock import Mock import pytest +# pyright: reportUnknownMemberType=false, reportUnknownParameterType=false, reportUnknownArgumentType=false + class TestAutoTrace: """Test the auto-trace AST transformation functionality.""" @pytest.fixture - def mock_tracer(self): + def mock_tracer(self) -> Mock: """Create a mock tracer for testing.""" tracer = Mock() span = Mock() @@ -20,7 +23,7 @@ def mock_tracer(self): tracer.start_as_current_span.return_value = span return tracer - def test_compile_source_basic(self, mock_tracer): + def test_compile_source_basic(self, mock_tracer: Mock) -> None: """Test basic AST compilation with auto-tracing.""" from gentrace.lib.auto_trace.rewrite_ast import compile_source @@ -37,7 +40,7 @@ def add(a, b): execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) # Execute - globs = {} + globs: dict[str, Any] = {} execute(globs) # Check that the function was defined and works @@ -47,7 +50,7 @@ def add(a, b): # Check that tracer was called mock_tracer.start_as_current_span.assert_called() - def test_no_auto_trace_decorator(self, mock_tracer): + def test_no_auto_trace_decorator(self, mock_tracer: Mock) -> None: """Test that @no_auto_trace decorator prevents tracing.""" from gentrace.lib.auto_trace.rewrite_ast import no_auto_trace, compile_source @@ -81,7 +84,7 @@ def traced(): # Check that only one function was traced assert mock_tracer.start_as_current_span.call_count == 1 - def test_class_methods_traced(self, mock_tracer): + def test_class_methods_traced(self, mock_tracer: Mock) -> None: """Test that class methods are traced.""" from gentrace.lib.auto_trace.rewrite_ast import compile_source @@ -104,7 +107,7 @@ def multiply(cls, a, b): execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) # Execute - globs = {} + globs: dict[str, Any] = {} execute(globs) # Check results @@ -114,7 +117,7 @@ def multiply(cls, a, b): # Check that methods were traced assert mock_tracer.start_as_current_span.call_count >= 2 - def test_generator_functions_not_traced(self, mock_tracer): + def test_generator_functions_not_traced(self, mock_tracer: Mock) -> None: """Test that generator functions are not traced (they have yield).""" from gentrace.lib.auto_trace.rewrite_ast import compile_source @@ -134,7 +137,7 @@ def regular(): execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) # Execute - globs = {} + globs: dict[str, Any] = {} execute(globs) # Check results @@ -143,7 +146,7 @@ def regular(): # Only regular() should be traced, not generator() assert mock_tracer.start_as_current_span.call_count == 1 - def test_install_auto_tracing(self): + def test_install_auto_tracing(self) -> None: """Test the install_auto_tracing function.""" from gentrace.lib.auto_trace import GentraceFinder, install_auto_tracing @@ -162,7 +165,7 @@ def test_install_auto_tracing(self): # Clean up sys.meta_path.remove(finder) - def test_install_auto_tracing_with_pipeline_id(self): + def test_install_auto_tracing_with_pipeline_id(self) -> None: """Test install_auto_tracing with pipeline_id.""" import uuid @@ -190,7 +193,7 @@ def test_install_auto_tracing_with_pipeline_id(self): # Clean up sys.meta_path.remove(finder) - def test_install_auto_tracing_invalid_pipeline_id(self): + def test_install_auto_tracing_invalid_pipeline_id(self) -> None: """Test that invalid pipeline_id raises ValueError.""" from gentrace.lib.auto_trace import install_auto_tracing @@ -201,7 +204,7 @@ def test_install_auto_tracing_invalid_pipeline_id(self): pipeline_id="not-a-uuid" ) - def test_compile_source_with_pipeline_id(self, mock_tracer): + def test_compile_source_with_pipeline_id(self, mock_tracer: Mock) -> None: """Test that pipeline_id is handled correctly for auto-traced functions.""" import uuid @@ -227,7 +230,7 @@ def test_func(): ) # Execute - globs = {} + globs: dict[str, Any] = {} execute(globs) # The function should have been called to create a span @@ -237,7 +240,7 @@ def test_func(): # based on whether it's a root span or not, so we can't test it directly # without mocking the OpenTelemetry context - def test_ast_transformation_preserves_docstrings(self, mock_tracer): + def test_ast_transformation_preserves_docstrings(self, mock_tracer: Mock) -> None: """Test that docstrings are preserved during AST transformation.""" from gentrace.lib.auto_trace.rewrite_ast import compile_source @@ -252,7 +255,7 @@ def documented(): execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) # Execute - globs = {} + globs: dict[str, Any] = {} execute(globs) # Check that docstring is preserved From 236bb6c261b533c000222c9656a996f15b2c5621 Mon Sep 17 00:00:00 2001 From: Vivek Nair Date: Sat, 7 Jun 2025 14:31:44 -0400 Subject: [PATCH 4/6] fix: Update README.md with correct environment variable information --- examples/auto_trace/README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/auto_trace/README.md b/examples/auto_trace/README.md index 39725279..b5096586 100644 --- a/examples/auto_trace/README.md +++ b/examples/auto_trace/README.md @@ -8,9 +8,8 @@ Before running the example, ensure you have the following environment variables ```bash export GENTRACE_API_KEY="your-gentrace-api-key" -export GENTRACE_BASE_URL="http://localhost:3000/api" # or your Gentrace instance URL export GENTRACE_PIPELINE_ID="your-pipeline-uuid" -export OPENAI_API_KEY="your-openai-api-key" # Optional +export OPENAI_API_KEY="your-openai-api-key" ``` ## Overview @@ -47,6 +46,7 @@ python auto_trace_pipeline_example.py ## Expected Output The example will: + 1. Initialize OpenTelemetry with Gentrace configuration 2. Install auto-tracing for the `pipeline_example_app` module 3. Execute a multi-step data processing pipeline @@ -85,6 +85,7 @@ This must be called BEFORE importing the modules you want to trace. ### How Pipeline ID Works When a pipeline_id is provided: + - The root span (first span in the trace) includes the `gentrace.pipeline_id` attribute - Child spans do NOT include the pipeline_id attribute - This allows the Gentrace backend to associate the entire trace tree with the pipeline @@ -93,6 +94,7 @@ When a pipeline_id is provided: ### Mixing Auto-Tracing with Manual Tracing The example demonstrates that manual tracing decorators can be used within auto-traced code: + - Functions marked with `@no_auto_trace` are excluded from automatic tracing - The `@traced` decorator can be used within auto-traced code for fine-grained control - Manual traces can add custom attributes and names @@ -102,6 +104,7 @@ The example demonstrates that manual tracing decorators can be used within auto- ### OpenTelemetry Configuration The example shows proper OpenTelemetry setup with: + - Resource configuration with service name - Gentrace sampler for sampling decisions - Gentrace span processor for baggage propagation @@ -114,8 +117,9 @@ The `min_duration` parameter (in seconds) can be used to only trace functions th ## Troubleshooting If spans don't appear in the Gentrace UI: + 1. Check that all environment variables are set correctly 2. Verify the Gentrace backend is running and accessible 3. Ensure the ClickHouse replication task is running 4. Check the console output for any error messages -5. Verify the pipeline ID exists in your Gentrace instance \ No newline at end of file +5. Verify the pipeline ID exists in your Gentrace instance From caf12c47ca4c581ff2758514a792687d28c7ad64 Mon Sep 17 00:00:00 2001 From: Vivek Nair Date: Sat, 7 Jun 2025 14:40:19 -0400 Subject: [PATCH 5/6] fix: Add missing type hints in ast_utils and import_hook --- .../auto_trace/pipeline_example_app/workflow.py | 4 ++-- src/gentrace/lib/auto_trace/__init__.py | 2 +- src/gentrace/lib/auto_trace/ast_utils.py | 4 ++++ src/gentrace/lib/auto_trace/import_hook.py | 4 ++++ src/gentrace/lib/auto_trace/rewrite_ast.py | 14 ++++++++++---- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/examples/auto_trace/pipeline_example_app/workflow.py b/examples/auto_trace/pipeline_example_app/workflow.py index b29fd848..d853d808 100644 --- a/examples/auto_trace/pipeline_example_app/workflow.py +++ b/examples/auto_trace/pipeline_example_app/workflow.py @@ -33,7 +33,7 @@ def extract_data() -> List[Dict[str, Any]]: time.sleep(0.05) # Simulate work # Simulate extracting data - data = [] + data: List[Dict[str, Any]] = [] for i in range(5): # Reduced for faster example record = process_record(i) data.append(record) @@ -53,7 +53,7 @@ def process_record(record_id: int) -> Dict[str, Any]: def transform_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Transform the extracted data.""" - transformed = [] + transformed: List[Dict[str, Any]] = [] for record in data: # Apply transformation result = apply_transformation(record) diff --git a/src/gentrace/lib/auto_trace/__init__.py b/src/gentrace/lib/auto_trace/__init__.py index cb4b8ff3..7cd3313c 100644 --- a/src/gentrace/lib/auto_trace/__init__.py +++ b/src/gentrace/lib/auto_trace/__init__.py @@ -80,7 +80,7 @@ def install_auto_tracing( def modules_func_from_sequence(modules: Sequence[str]) -> Callable[[AutoTraceModule], bool]: - return lambda module: module.parts_start_with(modules) + return lambda module: module.parts_start_with(list(modules)) class AutoTraceModuleAlreadyImportedException(Exception): diff --git a/src/gentrace/lib/auto_trace/ast_utils.py b/src/gentrace/lib/auto_trace/ast_utils.py index fb7b7b1b..85b24941 100644 --- a/src/gentrace/lib/auto_trace/ast_utils.py +++ b/src/gentrace/lib/auto_trace/ast_utils.py @@ -5,6 +5,7 @@ import ast from typing import Any, Dict, List, Union, cast from dataclasses import dataclass +from typing_extensions import override @dataclass @@ -20,6 +21,7 @@ def __post_init__(self): # so we can construct the qualified name of the current function. self.qualname_stack: List[str] = [] + @override def visit_ClassDef(self, node: ast.ClassDef): self.qualname_stack.append(node.name) # We need to call generic_visit here to modify any functions defined inside the class. @@ -27,6 +29,7 @@ def visit_ClassDef(self, node: ast.ClassDef): self.qualname_stack.pop() return node + @override def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): self.qualname_stack.append(node.name) qualname = '.'.join(self.qualname_stack) @@ -38,6 +41,7 @@ def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): return self.rewrite_function(node, qualname) + @override def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef): return self.visit_FunctionDef(node) diff --git a/src/gentrace/lib/auto_trace/import_hook.py b/src/gentrace/lib/auto_trace/import_hook.py index 462cadd0..59e2bcf8 100644 --- a/src/gentrace/lib/auto_trace/import_hook.py +++ b/src/gentrace/lib/auto_trace/import_hook.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from importlib.abc import Loader, MetaPathFinder from importlib.util import spec_from_loader +from typing_extensions import override from importlib.machinery import ModuleSpec from .types import AutoTraceModule @@ -27,6 +28,7 @@ class GentraceFinder(MetaPathFinder): tracer: Optional['Tracer'] pipeline_id: Optional[str] = None + @override def find_spec( self, fullname: str, path: Optional[Sequence[str]], target: Optional[ModuleType] = None ) -> Optional[ModuleSpec]: @@ -104,12 +106,14 @@ class GentraceLoader(Loader): execute: Callable[[Dict[str, Any]], None] """A function which accepts module globals and executes the compiled code.""" + @override def exec_module(self, module: ModuleType): """Execute a modified AST of the module's source code in the module's namespace.""" self.execute(module.__dict__) # This is required when `exec_module` is defined. # It returns None to indicate that the usual module creation process should be used. + @override def create_module(self, spec: ModuleSpec): # noqa: ARG002 return None diff --git a/src/gentrace/lib/auto_trace/rewrite_ast.py b/src/gentrace/lib/auto_trace/rewrite_ast.py index 8ab32cd1..e9085581 100644 --- a/src/gentrace/lib/auto_trace/rewrite_ast.py +++ b/src/gentrace/lib/auto_trace/rewrite_ast.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable, Optional, ContextManager from collections import deque from dataclasses import dataclass +from typing_extensions import override from opentelemetry import trace @@ -84,29 +85,34 @@ def check_no_auto_trace(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef, for node in node.decorator_list ) + @override def visit_ClassDef(self, node: ast.ClassDef): if self.check_no_auto_trace(node): return node return super().visit_ClassDef(node) + @override def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): if self.check_no_auto_trace(node): return node return super().visit_FunctionDef(node) + @override def rewrite_function(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.AST: if has_yield(node): return node return super().rewrite_function(node, qualname) + @override def get_span_attributes(self, qualname: str, lineno: int) -> Dict[str, Any]: """Get the attributes to set on the span.""" # Note: pipeline_id will be added dynamically only to root spans return super().get_span_attributes(qualname, lineno) + @override def create_span_call_node(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.Call: # See the compile_source docstring index = len(self.context_factories) @@ -130,7 +136,7 @@ def create_span_with_pipeline_support(): # Check if this is a root span (no active span in context) current_span = otel_trace.get_current_span(current_context) - is_root_span = current_span is None or not current_span.is_recording() + is_root_span = not current_span.is_recording() # Prepare attributes - only add pipeline_id to root spans span_attributes = attributes.copy() @@ -153,14 +159,14 @@ def create_span_with_pipeline_support(): # Wrap to ensure we detach the baggage context class SpanWithBaggageCleanup: - def __init__(self, span, token): + def __init__(self, span: Any, token: Any) -> None: self.span = span self.token = token - def __enter__(self): + def __enter__(self) -> Any: return self.span.__enter__() - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Any: try: return self.span.__exit__(exc_type, exc_val, exc_tb) finally: From 8a111baee0ba4b23bacaea59ed1308457a5e0dd4 Mon Sep 17 00:00:00 2001 From: Vivek Nair Date: Sat, 7 Jun 2025 15:21:29 -0400 Subject: [PATCH 6/6] fix: Add return type annotations to methods --- src/gentrace/lib/auto_trace/ast_utils.py | 8 ++++---- src/gentrace/lib/auto_trace/import_hook.py | 12 ++++++----- src/gentrace/lib/auto_trace/rewrite_ast.py | 24 ++++++++++++---------- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/gentrace/lib/auto_trace/ast_utils.py b/src/gentrace/lib/auto_trace/ast_utils.py index 85b24941..2eb0d0e8 100644 --- a/src/gentrace/lib/auto_trace/ast_utils.py +++ b/src/gentrace/lib/auto_trace/ast_utils.py @@ -16,13 +16,13 @@ class BaseTransformer(ast.NodeTransformer): filename: str module_name: str - def __post_init__(self): + def __post_init__(self) -> None: # Names of functions and classes that we're currently inside, # so we can construct the qualified name of the current function. self.qualname_stack: List[str] = [] @override - def visit_ClassDef(self, node: ast.ClassDef): + def visit_ClassDef(self, node: ast.ClassDef) -> ast.ClassDef: self.qualname_stack.append(node.name) # We need to call generic_visit here to modify any functions defined inside the class. node = cast(ast.ClassDef, self.generic_visit(node)) @@ -30,7 +30,7 @@ def visit_ClassDef(self, node: ast.ClassDef): return node @override - def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): + def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) -> ast.AST: self.qualname_stack.append(node.name) qualname = '.'.join(self.qualname_stack) self.qualname_stack.append('') @@ -42,7 +42,7 @@ def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): return self.rewrite_function(node, qualname) @override - def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef): + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> ast.AST: return self.visit_FunctionDef(node) def rewrite_function(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.AST: diff --git a/src/gentrace/lib/auto_trace/import_hook.py b/src/gentrace/lib/auto_trace/import_hook.py index 59e2bcf8..14035dc4 100644 --- a/src/gentrace/lib/auto_trace/import_hook.py +++ b/src/gentrace/lib/auto_trace/import_hook.py @@ -77,7 +77,9 @@ def find_spec( loader = GentraceLoader(plain_spec, execute) return spec_from_loader(fullname, loader) - + + return None + def _find_plain_specs( self, fullname: str, path: Optional[Sequence[str]], target: Optional[ModuleType] ) -> Iterator[ModuleSpec]: @@ -107,24 +109,24 @@ class GentraceLoader(Loader): """A function which accepts module globals and executes the compiled code.""" @override - def exec_module(self, module: ModuleType): + def exec_module(self, module: ModuleType) -> None: """Execute a modified AST of the module's source code in the module's namespace.""" self.execute(module.__dict__) # This is required when `exec_module` is defined. # It returns None to indicate that the usual module creation process should be used. @override - def create_module(self, spec: ModuleSpec): # noqa: ARG002 + def create_module(self, spec: ModuleSpec) -> Optional[ModuleType]: # noqa: ARG002 return None - def get_code(self, _name: str): + def get_code(self, _name: str) -> Any: # `python -m` uses the `runpy` module which calls this method instead of going through the normal protocol. # So return some code which can be executed with the module namespace. # Here `__loader__` will be this object, i.e. `self`. source = '__loader__.execute(globals())' return compile(source, '', 'exec', dont_inherit=True) - def __getattr__(self, item: str): + def __getattr__(self, item: str) -> Any: """Forward some methods to the plain spec's loader (likely a `SourceFileLoader`) if they exist.""" if item in {'get_filename', 'is_package'}: return getattr(self.plain_spec.loader, item) diff --git a/src/gentrace/lib/auto_trace/rewrite_ast.py b/src/gentrace/lib/auto_trace/rewrite_ast.py index e9085581..1d181246 100644 --- a/src/gentrace/lib/auto_trace/rewrite_ast.py +++ b/src/gentrace/lib/auto_trace/rewrite_ast.py @@ -5,7 +5,7 @@ import ast import time import uuid -from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable, Optional, ContextManager +from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable, Optional, ContextManager, cast from collections import deque from dataclasses import dataclass from typing_extensions import override @@ -37,7 +37,7 @@ def compile_source( # dont_inherit=True is necessary to prevent the module from inheriting the __future__ import from this module. code = compile(tree, filename, 'exec', dont_inherit=True) - def execute(globs: Dict[str, Any]): + def execute(globs: Dict[str, Any]) -> None: globs[gentrace_name] = context_factories exec(code, globs, globs) @@ -57,7 +57,7 @@ def rewrite_ast( transformer = AutoTraceTransformer( gentrace_name, filename, module_name, context_factories, min_duration, tracer, pipeline_id ) - return transformer.visit(tree) + return cast(ast.AST, transformer.visit(tree)) @dataclass @@ -86,14 +86,14 @@ def check_no_auto_trace(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef, ) @override - def visit_ClassDef(self, node: ast.ClassDef): + def visit_ClassDef(self, node: ast.ClassDef) -> ast.ClassDef: if self.check_no_auto_trace(node): return node return super().visit_ClassDef(node) @override - def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]): + def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) -> ast.AST: if self.check_no_auto_trace(node): return node @@ -130,7 +130,7 @@ def create_span_call_node(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDe from ..constants import ATTR_GENTRACE_SAMPLE_KEY, ATTR_GENTRACE_PIPELINE_ID - def create_span_with_pipeline_support(): + def create_span_with_pipeline_support() -> ContextManager[Any]: """Create span with pipeline_id support.""" current_context = otel_context.get_current() @@ -188,13 +188,14 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Any: class MeasureTime: __slots__ = 'start' - def __enter__(_self): + def __enter__(_self) -> 'MeasureTime': _self.start = time.perf_counter_ns() + return _self - def __exit__(_self, *_): + def __exit__(_self, *_: Any) -> None: if time.perf_counter_ns() - _self.start >= self.min_duration: self.context_factories[index] = span_factory - + self.context_factories.append(MeasureTime) else: self.context_factories.append(span_factory) @@ -233,7 +234,7 @@ def no_auto_trace(x: T) -> T: return x -def has_yield(node: ast.AST): +def has_yield(node: ast.AST) -> bool: queue = deque([node]) while queue: node = queue.popleft() @@ -241,4 +242,5 @@ def has_yield(node: ast.AST): if isinstance(child, (ast.Yield, ast.YieldFrom)): return True if not isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)): - queue.append(child) \ No newline at end of file + queue.append(child) + return False \ No newline at end of file