From ac64faff4fdd6d5c29d06f4d1da50aa87b0160fd Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Wed, 7 May 2025 15:30:48 +0300 Subject: [PATCH 1/6] senders: drop dependency on kafka.RETRY_ERROR_TYPES Kafka-python has dropped RETRY_ERROR_TYPES definition. Catch generic KafkaError and check from the exception itself whether the error is retriable. Re-raise fatal errors. --- journalpump/senders/kafka.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/journalpump/senders/kafka.py b/journalpump/senders/kafka.py index 57c9a48..8486d8a 100644 --- a/journalpump/senders/kafka.py +++ b/journalpump/senders/kafka.py @@ -15,12 +15,6 @@ except ImportError: zstd = None -KAFKA_CONN_ERRORS = tuple(errors.RETRY_ERROR_TYPES) + ( - errors.UnknownError, - socket.timeout, - TimeoutError, -) - logging.getLogger("kafka").setLevel(logging.CRITICAL) # remove client-internal tracebacks from logging output @@ -83,7 +77,11 @@ def _init_kafka(self) -> None: try: kafka_producer = KafkaProducer(**producer_config) - except KAFKA_CONN_ERRORS as ex: + except (errors.KafkaError, socket.timeout, TimeoutError) as ex: + if isinstance(ex, errors.KafkaError): + # Reraise exceptions that are fatal + if not ex.retriable: + raise self.mark_disconnected(ex) self.log.warning( "Retriable error during Kafka initialization: %s: %s", @@ -131,7 +129,11 @@ def send_messages(self, *, messages, cursor): result_future.get(timeout=1) self.mark_sent(messages=messages, cursor=cursor) return True - except KAFKA_CONN_ERRORS as ex: + except (errors.KafkaError, socket.timeout, TimeoutError) as ex: + if isinstance(ex, errors.KafkaError): + # Reraise exceptions that are fatal + if not ex.retriable: + raise self.mark_disconnected(ex) self.log.info( "Kafka retriable error during send: %s: %s, waiting", From 532240b51fa8eb89806943c0b9937bc25f4082b3 Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Wed, 7 May 2025 15:02:37 +0300 Subject: [PATCH 2/6] senders: refactor websocket connection error handling Replace legacy websockets.exceptions.InvalidStatusCode with websockets.exceptions.InvalidHandshake. Reorder exceptions to catch more specific timeout errors. --- journalpump/senders/websocket.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/journalpump/senders/websocket.py b/journalpump/senders/websocket.py index a48ec5c..aa09d85 100644 --- a/journalpump/senders/websocket.py +++ b/journalpump/senders/websocket.py @@ -244,24 +244,24 @@ async def comms_channel_round(self): for task in pending: task.cancel() - except ConnectionRefusedError as ex: - self.log.warning("Websocket connection refused: %r. Retrying.", ex) - except (ConnectionTimeoutError, asyncio.TimeoutError, CancelledError) as ex: - self.log.warning("Websocket connection timed out: %r. Retrying.", ex) except socket.gaierror as ex: self.log.error( "DNS lookup for websocket endpoint or SOCKS5 proxy failed: %r. Retrying.", ex, ) - except websockets.exceptions.InvalidStatusCode as ex: + except ConnectionRefusedError as ex: + self.log.warning("Websocket connection refused: %r. Retrying.", ex) + except ssl.SSLCertVerificationError as ex: + self.log.error("Websocket certificate verification error: %r. Retrying.", ex) + except (ProxyError, ProxyConnectionError, ProxyTimeoutError) as ex: + self.log.warning("SOCKS5 proxy connection error: %r. Retrying.", ex) + except (ConnectionTimeoutError, asyncio.TimeoutError, CancelledError) as ex: + self.log.warning("Websocket connection timed out: %r. Retrying.", ex) + except websockets.exceptions.InvalidHandshake as ex: self.log.error( - "Websocket server rejected connection with HTTP status code: %r. Retrying.", + "Websocket handshake failed: %r. Retrying.", ex, ) - except (ProxyError, ProxyConnectionError, ProxyTimeoutError) as ex: - self.log.warning("SOCKS5 proxy connection error: %r. Retrying.", ex) - except ssl.SSLCertVerificationError as ex: - self.log.error("Websocket certificate verification error: %r. Retrying.", ex) except OSError as ex: # Network unreachable, etc, may happen sporadically self.log.warning("Websocket connection error: %r. Retrying.", ex) except Exception as ex: # pylint:disable=broad-except From 8ced6b2166a1f26bf4f38139fb6b8b15fdc831de Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Wed, 7 May 2025 21:31:46 +0300 Subject: [PATCH 3/6] senders: version compatibility fixes for websockets dependency Websockets library has changed interfaces somewhat from 12.0 version (shipped with Fedora) and the most recent 15.0 (available via PyPI). Dynamically figure out the correct keyword arguments to use between the known versions. --- journalpump/senders/websocket.py | 45 ++++++++++++++++++++++++-------- test/test_websocket.py | 1 - 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/journalpump/senders/websocket.py b/journalpump/senders/websocket.py index aa09d85..826c0c2 100644 --- a/journalpump/senders/websocket.py +++ b/journalpump/senders/websocket.py @@ -5,6 +5,7 @@ from journalpump import __version__ from journalpump.types import StrEnum from journalpump.util import ExponentialBackoff +from packaging.version import Version from threading import Thread from urllib.parse import urlparse @@ -177,17 +178,39 @@ async def websocket_connect_coro(self): socks_url_parsed.port, ) - ws_compr = None if self.websocket_compression == WebsocketCompression.none else str(self.websocket_compression) - return await websockets.connect( # pylint:disable=no-member - self.websocket_uri, - ssl=ssl_context, - compression=ws_compr, - extra_headers=headers, - sock=sock, - server_hostname=url_parsed.hostname if self.ssl_enabled else None, - close_timeout=20, - max_size=MAX_KAFKA_MESSAGE_SIZE * 2, - ) + # In order to support version transition in websockects, we generated kwargs dynamically + connect_kwargs = { + "close_timeout": 20, + "max_size": MAX_KAFKA_MESSAGE_SIZE * 2, + "ssl": ssl_context, + } + + if self.websocket_compression != WebsocketCompression.none: + connect_kwargs["compression"] = str(self.websocket_compression) + + if self.ssl_enabled: + connect_kwargs["server_hostname"] = url_parsed.hostname + + if sock: + connect_kwargs["sock"] = sock + + # Versions 13.0 and up switched into additional_headers, lder versions expect extra_headers + # Versions 15.0 up introduce a separate user_agent_header + if headers: + websockets_version = Version(websockets.__version__) + if websockets_version >= Version("13.0"): + if websockets_version >= Version("15.0"): + user_agent = headers.pop("User-Agent", None) + if user_agent: + connect_kwargs["user_agent_header"] = user_agent + if headers: + connect_kwargs["additional_headers"] = headers + else: + connect_kwargs["additional_headers"] = headers + else: + connect_kwargs["extra_headers"] = headers + + return await websockets.connect(self.websocket_uri, **connect_kwargs) async def websocket_connect(self, *, timeout=30): connect_task = asyncio.create_task(self.websocket_connect_coro()) diff --git a/test/test_websocket.py b/test/test_websocket.py index 8cd8131..d109f67 100644 --- a/test/test_websocket.py +++ b/test/test_websocket.py @@ -71,7 +71,6 @@ async def run_websocket_server(self): self.process_connection, "127.0.0.1", self.port, - loop=self.loop, ssl=ctx, close_timeout=10, ) as server: From 52a5864db8e24546a7fe1f4fe1915fe01c17b93f Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Wed, 7 May 2025 22:48:19 +0300 Subject: [PATCH 4/6] test: adjust handler arguments in websocket tests The path argument in server conneciton handlers was removed in 10.1 with backwards compatibility removed in 13.0. --- test/test_websocket.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_websocket.py b/test/test_websocket.py index d109f67..7398c81 100644 --- a/test/test_websocket.py +++ b/test/test_websocket.py @@ -35,8 +35,8 @@ async def handle_incoming_websocket_message(self, *, connection): self.log.info("WS: Received message: %r", message) self.in_queue.append(message) - async def process_connection(self, websocket, path): - self.log.info("WS: Client connection accepted on %s", path) + async def process_connection(self, websocket): + self.log.info("WS: Client connection accepted") pending = set() try: From 6c3995e892435c2f4d295f37d5b772f818388087 Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Wed, 7 May 2025 22:34:54 +0300 Subject: [PATCH 5/6] test: use random unallocated ports in websocket tests --- test/test_websocket.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/test/test_websocket.py b/test/test_websocket.py index 7398c81..680322a 100644 --- a/test/test_websocket.py +++ b/test/test_websocket.py @@ -13,18 +13,14 @@ class WebsocketMockServer(threading.Thread): - def __init__( - self, - *, - port, - ): + def __init__(self): super().__init__() self.daemon = True self.log = logging.getLogger(self.__class__.__name__) - self.port = port self.in_queue = deque() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) + self.start_event = None self.stop_event = asyncio.Event() self.running = False self.websocket_server = None @@ -66,16 +62,20 @@ async def run_websocket_server(self): # ctx.load_verify_locations(self.ca_certs) # ctx.verify_mode = ssl.CERT_REQUIRED + self.start_event = asyncio.Event() + # websockets uses lazy_import and pylint doesn't quite get it async with websockets.serve( # pylint:disable=no-member self.process_connection, "127.0.0.1", - self.port, + None, ssl=ctx, close_timeout=10, ) as server: self.websocket_server = server self.log.info("WS: Started serving websocket connections") + self.start_event.set() + self.start_event = None await self.stop_event.wait() self.log.info("WS: Stopped serving websocket connections") @@ -105,6 +105,14 @@ def stop(self): task.cancel() self.log.info("WS: stopped") + async def get_port_task(self): + if self.start_event: + await self.start_event.wait() + return self.websocket_server.sockets[0].getsockname()[1] + + def get_port(self): + return asyncio.run_coroutine_threadsafe(self.get_port_task(), self.loop).result() + def assert_msgs_found(ws_server, *, messages, timeout): # Check that all of these messages were sent to the websocket server. @@ -156,16 +164,15 @@ def setup_pump(tmpdir, sender_config): def test_producer_nobatch(caplog, tmpdir): caplog.set_level(logging.INFO) - ws_server = WebsocketMockServer( - port=10111, - ) + ws_server = WebsocketMockServer() ws_server.start() + port = ws_server.get_port() pump, sender = setup_pump( tmpdir, { "output_type": "websocket", - "websocket_uri": "ws://127.0.0.1:10111/pump-pump", + "websocket_uri": f"ws://127.0.0.1:{port}/pump-pump", "compression": "none", "max_batch_size": 0, }, @@ -185,16 +192,15 @@ def test_producer_nobatch(caplog, tmpdir): def test_producer_batch(caplog, tmpdir): caplog.set_level(logging.INFO) - ws_server = WebsocketMockServer( - port=10111, - ) + ws_server = WebsocketMockServer() ws_server.start() + port = ws_server.get_port() pump, sender = setup_pump( tmpdir, { "output_type": "websocket", - "websocket_uri": "ws://127.0.0.1:10111/pump-pump", + "websocket_uri": f"ws://127.0.0.1:{port}/pump-pump", "compression": "snappy", "max_batch_size": 1024, }, From 6b1066dac52d79b7615bf53247e1ac736247289b Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Wed, 7 May 2025 15:37:58 +0300 Subject: [PATCH 6/6] senders: resolve and use a random address on websockets connect Resolve websocket remote address and pick a random one as an explicit host to connect. The combination of asyncio websockets and asyncio create_connect() have some overlapping timeouts that lead to connection attempts to run through the whole list of targets returned by getaddrinfo(). This breaks connectivity in some dual stack IPv6/IPv4 cases, when either one of the address family has e.g. routing problems. We can fairly safely pick one at random here; we have retry logic built around the connections anyway that eventually lead to connected state. --- journalpump/senders/websocket.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/journalpump/senders/websocket.py b/journalpump/senders/websocket.py index 826c0c2..2f64809 100644 --- a/journalpump/senders/websocket.py +++ b/journalpump/senders/websocket.py @@ -13,6 +13,7 @@ import contextlib import enum import logging +import random import snappy # pylint: disable=import-error import socket import ssl @@ -164,6 +165,8 @@ async def websocket_connect_coro(self): sock = None url_parsed = urlparse(self.websocket_uri) + preferred_host = None + if self.socks5_proxy: socks_url_parsed = urlparse(self.socks5_proxy_url) self.log.info( @@ -177,6 +180,24 @@ async def websocket_connect_coro(self): socks_url_parsed.hostname, socks_url_parsed.port, ) + else: + # Resolve hostname and pick one address at random + # Websockets.connect() and underlying asyncio.loop.create_connection() have some overlapping timeouts + # that lead to little bit difficulties with working through all addresses returned by getaddrinfo. + # We pick one at random here, and rely our own outer loops to handle retries. + present_addrs = [ + # getaddrinfo returns 5-tuple (family, type, proto, canonname, sockaddr) + # sockaddr is family dependent, but leads with address for both the IPv6 and IPv4 families + sockaddr[0] + for _, _, _, _, sockaddr in await self.websocket_loop.getaddrinfo( + url_parsed.hostname, 0, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP + ) + ] + if present_addrs: + preferred_host = random.choice(present_addrs) + else: + # We couldn't resolve a suitable name, fallback to async.loop.create_connection() name handling + preferred_host = url_parsed.hostname # In order to support version transition in websockects, we generated kwargs dynamically connect_kwargs = { @@ -210,6 +231,9 @@ async def websocket_connect_coro(self): else: connect_kwargs["extra_headers"] = headers + if preferred_host: + connect_kwargs["host"] = preferred_host + return await websockets.connect(self.websocket_uri, **connect_kwargs) async def websocket_connect(self, *, timeout=30):