@@ -1097,30 +1097,19 @@ async def _poll_audio_queues(
10971097 self ,
10981098 queues : dict [str , tuple [Participant , AudioQueue ]],
10991099 ) -> AsyncIterator [tuple [Participant , PcmData ]]:
1100- """Poll all participant audio queues in parallel .
1100+ """Poll all participant audio queues sequentially .
11011101
1102- Yields (Participant, PcmData) tuples as each queue produces a chunk.
1103- Queues that time out are silently skipped. On exit, any remaining
1104- in-flight tasks are cancelled to avoid unhandled exceptions.
1102+ Yields (Participant, PcmData) tuples for each queue that has data ready.
1103+ Queues that time out are silently skipped.
11051104 """
1106-
1107- async def _poll (participant : Participant , queue : AudioQueue ):
1108- pcm = await asyncio .wait_for (
1109- queue .get_duration (duration_ms = 20 ), timeout = 0.001
1110- )
1111- return participant , pcm
1112-
1113- tasks = [asyncio .create_task (_poll (p , q )) for p , q in queues .values ()]
1114- try :
1115- for fut in asyncio .as_completed (tasks ):
1116- try :
1117- yield await fut
1118- except (asyncio .TimeoutError , asyncio .QueueEmpty ):
1119- continue
1120- finally :
1121- for t in tasks :
1122- if not t .done ():
1123- t .cancel ()
1105+ for participant , queue in queues .values ():
1106+ try :
1107+ pcm = await asyncio .wait_for (
1108+ queue .get_duration (duration_ms = 20 ), timeout = 0.001
1109+ )
1110+ yield participant , pcm
1111+ except (TimeoutError , asyncio .QueueEmpty ):
1112+ continue
11241113
11251114 async def _consume_incoming_audio (self ) -> None :
11261115 """Consumer that continuously processes audio from per-participant queues."""
0 commit comments