diff --git a/src/parxy_cli/commands/parse.py b/src/parxy_cli/commands/parse.py index fd9cbaf..eace15c 100644 --- a/src/parxy_cli/commands/parse.py +++ b/src/parxy_cli/commands/parse.py @@ -257,10 +257,10 @@ def parse( typer.Option( '--workers', '-w', - help='Number of parallel workers to use. Defaults to 2.', + help='Number of parallel workers to use. Defaults to cpu count.', min=1, ), - ] = 2, + ] = None, ): """ Parse documents using one or more drivers. @@ -315,55 +315,60 @@ def parse( error_count = 0 # Show info - with console.shimmer( - f'Processing {len(files)} file{"s" if len(files) > 1 else ""} with {len(drivers)} driver{"s" if len(drivers) > 1 else ""}...' - ): - # Process files with progress bar - with console.progress('Processing documents') as progress: - task = progress.add_task('', total=total_tasks) - - batch_tasks = [str(f) for f in files] - - for result in Parxy.batch_iter( - tasks=batch_tasks, - drivers=drivers, - level=level.value, - workers=workers, - ): - file_name = ( - Path(result.file).name - if isinstance(result.file, str) - else 'document' - ) - - if result.success: - output_file, page_count = save_batch_result( - result=result, - mode=mode, - output_dir=output_path, - show=show, - ) - console.print( - f'[faint]⎿ [/faint] {file_name} via {result.driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]' - ) - else: - console.print( - f'[faint]⎿ [/faint] {file_name} via {result.driver} error. [error]{result.error}[/error]' + try: + with console.shimmer( + f'Processing {len(files)} file{"s" if len(files) > 1 else ""} with {len(drivers)} driver{"s" if len(drivers) > 1 else ""}...' + ): + # Process files with progress bar + with console.progress('Processing documents') as progress: + task = progress.add_task('', total=total_tasks) + + batch_tasks = [str(f) for f in files] + + for result in Parxy.batch_iter( + tasks=batch_tasks, + drivers=drivers, + level=level.value, + workers=workers, + ): + file_name = ( + Path(result.file).name + if isinstance(result.file, str) + else 'document' ) - error_count += 1 - if stop_on_failure: - console.newline() - console.info( - 'Stopping due to error (--stop-on-failure flag is set)' + if result.success: + output_file, page_count = save_batch_result( + result=result, + mode=mode, + output_dir=output_path, + show=show, ) - raise typer.Exit(1) + console.print( + f'[faint]⎿ [/faint] {file_name} via {result.driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]' + ) + else: + console.print( + f'[faint]⎿ [/faint] {file_name} via {result.driver} error. [error]{result.error}[/error]' + ) + error_count += 1 + + if stop_on_failure: + console.newline() + console.info( + 'Stopping due to error (--stop-on-failure flag is set)' + ) + raise typer.Exit(1) - progress.update(task, advance=1) + progress.update(task, advance=1) - elapsed_time = format_timedelta( - timedelta(seconds=max(0, progress.tasks[0].elapsed)) - ) + elapsed_time = format_timedelta( + timedelta(seconds=max(0, progress.tasks[0].elapsed)) + ) + except KeyboardInterrupt: + console.newline() + console.warning('Interrupted by user') + raise typer.Exit(130) console.newline() if error_count == len(files): diff --git a/src/parxy_core/drivers/llamaparse.py b/src/parxy_core/drivers/llamaparse.py index 947b524..e62440d 100644 --- a/src/parxy_core/drivers/llamaparse.py +++ b/src/parxy_core/drivers/llamaparse.py @@ -124,8 +124,10 @@ class LlamaParseDriver(Driver): def _initialize_driver(self): """Initialize the Llama Parse driver. - Validates that dependencies are installed and creates the default client. - Client creation is also available per-call to support configuration overrides. + Validates that dependencies are installed. + A fresh client is created per ``_handle`` call to avoid sharing asyncio + event-loop state across threads (LlamaParse uses ``asyncio.run`` + internally, so a single client instance cannot be used concurrently). Raises ------ @@ -142,12 +144,6 @@ def _initialize_driver(self): "Install with 'pip install parxy[llama]'" ) from e - # Create default client for calls without overrides - self.__default_client = self._create_client( - self._config.model_dump() if self._config else {}, - api_key=self._config.api_key if self._config else None, - ) - def _create_client( self, config_dict: dict, api_key: Optional['SecretStr'] = None ) -> 'LlamaParse': @@ -323,17 +319,17 @@ def _handle( k: v for k, v in kwargs.items() if k not in _PER_CALL_OPTIONS } - # Determine which client to use + # Always create a fresh client per call. LlamaParse uses + # ``asyncio.run`` internally which binds objects to a thread-local + # event loop, so sharing a single client across threads causes + # "bound to a different event loop" errors. + merged_config = self._config.model_dump() if self._config else {} if overrides: - # Merge base config with overrides - merged_config = self._config.model_dump() if self._config else {} merged_config.update(overrides) - client = self._create_client( - merged_config, - api_key=self._config.api_key if self._config else None, - ) - else: - client = self.__default_client + client = self._create_client( + merged_config, + api_key=self._config.api_key if self._config else None, + ) try: filename, stream = self.handle_file_input(file) diff --git a/src/parxy_core/facade/parxy.py b/src/parxy_core/facade/parxy.py index 428dec5..ddfe290 100644 --- a/src/parxy_core/facade/parxy.py +++ b/src/parxy_core/facade/parxy.py @@ -2,6 +2,7 @@ import io import os +import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional, Dict, Callable, List, Union, Iterator @@ -234,10 +235,19 @@ def batch_iter( ] breaker = CircuitBreakerState() + cancel_event = threading.Event() def process_task( file: Union[str, io.BytesIO, bytes], driver_name: str, task_level: str ) -> BatchResult: + if cancel_event.is_set(): + return BatchResult( + file=file, + driver=driver_name, + document=None, + error='Cancelled', + ) + trip_exc = breaker.get_trip_exception(driver_name) if trip_exc is not None: return BatchResult( @@ -254,6 +264,13 @@ def process_task( file=file, driver=driver_name, document=doc, error=None ) except Exception as e: + if cancel_event.is_set(): + return BatchResult( + file=file, + driver=driver_name, + document=None, + error='Cancelled', + ) breaker.record_failure(driver_name, e) return BatchResult( file=file, @@ -263,7 +280,8 @@ def process_task( exception=e, ) - with ThreadPoolExecutor(max_workers=max_workers) as executor: + executor = ThreadPoolExecutor(max_workers=max_workers) + try: futures = [] for file, driver_name, task_level in work_items: future = executor.submit(process_task, file, driver_name, task_level) @@ -271,6 +289,13 @@ def process_task( for future in as_completed(futures): yield future.result() + except KeyboardInterrupt: + cancel_event.set() + for future in futures: + future.cancel() + raise + finally: + executor.shutdown(wait=False, cancel_futures=True) @classmethod def batch( diff --git a/tests/commands/test_parse.py b/tests/commands/test_parse.py index fa7249b..5fc27f4 100644 --- a/tests/commands/test_parse.py +++ b/tests/commands/test_parse.py @@ -50,7 +50,7 @@ def test_parse_command_calls_facade_correctly(runner, mock_document, tmp_path): tasks=[str(test_file)], drivers=['pymupdf'], level='block', - workers=2, + workers=None, ) @@ -92,7 +92,7 @@ def test_parse_command_with_custom_options(runner, mock_document, tmp_path): tasks=[str(test_file)], drivers=['llamaparse'], level='block', - workers=2, + workers=None, ) diff --git a/uv.lock b/uv.lock index 09237d0..2648079 100644 --- a/uv.lock +++ b/uv.lock @@ -511,7 +511,7 @@ name = "cuda-bindings" version = "12.9.4" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cuda-pathfinder" }, + { name = "cuda-pathfinder", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/a9/c1/dabe88f52c3e3760d861401bb994df08f672ec893b8f7592dc91626adcf3/cuda_bindings-12.9.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fda147a344e8eaeca0c6ff113d2851ffca8f7dfc0a6c932374ee5c47caa649c8", size = 12151019, upload-time = "2025-10-21T14:51:43.167Z" }, @@ -1914,7 +1914,7 @@ name = "nvidia-cudnn-cu12" version = "9.10.2.21" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-cublas-cu12" }, + { name = "nvidia-cublas-cu12", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/ba/51/e123d997aa098c61d029f76663dedbfb9bc8dcf8c60cbd6adbe42f76d049/nvidia_cudnn_cu12-9.10.2.21-py3-none-manylinux_2_27_x86_64.whl", hash = "sha256:949452be657fa16687d0930933f032835951ef0892b37d2d53824d1a84dc97a8", size = 706758467, upload-time = "2025-06-06T21:54:08.597Z" }, @@ -1925,7 +1925,7 @@ name = "nvidia-cufft-cu12" version = "11.3.3.83" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-nvjitlink-cu12" }, + { name = "nvidia-nvjitlink-cu12", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/1f/13/ee4e00f30e676b66ae65b4f08cb5bcbb8392c03f54f2d5413ea99a5d1c80/nvidia_cufft_cu12-11.3.3.83-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:4d2dd21ec0b88cf61b62e6b43564355e5222e4a3fb394cac0db101f2dd0d4f74", size = 193118695, upload-time = "2025-03-07T01:45:27.821Z" }, @@ -1952,9 +1952,9 @@ name = "nvidia-cusolver-cu12" version = "11.7.3.90" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-cublas-cu12" }, - { name = "nvidia-cusparse-cu12" }, - { name = "nvidia-nvjitlink-cu12" }, + { name = "nvidia-cublas-cu12", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, + { name = "nvidia-cusparse-cu12", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, + { name = "nvidia-nvjitlink-cu12", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/85/48/9a13d2975803e8cf2777d5ed57b87a0b6ca2cc795f9a4f59796a910bfb80/nvidia_cusolver_cu12-11.7.3.90-py3-none-manylinux_2_27_x86_64.whl", hash = "sha256:4376c11ad263152bd50ea295c05370360776f8c3427b30991df774f9fb26c450", size = 267506905, upload-time = "2025-03-07T01:47:16.273Z" }, @@ -1965,7 +1965,7 @@ name = "nvidia-cusparse-cu12" version = "12.5.8.93" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "nvidia-nvjitlink-cu12" }, + { name = "nvidia-nvjitlink-cu12", marker = "sys_platform != 'emscripten' and sys_platform != 'win32'" }, ] wheels = [ { url = "https://files.pythonhosted.org/packages/c2/f5/e1854cb2f2bcd4280c44736c93550cc300ff4b8c95ebe370d0aa7d2b473d/nvidia_cusparse_cu12-12.5.8.93-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:1ec05d76bbbd8b61b06a80e1eaf8cf4959c3d4ce8e711b65ebd0443bb0ebb13b", size = 288216466, upload-time = "2025-03-07T01:48:13.779Z" }, @@ -2279,7 +2279,7 @@ wheels = [ [[package]] name = "parxy" -version = "0.11.0" +version = "0.12.0" source = { editable = "." } dependencies = [ { name = "importlib-resources" }, @@ -3603,6 +3603,8 @@ dependencies = [ { name = "typing-extensions" }, ] wheels = [ + { url = "https://files.pythonhosted.org/packages/d3/54/a2ba279afcca44bbd320d4e73675b282fcee3d81400ea1b53934efca6462/torch-2.10.0-2-cp312-none-macosx_11_0_arm64.whl", hash = "sha256:13ec4add8c3faaed8d13e0574f5cd4a323c11655546f91fbe6afa77b57423574", size = 79498202, upload-time = "2026-02-10T21:44:52.603Z" }, + { url = "https://files.pythonhosted.org/packages/ec/23/2c9fe0c9c27f7f6cb865abcea8a4568f29f00acaeadfc6a37f6801f84cb4/torch-2.10.0-2-cp313-none-macosx_11_0_arm64.whl", hash = "sha256:e521c9f030a3774ed770a9c011751fb47c4d12029a3d6522116e48431f2ff89e", size = 79498254, upload-time = "2026-02-10T21:44:44.095Z" }, { url = "https://files.pythonhosted.org/packages/cc/af/758e242e9102e9988969b5e621d41f36b8f258bb4a099109b7a4b4b50ea4/torch-2.10.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:5fd4117d89ffd47e3dcc71e71a22efac24828ad781c7e46aaaf56bf7f2796acf", size = 145996088, upload-time = "2026-01-21T16:24:44.171Z" }, { url = "https://files.pythonhosted.org/packages/23/8e/3c74db5e53bff7ed9e34c8123e6a8bfef718b2450c35eefab85bb4a7e270/torch-2.10.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:787124e7db3b379d4f1ed54dd12ae7c741c16a4d29b49c0226a89bea50923ffb", size = 915711952, upload-time = "2026-01-21T16:23:53.503Z" }, { url = "https://files.pythonhosted.org/packages/6e/01/624c4324ca01f66ae4c7cd1b74eb16fb52596dce66dbe51eff95ef9e7a4c/torch-2.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:2c66c61f44c5f903046cc696d088e21062644cbe541c7f1c4eaae88b2ad23547", size = 113757972, upload-time = "2026-01-21T16:24:39.516Z" },