Skip to content

Commit 4577e0b

Browse files
MehakBindraMehak Bindra
andauthored
[Fix] streaming: do not reset timeout for each emit, do not wait forever on close stream (#197)
Issue : microsoft/teams.ts#374 Main change:: - Emit behavior updated - If no flush is scheduled and no timeout is pending, emit flushes immediately. - Otherwise, messages are queued and flushed later. - Flush improvements - Explicitly acquire and release the lock to prevent concurrent flushes. - If the lock is unavailable, flush returns immediately without blocking. - Test fixes - Added _wait_for_id_and_queue to prevent hanging tests: ensures stream.close() does not wait forever if flush fails. - Removed asyncio.sleep delays in tests; replaced with mocked call_later for deterministic scheduling. - Added a test to verify concurrent emits are serialized and do not flush at the same time. - remove `timer.py` (Previously used to differentiate between ongoing tasks and tasks not yet started, which is no longer needed as we now only cancel timeout from flush, not emit) https://github.com/user-attachments/assets/bee7b959-b2fb-42a2-92a1-83082b2ef97e --------- Co-authored-by: Mehak Bindra <mehakbindra@microsoft.com>
1 parent 5151fad commit 4577e0b

4 files changed

Lines changed: 171 additions & 180 deletions

File tree

packages/apps/src/microsoft_teams/apps/http_stream.py

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,20 @@
2121
from microsoft_teams.common import ConsoleLogger, EventEmitter
2222

2323
from .plugins.streamer import StreamerEvent, StreamerProtocol
24-
from .utils import RetryOptions, Timeout, retry
24+
from .utils import RetryOptions, retry
2525

2626

2727
class HttpStream(StreamerProtocol):
2828
"""
2929
HTTP-based streaming implementation for Microsoft Teams activities.
3030
3131
Flow:
32-
1. emit() adds activities to a queue and cancels any pending flush timeout
33-
2. emit() schedules _flush() to run after 0.5 seconds via Timeout
34-
3. If another emit() happens before flush executes, the timeout is cancelled and rescheduled
35-
4. _flush() starts by cancelling any pending timeout, then processes up to 10 queued activities under a lock
36-
5. _flush() combines text from MessageActivity and sends it as a Typing activity with streamType='streaming'
37-
6. _flush() schedules another flush if more items remain in queue
38-
7. close() waits for queue to empty, then sends final message with stream_type='stream_final'
32+
1. emit() adds activities to a queue
33+
2. _flush() processes up to 10 queued items under a lock.
34+
3. Informative typing updates are sent immediately if no message started.
35+
4. Message text are combined into a typing chunk.
36+
5. Another flush is scheduled if more items remain.
37+
6. close() waits for queue to empty, then sends final message with stream_type='stream_final'
3938
4039
The timeout cancellation ensures only one flush operation is scheduled at a time.
4140
The delays between flushes is to ensure we dont hit API rate limits with Microsoft Teams.
@@ -60,9 +59,9 @@ def __init__(self, client: ApiClient, ref: ConversationReference, logger: Option
6059

6160
self._result: Optional[SentActivity] = None
6261
self._lock = asyncio.Lock()
63-
self._timeout: Optional[Timeout] = None
64-
self._id_set_event = asyncio.Event()
65-
self._queue_empty_event = asyncio.Event()
62+
self._timeout: Optional[asyncio.TimerHandle] = None
63+
self._pending: Optional[asyncio.Task[None]] = None
64+
self._total_wait_timeout: float = 30.0
6665

6766
self._reset_state()
6867

@@ -104,18 +103,14 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str])
104103
Args:
105104
activity: The activity to emit.
106105
"""
107-
if self._timeout is not None:
108-
self._timeout.cancel()
109-
self._timeout = None
110106

111107
if isinstance(activity, str):
112108
activity = MessageActivityInput(text=activity, type="message")
113109
self._queue.append(activity)
114110

115-
# Clear the queue empty event since we just added an item
116-
self._queue_empty_event.clear()
117-
118-
self._timeout = Timeout(0.5, self._flush)
111+
if not self._pending and not self._timeout:
112+
# Schedule a flush immediately when no timeout is set (first emit)
113+
self._pending = asyncio.create_task(self._flush())
119114

120115
def update(self, text: str) -> None:
121116
"""
@@ -126,6 +121,20 @@ def update(self, text: str) -> None:
126121
"""
127122
self.emit(TypingActivityInput().with_text(text).with_channel_data(ChannelData(stream_type="informative")))
128123

124+
async def _wait_for_id_and_queue(self):
125+
"""Wait until _id is set and the queue is empty, with a total timeout."""
126+
127+
async def _poll():
128+
while self._queue or not self._id:
129+
self._logger.debug("waiting for _id to be set or queue to be empty")
130+
await asyncio.sleep(0.1)
131+
132+
try:
133+
await asyncio.wait_for(_poll(), timeout=self._total_wait_timeout)
134+
return True
135+
except asyncio.TimeoutError:
136+
return False
137+
129138
async def close(self) -> Optional[SentActivity]:
130139
# wait for lock to be free
131140
if self._result is not None:
@@ -137,13 +146,10 @@ async def close(self) -> Optional[SentActivity]:
137146
return None
138147

139148
# Wait until _id is set and queue is empty
140-
if not self._id:
141-
self._logger.debug("waiting for ID to be set")
142-
await self._id_set_event.wait()
143-
144-
while self._queue:
145-
self._logger.debug("waiting for queue to be empty...")
146-
await self._queue_empty_event.wait()
149+
result = await self._wait_for_id_and_queue()
150+
if not result:
151+
self._logger.warning("Timeout while waiting for _id to be set and queue to be empty, cannot close stream")
152+
return None
147153

148154
if self._text == "" and self._attachments == []:
149155
self._logger.warning("no text or attachments to send, cannot close stream")
@@ -171,10 +177,14 @@ async def _flush(self) -> None:
171177
Flush the current activity queue.
172178
"""
173179
# If there are no items in the queue, nothing to flush
174-
async with self._lock:
180+
if self._lock.locked():
181+
return
182+
183+
await self._lock.acquire()
184+
185+
try:
175186
if not self._queue:
176187
return
177-
178188
if self._timeout is not None:
179189
self._timeout.cancel()
180190
self._timeout = None
@@ -216,13 +226,14 @@ async def _flush(self) -> None:
216226
to_send = TypingActivityInput(text=self._text)
217227
await self._send_activity(to_send)
218228

219-
# Signal if queue is now empty
220-
if not self._queue:
221-
self._queue_empty_event.set()
222-
223229
# If more queued, schedule another flush
224230
if self._queue and not self._timeout:
225-
self._timeout = Timeout(0.5, self._flush)
231+
self._timeout = asyncio.get_running_loop().call_later(0.5, lambda: asyncio.create_task(self._flush()))
232+
233+
finally:
234+
# Reset flushing flag so future emits can trigger another flush
235+
self._pending = None
236+
self._lock.release()
226237

227238
async def _send_activity(self, to_send: TypingActivityInput):
228239
"""
@@ -240,8 +251,6 @@ async def _send_activity(self, to_send: TypingActivityInput):
240251
self._index += 1
241252
if self._id is None:
242253
self._id = res.id
243-
# Signal that ID has been set
244-
self._id_set_event.set()
245254

246255
async def _send(self, to_send: Union[TypingActivityInput, MessageActivityInput]) -> SentActivity:
247256
"""

packages/apps/src/microsoft_teams/apps/utils/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@
55

66
from .activity_utils import extract_tenant_id
77
from .retry import RetryOptions, retry
8-
from .timer import Timeout
98

10-
__all__ = ["extract_tenant_id", "retry", "Timeout", "RetryOptions"]
9+
__all__ = ["extract_tenant_id", "retry", "RetryOptions"]

packages/apps/src/microsoft_teams/apps/utils/timer.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

0 commit comments

Comments
 (0)