Skip to content

Conversation

@artur-ciocanu
Copy link
Contributor

Description

Adding raw event subscription alongside CloudEvent subscription

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #1616

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

Signed-off-by: Artur Ciocanu <artur.ciocanu@gmail.com>
@artur-ciocanu artur-ciocanu requested review from a team as code owners January 10, 2026 07:51
@artur-ciocanu
Copy link
Contributor Author

@dapr/approvers-java-sdk and @dapr/maintainers-java-sdk could you please take a look and approve. This has been requested by @alicejgibbons.

One thing I want to mention, I am not really fond of the new name subscribeToEventsData(...) I couldn't think of a better name.

An alternative would be to rename the old method to subscribeToCloudEvents(...) that uses CloudEvent and then have subscribeToEvents(...) return generic type.

@javier-aliaga
Copy link
Contributor

@dapr/approvers-java-sdk and @dapr/maintainers-java-sdk could you please take a look and approve. This has been requested by @alicejgibbons.

One thing I want to mention, I am not really fond of the new name subscribeToEventsData(...) I couldn't think of a better name.

An alternative would be to rename the old method to subscribeToCloudEvents(...) that uses CloudEvent and then have subscribeToEvents(...) return generic type.

Another possibility is naming the new one SubscribeToRawEvent or similar

@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 78.92%. Comparing base (d759c53) to head (b4601a8).
⚠️ Report is 249 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1617      +/-   ##
============================================
+ Coverage     76.91%   78.92%   +2.00%     
- Complexity     1592     2074     +482     
============================================
  Files           145      227      +82     
  Lines          4843     6258    +1415     
  Branches        562      684     +122     
============================================
+ Hits           3725     4939    +1214     
- Misses          821      970     +149     
- Partials        297      349      +52     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Override
public <T> Flux<T> subscribeToEventsData(String pubsubName, String topic, TypeRef<T> type) {
return subscribeToEvents(pubsubName, topic, type)
.map(CloudEvent::getData);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artur-ciocanu does this line not force the type to deserialize into a CloudEvent?

@alicejgibbons
Copy link

Regardinging renaming, can we have multiple overloads to subscribeToEvents?

@artur-ciocanu
Copy link
Contributor Author

Regardinging renaming, can we have multiple overloads to subscribeToEvents?

@alicejgibbons unfortunately we can not have the same overloads that depends just on generic type.

@alicejgibbons as I was thinking more and more. Instead of coming up with different APIs, customers can leverage Flux via a simple map operator. It is trivial:

Flux<SomeEvent> stream = daprClient.subscribeToEvents(pubsubName, topic, type)
        .map(CloudEvent::getData);

It is an additional step, but it is not too bad.

Please let me know your thoughts.

CC: @javier-aliaga @cicoyle

// Publish messages
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("Raw message #%d for run %s", i, runId);
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still publishing a cloudEvent by default since you are not adding the raw payload header. Thus needs to be changed to something like the following:

PublishEventRequest publishEventRequest = new PublishEventRequest(pubsubName, topicName, eventType);
publishEventRequest.setContentType(APPLICATION_EVENT_TYPES_JSON);
headers.put("rawPayload", "true"); 
publishEventRequest.setMetadata(headers);
client.publishEvent(publishEventRequest).block();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flux Streaming subscriptions with generic type

4 participants