Skip to content

Added parallel processing of subscribers in EventBroker during asynchronous execution#14

Merged
mom1 merged 2 commits into
mainfrom
feature/AI-3-async-parallel
Apr 13, 2026
Merged

Added parallel processing of subscribers in EventBroker during asynchronous execution#14
mom1 merged 2 commits into
mainfrom
feature/AI-3-async-parallel

Conversation

@mom1
Copy link
Copy Markdown
Owner

@mom1 mom1 commented Mar 30, 2026

Summary

Add parallel async execution support for EventBroker subscribers. This significantly improves performance for I/O-bound handlers by running them concurrently.

Core Changes

parallel=True parameter in EventBroker.__init__():

  • Async handlers execute concurrently via asyncio.gather()
  • Sync handlers run in thread pool via asyncio.to_thread() — don't block event loop
  • Full backward compatibility: parallel=False by default

Implementation Details

  • Deduplication: handlers matching multiple predicates receive event only once
  • Clean error handling: NotImplementedError when no handlers match
  • Mixed handlers support: both sync and async in same broker

Tests & Refactoring

  • Applied readable tests methodology (Given-When-Then, Mock > nonlocal)
  • Unified factories: removed basket fixture, now using BasketFactory
  • Replaced EventBrokerFactory class with pytest fixtures
  • Coverage: 100% (added missing parallel+sync handler test)

Example Usage

broker = EventBroker(parallel=True)

@broker.instance(MyEvent)
async def handler_a(event): ...

@broker.instance(MyEvent)
async def handler_b(event): ...

# Both handlers run concurrently:
await broker(my_event)  # ~max(duration_a, duration_b) instead of sum

@mom1 mom1 force-pushed the feature/AI-3-async-parallel branch from fb3770c to f7c4ff6 Compare March 30, 2026 16:36
@mom1 mom1 force-pushed the feature/AI-3-async-parallel branch 4 times, most recently from 4d72f8e to 3b9d3af Compare April 9, 2026 20:57
- Add parallel parameter to EventBroker.__init__ for parallel async handling
- Execute async handlers concurrently with asyncio.gather when parallel=True
- Separate coroutines and sync handlers upfront for reuse
- Add tests for parallel and sequential execution modes
- Update README with usage examples
- Fix type annotations and use _ naming for unused handlers
- Unify factories, refactoring tests, 100% coverage.
@mom1 mom1 force-pushed the feature/AI-3-async-parallel branch from 3b9d3af to 2d128fa Compare April 9, 2026 20:59
@mom1 mom1 changed the title Add parallel async subscribers in EventBroker Added parallel processing of subscribers in EventBroker during asynchronous execution Apr 9, 2026
@mom1 mom1 merged commit d3d3c1e into main Apr 13, 2026
5 checks passed
@mom1 mom1 deleted the feature/AI-3-async-parallel branch April 13, 2026 12:29
@github-actions
Copy link
Copy Markdown

🎉 This PR is included in version 0.5.0 🎉

The release is available on GitHub release

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant