Skip to content

Add streaming support#43

Open
eleonorecharles wants to merge 13 commits intodatalayer:mainfrom
datalayer-externals:feature/sse-streaming-2
Open

Add streaming support#43
eleonorecharles wants to merge 13 commits intodatalayer:mainfrom
datalayer-externals:feature/sse-streaming-2

Conversation

@eleonorecharles
Copy link
Copy Markdown

@echarles
Copy link
Copy Markdown
Member

echarles commented Jan 9, 2026

CI is green - we are using this branch for various projects.

@echarles
Copy link
Copy Markdown
Member

echarles commented Jan 13, 2026

@amitgelber
Copy link
Copy Markdown

@echarles can this be merged? how can we push it? thank you

@echarles
Copy link
Copy Markdown
Member

@echarles can this be merged? how can we push it? thank you

I hope this PR can be reviewed soon.

Comment thread fasta2a/applications.py Outdated
Comment on lines +148 to +155
# Serialize event to ensure proper camelCase conversion
event_dict = stream_event_ta.dump_python(event, mode='json', by_alias=True)

# Wrap in JSON-RPC response
jsonrpc_response = {'jsonrpc': '2.0', 'id': request_id, 'result': event_dict}

# Convert to JSON string
yield json.dumps(jsonrpc_response)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this serialization and then deserialization? Seems a bit weird. I'll check.

But for sure we are not going to use json.dumps. There's a function in pydantic_core for it.

Copy link
Copy Markdown
Member

@echarles echarles Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In d16abca

The 3-step serialize pattern in the SSE generator:

  1. stream_event_ta.dump_python→ intermediate Python dict
  2. Wrap in a plain dict {'jsonrpc': '2.0', ...}
  3. json.dumps(...) → JSON string

is replaced by a single call . This skips the intermediate dict materialization and uses pydantic_core' JSON serializer instead of json.dumps. The StreamMessageResponse TypeAdapter handles camelCase aliasing for the entire envelope + nested event in one pass.

Comment thread fasta2a/applications.py Outdated
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
docs_url: str | None = '/docs',
streaming: bool = False,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want it to be False by default?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set to True in 23907cf

Comment thread fasta2a/applications.py Outdated
Comment thread fasta2a/applications.py Outdated
# Parse the streaming request
stream_request = stream_message_request_ta.validate_json(data)

# Create an async generator wrapper that formats events as JSON-RPC responses
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for a comment explaining each line.

Suggested change
# Create an async generator wrapper that formats events as JSON-RPC responses

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 33ee9c5

Comment thread fasta2a/applications.py Outdated
if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
# Parse the streaming request
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Parse the streaming request

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 33ee9c5

Comment thread fasta2a/broker.py Outdated
Comment on lines +33 to +34
raise NotImplementedError('send_run_task is not implemented yet.')
...
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you replace this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted in 1709673

Comment thread fasta2a/broker.py Outdated
Comment on lines +38 to +39
raise NotImplementedError('send_cancel_task is not implemented yet.')
...
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted in 1709673

Comment thread fasta2a/storage.py Outdated
return self.contexts.get(context_id)


class StreamingStorageWrapper(Storage[ContextT]):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this instead of using Storage?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage is used in 2db81b8

Comment thread tests/test_streaming.py Outdated
Comment on lines +404 to +405
class TestFastA2AStreaming:
"""Tests for the FastA2A message/stream SSE endpoint."""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test classes please.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in ab97e62

Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 4 potential issues.

View 3 additional findings in Devin Review.

Open in Devin Review

Comment thread fasta2a/task_manager.py Outdated
Comment thread fasta2a/task_manager.py Outdated
Comment thread fasta2a/broker.py Outdated
Comment thread fasta2a/storage.py Outdated
task = await self._storage.update_task(task_id, state, new_artifacts, new_messages)

# Determine if this is a final state
final = state in ('completed', 'failed', 'canceled')
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 TaskState completeness: 'rejected', 'auth-required', 'unknown' and 'input-required' not treated as final

In fasta2a/worker.py:74, final is determined by state in ('completed', 'failed', 'canceled'). Looking at the TaskState type alias at fasta2a/schema.py:436-438, there are additional terminal-like states: 'rejected', 'auth-required', and 'unknown'. If a worker implementation ever sets the state to 'rejected', no final event would be sent to stream subscribers, causing them to hang. The current EchoWorker in tests only uses 'working', 'completed', and 'canceled', so this isn't triggered now, but could be a problem for future worker implementations.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 new potential issues.

