Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 51 additions & 46 deletions src/parxy_cli/commands/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ def parse(
typer.Option(
'--workers',
'-w',
help='Number of parallel workers to use. Defaults to 2.',
help='Number of parallel workers to use. Defaults to cpu count.',
min=1,
),
] = 2,
] = None,
):
"""
Parse documents using one or more drivers.
Expand Down Expand Up @@ -315,55 +315,60 @@ def parse(
error_count = 0

# Show info
with console.shimmer(
f'Processing {len(files)} file{"s" if len(files) > 1 else ""} with {len(drivers)} driver{"s" if len(drivers) > 1 else ""}...'
):
# Process files with progress bar
with console.progress('Processing documents') as progress:
task = progress.add_task('', total=total_tasks)

batch_tasks = [str(f) for f in files]

for result in Parxy.batch_iter(
tasks=batch_tasks,
drivers=drivers,
level=level.value,
workers=workers,
):
file_name = (
Path(result.file).name
if isinstance(result.file, str)
else 'document'
)

if result.success:
output_file, page_count = save_batch_result(
result=result,
mode=mode,
output_dir=output_path,
show=show,
)
console.print(
f'[faint]⎿ [/faint] {file_name} via {result.driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]'
)
else:
console.print(
f'[faint]⎿ [/faint] {file_name} via {result.driver} error. [error]{result.error}[/error]'
try:
with console.shimmer(
f'Processing {len(files)} file{"s" if len(files) > 1 else ""} with {len(drivers)} driver{"s" if len(drivers) > 1 else ""}...'
):
# Process files with progress bar
with console.progress('Processing documents') as progress:
task = progress.add_task('', total=total_tasks)

batch_tasks = [str(f) for f in files]

for result in Parxy.batch_iter(
tasks=batch_tasks,
drivers=drivers,
level=level.value,
workers=workers,
):
file_name = (
Path(result.file).name
if isinstance(result.file, str)
else 'document'
)
error_count += 1

if stop_on_failure:
console.newline()
console.info(
'Stopping due to error (--stop-on-failure flag is set)'
if result.success:
output_file, page_count = save_batch_result(
result=result,
mode=mode,
output_dir=output_path,
show=show,
)
raise typer.Exit(1)
console.print(
f'[faint]⎿ [/faint] {file_name} via {result.driver} to [success]{output_file}[/success] [faint]({page_count} pages)[/faint]'
)
else:
console.print(
f'[faint]⎿ [/faint] {file_name} via {result.driver} error. [error]{result.error}[/error]'
)
error_count += 1

if stop_on_failure:
console.newline()
console.info(
'Stopping due to error (--stop-on-failure flag is set)'
)
raise typer.Exit(1)

progress.update(task, advance=1)
progress.update(task, advance=1)

elapsed_time = format_timedelta(
timedelta(seconds=max(0, progress.tasks[0].elapsed))
)
elapsed_time = format_timedelta(
timedelta(seconds=max(0, progress.tasks[0].elapsed))
)
except KeyboardInterrupt:
console.newline()
console.warning('Interrupted by user')
raise typer.Exit(130)

console.newline()
if error_count == len(files):
Expand Down
30 changes: 13 additions & 17 deletions src/parxy_core/drivers/llamaparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ class LlamaParseDriver(Driver):
def _initialize_driver(self):
"""Initialize the Llama Parse driver.

Validates that dependencies are installed and creates the default client.
Client creation is also available per-call to support configuration overrides.
Validates that dependencies are installed.
A fresh client is created per ``_handle`` call to avoid sharing asyncio
event-loop state across threads (LlamaParse uses ``asyncio.run``
internally, so a single client instance cannot be used concurrently).

Raises
------
Expand All @@ -142,12 +144,6 @@ def _initialize_driver(self):
"Install with 'pip install parxy[llama]'"
) from e

# Create default client for calls without overrides
self.__default_client = self._create_client(
self._config.model_dump() if self._config else {},
api_key=self._config.api_key if self._config else None,
)

def _create_client(
self, config_dict: dict, api_key: Optional['SecretStr'] = None
) -> 'LlamaParse':
Expand Down Expand Up @@ -323,17 +319,17 @@ def _handle(
k: v for k, v in kwargs.items() if k not in _PER_CALL_OPTIONS
}

# Determine which client to use
# Always create a fresh client per call. LlamaParse uses
# ``asyncio.run`` internally which binds objects to a thread-local
# event loop, so sharing a single client across threads causes
# "bound to a different event loop" errors.
merged_config = self._config.model_dump() if self._config else {}
if overrides:
# Merge base config with overrides
merged_config = self._config.model_dump() if self._config else {}
merged_config.update(overrides)
client = self._create_client(
merged_config,
api_key=self._config.api_key if self._config else None,
)
else:
client = self.__default_client
client = self._create_client(
merged_config,
api_key=self._config.api_key if self._config else None,
)

try:
filename, stream = self.handle_file_input(file)
Expand Down
27 changes: 26 additions & 1 deletion src/parxy_core/facade/parxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional, Dict, Callable, List, Union, Iterator

Expand Down Expand Up @@ -234,10 +235,19 @@ def batch_iter(
]

breaker = CircuitBreakerState()
cancel_event = threading.Event()

def process_task(
file: Union[str, io.BytesIO, bytes], driver_name: str, task_level: str
) -> BatchResult:
if cancel_event.is_set():
return BatchResult(
file=file,
driver=driver_name,
document=None,
error='Cancelled',
)

trip_exc = breaker.get_trip_exception(driver_name)
if trip_exc is not None:
return BatchResult(
Expand All @@ -254,6 +264,13 @@ def process_task(
file=file, driver=driver_name, document=doc, error=None
)
except Exception as e:
if cancel_event.is_set():
return BatchResult(
file=file,
driver=driver_name,
document=None,
error='Cancelled',
)
breaker.record_failure(driver_name, e)
return BatchResult(
file=file,
Expand All @@ -263,14 +280,22 @@ def process_task(
exception=e,
)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
futures = []
for file, driver_name, task_level in work_items:
future = executor.submit(process_task, file, driver_name, task_level)
futures.append(future)

for future in as_completed(futures):
yield future.result()
except KeyboardInterrupt:
cancel_event.set()
for future in futures:
future.cancel()
raise
finally:
executor.shutdown(wait=False, cancel_futures=True)

@classmethod
def batch(
Expand Down
4 changes: 2 additions & 2 deletions tests/commands/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_parse_command_calls_facade_correctly(runner, mock_document, tmp_path):
tasks=[str(test_file)],
drivers=['pymupdf'],
level='block',
workers=2,
workers=None,
)


Expand Down Expand Up @@ -92,7 +92,7 @@ def test_parse_command_with_custom_options(runner, mock_document, tmp_path):
tasks=[str(test_file)],
drivers=['llamaparse'],
level='block',
workers=2,
workers=None,
)


Expand Down
18 changes: 10 additions & 8 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.