Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ repos:
args: [--extension-pkg-whitelist=confluent_kafka, --load-plugins=pylint.extensions.bad_builtin, --load-plugins=pylint.extensions.mccabe]
additional_dependencies: ["click", "confluent_kafka", "cotyledon", "pytest", "pytest_mock"]


# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v0.902
# hooks:
# - id: mypy
# args: ["--config-file=pyproject.toml"]
# files: ^eventbusk/
# additional_dependencies: ["types-click"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v2.1.0
hooks:
- id: mypy
args: ["--config-file=pyproject.toml"]
files: ^eventbusk/
additional_dependencies: ["types-click"]

- repo: local
hooks:
Expand Down
35 changes: 26 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ bus.send(foo)

## Contributing

Pre-requisites:
### Setting up locally

- [uv](https://docs.astral.sh/uv/getting-started/installation/)
- [Docker Desktop](https://www.docker.com/products/docker-desktop/)
Expand All @@ -55,6 +55,26 @@ uv sync --extra dev
pre-commit install
```

Now you can run the example project consumers. Ensure the topics in the example are created first.

```bash
cd examples
uv run eventbusk worker -A eventbus:bus
```

You can also publish:

```bash
uv run python

>>> from eventbus import bus, Fooey
>>> bus.send(Fooey(foo_val="lorem ipsum"))
```


### Checks
After making code changes you can run some basic sanity checks as follows.

Run the tests:

```bash
Expand All @@ -64,6 +84,7 @@ uv run task test
Run the linter:

```bash
uv run task ruff
uv run task pylint
```

Expand All @@ -73,18 +94,14 @@ Format the code:
uv run task format
```

Now you can run the example project consumers. Ensure the topics in the example are created first.
Run type checks:

```bash
cd examples
uv run eventbusk worker -A eventbus:bus
uv run task typecheck
```

You can also publish:
You can also choose running pre-commit manually, which runs all all of the above, among other things.

```bash
uv run python

>>> from eventbus import bus, Fooey
>>> bus.send(Fooey(foo_val="lorem ipsum"))
uv run pre-commit run --all-files
```
2 changes: 1 addition & 1 deletion eventbusk/brokers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


__all__ = ["Consumer", "DeliveryCallback", "Producer"]
__all__ = ["BaseProducer", "Consumer", "DeliveryCallback", "Producer"]


def consumer_factory(broker: str, topic: str, group: str) -> BaseConsumer:
Expand Down
10 changes: 5 additions & 5 deletions eventbusk/brokers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from contextlib import ContextDecorator
from typing import TYPE_CHECKING, Self

from confluent_kafka import cimpl # type: ignore
from confluent_kafka import cimpl

if TYPE_CHECKING:
from types import TracebackType
Expand All @@ -25,7 +25,7 @@
# Type hints
# callback method `on_delivery` on the producer
type DeliveryCallback = Callable[..., None]
type Message = str | bytes | cimpl.Message
type Message = bytes | cimpl.Message


class BaseBrokerURI(ABC):
Expand Down Expand Up @@ -68,7 +68,7 @@ def __exit__( # pylint: disable=too-many-positional-arguments
pass

@abstractmethod
def poll(self, timeout: int) -> Message | None: # type: ignore
def poll(self, timeout: int) -> Message | None:
"""Poll for a specified time in seconds for new messages."""

@abstractmethod
Expand All @@ -87,10 +87,10 @@ def __init__(self, broker: str) -> None:
super().__init__()

@abstractmethod
def produce( # type: ignore
def produce( # pylint: disable=too-many-arguments
self,
topic: str,
value: Message,
value: bytes,
*,
flush: bool = True,
on_delivery: DeliveryCallback = None,
Expand Down
2 changes: 2 additions & 0 deletions eventbusk/brokers/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ def __init__(self, broker: str, *, topic: str, group: str) -> None:
self.topic = topic
self.group = group

# pylint: disable-next=useless-return
def poll(self, timeout: int = 1) -> Message | None:
"""Sleeps for the required timeout, and returns no message."""
time.sleep(timeout)
return None

def ack(self, message: Message) -> None:
"""Acknowledge event."""
Expand Down
20 changes: 14 additions & 6 deletions eventbusk/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Self
from typing import TYPE_CHECKING, Self, cast

from confluent_kafka import ( # type: ignore
from confluent_kafka import (
Consumer as CConsumer,
KafkaException,
Producer as CProducer,
Expand All @@ -18,6 +18,8 @@
if TYPE_CHECKING:
from types import TracebackType

from confluent_kafka import Message as CMessage

from .base import DeliveryCallback, Message


Expand Down Expand Up @@ -135,7 +137,7 @@ def __init__(self, broker: str, *, topic: str, group: str) -> None:
self.broker = BrokerURI.from_uri(broker)
self.topic = topic
self.group = group
self._consumer: CConsumer = None
self._consumer: CConsumer | None = None

def __repr__(self) -> str:
return (
Expand Down Expand Up @@ -164,7 +166,8 @@ def __exit__( # pylint: disable=too-many-positional-arguments
exc_value: BaseException | None,
exc_traceback: TracebackType | None,
) -> None:
self._consumer.close()
if self._consumer is not None:
self._consumer.close()

if exc_type and exc_value and exc_traceback:
logger.warning(
Expand All @@ -179,11 +182,16 @@ def __exit__( # pylint: disable=too-many-positional-arguments

def poll(self, timeout: int) -> Message | None:
"""Poll the topic for new messages."""
if self._consumer is None:
raise RuntimeError("Consumer is not open. Use as a context manager.")
return self._consumer.poll(timeout)

def ack(self, message: Message | None) -> None:
"""Acknowledge the message by explicitly committing."""
self._consumer.commit(message=message)
if self._consumer is None:
raise RuntimeError("Consumer is not open. Use as a context manager.")
if message is not None:
self._consumer.commit(message=cast("CMessage", message))


class Producer(BaseProducer):
Expand All @@ -198,7 +206,7 @@ def __init__(self, broker: str) -> None:
def produce(
self,
topic: str,
value: Message,
value: bytes,
*,
flush: bool = True,
on_delivery: DeliveryCallback = None,
Expand Down
38 changes: 26 additions & 12 deletions eventbusk/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
from collections.abc import Callable
from dataclasses import asdict, dataclass, field
from functools import wraps
from typing import Any

from .brokers import Consumer, DeliveryCallback, Producer
from .brokers import BaseProducer, Consumer, DeliveryCallback, Producer
from .exceptions import AlreadyRegistered, ConsumerError, ProducerError, UnknownEvent

logger = logging.getLogger(__name__)
Expand All @@ -36,7 +37,7 @@ class MyEvent(Event):
class EventJsonEncoder(json.JSONEncoder):
"""JSON encoder that additionally converts uuid to str."""

def default(self, o):
def default(self, o: Any) -> Any:
if isinstance(o, uuid.UUID):
return str(o)
return json.JSONEncoder.default(self, o)
Expand Down Expand Up @@ -97,7 +98,7 @@ def __init__(
# This is done to avoid issues forking, causing flush to fail.
# https://github.com/confluentinc/confluent-kafka-python/issues/1122
# https://github.com/dpkp/kafka-python/issues/1098
self.producer = None
self.producer: BaseProducer | None = None

# Registries
# Topic <--> Event type is a 1-1 relation right now, i.e. a topic can only
Expand Down Expand Up @@ -174,8 +175,18 @@ def receivers(self) -> set[ReceiverWorker]:
"""Returns a set of receivers(consumers) of events."""
return self._receivers

@property
def before_receive_hooks(self) -> list[Hook]:
"""Returns the list of before-receive hooks."""
return self._before_receive_hooks

@property
def after_receive_hooks(self) -> list[Hook]:
"""Returns the list of after-receive hooks."""
return self._after_receive_hooks

# TODO: add group parameter?
def receive( # pylint: disable=too-complex
def receive( # pylint: disable=too-complex,too-many-statements
self,
event_type: EventType,
poll_timeout: int = 1,
Expand Down Expand Up @@ -213,12 +224,14 @@ def wrapper() -> None: # pylint: disable=too-many-branches
try:
message = consumer.poll(poll_timeout)
except ConsumerError:
msg = (
error_msg = (
"Error while consuming message. "
"Topic might be blocked"
)
logger.exception(msg, exc_info=True, extra=log_context)
self.sleep(seconds=1, message=msg)
logger.exception(
error_msg, exc_info=True, extra=log_context
)
self.sleep(seconds=1, message=error_msg)
continue

# No message to consume.
Expand All @@ -231,18 +244,18 @@ def wrapper() -> None: # pylint: disable=too-many-branches
# "error
msg_error = message.error() # type: ignore
if msg_error:
msg = (
error_msg = (
"Error while consuming message. "
"Topic might be blocked"
)
logger.warning(
msg,
error_msg,
extra={
**log_context,
"error": msg_error,
},
)
self.sleep(seconds=1, message=msg)
self.sleep(seconds=1, message=error_msg)
continue

# Deserialise to the dataclass of the event
Expand All @@ -266,8 +279,9 @@ def wrapper() -> None: # pylint: disable=too-many-branches

# TODO: Fix following
# Too many arguments for "Event" [call-arg]
event = event_type(**event_data) # type: ignore
event.event_id = event_id
event = event_type(**event_data)
if event_id is not None:
event.event_id = event_id

for hook in self._before_receive_hooks:
try:
Expand Down
12 changes: 4 additions & 8 deletions eventbusk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import threading
from contextlib import contextmanager, suppress
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING

import click
import cotyledon # type: ignore
import cotyledon
from cotyledon.types import WorkerId

from .bus import EventBus

Expand Down Expand Up @@ -59,10 +59,6 @@ def find_app(app: str, attr_name: str = "app") -> EventBus:
with cwd_in_path():
module = importlib.import_module(module_name, package=None)

if not isinstance(module, ModuleType):
msg = f"Module f{module_name} not found or cannot be imported"
raise AttributeError(msg)

# Find bus instance within the module
found = getattr(module, attr_name)
if not isinstance(found, EventBus):
Expand All @@ -79,11 +75,11 @@ def cli() -> None:
"""Main entry point."""


class Worker(cotyledon.Service): # type: ignore
class Worker(cotyledon.Service): # type: ignore[misc]
"""Process handling an event receiver."""

def __init__(self, worker_id: int, receiver: Callable[..., None]) -> None:
super().__init__(worker_id)
super().__init__(WorkerId(worker_id))
self._shutdown = threading.Event()
self.receiver = receiver
self.name = EventBus.to_fqn(receiver)
Expand Down
Loading