Skip to content

Commit 7af9ce5

Browse files
committed
Log client addresses in scheduler lifecycle messages
Signed-off-by: peter941221 <peter941221@gmail.com>
1 parent 945d7eb commit 7af9ce5

2 files changed

Lines changed: 37 additions & 5 deletions

File tree

distributed/scheduler.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,24 @@ class ClientState:
223223
#: Output of :func:`distributed.versions.get_versions` on the client
224224
versions: dict[str, Any]
225225

226+
#: Remote address of the client connection as seen by the scheduler.
227+
address: str | None
228+
226229
__slots__ = tuple(__annotations__)
227230

228-
def __init__(self, client: str, *, versions: dict[str, Any] | None = None):
231+
def __init__(
232+
self,
233+
client: str,
234+
*,
235+
versions: dict[str, Any] | None = None,
236+
address: str | None = None,
237+
):
229238
self.client_key = client
230239
self._hash = hash(client)
231240
self.wants_what = set()
232241
self.last_seen = time()
233242
self.versions = versions or {}
243+
self.address = address
234244

235245
def __hash__(self) -> int:
236246
return self._hash
@@ -5907,9 +5917,12 @@ async def add_client(
59075917
"""
59085918
assert client is not None
59095919
comm.name = "Scheduler->Client"
5910-
logger.info("Receive client connection: %s", client)
5920+
client_address = comm.peer_address
5921+
logger.info("Receive client connection: %s at %s", client, client_address)
59115922
self.log_event(["all", client], {"action": "add-client", "client": client})
5912-
self.clients[client] = ClientState(client, versions=versions)
5923+
self.clients[client] = ClientState(
5924+
client, versions=versions, address=client_address
5925+
)
59135926
self._client_connections_added_total += 1
59145927

59155928
for plugin in list(self.plugins.values()):
@@ -5944,15 +5957,23 @@ async def add_client(
59445957
await self.client_comms[client].close()
59455958
del self.client_comms[client]
59465959
if self.status == Status.running:
5947-
logger.info("Close client connection: %s", client)
5960+
logger.info(
5961+
"Close client connection: %s at %s",
5962+
client,
5963+
client_address,
5964+
)
59485965
except TypeError: # comm becomes None during GC
59495966
pass
59505967

59515968
def remove_client(self, client: str, stimulus_id: str | None = None) -> None:
59525969
"""Remove client from network"""
59535970
stimulus_id = stimulus_id or f"remove-client-{time()}"
5971+
client_address = self.clients.get(client).address if client in self.clients else None
59545972
if self.status == Status.running:
5955-
logger.info("Remove client %s", client)
5973+
if client_address is not None:
5974+
logger.info("Remove client %s at %s", client, client_address)
5975+
else:
5976+
logger.info("Remove client %s", client)
59565977
self.log_event(["all", client], {"action": "remove-client", "client": client})
59575978
try:
59585979
cs: ClientState = self.clients[client]

distributed/tests/test_scheduler.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,17 @@ async def test_clear_events_client_removal(c, s, a, b):
878878
assert time() < start + 2
879879

880880

881+
@gen_cluster(client=True, nthreads=[])
882+
async def test_client_connection_logs_include_address(c, s):
883+
with captured_logger("distributed.scheduler", level=logging.INFO) as caplog:
884+
await c.close()
885+
886+
logs = caplog.getvalue()
887+
assert f"Receive client connection: {c.id} at " in logs
888+
assert f"Remove client {c.id} at " in logs
889+
assert f"Close client connection: {c.id} at " in logs
890+
891+
881892
@gen_cluster(client=True, nthreads=[])
882893
async def test_add_worker(c, s):
883894
x = c.submit(inc, 1, key="x")

0 commit comments

Comments
 (0)