From f329240a635fdf7b50716c117847dd91f2bfcbfe Mon Sep 17 00:00:00 2001 From: Marek Szymutko Date: Thu, 9 Apr 2026 14:43:35 +0200 Subject: [PATCH] fix: retry lookup supports regex * handle sendErrors Signed-off-by: Marek Szymutko --- src/retriable_kafka_client/retry_utils.py | 51 +++++++++++++++++++++-- tests/integration/integration_utils.py | 4 +- tests/integration/test_retry.py | 18 ++++---- tests/unit/test_retry_utils.py | 12 +++++- 4 files changed, 72 insertions(+), 13 deletions(-) diff --git a/src/retriable_kafka_client/retry_utils.py b/src/retriable_kafka_client/retry_utils.py index 76a8959..8960965 100644 --- a/src/retriable_kafka_client/retry_utils.py +++ b/src/retriable_kafka_client/retry_utils.py @@ -19,10 +19,13 @@ import datetime import logging +import re from collections import defaultdict +from typing import TypeVar from confluent_kafka import Message, KafkaException, TopicPartition +from .error import SendError from .config import ProducerConfig, ConsumerConfig, ConsumeTopicConfig from .headers import ( TIMESTAMP_HEADER, @@ -36,6 +39,7 @@ LOGGER = logging.getLogger(__name__) +T = TypeVar("T") def _get_retry_timestamp(message: Message) -> float | None: @@ -76,6 +80,39 @@ def _get_current_timestamp() -> float: return datetime.datetime.now(tz=datetime.timezone.utc).timestamp() +def _regex_lookup_and_update_in_dict( + input_key: str, lookup_dict: dict[str, T] +) -> T | None: + """ + Lookup a key in a dictionary. If not present, try matching + the input key with a dictionary key. If matched, update the + lookup dict and return the corresponding value. + + This is useful for searching and updating lookup tables + for retry configs and producers. + + Args: + input_key: Usually topic name as present in the received message, + the key we search for. + lookup_dict: Lookup dict, it's keys can be either + direct matches or regex patterns + Returns: + The item from the lookup dict, or None if no item was found. + """ + result = lookup_dict.get(input_key, None) + if result is not None: + return result + for dict_key, dict_value in lookup_dict.items(): + try: + if re.match(dict_key, input_key): + # Store the lookup for next time + lookup_dict[input_key] = dict_value + return dict_value + except re.PatternError: + pass + return None + + class RetryScheduleCache: """ Class for storing information about messages that are blocked @@ -254,7 +291,9 @@ def _get_retry_headers( message: Kafka message that will be retried Returns: dictionary of retry headers used for next sending """ - relevant_config = self.__topic_lookup.get(message.topic) + relevant_config = _regex_lookup_and_update_in_dict( + message.topic, self.__topic_lookup + ) if relevant_config is None: return None previous_attempt = _get_retry_attempt(message) @@ -277,7 +316,9 @@ def resend_message(self, message: MessageGroup) -> None: message: the Kafka message that failed to be processed """ message_topic = message.topic - relevant_producer = self.__retry_producers.get(message_topic) + relevant_producer = _regex_lookup_and_update_in_dict( + message_topic, self.__retry_producers + ) if relevant_producer is None: LOGGER.debug( "Message %s from topic %s does not have configured retry topic.", @@ -287,7 +328,9 @@ def resend_message(self, message: MessageGroup) -> None: return # Check if we've exhausted retry attempts - relevant_config = self.__topic_lookup.get(message_topic) + relevant_config = _regex_lookup_and_update_in_dict( + message_topic, self.__topic_lookup + ) if relevant_config is not None: current_attempt = _get_retry_attempt(message) if current_attempt >= relevant_config.retries: @@ -309,7 +352,7 @@ def resend_message(self, message: MessageGroup) -> None: message_topic, extra={"message_raw": str(message.all_chunks)}, ) - except (TypeError, BufferError, KafkaException): + except (TypeError, BufferError, KafkaException, SendError): LOGGER.exception( "Cannot resend message from topic: %s to its retry topic %s", message_topic, diff --git a/tests/integration/integration_utils.py b/tests/integration/integration_utils.py index 48eb233..d99ceb1 100644 --- a/tests/integration/integration_utils.py +++ b/tests/integration/integration_utils.py @@ -337,6 +337,7 @@ def start_consumer( max_concurrency: int = 4, max_workers: int = 2, filter_function: Callable[[Any], bool] | None = None, + override_topics: list[ConsumeTopicConfig] | None = None, ) -> ConsumerHandle: """ Create and start a consumer with the specified configuration. @@ -348,6 +349,7 @@ def start_consumer( max_concurrency: Consumer concurrency limit max_workers: Thread pool size filter_function: Filters messages based on the user-provided function + override_topics: If provided, override the topic config Returns: A ConsumerHandle that can be used to stop the consumer. @@ -367,7 +369,7 @@ def start_consumer( # Create consumer consumer_config = ConsumerConfig( kafka_hosts=[self.kafka_config[KafkaOptions.KAFKA_NODES]], - topics=self.config.topics, + topics=self.config.topics if override_topics is None else override_topics, username=self.kafka_config[KafkaOptions.USERNAME], password=self.kafka_config[KafkaOptions.PASSWORD], group_id=self.config.group_id, diff --git a/tests/integration/test_retry.py b/tests/integration/test_retry.py index bed7bbe..70a87c9 100644 --- a/tests/integration/test_retry.py +++ b/tests/integration/test_retry.py @@ -28,20 +28,24 @@ async def test_retry_mechanism_on_failure( Test that when a message processing fails, it gets retried via the retry topic and eventually succeeds. """ + topic_config = ConsumeTopicConfig( + base_topic="test-base-topic", + retry_topic="test-retry-topic", + retries=3, + fallback_delay=1.0, + ) config = ScaffoldConfig( topics=[ - ConsumeTopicConfig( - base_topic="test-base-topic", - retry_topic="test-retry-topic", - retries=3, - fallback_delay=1.0, - ), + topic_config, ], group_id="test-retry-consumer-group", ) async with IntegrationTestScaffold(kafka_config, admin_client, config) as scaffold: - scaffold.start_consumer(fail_chance_on_first=1.0) + topic_config.base_topic = "^[t]est-base-topic$" + scaffold.start_consumer( + fail_chance_on_first=1.0, override_topics=[topic_config] + ) await asyncio.sleep(2) # Wait for consumer to be ready await scaffold.send_messages(1) diff --git a/tests/unit/test_retry_utils.py b/tests/unit/test_retry_utils.py index b9cc8b9..d259e91 100644 --- a/tests/unit/test_retry_utils.py +++ b/tests/unit/test_retry_utils.py @@ -470,7 +470,7 @@ def test__get_retry_headers_no_config() -> None: "topics,message_topic,send_error,expect_send", [ pytest.param( - [ConsumeTopicConfig(base_topic="t", retry_topic="t-retry")], + [ConsumeTopicConfig(base_topic="^t$", retry_topic="t-retry")], "t", None, True, @@ -497,6 +497,16 @@ def test__get_retry_headers_no_config() -> None: True, id="exception_logged", ), + pytest.param( + [ + ConsumeTopicConfig(base_topic="t[", retry_topic="t-retry"), + ConsumeTopicConfig(base_topic="^u$", retry_topic="u-retry"), + ], + "u", + None, + True, + id="regex_matching", + ), ], ) def test_resend_message(topics, message_topic, send_error, expect_send) -> None: