From eda3606097c7ea44b1dda3694030ba0bffe9e921 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Mon, 30 Mar 2026 15:45:08 +0100 Subject: [PATCH] :sparkles: Add parallel async subscribers in EventBroker - Run async event handlers in parallel using asyncio.gather - Maintain backward compatibility for sync handlers - Added documentation note in README --- README.md | 2 ++ src/dddkit/dataclasses/events.py | 9 +++++++-- src/dddkit/pydantic/events.py | 9 +++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 11340ff..9e3d366 100644 --- a/README.md +++ b/README.md @@ -295,6 +295,8 @@ async def context(): await handle_event(product_event) ``` +> **Note**: Async event handlers are executed in parallel using `asyncio.gather`. Sync handlers are called sequentially. + ### Stories Stories provide a pattern for defining sequential business operations with optional hooks for execution tracking, diff --git a/src/dddkit/dataclasses/events.py b/src/dddkit/dataclasses/events.py index 177e2a4..1751245 100644 --- a/src/dddkit/dataclasses/events.py +++ b/src/dddkit/dataclasses/events.py @@ -1,3 +1,4 @@ +import asyncio from asyncio import get_running_loop from collections.abc import Awaitable, Callable from dataclasses import dataclass, field @@ -84,11 +85,15 @@ def publish(self, event: DomainEvent) -> None: handler(event) async def async_publish(self, event: DomainEvent) -> None: - for handler in self._get_subscribers(event): + handlers = self._get_subscribers(event) + async_handlers = [] + for handler in handlers: if iscoroutinefunction(handler): - await handler(event) + async_handlers.append(handler(event)) else: handler(event) + if async_handlers: + await asyncio.gather(*async_handlers) def instance(self, obj_type: type[ET] | tuple[type[ET], ...] | None) -> Callable[[HandlerEvent], HandlerEvent]: _type = obj_type if obj_type is not None else type(None) diff --git a/src/dddkit/pydantic/events.py b/src/dddkit/pydantic/events.py index 960819f..01a8ba0 100644 --- a/src/dddkit/pydantic/events.py +++ b/src/dddkit/pydantic/events.py @@ -1,3 +1,4 @@ +import asyncio from asyncio import get_running_loop from collections.abc import Awaitable, Callable from datetime import datetime @@ -86,11 +87,15 @@ def publish(self, event: DomainEvent) -> None: handler(event) async def async_publish(self, event: DomainEvent) -> None: - for handler in self._get_subscribers(event): + handlers = self._get_subscribers(event) + async_handlers = [] + for handler in handlers: if iscoroutinefunction(handler): - await handler(event) + async_handlers.append(handler(event)) else: handler(event) + if async_handlers: + await asyncio.gather(*async_handlers) def instance(self, obj_type: type[ET] | tuple[type[ET], ...] | None) -> Callable[[HandlerEvent], HandlerEvent]: _type = obj_type if obj_type is not None else type(None)