feat: extensions#44
Conversation
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
54ec2e0 to
fe5d007
Compare
| @@ -56,6 +57,66 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: | |||
| except Exception: | |||
| await self.storage.update_task(task_operation['params']['id'], state='failed') | |||
There was a problem hiding this comment.
🔴 Exception handler bypasses new update_task method, leaving stream subscribers hanging forever
When run_task or cancel_task raises an exception in _handle_task_operation, the except block at line 58 calls self.storage.update_task(...) directly instead of the new self.update_task(...) method introduced in this PR. The new self.update_task() method (fasta2a/worker.py:60-118) is documented as "the primary method workers should use to update task state" because it both persists the update to storage AND publishes streaming events to the broker via send_stream_event. By calling self.storage.update_task() directly, no TaskStatusUpdateEvent(final=True) is ever published to subscribers. Any active subscribe_to_stream iterator for that task will block indefinitely waiting for a final event, causing the SSE connection to hang.
| await self.storage.update_task(task_operation['params']['id'], state='failed') | |
| await self.update_task(task_operation['params']['id'], state='failed') |
Was this helpful? React with 👍 or 👎 to provide feedback.
| # Parse activated extensions from the A2A-Extensions header | ||
| extensions_header = request.headers.get('a2a-extensions', '') | ||
| activated_extensions: list[str] = ( | ||
| [uri.strip() for uri in extensions_header.split(',') if uri.strip()] if extensions_header else [] | ||
| ) | ||
| # Stash on the request state so workers / handlers can inspect them | ||
| request.state.activated_extensions = activated_extensions |
There was a problem hiding this comment.
🚩 activated_extensions parsed but never consumed by downstream code
At fasta2a/applications.py:144-150, the A2A-Extensions header is parsed and stored on request.state.activated_extensions with a comment saying "so workers / handlers can inspect them." However, this state is never passed to TaskManager, Broker, or Worker — those components receive only TaskSendParams which doesn't include activated extensions information. The request.state is scoped to the HTTP request handler and is not accessible from workers. This appears to be scaffolding for future use rather than a bug, but it means extensions have no functional effect currently.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Pull request overview
Adds A2A extensions support and introduces an SSE-based message/stream implementation, including broker pub/sub and worker-driven stream event publishing.
Changes:
- Extend agent-card capabilities to advertise supported
extensionsand defaultstreamingcapability. - Implement
message/streamend-to-end: TaskManager streaming generator, broker stream pub/sub, worker event publishing, and SSE HTTP response. - Add/lock new dependencies (
sse-starlette, plus dev deps) and introduce streaming-focused test coverage.
Reviewed changes
Copilot reviewed 10 out of 12 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
fasta2a/schema.py |
Adds extensions to agent capabilities and defines StreamEvent union/type adapter. |
fasta2a/applications.py |
Adds extension header parsing, streaming toggle, and SSE message/stream endpoint. |
fasta2a/task_manager.py |
Implements stream_message() as an async generator yielding stream events. |
fasta2a/broker.py |
Adds streaming event send/subscribe APIs and an in-memory pub/sub implementation. |
fasta2a/worker.py |
Adds Worker.update_task() helper to persist updates and publish stream events. |
fasta2a/__init__.py |
Exports AgentExtension and StreamEvent. |
fasta2a/storage.py |
Import formatting only (no behavioral change). |
tests/test_streaming.py |
New test suite covering broker streaming, worker event publishing, TaskManager streaming, and SSE endpoint. |
tests/test_applications.py |
Updates agent-card snapshot to expect streaming enabled. |
pyproject.toml |
Adds runtime sse-starlette and dev deps used by streaming/tests. |
uv.lock |
Locks newly added dependencies/markers. |
.gitignore |
Ignores coverage artifacts (.coverage*). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Start task execution in background | ||
| asyncio.create_task(self.broker.run_task(broker_params)) | ||
|
|
There was a problem hiding this comment.
The background asyncio.create_task(self.broker.run_task(...)) isn't tracked or awaited. If it raises, you'll get "Task exception was never retrieved" and the SSE generator may keep streaming indefinitely. Consider capturing the task and handling exceptions (e.g., add a done callback that logs and/or publishes a failed final status) and cancel it if the client disconnects.
| if a2a_request['method'] == 'message/send': | ||
| jsonrpc_response = await self.task_manager.send_message(a2a_request) | ||
| elif a2a_request['method'] == 'message/stream': | ||
| stream_request = stream_message_request_ta.validate_json(data) | ||
|
|
||
| async def sse_generator(): | ||
| request_id = stream_request.get('id') | ||
| async for event in self.task_manager.stream_message(stream_request): | ||
| jsonrpc_response = StreamMessageResponse( | ||
| jsonrpc='2.0', | ||
| id=request_id, | ||
| result=event, | ||
| ) | ||
| yield stream_message_response_ta.dump_json(jsonrpc_response, by_alias=True).decode() | ||
|
|
||
| return EventSourceResponse(sse_generator()) |
There was a problem hiding this comment.
FastA2A advertises streaming capability via AgentCapabilities.streaming, but _agent_run_endpoint will still accept and serve message/stream even when self.streaming is False. Add a guard that returns an appropriate JSON-RPC error (e.g., MethodNotFound/UnsupportedOperation) when streaming is disabled so behavior matches the advertised capabilities.
| 'capabilities': { | ||
| 'streaming': False, | ||
| 'streaming': True, | ||
| 'pushNotifications': False, |
There was a problem hiding this comment.
The PR metadata focuses on "extensions", but this change also flips the default advertised streaming capability to True (and adds a new message/stream SSE endpoint). If this broader behavior change is intentional, it should be reflected in the PR title/description (or split into a separate PR) to avoid surprising consumers.
| except Exception: | ||
| await self.storage.update_task(task_operation['params']['id'], state='failed') |
There was a problem hiding this comment.
In the exception handler, the worker updates storage directly, which bypasses the new Worker.update_task() logic and therefore won't publish a final status-update event to stream subscribers on failures. Use Worker.update_task(..., state='failed') here (and consider capturing/logging the exception) so streaming clients reliably see task termination.
| async with self._subscriber_lock: | ||
| subscribers = self._event_subscribers.get(task_id, []) | ||
| if not subscribers: | ||
| return | ||
|
|
||
| # Send event to all subscribers, removing closed streams | ||
| active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] | ||
| for stream in subscribers: | ||
| try: | ||
| await stream.send(event) | ||
| active_subscribers.append(stream) | ||
| except (anyio.ClosedResourceError, anyio.BrokenResourceError): | ||
| # Subscriber disconnected, remove from list | ||
| pass | ||
|
|
||
| # Update subscriber list with only active ones | ||
| if active_subscribers: | ||
| self._event_subscribers[task_id] = active_subscribers | ||
| elif task_id in self._event_subscribers: | ||
| # No active subscribers left, clean up |
There was a problem hiding this comment.
send_stream_event() holds _subscriber_lock while awaiting stream.send(...). If a subscriber is slow (buffer full) this blocks all subscription changes and event sends across all tasks. Consider copying the current subscriber list under the lock, releasing it before awaiting sends, then reacquiring to prune/commit active subscribers.
| async with self._subscriber_lock: | |
| subscribers = self._event_subscribers.get(task_id, []) | |
| if not subscribers: | |
| return | |
| # Send event to all subscribers, removing closed streams | |
| active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] | |
| for stream in subscribers: | |
| try: | |
| await stream.send(event) | |
| active_subscribers.append(stream) | |
| except (anyio.ClosedResourceError, anyio.BrokenResourceError): | |
| # Subscriber disconnected, remove from list | |
| pass | |
| # Update subscriber list with only active ones | |
| if active_subscribers: | |
| self._event_subscribers[task_id] = active_subscribers | |
| elif task_id in self._event_subscribers: | |
| # No active subscribers left, clean up | |
| # Take a snapshot of current subscribers under the lock. | |
| async with self._subscriber_lock: | |
| subscribers = list(self._event_subscribers.get(task_id, [])) | |
| if not subscribers: | |
| return | |
| # Send event to all subscribers without holding the lock, tracking active ones. | |
| active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] | |
| for stream in subscribers: | |
| try: | |
| await stream.send(event) | |
| active_subscribers.append(stream) | |
| except (anyio.ClosedResourceError, anyio.BrokenResourceError): | |
| # Subscriber disconnected, remove from list on reconciliation. | |
| pass | |
| # Reacquire the lock to reconcile the active subscribers with the current list. | |
| async with self._subscriber_lock: | |
| current = self._event_subscribers.get(task_id) | |
| if current is None: | |
| # Task already cleaned up or no subscribers remain. | |
| return | |
| if not active_subscribers: | |
| # No active subscribers left, clean up if still present. | |
| if task_id in self._event_subscribers: | |
| del self._event_subscribers[task_id] | |
| return | |
| # Preserve only streams that are still active and still present in the current list. | |
| active_set = set(active_subscribers) | |
| new_current = [stream for stream in current if stream in active_set] | |
| if new_current: | |
| self._event_subscribers[task_id] = new_current | |
| elif task_id in self._event_subscribers: | |
| # No active subscribers left, clean up. |
| # Start task execution in background | ||
| asyncio.create_task(self.broker.run_task(broker_params)) | ||
|
|
||
| # Stream events from broker | ||
| async for event in self.broker.subscribe_to_stream(task['id']): |
There was a problem hiding this comment.
stream_message() can miss early stream events: it yields the initial task, then schedules broker.run_task(), and only afterwards starts iterating subscribe_to_stream(). If the worker publishes status updates quickly, they can be sent before the subscription is registered and will be dropped. Restructure so the subscription is registered before starting task execution (and ideally before yielding the first event), or change the broker API so subscription registration happens eagerly when subscribe_to_stream() is called.
| # Start task execution in background | |
| asyncio.create_task(self.broker.run_task(broker_params)) | |
| # Stream events from broker | |
| async for event in self.broker.subscribe_to_stream(task['id']): | |
| # Register subscription to the broker's event stream before starting task execution | |
| stream = self.broker.subscribe_to_stream(task['id']) | |
| # Start task execution in background | |
| asyncio.create_task(self.broker.run_task(broker_params)) | |
| # Stream events from broker | |
| async for event in stream: |
| extensions: NotRequired[list[AgentExtension]] | ||
| """A2A extensions supported by this agent. | ||
|
|
||
| Each extension is declared as an ``AgentExtension`` object with a | ||
| unique ``uri``, optional ``description``, ``required`` flag, and | ||
| ``params`` configuration. Clients activate extensions by sending | ||
| the selected URIs in the ``A2A-Extensions`` HTTP header. | ||
| """ |
There was a problem hiding this comment.
🟡 Duplicate extensions field in AgentCapabilities TypedDict
The PR adds a second extensions: NotRequired[list[AgentExtension]] field at fasta2a/schema.py:93 without removing the pre-existing identical field at fasta2a/schema.py:90. In Python, the second class-body definition silently overwrites the first in __annotations__, so the TypedDict ends up with one extensions key. Pyright does not flag this (confirmed: 0 errors on this file). While the types are identical so there's no runtime breakage today, this is clearly an accidental duplication — the intent was to update the docstring, not add a second field. The original definition (lines 90–91) should be removed.
| extensions: NotRequired[list[AgentExtension]] | |
| """A2A extensions supported by this agent. | |
| Each extension is declared as an ``AgentExtension`` object with a | |
| unique ``uri``, optional ``description``, ``required`` flag, and | |
| ``params`` configuration. Clients activate extensions by sending | |
| the selected URIs in the ``A2A-Extensions`` HTTP header. | |
| """ |
Was this helpful? React with 👍 or 👎 to provide feedback.
| capabilities = AgentCapabilities( | ||
| streaming=self.streaming, push_notifications=False, state_transition_history=False | ||
| ) |
There was a problem hiding this comment.
🚩 state_transition_history=False is silently dropped during serialization
At fasta2a/applications.py:101, state_transition_history=False is passed to the AgentCapabilities TypedDict constructor, but this field is not defined in the TypedDict at fasta2a/schema.py:78-100. Pyright does flag this (confirmed via uv run pyright), so it's a CI-caught type error. However, the runtime impact is notable: pydantic's dump_json silently strips the unknown key, so stateTransitionHistory never appears in the serialized agent card JSON. Since the value is False, this is arguably equivalent to omitting it (default assumed false), but if someone changes this to True in the future, it would also be silently dropped. The fix would be to either add a state_transition_history field to AgentCapabilities TypedDict or remove it from the constructor call.
Was this helpful? React with 👍 or 👎 to provide feedback.
| elif a2a_request['method'] == 'message/stream': | ||
| return StreamingResponse( | ||
| self.task_manager.stream_message(a2a_request), | ||
| media_type='text/event-stream', | ||
| ) |
There was a problem hiding this comment.
🚩 No guard against message/stream requests when streaming=False
The new streaming parameter on FastA2A controls whether the agent card advertises streaming capability (fasta2a/applications.py:49), but the _agent_run_endpoint at line 154 will still accept and process message/stream requests even when streaming=False. The agent card is just an informational hint to clients; a misbehaving client could still send message/stream and receive a streaming response. This may be intentional (capability advertisement vs. enforcement), but it's worth considering whether to return an error for message/stream when streaming is disabled.
Was this helpful? React with 👍 or 👎 to provide feedback.
Support extensions https://a2a-protocol.org/latest/topics/extensions/