-
Notifications
You must be signed in to change notification settings - Fork 102
Description
The streaming state machine when having a raised error inside one of the actions doesn't have a way of gracefully shutting down the stream in the "finally" clause of try/except/finally block.
Steps to replicate behavior
This is the self-contained snipper to reproduce the behaviour. It's a FastAPI endpoint which you can run with python and then curl it. The code raises an error suddenly when the counter is equal 5, but you can comment it out to see that it should finish without this: curl: (18) transfer closed with outstanding read data remaining
Python sippet:
import datetime
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import uvicorn
from burr.core import State, ApplicationBuilder, expr
from burr.core.action import streaming_action
app = FastAPI(title="Bug Report Demo")
@streaming_action(reads=["counter"], writes=["counter"])
async def increment(state: State):
"""Mock action for report - increment the counter by 1"""
current_count = state["counter"]
exception_occurred = False
try:
if current_count == 5:
raise ValueError("Raising to show some exception from some function in the action!")
except ValueError:
yield {"response": "error! some helpful info for the frontend..."}, None
exception_occurred = True
raise
finally:
if exception_occurred:
yield {"response": "gracefully stopping the stream..."}, None
yield {}, state
current_count += 1
print("Count: ", current_count)
yield {"response": "increment"}, None
yield {}, state.update(counter=current_count)
@streaming_action(reads=["counter"], writes=[])
async def exit_counter(state: State):
"""Print the current count and the current time"""
current_count = state["counter"]
print(f"Finished counting to {current_count} at {datetime.datetime.now():%H:%M:%S %Y-%m-%d}")
yield {"response": "exit_counter"}, None
yield {}, state
async def build_app():
"""Build the state machine application"""
return await (
ApplicationBuilder()
.with_actions(increment, exit_counter)
.with_transitions(
("increment", "increment", expr("counter < 10")),
("increment", "exit_counter"),
)
.with_state(counter=0)
.with_entrypoint("increment")
.abuild()
)
async def generate_stream() -> AsyncGenerator[str, None]:
"""Generate the stream of responses"""
app = await build_app()
async for action, streaming_container in app.astream_iterate(
inputs={},
halt_after=["exit_counter"],
):
async for item in streaming_container:
yield str(item) + "\n\n"
@app.get("/stream")
async def stream_endpoint():
"""Endpoint to demonstrate the bug report with streaming response"""
return StreamingResponse(
generate_stream(),
media_type="text/plain",
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Stack Traces
Curl output:
`curl -N http://localhost:8000/stream
{'response': 'increment'}
{'response': 'increment'}
{'response': 'increment'}
{'response': 'increment'}
{'response': 'increment'}
{'response': 'error! some helpful info for the frontend...'}
{'response': 'gracefully stopping the stream...'}
curl: (18) transfer closed with outstanding read data remaining`
Expected behavior
I would expect the ability to gracefully close the stream i.e. stream back the error contents to the frontend (or any other consumer) after raising the Exception and then closing the burr operation by yielding the State instead of None. It seems now it's not really possible because the last yield in finally "gracefully stopping the stream..." is the last thing that is ever ran.
It would perhaps be good to be able to yield a textual response while closing the stream with:
yield response_with_error_info, state