Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ jobs:
fetch-depth: 0

- name: Install uv (official Astral action)
uses: astral-sh/setup-uv@v7
uses: astral-sh/setup-uv@v8.0.0
with:
version: "0.9.7"
version: "0.11.6"
enable-cache: true
python-version: ${{ matrix.python-version }}

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,18 @@ async def context():
await handle_event(product_event)
```

> **Note**: Async event handlers are executed sequentially by default. To enable parallel execution, create EventBroker with `parallel=True`:

```python
# Sequential (default, backward compatible)
broker = EventBroker()
await handle_event(product_event)

# Parallel
broker = EventBroker(parallel=True)
await handle_event(product_event)
```

### Stories

Stories provide a pattern for defining sequential business operations with optional hooks for execution tracking,
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ dev = [
"coverage==7.12.*",
"rich==14.2.*",
"funlog==0.2.*",
"polyfactory>=2.0.0",
]
lint = [
"ruff==0.14.*",
Expand Down
28 changes: 20 additions & 8 deletions src/dddkit/dataclasses/events.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from asyncio import get_running_loop
from asyncio import gather, get_running_loop, to_thread
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from datetime import datetime
from inspect import iscoroutinefunction
from typing import TypeAlias, TypeVar
from types import CoroutineType
from typing import Any, TypeAlias, TypeVar
from uuid import UUID
from zoneinfo import ZoneInfo

Expand All @@ -30,10 +31,11 @@ class DomainEvent:


class EventBroker:
__slots__: tuple[str, ...] = ('_event_handlers',)
__slots__: tuple[str, ...] = ('_event_handlers', '_parallel')

def __init__(self) -> None:
def __init__(self, parallel: bool = False) -> None:
self._event_handlers: dict[Predicate, set[HandlerEvent]] = {}
self._parallel: bool = parallel

def __call__(self, event: DomainEvent) -> Awaitable[None]:
try:
Expand Down Expand Up @@ -84,11 +86,21 @@ def publish(self, event: DomainEvent) -> None:
handler(event)

async def async_publish(self, event: DomainEvent) -> None:
for handler in self._get_subscribers(event):
if iscoroutinefunction(handler):
await handler(event)
coroutines: list[CoroutineType[Any, Any, Awaitable[None] | None]] = []

for h in self._get_subscribers(event):
if iscoroutinefunction(h):
coroutines.append(h(event))
elif self._parallel:
coroutines.append(to_thread(h, event))
else:
handler(event)
h(event)

if self._parallel:
await gather(*coroutines)
else:
for c in coroutines:
await c

def instance(self, obj_type: type[ET] | tuple[type[ET], ...] | None) -> Callable[[HandlerEvent], HandlerEvent]:
_type = obj_type if obj_type is not None else type(None)
Expand Down
28 changes: 20 additions & 8 deletions src/dddkit/pydantic/events.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from asyncio import get_running_loop
from asyncio import gather, get_running_loop, to_thread
from collections.abc import Awaitable, Callable
from datetime import datetime
from inspect import iscoroutinefunction
from typing import ClassVar, TypeAlias, TypeVar
from types import CoroutineType
from typing import Any, ClassVar, TypeAlias, TypeVar
from uuid import UUID
from zoneinfo import ZoneInfo

Expand Down Expand Up @@ -32,10 +33,11 @@ class DomainEvent(BaseModel):


class EventBroker:
__slots__: tuple[str, ...] = ('_event_handlers',)
__slots__: tuple[str, ...] = ('_event_handlers', '_parallel')

def __init__(self) -> None:
def __init__(self, parallel: bool = False) -> None:
self._event_handlers: dict[Predicate, set[HandlerEvent]] = {}
self._parallel: bool = parallel

def __call__(self, event: DomainEvent) -> Awaitable[None]:
try:
Expand Down Expand Up @@ -86,11 +88,21 @@ def publish(self, event: DomainEvent) -> None:
handler(event)

async def async_publish(self, event: DomainEvent) -> None:
for handler in self._get_subscribers(event):
if iscoroutinefunction(handler):
await handler(event)
coroutines: list[CoroutineType[Any, Any, Awaitable[None] | None]] = []

for h in self._get_subscribers(event):
if iscoroutinefunction(h):
coroutines.append(h(event))
elif self._parallel:
coroutines.append(to_thread(h, event))
else:
handler(event)
h(event)

if self._parallel:
await gather(*coroutines)
else:
for c in coroutines:
await c

def instance(self, obj_type: type[ET] | tuple[type[ET], ...] | None) -> Callable[[HandlerEvent], HandlerEvent]:
_type = obj_type if obj_type is not None else type(None)
Expand Down
70 changes: 58 additions & 12 deletions tests/dataclasses/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from collections.abc import Callable
from dataclasses import dataclass
from typing import NewType, cast
from typing import Any, NewType, cast, final
from unittest.mock import Mock
from uuid import UUID, uuid4

import pytest
from polyfactory.factories import DataclassFactory
from typing_extensions import override

from dddkit.dataclasses import Aggregate, AggregateEvent, DomainEvent, EventBroker

Expand All @@ -19,20 +23,17 @@ class Basket(Aggregate):
basket_id: BasketId

@dataclass(frozen=True, kw_only=True)
class Created(AggregateEvent):
"""Event for basket creation."""
class Created(AggregateEvent): ...

@dataclass(frozen=True, kw_only=True)
class Changed(AggregateEvent):
"""Event for basket change."""
class Changed(AggregateEvent): ...

@dataclass(frozen=True, kw_only=True)
class ChangedId(Changed):
basket_id: BasketId

@dataclass(frozen=True, kw_only=True)
class Deleted(AggregateEvent):
"""Event for basket deletion."""
class Deleted(AggregateEvent): ...

@classmethod
def new(cls, basket_id: BasketId) -> 'Basket':
Expand All @@ -49,10 +50,55 @@ def delete(self) -> None:


@pytest.fixture
def basket() -> Basket:
return Basket(basket_id=cast(BasketId, uuid4()))
def event_broker() -> EventBroker:
return EventBroker(parallel=False)


@pytest.fixture(name='handle_event')
def handle_event_factory() -> EventBroker:
return EventBroker()
@pytest.fixture
def parallel_event_broker() -> EventBroker:
return EventBroker(parallel=True)


@pytest.fixture
def event_broker_with_handler() -> tuple[EventBroker, Mock]:
broker = EventBroker()
handler = Mock()
broker.subscribe(lambda event: isinstance(event, BasketChanged), handler)
return broker, handler


@final
class BasketFactory(DataclassFactory[Basket]):
"""Factory for Basket aggregates."""

__model__: type[Basket] = Basket
__random_seed__ = 42

@classmethod
@override
def get_provider_map(cls) -> dict[type, Callable[[], Any]]:
providers = super().get_provider_map()
providers[BasketId] = lambda: cast(BasketId, uuid4())
return providers

@classmethod
def created(cls, basket_id: BasketId | None = None) -> Basket:
return Basket.new(basket_id=basket_id or cast(BasketId, uuid4()))


@pytest.fixture
def basket_factory() -> type[BasketFactory]:
return BasketFactory


@final
class BasketChangedFactory(DataclassFactory[BasketChanged]):
"""Factory for BasketChanged events."""

__model__: type[BasketChanged] = BasketChanged
__random_seed__ = 42


@pytest.fixture
def basket_changed_factory() -> type[BasketChangedFactory]:
return BasketChangedFactory
73 changes: 56 additions & 17 deletions tests/dataclasses/test_dataclasses_aggregates.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,67 @@
from typing import cast
from uuid import uuid4

from .conftest import Basket, BasketId
from .conftest import Basket, BasketFactory, BasketId


class TestAggregate:
def test_new_aggregate(self):
basket = Basket.new(basket_id=cast(BasketId, uuid4()))
def test_when_aggregate_created_via_factory_then_created_event_emitted(basket_factory: type[BasketFactory]):
"""Using factory method adds Created event to aggregate's event list."""
basket = basket_factory.created()

assert (events := basket.get_events())
assert isinstance(events[0], Basket.Created)
events = basket.get_events()
assert len(events) == 1
assert isinstance(events[0], Basket.Created)

def test_clear_events(self, basket: Basket) -> None:
basket.delete()

assert (events := basket.get_events())
assert isinstance(events[0], Basket.Deleted)
def test_when_aggregate_created_directly_then_no_events(basket_factory: type[BasketFactory]):
"""Direct instantiation does not emit domain events."""
basket = basket_factory.build()

basket.clear_events()
assert not basket.get_events()
assert not basket.get_events()

def test_add_event(self, basket: Basket) -> None:
basket.change_id(cast(BasketId, uuid4()))

assert (events := basket.get_events())
assert isinstance(events[0], Basket.ChangedId)
assert isinstance(events[0], Basket.Changed)
def test_when_aggregate_modified_then_domain_event_added(basket_factory: type[BasketFactory]):
"""Changing aggregate state adds corresponding domain event."""
basket = basket_factory.build()
new_id = cast(BasketId, uuid4())

basket.change_id(new_id)

events = basket.get_events()
assert len(events) == 1
assert isinstance(events[0], Basket.ChangedId)
assert events[0].basket_id == new_id


def test_when_aggregate_deleted_then_deleted_event_added(basket_factory: type[BasketFactory]):
"""Delete operation adds Deleted event to aggregate."""
basket = basket_factory.build()

basket.delete()

events = basket.get_events()
assert len(events) == 1
assert isinstance(events[0], Basket.Deleted)


def test_when_events_cleared_then_aggregate_has_no_events(basket_factory: type[BasketFactory]):
"""clear_events removes all pending domain events from aggregate."""
basket = basket_factory.created()
basket.delete()
assert len(basket.get_events()) == 2

basket.clear_events()

assert not basket.get_events()


def test_when_event_is_subtype_then_matches_parent_type(basket_factory: type[BasketFactory]):
"""Domain events maintain proper inheritance hierarchy."""
basket = basket_factory.build()
new_id = cast(BasketId, uuid4())

basket.change_id(new_id)

events = basket.get_events()
assert isinstance(events[0], Basket.Changed)
assert isinstance(events[0], Basket.ChangedId)
5 changes: 3 additions & 2 deletions tests/dataclasses/test_dataclasses_changes_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dddkit.dataclasses import AggregateEvent, ChangesHandler, DomainEvent

from .conftest import Basket, BasketChanged, BasketId
from .conftest import Basket, BasketChanged, BasketFactory, BasketId


class Result(NamedTuple):
Expand Down Expand Up @@ -47,10 +47,11 @@ def _(self, _: Basket.Deleted, basket: Basket) -> None:


class TestChangeHandler:
def test_handle_changes(self, basket: Basket):
def test_handle_changes(self, basket_factory: type[BasketFactory]):
basket_changes_handler = BasketChangesHandler()
basket_changes_handler.created_fields = {'id': cast(BasketId, uuid4())}

basket = basket_factory.build()
new_basket_id = cast(BasketId, uuid4())
basket.change_id(new_basket_id)

Expand Down
Loading