Skip to content

Subscription handling cleanup#931

Merged
andsel merged 12 commits into
moquette-io:mainfrom
FraunhoferIOSB:subscription_rework
May 15, 2026
Merged

Subscription handling cleanup#931
andsel merged 12 commits into
moquette-io:mainfrom
FraunhoferIOSB:subscription_rework

Conversation

@hylkevds
Copy link
Copy Markdown
Collaborator

@hylkevds hylkevds commented Jan 31, 2026

While working on a subscription related feature I had to carefully review the way subscriptions are passed through Moquette, and did some massive cleanup and simplification.

Subscription information was being converted back and forth between several classes,
resulting in much unneeded object creation. But in the end it would always end up as Subscription.

  • This removes all the intermediate Objects, simplifying the code.
  • This includes the removal of the SharedSubscription class, since in the end, those are also turned into normal Subscriptions, making the extra class meaningless.
  • The logic contained many superfluous if/else checks for subscriptionIdentifiers caused by the wrapping and unwrapping of Subscription data.
  • All methods that would take several parameters with subscription information now simply accept a subscription.
  • Subscriptions are (already) registered on the Session, so when an unsubscribe is received, the subscription can be fetched from the Session. This allows the subscription-remove functions to be greatly simplified.
  • To simplify and speed up fetching the subscription from the Session, it is now stored in a Map instead of a Set.

These changes will greatly improve the maintainability of the code, and probably increase efficiency.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works
  • I have updated the Changelog if it's a feature or a fix that has to be reported

@hylkevds
Copy link
Copy Markdown
Collaborator Author

Note that this PR is on top of PR #919, since I discovered that problem at the start of this work.

@hylkevds hylkevds requested a review from andsel January 31, 2026 14:01
@andsel
Copy link
Copy Markdown
Collaborator

andsel commented Feb 6, 2026

Hi @hylkevds I'll check it, sorry but I'm pretty loaded and this PR is not so trivial.

@hylkevds
Copy link
Copy Markdown
Collaborator Author

hylkevds commented Feb 7, 2026

No rush, I know how life is. I had wanted to get this done half a year ago, but I also just didn't find the time for it.
You should also prioritize 919, not this one.
This PR is far from trivial, and it doesn't actually fix or add any functionality. It just makes the logic a lot easier to follow, and probably improves performance though I've not measured that.

@hylkevds hylkevds force-pushed the subscription_rework branch from 657050e to da9b7da Compare March 9, 2026 09:27
hylkevds and others added 2 commits March 9, 2026 16:26
Subscription information was being converted back and forth between several classes,
resulting in much unneeded object creation. But in the end it would always end up as Subscription.
- This removes all the intermediate Objects, simplifying the code.
- This includes the removal of the SharedSubscription class, since in the end, those are also turned into normal
  Subscriptions, making the extra class meaningless.
- The logic contained many superfluous if/else checks for subscriptionIdentifiers caused by the wrapping
  and unwrapping of Subscription data.
- All methods that would take several parameters with subscription information now simply accept a subscription.
Subscriptions are registered on the Session, so when an unsubscribe is
received, the subscription can be fetched from the Session. This allows
the subscription-remove functions to be greatly simplified.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors Moquette’s subscription handling by removing intermediate subscription wrapper types (including SharedSubscription) and standardizing APIs to pass Subscription objects end-to-end, with corresponding updates across persistence, session tracking, and subscription matching.

