diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index dd6330ca..00000000 --- a/CLAUDE.md +++ /dev/null @@ -1,48 +0,0 @@ -# Claude Guidelines for gentrace-python - -## Project Overview -gentrace-python is a Python SDK for the Gentrace API, which provides tools for evaluating and monitoring AI applications. - -## Code Style Guidelines -- Follow PEP 8 style guidelines -- Use type hints where appropriate -- Maintain consistent indentation (4 spaces) -- Use meaningful variable and function names -- Add docstrings for public APIs -- Follow the existing error handling patterns - -## Testing Requirements -- Write unit tests for new functionality using pytest -- Ensure all tests pass before submitting PRs -- Maintain or improve test coverage - -## PR Guidelines -- Keep PRs focused on a single feature or bug fix -- Include clear descriptions of changes -- Reference related issues -- Update documentation as needed - -## Commit Message Format -- Use clear, descriptive commit messages -- Start with a verb in the present tense (e.g., "Add", "Fix", "Update") -- Reference issue numbers when applicable - -## Dependencies -- Minimize adding new dependencies -- Prefer well-maintained, widely-used packages -- Consider compatibility with different Python versions - -## Security Considerations -- Never expose API keys or sensitive information -- Follow secure coding practices -- Validate user inputs - -## Documentation -- Update README.md for significant changes -- Document new features and APIs -- Keep code comments up-to-date - -## Performance -- Consider performance implications of changes -- Avoid unnecessary computations or memory usage - diff --git a/examples/auto_trace/README.md b/examples/auto_trace/README.md new file mode 100644 index 00000000..b5096586 --- /dev/null +++ b/examples/auto_trace/README.md @@ -0,0 +1,125 @@ +# Gentrace Auto-Tracing Example + +This example demonstrates Gentrace's automatic AST-based tracing feature with pipeline ID support, which instruments all functions in specified modules without requiring manual decoration. + +## Prerequisites + +Before running the example, ensure you have the following environment variables set: + +```bash +export GENTRACE_API_KEY="your-gentrace-api-key" +export GENTRACE_PIPELINE_ID="your-pipeline-uuid" +export OPENAI_API_KEY="your-openai-api-key" +``` + +## Overview + +The example simulates a data processing pipeline with multiple layers of function calls, creating a deep tree of spans to showcase the automatic tracing capabilities with pipeline association. + +## Structure + +``` +auto_trace/ +├── README.md # This file +├── auto_trace_pipeline_example.py # Main example script +└── pipeline_example_app/ # Example application package + ├── __init__.py + └── workflow.py # Pipeline workflow functions +``` + +## Features Demonstrated + +1. **Automatic Function Tracing**: All functions in the `pipeline_example_app` module are automatically traced without decorators +2. **Pipeline ID Support**: All traces are associated with a specific Gentrace pipeline +3. **Deep Span Trees**: The pipeline creates a multi-level hierarchy of spans showing the call flow +4. **OpenTelemetry Integration**: Proper span context propagation with parent-child relationships +5. **Root Span Attribution**: Only the root span includes the `gentrace.pipeline_id` attribute +6. **Decorator Compatibility**: The `@interaction` decorator can be used alongside auto-tracing, with decorator settings taking precedence + +## Running the Example + +```bash +cd examples/auto_trace +python auto_trace_pipeline_example.py +``` + +## Expected Output + +The example will: + +1. Initialize OpenTelemetry with Gentrace configuration +2. Install auto-tracing for the `pipeline_example_app` module +3. Execute a multi-step data processing pipeline +4. Export spans to the Gentrace backend via OTLP + +## Span Tree Structure + +The automatic tracing creates a span tree like this: + +``` +run_data_processing_pipeline [root - has pipeline_id from auto-tracing] +├── extract_data +│ └── process_record (multiple calls) +├── transform_data +│ ├── apply_transformation (multiple calls) +│ └── validate_transformation (multiple calls) +└── load_results + ├── generate_summary [auto-traced] + └── Manual Summary Enrichment [@traced decorator - custom attributes] +``` + +## Key Concepts + +### Installing Auto-Tracing with Pipeline ID + +```python +gentrace.install_auto_tracing( + ['pipeline_example_app'], + min_duration=0, + pipeline_id=pipeline_id +) +``` + +This must be called BEFORE importing the modules you want to trace. + +### How Pipeline ID Works + +When a pipeline_id is provided: + +- The root span (first span in the trace) includes the `gentrace.pipeline_id` attribute +- Child spans do NOT include the pipeline_id attribute +- This allows the Gentrace backend to associate the entire trace tree with the pipeline +- All spans in the trace are connected via the same `traceId` + +### Mixing Auto-Tracing with Manual Tracing + +The example demonstrates that manual tracing decorators can be used within auto-traced code: + +- Functions marked with `@no_auto_trace` are excluded from automatic tracing +- The `@traced` decorator can be used within auto-traced code for fine-grained control +- Manual traces can add custom attributes and names +- Both auto-traced and manually traced spans maintain proper parent-child relationships +- This allows selective manual instrumentation while maintaining automatic tracing for the rest of the code + +### OpenTelemetry Configuration + +The example shows proper OpenTelemetry setup with: + +- Resource configuration with service name +- Gentrace sampler for sampling decisions +- Gentrace span processor for baggage propagation +- OTLP exporter configured for the Gentrace backend + +### Minimum Duration + +The `min_duration` parameter (in seconds) can be used to only trace functions that take longer than the specified duration. This helps reduce noise from very fast functions. + +## Troubleshooting + +If spans don't appear in the Gentrace UI: + +1. Check that all environment variables are set correctly +2. Verify the Gentrace backend is running and accessible +3. Ensure the ClickHouse replication task is running +4. Check the console output for any error messages +5. Verify the pipeline ID exists in your Gentrace instance diff --git a/examples/auto_trace/auto_trace_pipeline_example.py b/examples/auto_trace/auto_trace_pipeline_example.py new file mode 100644 index 00000000..1f00231a --- /dev/null +++ b/examples/auto_trace/auto_trace_pipeline_example.py @@ -0,0 +1,107 @@ +"""Example demonstrating Gentrace's automatic AST-based tracing with pipeline_id support. + +This example shows how auto-traced functions can be associated with a specific +Gentrace pipeline and exported to the Gentrace backend. + +To run this example, ensure the following environment variables are set: + GENTRACE_API_KEY: Your Gentrace API token for authentication. + GENTRACE_BASE_URL: The base URL for your Gentrace instance (e.g., http://localhost:3000/api). + GENTRACE_PIPELINE_ID: The UUID of the pipeline to associate traces with. + OPENAI_API_KEY: Your OpenAI API key (optional, for OpenAI examples). +""" + +import os +import atexit +from typing import Dict + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +import gentrace +from gentrace import GentraceSampler, GentraceSpanProcessor + +# Get configuration from environment +api_key = os.getenv("GENTRACE_API_KEY", "") +gentrace_base_url = os.getenv("GENTRACE_BASE_URL", "") +pipeline_id = os.getenv("GENTRACE_PIPELINE_ID", "") + +if not api_key: + raise ValueError("GENTRACE_API_KEY environment variable not set.") +if not gentrace_base_url: + raise ValueError("GENTRACE_BASE_URL environment variable not set.") +if not pipeline_id: + raise ValueError("GENTRACE_PIPELINE_ID environment variable not set.") + +print(f"Gentrace Base URL: {gentrace_base_url}") +print(f"Pipeline ID: {pipeline_id}") +print("=" * 60) + +# Setup OpenTelemetry with Gentrace +resource = Resource(attributes={"service.name": "auto-trace-pipeline-example"}) +tracer_provider = TracerProvider(resource=resource, sampler=GentraceSampler()) + +# Configure OTLP exporter to send traces to Gentrace +otlp_headers: Dict[str, str] = {} +if api_key: + otlp_headers["Authorization"] = f"Bearer {api_key}" + +span_exporter = OTLPSpanExporter( + endpoint=f"{gentrace_base_url}/otel/v1/traces", + headers=otlp_headers, +) + +# Add Gentrace span processor for enrichment +gentrace_baggage_processor = GentraceSpanProcessor() +tracer_provider.add_span_processor(gentrace_baggage_processor) + +# Add the export processor +simple_export_processor = SimpleSpanProcessor(span_exporter) +tracer_provider.add_span_processor(simple_export_processor) + +# Set the global tracer provider +trace.set_tracer_provider(tracer_provider) + +# Install auto-tracing with pipeline_id for our example modules +# All functions in these modules will be automatically associated with this pipeline +print("Installing auto-tracing with pipeline_id...") +gentrace.install_auto_tracing( + ['pipeline_example_app'], + min_duration=0, + pipeline_id=pipeline_id +) + +# Now import the modules - they will be automatically instrumented with pipeline_id +from pipeline_example_app import workflow + +if __name__ == "__main__": + # Register shutdown handler + atexit.register(tracer_provider.shutdown) + + print("\nStarting Gentrace Auto-Trace Pipeline Example") + print("=" * 60) + + # Run the workflow - all traced functions will include the pipeline_id + try: + result = workflow.run_data_processing_pipeline() + + print("\n" + "=" * 60) + print(f"Pipeline completed successfully!") + print(f"Result: {result}") + print(f"\nMost spans include 'gentrace.pipeline_id': '{pipeline_id}'") + print("Note: The 'enrich_summary_manually' function uses @traced decorator") + print("to demonstrate manual tracing within auto-traced code.") + print("\nCheck the Gentrace dashboard to see the mixed trace!") + except Exception as e: + print(f"\nError running pipeline: {e}") + raise + + print("\nShutting down...") + # Force flush to ensure all spans are sent + tracer_provider.force_flush() + + # Add a small delay to ensure spans are processed + import time + time.sleep(1) \ No newline at end of file diff --git a/examples/auto_trace/pipeline_example_app/__init__.py b/examples/auto_trace/pipeline_example_app/__init__.py new file mode 100644 index 00000000..c304885f --- /dev/null +++ b/examples/auto_trace/pipeline_example_app/__init__.py @@ -0,0 +1 @@ +"""Example application demonstrating pipeline-aware auto-tracing.""" \ No newline at end of file diff --git a/examples/auto_trace/pipeline_example_app/workflow.py b/examples/auto_trace/pipeline_example_app/workflow.py new file mode 100644 index 00000000..d853d808 --- /dev/null +++ b/examples/auto_trace/pipeline_example_app/workflow.py @@ -0,0 +1,136 @@ +"""Example workflow that will be auto-traced with pipeline_id.""" + +import time +from typing import Any, Dict, List + +import gentrace + + +def run_data_processing_pipeline() -> Dict[str, Any]: + """Main pipeline entry point - this and all called functions will have pipeline_id.""" + print("Starting data processing pipeline...") + + # Step 1: Extract data + print("Step 1: Extracting data...") + data = extract_data() + print(f" Extracted {len(data)} records") + + # Step 2: Transform data + print("Step 2: Transforming data...") + transformed = transform_data(data) + print(f" Transformed {len(transformed)} records") + + # Step 3: Load results + print("Step 3: Loading results...") + result = load_results(transformed) + print(" Results loaded successfully") + + return result + + +def extract_data() -> List[Dict[str, Any]]: + """Extract data from source.""" + time.sleep(0.05) # Simulate work + + # Simulate extracting data + data: List[Dict[str, Any]] = [] + for i in range(5): # Reduced for faster example + record = process_record(i) + data.append(record) + + return data + + +def process_record(record_id: int) -> Dict[str, Any]: + """Process individual record.""" + # This will also be traced with the pipeline_id + return { + 'id': record_id, + 'value': record_id * 10, + 'status': 'extracted' + } + + +def transform_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Transform the extracted data.""" + transformed: List[Dict[str, Any]] = [] + for record in data: + # Apply transformation + result = apply_transformation(record) + if validate_transformation(result): + transformed.append(result) + + return transformed + + +def apply_transformation(record: Dict[str, Any]) -> Dict[str, Any]: + """Apply transformation to a single record.""" + time.sleep(0.01) # Simulate work + + return { + **record, + 'value': record['value'] * 2, + 'status': 'transformed', + 'transform_timestamp': time.time() + } + + +def validate_transformation(record: Dict[str, Any]) -> bool: + """Validate the transformed record.""" + return record.get('value', 0) > 0 and 'transform_timestamp' in record + + +def load_results(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Load the results to destination.""" + time.sleep(0.05) # Simulate work + + # Generate summary with auto-tracing + summary = generate_summary(data) + + # Also demonstrate manual tracing within auto-traced code + enriched_summary = enrich_summary_manually(summary) + + return { + 'status': 'success', + 'records_processed': len(data), + 'summary': enriched_summary, + 'pipeline_complete': True + } + + +@gentrace.no_auto_trace # Exclude from auto-tracing +def enrich_summary_manually(summary: Dict[str, Any]) -> Dict[str, Any]: + """Function that uses @traced decorator for manual tracing.""" + # Use the traced decorator directly + @gentrace.traced( + name="Manual Summary Enrichment", + attributes={"custom_attribute": "manual_trace", "enrichment_type": "statistics"} + ) + def _enrich() -> Dict[str, Any]: + time.sleep(0.01) # Simulate work + return { + **summary, + 'enriched': True, + 'percentile_95': summary.get('average', 0) * 1.5 if summary else 0, + 'traced_manually': True + } + + return _enrich() + + +def generate_summary(data: List[Dict[str, Any]]) -> Dict[str, Any]: + """Generate summary statistics (auto-traced).""" + if not data: + return {'total': 0, 'average': 0} + + values = [r.get('value', 0) for r in data] + + # Add a small simulated calculation + time.sleep(0.02) # Simulate computation time + + return { + 'total': sum(values), + 'average': sum(values) / len(values), + 'count': len(data), + 'interaction_wrapped': True # Marker to show this was wrapped + } \ No newline at end of file diff --git a/src/gentrace/__init__.py b/src/gentrace/__init__.py index 5afdf300..31efee8e 100644 --- a/src/gentrace/__init__.py +++ b/src/gentrace/__init__.py @@ -106,10 +106,12 @@ def __getattr__(self, name: str) -> _t.Any: ATTR_GENTRACE_FN_ARGS_EVENT_NAME, ATTR_GENTRACE_FN_OUTPUT_EVENT_NAME, ) +from .lib.auto_trace import install_auto_tracing from .lib.experiment import experiment from .lib.interaction import interaction from .lib.eval_dataset import TestInput, eval_dataset from .lib.span_processor import GentraceSpanProcessor +from .lib.auto_trace.rewrite_ast import no_auto_trace ### End custom Gentrace imports @@ -173,6 +175,8 @@ def __getattr__(self, name: str) -> _t.Any: "Experiment", "Dataset", "Pipeline", + "install_auto_tracing", + "no_auto_trace", # End custom Gentrace exports ] diff --git a/src/gentrace/lib/auto_trace/__init__.py b/src/gentrace/lib/auto_trace/__init__.py new file mode 100644 index 00000000..7cd3313c --- /dev/null +++ b/src/gentrace/lib/auto_trace/__init__.py @@ -0,0 +1,91 @@ +"""Automatic tracing functionality for Gentrace using AST transformation.""" + +from __future__ import annotations + +import sys +import uuid +import warnings +from typing import TYPE_CHECKING, Union, Literal, Callable, Optional, Sequence + +from .types import AutoTraceModule +from .import_hook import GentraceFinder + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + + +def install_auto_tracing( + modules: Union[Sequence[str], Callable[[AutoTraceModule], bool]], + *, + min_duration: float = 0, + check_imported_modules: Literal['error', 'warn', 'ignore'] = 'error', + tracer: Optional['Tracer'] = None, + pipeline_id: Optional[str] = None, +) -> None: + """Install automatic tracing for specified modules. + + Args: + modules: Either a list of module name prefixes to trace, or a callable + that returns True for modules that should be traced + min_duration: Minimum duration in seconds for a function to be traced + check_imported_modules: How to handle modules that have already been imported + tracer: OpenTelemetry tracer to use (defaults to gentrace tracer) + pipeline_id: Optional Gentrace pipeline ID to associate all auto-traced spans with. + Must be a valid UUID string if provided. + """ + if isinstance(modules, Sequence): + modules = modules_func_from_sequence(modules) # type: ignore + + if not callable(modules): + raise TypeError('modules must be a list of strings or a callable') + + if check_imported_modules not in ('error', 'warn', 'ignore'): + raise ValueError('check_imported_modules must be one of "error", "warn", or "ignore"') + + # Validate pipeline_id if provided + if pipeline_id is not None: + try: + uuid.UUID(pipeline_id) + except ValueError as e: + raise ValueError( + f"pipeline_id must be a valid UUID string. Received: '{pipeline_id}'" + ) from e + + if check_imported_modules != 'ignore': + for module in list(sys.modules.values()): + try: + auto_trace_module = AutoTraceModule(module.__name__, module.__file__) + except Exception: + continue + + if modules(auto_trace_module): + if check_imported_modules == 'error': + raise AutoTraceModuleAlreadyImportedException( + f'The module {module.__name__!r} matches modules to trace, but it has already been imported. ' + f'Either call `install_auto_tracing` earlier, ' + f"or set `check_imported_modules` to 'warn' or 'ignore'." + ) + else: + warnings.warn( + f'The module {module.__name__!r} matches modules to trace, but it has already been imported. ' + f'Either call `install_auto_tracing` earlier, ' + f"or set `check_imported_modules` to 'ignore'.", + AutoTraceModuleAlreadyImportedWarning, + stacklevel=2, + ) + + min_duration_ns = int(min_duration * 1_000_000_000) + finder = GentraceFinder(modules, min_duration_ns, tracer, pipeline_id) + sys.meta_path.insert(0, finder) + + +def modules_func_from_sequence(modules: Sequence[str]) -> Callable[[AutoTraceModule], bool]: + return lambda module: module.parts_start_with(list(modules)) + + +class AutoTraceModuleAlreadyImportedException(Exception): + pass + + +class AutoTraceModuleAlreadyImportedWarning(Warning): + pass \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/ast_utils.py b/src/gentrace/lib/auto_trace/ast_utils.py new file mode 100644 index 00000000..2eb0d0e8 --- /dev/null +++ b/src/gentrace/lib/auto_trace/ast_utils.py @@ -0,0 +1,112 @@ +"""AST transformation utilities for Gentrace auto-tracing.""" + +from __future__ import annotations + +import ast +from typing import Any, Dict, List, Union, cast +from dataclasses import dataclass +from typing_extensions import override + + +@dataclass +class BaseTransformer(ast.NodeTransformer): + """Helper for rewriting ASTs to wrap function bodies in `with tracer.start_as_current_span(...):`.""" + + span_name_prefix: str + filename: str + module_name: str + + def __post_init__(self) -> None: + # Names of functions and classes that we're currently inside, + # so we can construct the qualified name of the current function. + self.qualname_stack: List[str] = [] + + @override + def visit_ClassDef(self, node: ast.ClassDef) -> ast.ClassDef: + self.qualname_stack.append(node.name) + # We need to call generic_visit here to modify any functions defined inside the class. + node = cast(ast.ClassDef, self.generic_visit(node)) + self.qualname_stack.pop() + return node + + @override + def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) -> ast.AST: + self.qualname_stack.append(node.name) + qualname = '.'.join(self.qualname_stack) + self.qualname_stack.append('') + # We need to call generic_visit here to modify any classes/functions nested inside. + self.generic_visit(node) + self.qualname_stack.pop() # + self.qualname_stack.pop() # node.name + + return self.rewrite_function(node, qualname) + + @override + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> ast.AST: + return self.visit_FunctionDef(node) + + def rewrite_function(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.AST: + # Replace the body of the function with: + # with : + # + body = node.body.copy() + new_body: List[ast.stmt] = [] + if ( + body + and isinstance(body[0], ast.Expr) + and isinstance(body[0].value, ast.Constant) + and isinstance(body[0].value.value, str) + ): + # If the first statement is just a string literal, it's a docstring. + # Keep it as the first statement in the new body, not wrapped in a span, + # so it's still recognized as a docstring. + new_body.append(body.pop(0)) + + # Ignore functions with a trivial/empty body: + # - If `body` is empty, that means it originally was just a docstring that got popped above. + # - If `body` is just a single `pass` statement + # - If `body` is just a constant expression, particularly an ellipsis (`...`) + if not body or ( + len(body) == 1 + and ( + isinstance(body[0], ast.Pass) + or (isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Constant)) + ) + ): + return node + + span = ast.With( + items=[ + ast.withitem( + context_expr=self.create_span_call_node(node, qualname), + ) + ], + body=body, + type_comment=node.type_comment, + ) + new_body.append(span) + + return ast.fix_missing_locations( + ast.copy_location( + type(node)( # type: ignore + name=node.name, + args=node.args, + body=new_body, + decorator_list=node.decorator_list, + returns=node.returns, + type_comment=node.type_comment, + ), + node, + ) + ) + + def create_span_call_node(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.Call: + raise NotImplementedError() + + def get_span_attributes(self, qualname: str, lineno: int) -> Dict[str, Any]: + """Get the attributes to set on the span.""" + return { + 'code.filepath': self.filename, + 'code.lineno': lineno, + 'code.function': qualname, + } \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/import_hook.py b/src/gentrace/lib/auto_trace/import_hook.py new file mode 100644 index 00000000..14035dc4 --- /dev/null +++ b/src/gentrace/lib/auto_trace/import_hook.py @@ -0,0 +1,133 @@ +"""Import hook for Gentrace auto-tracing.""" + +from __future__ import annotations + +import ast +import sys +from types import ModuleType +from typing import TYPE_CHECKING, Any, Dict, Callable, Iterator, Optional, Sequence, cast +from dataclasses import dataclass +from importlib.abc import Loader, MetaPathFinder +from importlib.util import spec_from_loader +from typing_extensions import override +from importlib.machinery import ModuleSpec + +from .types import AutoTraceModule +from .rewrite_ast import compile_source + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + + +@dataclass +class GentraceFinder(MetaPathFinder): + """The import hook entry point, inserted into `sys.meta_path` to apply AST rewriting to matching modules.""" + + modules_filter: Callable[[AutoTraceModule], bool] + min_duration: int + tracer: Optional['Tracer'] + pipeline_id: Optional[str] = None + + @override + def find_spec( + self, fullname: str, path: Optional[Sequence[str]], target: Optional[ModuleType] = None + ) -> Optional[ModuleSpec]: + """This is the method that is called by the import system.""" + for plain_spec in self._find_plain_specs(fullname, path, target): + # Not all loaders have get_source, but it's an abstract method of the standard ABC InspectLoader. + # In particular it's implemented by `importlib.machinery.SourceFileLoader` + # which is provided by default. + get_source = getattr(plain_spec.loader, 'get_source', None) + if not callable(get_source): + continue + + try: + source = cast(str, get_source(fullname)) + except Exception: + continue + + if not source: + continue + + # We fully expect plain_spec.origin and self.get_filename(...) + # to be the same thing (a valid filename), but they're optional. + filename = plain_spec.origin + if not filename: + try: + filename = cast('Optional[str]', plain_spec.loader.get_filename(fullname)) # type: ignore + except Exception: + pass + + if not self.modules_filter(AutoTraceModule(fullname, filename)): + return None # tell the import system to try the next meta path finder + + try: + tree = ast.parse(source) + except Exception: + # The plain finder gave us invalid source code. Try another one. + continue + + filename = filename or f'<{fullname}>' + + try: + execute = compile_source(tree, filename, fullname, self.min_duration, self.tracer, self.pipeline_id) + except Exception: + # Auto-tracing failed with an unexpected error. Ensure that this doesn't crash the whole application. + return None # tell the import system to try the next meta path finder + + loader = GentraceLoader(plain_spec, execute) + return spec_from_loader(fullname, loader) + + return None + + def _find_plain_specs( + self, fullname: str, path: Optional[Sequence[str]], target: Optional[ModuleType] + ) -> Iterator[ModuleSpec]: + """Yield module specs returned by other finders on `sys.meta_path`.""" + for finder in sys.meta_path: + # Skip this finder or any like it to avoid infinite recursion. + if isinstance(finder, GentraceFinder): + continue + + try: + plain_spec = finder.find_spec(fullname, path, target) + except Exception: + continue + + if plain_spec: + yield plain_spec + + +@dataclass +class GentraceLoader(Loader): + """An import loader produced by GentraceFinder which executes a modified AST of the module's source code.""" + + plain_spec: ModuleSpec + """A spec for the module that was returned by another meta path finder.""" + + execute: Callable[[Dict[str, Any]], None] + """A function which accepts module globals and executes the compiled code.""" + + @override + def exec_module(self, module: ModuleType) -> None: + """Execute a modified AST of the module's source code in the module's namespace.""" + self.execute(module.__dict__) + + # This is required when `exec_module` is defined. + # It returns None to indicate that the usual module creation process should be used. + @override + def create_module(self, spec: ModuleSpec) -> Optional[ModuleType]: # noqa: ARG002 + return None + + def get_code(self, _name: str) -> Any: + # `python -m` uses the `runpy` module which calls this method instead of going through the normal protocol. + # So return some code which can be executed with the module namespace. + # Here `__loader__` will be this object, i.e. `self`. + source = '__loader__.execute(globals())' + return compile(source, '', 'exec', dont_inherit=True) + + def __getattr__(self, item: str) -> Any: + """Forward some methods to the plain spec's loader (likely a `SourceFileLoader`) if they exist.""" + if item in {'get_filename', 'is_package'}: + return getattr(self.plain_spec.loader, item) + raise AttributeError(item) \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/rewrite_ast.py b/src/gentrace/lib/auto_trace/rewrite_ast.py new file mode 100644 index 00000000..1d181246 --- /dev/null +++ b/src/gentrace/lib/auto_trace/rewrite_ast.py @@ -0,0 +1,246 @@ +"""AST rewriting for Gentrace auto-tracing.""" + +from __future__ import annotations + +import ast +import time +import uuid +from typing import TYPE_CHECKING, Any, Dict, List, Union, TypeVar, Callable, Optional, ContextManager, cast +from collections import deque +from dataclasses import dataclass +from typing_extensions import override + +from opentelemetry import trace + +from .ast_utils import BaseTransformer + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + + +def compile_source( + tree: ast.AST, + filename: str, + module_name: str, + min_duration: int, + tracer: Optional['Tracer'] = None, + pipeline_id: Optional[str] = None, +) -> Callable[[Dict[str, Any]], None]: + """Compile a modified AST of the module's source code. + + Returns a function which accepts module globals and executes the compiled code. + """ + gentrace_name = f'gentrace_{uuid.uuid4().hex}' + context_factories: List[Callable[[], ContextManager[Any]]] = [] + tree = rewrite_ast(tree, filename, gentrace_name, module_name, context_factories, min_duration, tracer, pipeline_id) + assert isinstance(tree, ast.Module) # for type checking + # dont_inherit=True is necessary to prevent the module from inheriting the __future__ import from this module. + code = compile(tree, filename, 'exec', dont_inherit=True) + + def execute(globs: Dict[str, Any]) -> None: + globs[gentrace_name] = context_factories + exec(code, globs, globs) + + return execute + + +def rewrite_ast( + tree: ast.AST, + filename: str, + gentrace_name: str, + module_name: str, + context_factories: List[Callable[[], ContextManager[Any]]], + min_duration: int, + tracer: Optional['Tracer'] = None, + pipeline_id: Optional[str] = None, +) -> ast.AST: + transformer = AutoTraceTransformer( + gentrace_name, filename, module_name, context_factories, min_duration, tracer, pipeline_id + ) + return cast(ast.AST, transformer.visit(tree)) + + +@dataclass +class AutoTraceTransformer(BaseTransformer): + """Trace all encountered functions except those explicitly marked with `@no_auto_trace`.""" + + context_factories: List[Callable[[], ContextManager[Any]]] + min_duration: int + tracer: Optional['Tracer'] + pipeline_id: Optional[str] = None + + def check_no_auto_trace(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef]) -> bool: + """Return true if the node has a `@no_auto_trace` decorator.""" + return any( + ( + isinstance(node, ast.Name) + and node.id == 'no_auto_trace' + or ( + isinstance(node, ast.Attribute) + and node.attr == 'no_auto_trace' + and isinstance(node.value, ast.Name) + and node.value.id == 'gentrace' + ) + ) + for node in node.decorator_list + ) + + @override + def visit_ClassDef(self, node: ast.ClassDef) -> ast.ClassDef: + if self.check_no_auto_trace(node): + return node + + return super().visit_ClassDef(node) + + @override + def visit_FunctionDef(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef]) -> ast.AST: + if self.check_no_auto_trace(node): + return node + + return super().visit_FunctionDef(node) + + @override + def rewrite_function(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.AST: + if has_yield(node): + return node + + return super().rewrite_function(node, qualname) + + @override + def get_span_attributes(self, qualname: str, lineno: int) -> Dict[str, Any]: + """Get the attributes to set on the span.""" + # Note: pipeline_id will be added dynamically only to root spans + return super().get_span_attributes(qualname, lineno) + + @override + def create_span_call_node(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], qualname: str) -> ast.Call: + # See the compile_source docstring + index = len(self.context_factories) + + # Get or create tracer + tracer = self.tracer or trace.get_tracer("gentrace") + + # Create span factory + span_name = f'{self.module_name}.{qualname}' + attributes = self.get_span_attributes(qualname, node.lineno) + + # When pipeline_id is set, we need special handling + if self.pipeline_id is not None: + from opentelemetry import trace as otel_trace, baggage as otel_baggage, context as otel_context + + from ..constants import ATTR_GENTRACE_SAMPLE_KEY, ATTR_GENTRACE_PIPELINE_ID + + def create_span_with_pipeline_support() -> ContextManager[Any]: + """Create span with pipeline_id support.""" + current_context = otel_context.get_current() + + # Check if this is a root span (no active span in context) + current_span = otel_trace.get_current_span(current_context) + is_root_span = not current_span.is_recording() + + # Prepare attributes - only add pipeline_id to root spans + span_attributes = attributes.copy() + if is_root_span: + span_attributes[ATTR_GENTRACE_PIPELINE_ID] = self.pipeline_id + + # For root spans, set baggage context + if is_root_span: + # Check if baggage is already set + existing_baggage = otel_baggage.get_baggage(ATTR_GENTRACE_SAMPLE_KEY, context=current_context) + if existing_baggage is None: + # Set baggage context (similar to @interaction decorator) + context_with_baggage = otel_baggage.set_baggage( + ATTR_GENTRACE_SAMPLE_KEY, "true", context=current_context + ) + token = otel_context.attach(context_with_baggage) + + # Create span with the appropriate attributes + span = tracer.start_as_current_span(span_name, attributes=span_attributes) + + # Wrap to ensure we detach the baggage context + class SpanWithBaggageCleanup: + def __init__(self, span: Any, token: Any) -> None: + self.span = span + self.token = token + + def __enter__(self) -> Any: + return self.span.__enter__() + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Any: + try: + return self.span.__exit__(exc_type, exc_val, exc_tb) + finally: + otel_context.detach(self.token) + + return SpanWithBaggageCleanup(span, token) + + # For non-root spans or if baggage already set, create normal span + return tracer.start_as_current_span(span_name, attributes=span_attributes) + + span_factory = create_span_with_pipeline_support + else: + # No pipeline_id, use standard span creation + span_factory = lambda: tracer.start_as_current_span(span_name, attributes=attributes) + + if self.min_duration > 0: + # This needs to be as fast as possible since it's the cost of auto-tracing a function + # that never actually gets instrumented because its calls are all faster than `min_duration`. + class MeasureTime: + __slots__ = 'start' + + def __enter__(_self) -> 'MeasureTime': + _self.start = time.perf_counter_ns() + return _self + + def __exit__(_self, *_: Any) -> None: + if time.perf_counter_ns() - _self.start >= self.min_duration: + self.context_factories[index] = span_factory + + self.context_factories.append(MeasureTime) + else: + self.context_factories.append(span_factory) + + # This node means: + # context_factories[index]() + # where `context_factories` is a global variable with the name `self.span_name_prefix` + # pointing to the `self.context_factories` list. + return ast.Call( + func=ast.Subscript( + value=ast.Name(id=self.span_name_prefix, ctx=ast.Load()), + slice=ast.Index(value=ast.Constant(value=index)), # type: ignore + ctx=ast.Load(), + ), + args=[], + keywords=[], + ) + + +T = TypeVar('T') + + +def no_auto_trace(x: T) -> T: + """Decorator to prevent a function/class from being traced by `gentrace.install_auto_tracing`. + + This is useful for small functions that are called very frequently and would generate too much noise. + + The decorator is detected at import time. + Only `@no_auto_trace` or `@gentrace.no_auto_trace` are supported. + + Any decorated function, or any function defined anywhere inside a decorated function/class, + will be completely ignored by auto-tracing. + + This decorator simply returns the argument unchanged, so there is zero runtime overhead. + """ + return x + + +def has_yield(node: ast.AST) -> bool: + queue = deque([node]) + while queue: + node = queue.popleft() + for child in ast.iter_child_nodes(node): + if isinstance(child, (ast.Yield, ast.YieldFrom)): + return True + if not isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)): + queue.append(child) + return False \ No newline at end of file diff --git a/src/gentrace/lib/auto_trace/types.py b/src/gentrace/lib/auto_trace/types.py new file mode 100644 index 00000000..33b97e44 --- /dev/null +++ b/src/gentrace/lib/auto_trace/types.py @@ -0,0 +1,22 @@ +"""Types for auto-tracing functionality.""" + +from __future__ import annotations + +from typing import List, Tuple, Union, Optional + + +class AutoTraceModule: + """Information about a module that might be auto-traced.""" + + def __init__(self, name: str, filename: Optional[str]): + self.name = name + self.filename = filename + + def parts_start_with(self, prefixes: Union[Tuple[str, ...], List[str]]) -> bool: + """Check if the module name starts with any of the given prefixes.""" + parts = self.name.split('.') + for prefix in prefixes: + prefix_parts = prefix.split('.') + if len(parts) >= len(prefix_parts) and parts[:len(prefix_parts)] == prefix_parts: + return True + return False \ No newline at end of file diff --git a/tests/test_auto_trace.py b/tests/test_auto_trace.py new file mode 100644 index 00000000..b9296e46 --- /dev/null +++ b/tests/test_auto_trace.py @@ -0,0 +1,262 @@ +"""Tests for the auto-trace functionality.""" + +import ast +import sys +from typing import Any +from unittest.mock import Mock + +import pytest + +# pyright: reportUnknownMemberType=false, reportUnknownParameterType=false, reportUnknownArgumentType=false + + +class TestAutoTrace: + """Test the auto-trace AST transformation functionality.""" + + @pytest.fixture + def mock_tracer(self) -> Mock: + """Create a mock tracer for testing.""" + tracer = Mock() + span = Mock() + span.__enter__ = Mock(return_value=span) + span.__exit__ = Mock(return_value=None) + tracer.start_as_current_span.return_value = span + return tracer + + def test_compile_source_basic(self, mock_tracer: Mock) -> None: + """Test basic AST compilation with auto-tracing.""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + # Simple test code + code = """ +def add(a, b): + return a + b + +result = add(1, 2) +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs: dict[str, Any] = {} + execute(globs) + + # Check that the function was defined and works + assert 'add' in globs + assert globs['result'] == 3 + + # Check that tracer was called + mock_tracer.start_as_current_span.assert_called() + + def test_no_auto_trace_decorator(self, mock_tracer: Mock) -> None: + """Test that @no_auto_trace decorator prevents tracing.""" + from gentrace.lib.auto_trace.rewrite_ast import no_auto_trace, compile_source + + # Code with no_auto_trace decorator + code = """ +from gentrace.lib.auto_trace.rewrite_ast import no_auto_trace + +@no_auto_trace +def skipped(): + return "not traced" + +def traced(): + return "traced" + +r1 = skipped() +r2 = traced() +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute with the decorator available + globs = {'no_auto_trace': no_auto_trace} + execute(globs) + + # Check results + assert globs['r1'] == "not traced" + assert globs['r2'] == "traced" + + # Check that only one function was traced + assert mock_tracer.start_as_current_span.call_count == 1 + + def test_class_methods_traced(self, mock_tracer: Mock) -> None: + """Test that class methods are traced.""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + code = """ +class Calculator: + def add(self, a, b): + return a + b + + @classmethod + def multiply(cls, a, b): + return a * b + +calc = Calculator() +r1 = calc.add(2, 3) +r2 = Calculator.multiply(4, 5) +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs: dict[str, Any] = {} + execute(globs) + + # Check results + assert globs['r1'] == 5 + assert globs['r2'] == 20 + + # Check that methods were traced + assert mock_tracer.start_as_current_span.call_count >= 2 + + def test_generator_functions_not_traced(self, mock_tracer: Mock) -> None: + """Test that generator functions are not traced (they have yield).""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + code = """ +def generator(): + yield 1 + yield 2 + +def regular(): + return list(generator()) + +result = regular() +""" + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs: dict[str, Any] = {} + execute(globs) + + # Check results + assert globs['result'] == [1, 2] + + # Only regular() should be traced, not generator() + assert mock_tracer.start_as_current_span.call_count == 1 + + def test_install_auto_tracing(self) -> None: + """Test the install_auto_tracing function.""" + from gentrace.lib.auto_trace import GentraceFinder, install_auto_tracing + + # Install auto-tracing for a test module pattern + install_auto_tracing(['test_dummy_module'], check_imported_modules='ignore') + + # Check that the finder was added to sys.meta_path + finder = None + for item in sys.meta_path: + if isinstance(item, GentraceFinder): + finder = item + break + + assert finder is not None + + # Clean up + sys.meta_path.remove(finder) + + def test_install_auto_tracing_with_pipeline_id(self) -> None: + """Test install_auto_tracing with pipeline_id.""" + import uuid + + from gentrace.lib.auto_trace import GentraceFinder, install_auto_tracing + + pipeline_id = str(uuid.uuid4()) + + # Install auto-tracing with pipeline_id + install_auto_tracing( + ['test_dummy_module'], + check_imported_modules='ignore', + pipeline_id=pipeline_id + ) + + # Check that the finder was added with pipeline_id + finder = None + for item in sys.meta_path: + if isinstance(item, GentraceFinder): + finder = item + break + + assert finder is not None + assert finder.pipeline_id == pipeline_id + + # Clean up + sys.meta_path.remove(finder) + + def test_install_auto_tracing_invalid_pipeline_id(self) -> None: + """Test that invalid pipeline_id raises ValueError.""" + from gentrace.lib.auto_trace import install_auto_tracing + + with pytest.raises(ValueError, match="pipeline_id must be a valid UUID"): + install_auto_tracing( + ['test_dummy_module'], + check_imported_modules='ignore', + pipeline_id="not-a-uuid" + ) + + def test_compile_source_with_pipeline_id(self, mock_tracer: Mock) -> None: + """Test that pipeline_id is handled correctly for auto-traced functions.""" + import uuid + + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + pipeline_id = str(uuid.uuid4()) + + # Simple test code + code = """ +def test_func(): + return "hello" + +result = test_func() +""" + + # Parse and compile with pipeline_id + tree = ast.parse(code) + execute = compile_source( + tree, '', 'test_module', + min_duration=0, + tracer=mock_tracer, + pipeline_id=pipeline_id + ) + + # Execute + globs: dict[str, Any] = {} + execute(globs) + + # The function should have been called to create a span + mock_tracer.start_as_current_span.assert_called() + + # Note: The actual pipeline_id attribute is added dynamically at runtime + # based on whether it's a root span or not, so we can't test it directly + # without mocking the OpenTelemetry context + + def test_ast_transformation_preserves_docstrings(self, mock_tracer: Mock) -> None: + """Test that docstrings are preserved during AST transformation.""" + from gentrace.lib.auto_trace.rewrite_ast import compile_source + + code = ''' +def documented(): + """This is a docstring.""" + return 42 +''' + + # Parse and compile + tree = ast.parse(code) + execute = compile_source(tree, '', 'test_module', min_duration=0, tracer=mock_tracer) + + # Execute + globs: dict[str, Any] = {} + execute(globs) + + # Check that docstring is preserved + assert globs['documented'].__doc__ == "This is a docstring." \ No newline at end of file