@@ -153,8 +153,19 @@ async def add_client(self, airplay_player: AirPlayPlayer) -> None:
153153 return
154154
155155 async with self ._lock :
156- # Get all buffered chunks to send
157- buffered_chunks = list (self ._chunk_buffer )
156+ # Get buffered chunks to send, but limit to ~5 seconds to avoid
157+ # blocking real-time streaming to other players (causes packet loss)
158+ max_late_join_buffer_seconds = 5.0
159+ all_buffered = list (self ._chunk_buffer )
160+
161+ # Filter to only include chunks within the time limit
162+ if all_buffered :
163+ min_position = self .seconds_streamed - max_late_join_buffer_seconds
164+ buffered_chunks = [
165+ (chunk , pos ) for chunk , pos in all_buffered if pos >= min_position
166+ ]
167+ else :
168+ buffered_chunks = []
158169
159170 if buffered_chunks :
160171 # Calculate how much buffer we're sending
@@ -209,7 +220,10 @@ async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> No
209220 if not self .sync_clients :
210221 break
211222
212- await self ._write_chunk_to_all_players (chunk )
223+ has_running_clients = await self ._write_chunk_to_all_players (chunk )
224+ if not has_running_clients :
225+ self .prov .logger .debug ("No running clients remaining, stopping audio streamer" )
226+ break
213227 self .seconds_streamed += len (chunk ) / pcm_sample_size
214228 finally :
215229 if not watchdog_task .done ():
@@ -237,7 +251,9 @@ async def _silence_watchdog(self, pcm_sample_size: int) -> None:
237251 silence_duration = 0.1
238252 silence_bytes = int (pcm_sample_size * silence_duration )
239253 silence_chunk = bytes (silence_bytes )
240- await self ._write_chunk_to_all_players (silence_chunk )
254+ has_running_clients = await self ._write_chunk_to_all_players (silence_chunk )
255+ if not has_running_clients :
256+ break
241257 self .seconds_streamed += silence_duration
242258 silence_inserted += silence_duration
243259 await asyncio .sleep (0.05 )
@@ -248,12 +264,15 @@ async def _silence_watchdog(self, pcm_sample_size: int) -> None:
248264 silence_inserted ,
249265 )
250266
251- async def _write_chunk_to_all_players (self , chunk : bytes ) -> None :
252- """Write a chunk to all connected players."""
267+ async def _write_chunk_to_all_players (self , chunk : bytes ) -> bool :
268+ """Write a chunk to all connected players.
269+
270+ :return: True if there are still running clients, False otherwise.
271+ """
253272 async with self ._lock :
254273 sync_clients = [x for x in self .sync_clients if x .stream and x .stream .running ]
255274 if not sync_clients :
256- return
275+ return False
257276
258277 # Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
259278 chunk_position = self .seconds_streamed
@@ -287,6 +306,10 @@ async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
287306 for player in players_to_remove :
288307 self .mass .create_task (self .remove_client (player ))
289308
309+ # Return False if all clients were removed (or scheduled for removal)
310+ remaining_clients = len (sync_clients ) - len (players_to_remove )
311+ return remaining_clients > 0
312+
290313 async def _write_chunk_to_player (self , airplay_player : AirPlayPlayer , chunk : bytes ) -> None :
291314 """Write audio chunk to a player's ffmpeg process."""
292315 player_id = airplay_player .player_id
0 commit comments