Changes:

  • Removed SharedSubscription and migrated shared-subscription storage/matching to use Subscription directly.
  • Simplified subscribe/unsubscribe flows by storing subscriptions on Session in a map keyed by topic string and reusing the stored Subscription for removals.
  • Updated repositories (H2, in-memory), trie structures, and tests to the new Subscription-centric APIs.

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
broker/src/main/java/io/moquette/broker/PostOffice.java Builds shared/non-shared Subscription objects directly; unsubscribe now looks up stored subscriptions from Session.
broker/src/main/java/io/moquette/broker/Session.java Stores subscriptions in a Map keyed by topic string; adds lookup helpers for unsubscribe.
broker/src/main/java/io/moquette/broker/SessionRegistry.java Adjusts subscription removal calls to new signatures (but still needs shared vs non-shared handling fixes).
broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java Repository API now removes/adds using Subscription objects; shared listing now returns Subscription.
broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java Persists shared subscriptions as Subscription; updates MVStore (de)serialization logic accordingly.
broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java In-memory shared-subscription storage now uses Subscription; removal now takes Subscription.
broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java Directory API now accepts Subscription; shared ops now use Subscription directly.
broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java Removes SubscriptionRequest/UnsubscribeRequest wrappers; trie now mutates using Subscription.
broker/src/main/java/io/moquette/broker/subscriptions/CNode.java Stores shared subscriptions as Subscription and returns them directly during match selection.
broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java Share name is now ShareName; adds helper to reconstruct $share/... topic string; renames option accessor.
broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java Adds EMPTY_SHARENAME, isEmpty(), and Comparable.
broker/src/main/java/io/moquette/broker/subscriptions/TNode.java Updates signatures to accept Subscription.
broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java Directory interface now accepts/removes Subscription objects directly.
broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java Updates debug dump to use new option accessor.
broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java Uses Subscription.getOption() accessor.
broker/src/main/java/io/moquette/broker/subscriptions/SharedSubscription.java Deleted (type removed).
broker/src/test/java/io/moquette/persistence/H2SubscriptionsRepositorySharedSubscriptionsTest.java Updates tests to use Subscription for shared subscriptions.
broker/src/test/java/io/moquette/broker/subscriptions/SubscriptionTestUtils.java Updates helpers to construct shared subscriptions with ShareName.
broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java Updates tests to call trie methods with Subscription directly.
broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java Updates tests to new directory signatures (add/remove take Subscription).
broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java Updates shared-subscription matching tests to use Subscription.
broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java Updates perf test to use Subscription directly.
ChangeLog.txt Adds changelog entry for the cleanup.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

public List<Subscription> getSubscriptions() {
return new ArrayList<>(subscriptions);
public Collection<Subscription> getSubscriptions() {
return subscriptions.values();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think it's right

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. The performance cost of making the copy are mostly irrelevant, since the method is only used when re-initialising a server after a restart, or when cleaning up a session.
I'll also change the method header back to returning a List.

Comment on lines +402 to 404
Optional<Subscription> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.getShareName().toString()))
.findFirst();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the actual correct fix would be to make sure an empty share name can never happen. En empty share name is bound to cause other issues too.
We should track this as a separate issue, since it already exists in the current version and is not specific to the PR.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second look, ShareName#toString is indeed the wrong method. Should be ShareName#getShareName. That is a bit confusing indeed.
Fixed now.

Comment on lines +56 to 60
for (Subscription shared : subscriptionsRepository.listAllSharedSubscription()) {
LOG.debug("Re-subscribing shared {}", shared);
ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(),
shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS())));
// TODO: This must call all registered InterceptHandler.onSubscribe
ctrie.addToTree(shared);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think this observation is right. I think that we can address in a follow-up PR, because requires also unit test to verify the error, and this PR is already big and maybe this change if out of context with the PR.