View 6 additional findings in Devin Review.

Open in Devin Review

Comment thread fasta2a/storage.py Outdated
task = await self._storage.update_task(task_id, state, new_artifacts, new_messages)

# Determine if this is a final state
final = state in ('completed', 'failed', 'canceled')
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Worker.update_task missing 'rejected' from final states causes stream to never close

The Worker.update_task method at fasta2a/worker.py:88 defines final states as ('completed', 'failed', 'canceled'), but the TaskState type at fasta2a/schema.py:502 includes 'rejected' as a valid terminal state. The TaskManager.resubscribe_task at fasta2a/task_manager.py:218 correctly treats 'rejected' as terminal: terminal_states = {'completed', 'canceled', 'failed', 'rejected'}. If a worker calls self.update_task(task_id, 'rejected'), the final flag will be False, so the method will emit a non-final status update (lines 91-101) but will skip the final status emit and event_bus.close() call (lines 122-134). This means SSE subscribers will hang indefinitely waiting for more events that will never arrive.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment thread fasta2a/task_manager.py Outdated
Comment thread fasta2a/applications.py
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 new potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment thread fasta2a/storage.py
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 InMemoryStorage.load_task mutates the stored task's history in place

Pre-existing issue: InMemoryStorage.load_task at fasta2a/storage.py:85-86 does task['history'] = task['history'][-history_length:], which mutates the stored task dict in place, permanently truncating the history. This means subsequent calls to load_task without history_length will still see the truncated history. This is a pre-existing bug, not introduced by this PR, but relevant context since streaming relies on storage state.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment thread fasta2a/worker.py Outdated
Comment thread fasta2a/task_manager.py Outdated
@echarles
Copy link
Copy Markdown
Member

echarles commented Mar 7, 2026

@Kludex Thx for the review, I have addressed them and CI is green.

@echarles
Copy link
Copy Markdown
Member

@Kludex I just see a few PRs merged on the main branch related to streaming with an event_bus. Is the streaming feature now complete in the main branch, and should this PR be closed?

Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 1 additional finding in Devin Review.

Open in Devin Review

Comment thread fasta2a/worker.py
Comment on lines +74 to +134
async def update_task(
self,
task_id: str,
state: TaskState,
new_artifacts: list[Artifact] | None = None,
new_messages: list[Message] | None = None,
) -> None:
"""Update a task's state in storage and publish streaming events to the broker.

This is the primary method workers should use to update task state. It handles
both persisting the update and notifying any stream subscribers.
"""
task = await self.storage.update_task(task_id, state, new_artifacts, new_messages)

final = state in ('completed', 'failed', 'canceled')

# For non-final updates, publish status first
if not final:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
),
),
)

# Publish message events before final status so subscribers receive them
if new_messages:
for message in new_messages:
await self.broker.event_bus.emit(task_id, StreamResponse(message=message))

# Publish artifact events
if new_artifacts:
for artifact in new_artifacts:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
artifact_update=TaskArtifactUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
artifact=artifact,
),
),
)

# For final updates, publish status last (after messages and artifacts)
if final:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
),
),
)
await self.broker.event_bus.close(task_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Test still uses direct storage/event_bus calls instead of new Worker.update_task helper

The test in tests/test_streaming.py:30-68 (EchoWorker) manually calls self.storage.update_task() and self.broker.event_bus.emit()/close() directly, rather than using the new Worker.update_task() helper method introduced in this PR. While not a bug, this means the new Worker.update_task method has no test coverage. If the intent is for workers to use the helper, the test should be updated to validate the helper's behavior (including the ordering of status/message/artifact events).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment thread fasta2a/schema.py
Comment on lines +997 to +1000
StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent]
"""A streaming event that can be sent during message/stream requests."""

stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 StreamEvent type alias may not be useful without discriminator

The new StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent] at fasta2a/schema.py:997 and its TypeAdapter are exported publicly but never used internally. These TypedDicts share overlapping field names (e.g., task_id, context_id) which could make Pydantic's union discrimination unreliable without an explicit Discriminator. This may cause unexpected validation behavior when deserializing ambiguous payloads. Worth verifying the intended use case for this type.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@echarles
Copy link
Copy Markdown
Member

@Kludex I have merged and resolved conflicts. Where do we go from here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Does FastA2AClient support subscribing to the SSE stream via tasks/sendSubscribe?

4 participants