diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 445ffd96e8..e79521bc35 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -223,14 +223,24 @@ class ClientState: #: Output of :func:`distributed.versions.get_versions` on the client versions: dict[str, Any] + #: Remote address of the client connection as seen by the scheduler. + address: str | None + __slots__ = tuple(__annotations__) - def __init__(self, client: str, *, versions: dict[str, Any] | None = None): + def __init__( + self, + client: str, + *, + versions: dict[str, Any] | None = None, + address: str | None = None, + ): self.client_key = client self._hash = hash(client) self.wants_what = set() self.last_seen = time() self.versions = versions or {} + self.address = address def __hash__(self) -> int: return self._hash @@ -5907,9 +5917,12 @@ async def add_client( """ assert client is not None comm.name = "Scheduler->Client" - logger.info("Receive client connection: %s", client) + client_address = comm.peer_address + logger.info("Receive client connection: %s at %s", client, client_address) self.log_event(["all", client], {"action": "add-client", "client": client}) - self.clients[client] = ClientState(client, versions=versions) + self.clients[client] = ClientState( + client, versions=versions, address=client_address + ) self._client_connections_added_total += 1 for plugin in list(self.plugins.values()): @@ -5944,15 +5957,24 @@ async def add_client( await self.client_comms[client].close() del self.client_comms[client] if self.status == Status.running: - logger.info("Close client connection: %s", client) + logger.info( + "Close client connection: %s at %s", + client, + client_address, + ) except TypeError: # comm becomes None during GC pass def remove_client(self, client: str, stimulus_id: str | None = None) -> None: """Remove client from network""" stimulus_id = stimulus_id or f"remove-client-{time()}" + client_state = self.clients.get(client) + client_address = client_state.address if client_state is not None else None if self.status == Status.running: - logger.info("Remove client %s", client) + if client_address is not None: + logger.info("Remove client %s at %s", client, client_address) + else: + logger.info("Remove client %s", client) self.log_event(["all", client], {"action": "remove-client", "client": client}) try: cs: ClientState = self.clients[client] diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ff4d4268d2..ad1073be17 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -878,6 +878,18 @@ async def test_clear_events_client_removal(c, s, a, b): assert time() < start + 2 +@gen_cluster(nthreads=[]) +async def test_client_connection_logs_include_address(s): + with captured_logger("distributed.scheduler", level=logging.INFO) as caplog: + async with Client(s.address, asynchronous=True) as c: + client_id = c.id + + logs = caplog.getvalue() + assert f"Receive client connection: {client_id} at " in logs + assert f"Remove client {client_id} at " in logs + assert f"Close client connection: {client_id} at " in logs + + @gen_cluster(client=True, nthreads=[]) async def test_add_worker(c, s): x = c.submit(inc, 1, key="x")