Comment thread broker/src/main/java/io/moquette/broker/SessionRegistry.java
Comment on lines 366 to 369
private void unsubscribe(Session session) {
for (Subscription existingSub : session.getSubscriptions()) {
subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
subscriptionsDirectory.removeSubscription(existingSub);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds Also here I think it's right. I recall that I created pretty big unit tests, but maybe this is more an integration one that's missed. WDYT ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unsubscribe(Session) method is only used when purging a session. So we'd need an integration tests that

  • Creates a shared subscription with a persistent session.
  • Reconnects with the same clientId, but requesting a clean session.

That test currently doesn't exist... And I'm pretty sure the problem already existed before as a result. I'll add the hasShareName check.

Copy link
Copy Markdown
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hylkevds thank's a TON for your monumental work here, the spirit of the original code was to make think smoother but at the end created a little bit of mess that you correctly fixed.

Now that we don't have anymore a specialized SharedSubscription but only the Subscription which can be shared or not, I think that in every method that is something like "sharedSubscription(Subscription)" we have to put a defensive check that effectively the provided Subscription parameter is of shared kind. This to avoid of potential leak of non shared Subscription in the shared Subscription paths. WDYT?

I left suggestion comments and also spotted that Copilot spotted maybe real problems, please could you check those?

I've create #934 as follow-up work on this, to avoid making the context of this PR too big. Feel free to add even more tasks to that issue, maybe all the other Copilot suggestions if you think that fixing in this PR is out-of-context or make it too big.

Great work 👏

for (Subscription subscription : this.subscriptionsRepository.listAllSubscriptions()) {
LOG.debug("Re-subscribing {}", subscription);
ctrie.addToTree(SubscriptionRequest.buildNonShared(subscription));
// TODO: This must call all registered InterceptHandler.onSubscribe
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds why should call the onSubscribe? onSubscribe callback is intended to be invoked when a client effectively send a SUBSCRIBE to a series of topic filter. This init method, just reconstructs the directory from the data provided by the subscription's repository (the storage) so aren't real SUBSCRIBE commands.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because an embedded server should also be brought back up to the state of all subscriptions. For instance, FROST-Server only sends messages on topics that actually have subscriptions. But to know a subscription exists, it registers an InterceptHandler so it knows about subscribe and unsubscribe events.

You could argue that the embedding server should save and restore its own state, but that just leads to the double storing of data, with the significant chance of these two states diverging due to race-conditions during shutdown. It's much more reliable to have Moquette holding the master storage, and the embedding server following that.

An alternative would be to have a separate method on the InterceptHandler for subscriptions restored from storage.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Moquette could expose an API to query the subscription storage, so that FROST-Server queries Moquette's subscription storage. In that case I should ask your help to shape the possible queries, the most obvious is "list all subscriptions", but maybe you have other use cases. Then let the asynch listener mechanism to receive the updates when happens, instead of polling this API to aligns continuously its internal state.
WDYT?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, some dedicated methods to inspect the current topics would be even better. It would also allow for occasionally checking if the state of the embedding server still matches that of the broker.
I'll add it to our subscription TODO issue (#934) and create a wishlist for discussion.

Comment thread ChangeLog.txt Outdated
@@ -1,4 +1,5 @@
Version 0.19-SNAPSHOT
[Cleanup] Cleaned up subscription handling.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[Cleanup] Cleaned up subscription handling.
[cleanup] Cleaned up subscription handling. (#931)

*
* @return the original topic string.
*/
public String getOriginalTopicWithSharename() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public String getOriginalTopicWithSharename() {
public String getOriginalTopicFilterWithSharename() {


@Override
public int compareTo(ShareName o) {
return shareName.compareTo(o.shareName);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shareName is null this is a NPE.
If we know that it can't ever be null we need to use a null check assertion in the constructor, or alternatively if null is same as EMPTY_SHARENAME replace null shareName with EMPTY_SHARENAME in the constructor and remove null checks around.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed validateShareName already does the null and empty check. But I'll add a non-null check regardless, to ensure we can't accidentally add a null sharename in the future.

}

private void addSharedSubscriptionRequest(SubscriptionRequest shareSubRequest) {
private void addSharedSubscriptionRequest(Subscription shareSubRequest) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addSharedSubscriptionRequest -> addShared

Comment on lines +56 to 60
for (Subscription shared : subscriptionsRepository.listAllSharedSubscription()) {
LOG.debug("Re-subscribing shared {}", shared);
ctrie.addToTree(SubscriptionRequest.buildShared(shared.getShareName(), shared.topicFilter(),
shared.clientId(), MqttSubscriptionOption.onlyFromQos(shared.requestedQoS())));
// TODO: This must call all registered InterceptHandler.onSubscribe
ctrie.addToTree(shared);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think this observation is right. I think that we can address in a follow-up PR, because requires also unit test to verify the error, and this PR is already big and maybe this change if out of context with the PR.

Optional<SharedSubscriptionData> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.name.toString()))
Optional<Subscription> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.getShareName().toString()))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subData now can became simply sub

public List<Subscription> getSubscriptions() {
return new ArrayList<>(subscriptions);
public Collection<Subscription> getSubscriptions() {
return subscriptions.values();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds I think it's right

Comment thread broker/src/main/java/io/moquette/broker/SessionRegistry.java
Comment on lines 366 to 369
private void unsubscribe(Session session) {
for (Subscription existingSub : session.getSubscriptions()) {
subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
subscriptionsDirectory.removeSubscription(existingSub);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds Also here I think it's right. I recall that I created pretty big unit tests, but maybe this is more an integration one that's missed. WDYT ?

@andsel
Copy link
Copy Markdown
Collaborator

andsel commented Apr 7, 2026

@hylkevds let me know when this PR is ready for another review :-)

@hylkevds
Copy link
Copy Markdown
Collaborator Author

I think I've addressed all comments, except for removing the TODO comment on lime 57 of CTrieSubscriptionDirectory.java

@hylkevds
Copy link
Copy Markdown
Collaborator Author

The TODO comments are now also gone

@andsel andsel self-requested a review May 13, 2026 11:37
Copy link
Copy Markdown
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hylkevds maybe a nitpick, but please consider to add helper checker methods on subscription type, like assertion methods.

boolean notExistingSubscription = ctrie.addToTree(subRequest);
subscriptionsRepository.addNewSubscription(subRequest.subscription());
public boolean add(Subscription sub) {
if (sub.hasShareName()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind to add checker methods like in andsel#2 ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, personally I dislike hiding the throwing of exceptions away like that, especially if the method name doesn't make it very obvious that its only goal is to throw an exception sometimes... It doesn't make the code more readable nor easier to follow.

Of course it's your project, so if you insist I'll change it :)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dinner and some Yoga later...
I see the point that those if ... then ... throw new x are very ugly, but a method should either encapsulate significant complexity, or be re-useable. How about this further development of the pattern:

public class Exceptions {
    public static final void illegalArgumentIf(boolean predicate, String message) {
        if (predicate) {
            throw new IllegalArgumentException(message);
        }
    }
}

That is re-usable, and calling

Exceptions.illegalArgumentIf(sub.hasShareName(), "Adding a shared subscription using the non-shared method.");

makes it very clear what the intention is.
It also open the option of making a version that accepts a lambda instead of a boolean, for more complex checks.

We can then also do the same for other exceptions. What do you think?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial consideration came from https://docs.oracle.com/javase/8/docs/api/java/util/Objects.html#requireNonNull-T- where in some methods, to assure a parameter is not null we use:

public void aMethod(Object some) {
  Object.requireNonNull("some parameter can't be null");
  ...
}

Your idea is not bad, is a good brick to build other "check-style" method. I think that if the method is more constrained provides implicitly more context.

What do you say of renaming checkIsShared with requireShared(subscription, message), it's very domain specific and carries all the meaning it needs.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, requireShared is better.
On that note, shouldn't it just be a method on the subscription itself? Then we don't need to pass the subscription to the method: subscription.requireShared(message).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice suggestion, however the invariant is part of the client code, in this code CTrieSubscriptionDirectory and it's not proper business for the subscription itself. Just to limit the public API surface of the Subscription and limiting the scope.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to update your PR to this PR, or shall I add the changes as a separate commit to this PR, or do we do this completely separate?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add the changes as a separate commit to this PR 🙏

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@andsel andsel self-requested a review May 15, 2026 07:41
Copy link
Copy Markdown
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thank you so so so much @hylkevds for your work on this! 👏

@andsel andsel merged commit e7ac96e into moquette-io:main May 15, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants