Skip to content

Commit 8627a99

Browse files
committed
[ECO-5698] fix: handle normal WebSocket close frames and improve reconnection logic
- Added local WebSocket proxy for testing (`WsProxy`) and corresponding tests for immediate reconnection on normal close. - Fixed missing reconnection on server-sent normal WebSocket close frames in `WebSocketTransport`. - Adjusted idle timer handling to avoid accidental reuse.
1 parent 5cdfeb3 commit 8627a99

2 files changed

Lines changed: 108 additions & 3 deletions

File tree

ably/transport/websockettransport.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic
8080
def connect(self):
8181
headers = HttpUtils.default_headers()
8282
query_params = urllib.parse.urlencode(self.params)
83-
ws_url = (f'wss://{self.host}?{query_params}')
83+
scheme = 'wss' if self.options.tls else 'ws'
84+
ws_url = f'{scheme}://{self.host}?{query_params}'
8485
log.info(f'connect(): attempting to connect to {ws_url}')
8586
self.ws_connect_task = asyncio.create_task(self.ws_connect(ws_url, headers))
8687
self.ws_connect_task.add_done_callback(self.on_ws_connect_done)
@@ -124,6 +125,11 @@ async def _handle_websocket_connection(self, ws_url, websocket):
124125
if not self.is_disposed:
125126
await self.dispose()
126127
self.connection_manager.deactivate_transport(err)
128+
else:
129+
# Read loop exited normally (e.g., server sent normal WS close frame)
130+
if not self.is_disposed:
131+
await self.dispose()
132+
self.connection_manager.deactivate_transport()
127133

128134
async def on_protocol_message(self, msg):
129135
self.on_activity()
@@ -284,8 +290,9 @@ async def send(self, message: dict):
284290
await self.websocket.send(raw_msg)
285291

286292
def set_idle_timer(self, timeout: float):
287-
if not self.idle_timer:
288-
self.idle_timer = Timer(timeout, self.on_idle_timer_expire)
293+
if self.idle_timer:
294+
self.idle_timer.cancel()
295+
self.idle_timer = Timer(timeout, self.on_idle_timer_expire)
289296

290297
async def on_idle_timer_expire(self):
291298
self.idle_timer = None

test/ably/realtime/realtimeconnection_test.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import asyncio
22

33
import pytest
4+
from websockets import connect as _ws_connect
5+
from websockets.asyncio.server import serve as ws_serve
46

57
from ably.realtime.connection import ConnectionEvent, ConnectionState
68
from ably.transport.defaults import Defaults
@@ -10,6 +12,68 @@
1012
from test.ably.utils import BaseAsyncTestCase
1113

1214

15+
async def _relay(src, dst):
16+
try:
17+
async for msg in src:
18+
await dst.send(msg)
19+
except Exception:
20+
pass
21+
22+
23+
class WsProxy:
24+
"""Local WS proxy that forwards to real Ably and lets tests trigger a normal close."""
25+
26+
def __init__(self, target_host: str):
27+
self.target_host = target_host
28+
self.server = None
29+
self.port: int | None = None
30+
self._close_event: asyncio.Event | None = None
31+
32+
async def _handler(self, client_ws):
33+
# Create a fresh event for this connection; signal to drop the connection cleanly
34+
self._close_event = asyncio.Event()
35+
path = client_ws.request.path # e.g. "/?key=...&format=json"
36+
target_url = f"wss://{self.target_host}{path}"
37+
try:
38+
async with _ws_connect(target_url, ping_interval=None) as server_ws:
39+
c2s = asyncio.create_task(_relay(client_ws, server_ws))
40+
s2c = asyncio.create_task(_relay(server_ws, client_ws))
41+
close_task = asyncio.create_task(self._close_event.wait())
42+
try:
43+
await asyncio.wait([c2s, s2c, close_task], return_when=asyncio.FIRST_COMPLETED)
44+
finally:
45+
c2s.cancel()
46+
s2c.cancel()
47+
close_task.cancel()
48+
except Exception:
49+
pass
50+
# After _handler returns the websockets server sends a normal close frame (1000)
51+
52+
async def close_active_connection(self):
53+
"""Trigger a normal WS close (code 1000) on the currently active client connection.
54+
55+
Signals the handler to exit; the websockets server framework then sends the
56+
close frame automatically when the handler coroutine returns.
57+
"""
58+
if self._close_event:
59+
self._close_event.set()
60+
61+
@property
62+
def endpoint(self) -> str:
63+
"""Endpoint string to pass to AblyRealtime (combine with tls=False)."""
64+
return f"127.0.0.1:{self.port}"
65+
66+
async def __aenter__(self):
67+
self.server = await ws_serve(self._handler, "127.0.0.1", 0, ping_interval=None)
68+
self.port = self.server.sockets[0].getsockname()[1]
69+
return self
70+
71+
async def __aexit__(self, *args):
72+
if self.server:
73+
self.server.close()
74+
await self.server.wait_closed()
75+
76+
1377
class TestRealtimeConnection(BaseAsyncTestCase):
1478
@pytest.fixture(autouse=True)
1579
async def setup(self):
@@ -469,3 +533,37 @@ async def test_queue_messages_defaults_to_true(self):
469533
# TO3g: queueMessages defaults to true
470534
assert ably.options.queue_messages is True
471535
assert ably.connection.connection_manager.options.queue_messages is True
536+
537+
async def test_normal_ws_close_triggers_immediate_reconnection(self):
538+
"""Server normal WS close (code 1000) must trigger immediate reconnection.
539+
540+
Regression test: ConnectionClosedOK was silently swallowed and deactivate_transport
541+
was never called, leaving the client disconnected until the idle timer fired.
542+
"""
543+
async with WsProxy(self.test_vars["host"]) as proxy:
544+
ably = await TestApp.get_ably_realtime(
545+
disconnected_retry_timeout=500_000,
546+
suspended_retry_timeout=500_000,
547+
tls=False,
548+
endpoint=proxy.endpoint,
549+
)
550+
551+
try:
552+
await asyncio.wait_for(
553+
ably.connection.once_async(ConnectionState.CONNECTED), timeout=10
554+
)
555+
556+
# Simulate server sending a normal WS close frame
557+
await proxy.close_active_connection()
558+
559+
# Must go DISCONNECTED quickly — not after the 25 s idle timer
560+
await asyncio.wait_for(
561+
ably.connection.once_async(ConnectionState.CONNECTING), timeout=1
562+
)
563+
564+
# Must reconnect immediately — not after the 500 s retry timer
565+
await asyncio.wait_for(
566+
ably.connection.once_async(ConnectionState.CONNECTED), timeout=10
567+
)
568+
finally:
569+
await ably.close()

0 commit comments

Comments
 (0)