diff --git a/ChangeLog.txt b/ChangeLog.txt index 3bfab198b..fb27e1386 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.19-SNAPSHOT + [feature] Added topic rewriting, allowing the internally used topic to be different from the topic the client used. [Cleanup] Cleaned up subscription handling. (#931) [feature] Added metrics framework and Prometheus implementation. - Enable by setting `metrics_provider_class` to `MetricsProviderPrometheus`. diff --git a/broker/src/main/java/io/moquette/broker/Authorizator.java b/broker/src/main/java/io/moquette/broker/Authorizator.java index 6f237b1ea..b8925397e 100644 --- a/broker/src/main/java/io/moquette/broker/Authorizator.java +++ b/broker/src/main/java/io/moquette/broker/Authorizator.java @@ -79,11 +79,11 @@ private List verifyTopicsReadAccessWithTopicExtractor(Str final int messageId = messageId(msg); for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) { - Topic topic = topicExtractor.apply(req.topicName()); + Topic topic = topicExtractor.apply(req.topicFilter()); final MqttQoS qos = getQoSCheckingAlsoPermissionsOnTopic(clientID, username, messageId, topic, req.qualityOfService()); MqttSubscriptionOption option = PostOffice.optionWithQos(qos, req.option()); - ackTopics.add(new MqttTopicSubscription(req.topicName(), option)); + ackTopics.add(new MqttTopicSubscription(req.topicFilter(), option)); } return ackTopics; } diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 9cae0f35a..4e8f2b9b6 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -57,6 +57,8 @@ import java.util.stream.Collectors; import static io.moquette.broker.Utils.messageId; +import io.moquette.interception.TopicRewriter; +import io.moquette.interception.TopicRewriterUnity; import io.moquette.metrics.MetricsManager; import io.moquette.metrics.MetricsProvider; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; @@ -206,6 +208,7 @@ public RouteResult ifFailed(Runnable action) { private final ISessionsRepository sessionRepository; private SessionRegistry sessionRegistry; private BrokerInterceptor interceptor; + private TopicRewriter topicRewriter = new TopicRewriterUnity(); private final FailedPublishCollection failedPublishes = new FailedPublishCollection(); private final SessionEventLoopGroup sessionLoops; private final Clock clock; @@ -271,6 +274,10 @@ public Optional expireAt() { recreateRetainedExpires(retainedRepository); } + public void setTopicRewriter(TopicRewriter topicRewriter) { + this.topicRewriter = topicRewriter; + } + private void cleanRetainedExpired(ExpirableTopic expirable) { retainedRepository.cleanRetained(expirable.topic); } @@ -420,7 +427,9 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S .map(sub -> { final Topic topic = new Topic(sub.topicFilter()); MqttSubscriptionOption option = sub.option();//MqttSubscriptionOption.onlyFromQos(sub.qualityOfService()); - return new Subscription(clientID, topic, option, subscriptionIdOpt); + final Subscription subscription = new Subscription(clientID, topic, option, subscriptionIdOpt); + subscription.setTopicFilterInternal(topicRewriter.rewriteTopic(subscription)); + return subscription; }).collect(Collectors.toList()); final Set subscriptionToSendRetained = newSubscriptions.stream() @@ -444,12 +453,14 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S } } - private static Subscription buildSharedSubscriptionFrom(MqttTopicSubscription s, String clientID, Optional subscriptionIdOpt) { - return new Subscription( + private Subscription buildSharedSubscriptionFrom(MqttTopicSubscription s, String clientID, Optional subscriptionIdOpt) { + final Subscription subscription = new Subscription( clientID, Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(s.topicFilter())), s.option(), new ShareName(SharedSubscriptionUtils.extractShareName(s.topicFilter())), subscriptionIdOpt); + subscription.setTopicFilterInternal(topicRewriter.rewriteTopic(subscription)); + return subscription; } private static boolean needToReceiveRetained(Utils.Couple addedAndSub) { @@ -519,7 +530,7 @@ private static Optional verifyAndExtractMessageIdentifie private void publishRetainedMessagesForSubscriptions(String clientID, Collection newSubscriptions) { Session targetSession = this.sessionRegistry.retrieve(clientID); for (Subscription subscription : newSubscriptions) { - final String topicFilter = subscription.getTopicFilter().toString(); + final String topicFilter = subscription.getTopicFilterInternal().toString(); final Collection retainedMsgs = retainedRepository.retainedOnTopic(topicFilter); if (retainedMsgs.isEmpty()) { @@ -929,7 +940,16 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq boolean isSessionPresent = targetSession != null; if (isSessionPresent) { LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", - sub.getClientId(), sub.getTopicFilter(), qos); + sub.getClientId(), sub.getTopicFilterInternal(), qos); + + if (sub.getTopicFilterClient().hasWildcard() && sub.isTopicRewritten()) { + // Topic contains a wildcard AND is rewritten. The interceptor that did the rewriting + // must tell us what topic the client expects. + topic = topicRewriter.rewriteTopicInverse(sub.getTopicFilterClient(), topic); + } else { + // Non-Wildcard or non-rewritten topic, we can use the client version. + topic = sub.getTopicFilterClient(); + } metricsProvider.addMessage(SessionEventLoop.getThreadQueueId(), qos.value()); Collection existingProperties = msg.variableHeader().properties().listAll(); @@ -941,7 +961,7 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq // If we are, the subscriber disconnected after the subscriptions tree selected that session as a // destination. LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}", sub.getClientId(), - sub.getTopicFilter(), qos); + sub.getTopicFilterInternal(), qos); } } diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 86d9a6179..858083932 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -68,6 +68,7 @@ import java.util.concurrent.ScheduledExecutorService; import static io.moquette.broker.Session.INFINITE_EXPIRY; +import io.moquette.interception.TopicRewriter; import static io.moquette.metrics.MetricsUtils.getInterceptorIds; import io.moquette.metrics.MetricsManager; import io.moquette.metrics.MetricsProvider; @@ -81,6 +82,7 @@ public class Server { private NewNettyAcceptor acceptor; private volatile boolean initialized; private PostOffice dispatcher; + private TopicRewriter topicRewriter; private BrokerInterceptor interceptor; private H2Builder h2Builder; private SessionRegistry sessions; @@ -266,6 +268,9 @@ public void startServer(IConfig config, List handler final MqttQoS serverGrantedQoS = parseMaxGrantedQoS(config); dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, sessionsRepository, interceptor, authorizator, loopsGroup, clock, serverGrantedQoS, metricsProvider); + if (topicRewriter != null) { + dispatcher.setTopicRewriter(topicRewriter); + } final BrokerConfiguration brokerConfig = new BrokerConfiguration(config); MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions, dispatcher); @@ -284,6 +289,13 @@ public void startServer(IConfig config, List handler initialized = true; } + public void setTopicRewriter(TopicRewriter topicRewriter) { + this.topicRewriter = topicRewriter; + if (dispatcher != null) { + dispatcher.setTopicRewriter(topicRewriter); + } + } + private static MqttQoS parseMaxGrantedQoS(IConfig config) { final String qosValue = config.getProperty(IConfig.MAX_SERVER_GRANTED_QOS_PROPERTY_NAME, "2"); try { diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 076b604a5..253f5561c 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -316,8 +316,10 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri // publish new session final Session newSession = createNewSession(msg, clientId); Session previous = pool.put(clientId, newSession); + metricsProvider.addOpenSession(); if (previous != null) { LOG.error("We're re-opening a session for clientId {} and we purged the old session, but there is still a session in the pool! this is a bug!", clientId); + metricsProvider.removeOpenSession(); } LOG.trace("case 2, oldSession with same CId {} disconnected", clientId); @@ -353,7 +355,7 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri private void reactivateSubscriptions(Session session, String username) { //verify if subscription still satisfy read ACL permissions for (Subscription existingSub : session.getSubscriptions()) { - final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilter(), username, + final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilterInternal(), username, session.getClientID()); if (!topicReadable) { if (existingSub.hasShareName()) { @@ -361,7 +363,7 @@ private void reactivateSubscriptions(Session session, String username) { } else { subscriptionsDirectory.removeSubscription(existingSub); } - session.removeSubscription(existingSub.getTopicFilter()); + session.removeSubscription(existingSub.getTopicFilterClient()); } // TODO // subscriptionsDirectory.reactivate(existingSub.getTopicFilter(), session.getClientID()); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 6278c9b0a..f5c1a6830 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -125,7 +125,7 @@ private List recursiveMatch(Topic topicName, INode inode, int dept public boolean addToTree(Subscription sub) { Action res; do { - res = insert(sub.getTopicFilter(), this.root, sub); + res = insert(sub.getTopicFilterInternal(), this.root, sub); } while (res == Action.REPEAT); return res == Action.OK_NEW; } @@ -193,7 +193,7 @@ private INode createLeafNodes(Token token, Subscription sub) { public void removeFromTree(Subscription sub) { Action res; do { - res = remove(sub.getClientId(), sub.getTopicFilter(), this.root, NO_PARENT, sub); + res = remove(sub.getClientId(), sub.getTopicFilterInternal(), this.root, NO_PARENT, sub); } while (res == Action.REPEAT); } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java index b1eab43a6..767507596 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java @@ -38,7 +38,7 @@ private String prettySubscriptions(CNode node) { int counter = 0; for (Subscription couple : node.subscriptions()) { subScriptionsStr - .append("{filter=").append(couple.topicFilter).append(", ") + .append("{filter=").append(couple.getTopicFilterInternal()).append(", ") .append("option=").append(couple.getOption()).append(", ") .append("client='").append(couple.clientId).append("'}"); counter++; diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java b/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java index bc300a931..b614ce055 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java @@ -30,7 +30,8 @@ public final class Subscription implements Serializable, Comparable subscriptionId; @@ -60,7 +61,8 @@ public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption o public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption options, ShareName shareName, Optional subscriptionId) { this.clientId = clientId; - this.topicFilter = topicFilter; + this.topicFilterClient = topicFilter; + this.topicFilterInternal = topicFilter; this.shareName = shareName; this.subscriptionId = subscriptionId; this.option = options; @@ -70,8 +72,16 @@ public String getClientId() { return clientId; } - public Topic getTopicFilter() { - return topicFilter; + public Topic getTopicFilterInternal() { + return topicFilterInternal; + } + + public void setTopicFilterInternal(Topic topicFilterInternal) { + this.topicFilterInternal = topicFilterInternal; + } + + public Topic getTopicFilterClient() { + return topicFilterClient; } /** @@ -82,9 +92,18 @@ public Topic getTopicFilter() { */ public String getOriginalTopicFilterWithSharename() { if (shareName.isEmpty()) { - return topicFilter.toString(); + return topicFilterClient.toString(); } - return "$share/" + shareName.getShareName() + "/" + topicFilter.toString(); + return "$share/" + shareName.getShareName() + "/" + topicFilterClient.toString(); + } + + /** + * Check if the client topic was rewritten to a different internal topic. + * + * @return true if the client topic is different from the internal topic. + */ + public boolean isTopicRewritten() { + return topicFilterInternal.equals(topicFilterClient); } public boolean qosLessThan(Subscription sub) { @@ -106,17 +125,17 @@ public boolean equals(Object o) { Subscription that = (Subscription) o; return Objects.equals(clientId, that.clientId) && Objects.equals(shareName, that.shareName) && - Objects.equals(topicFilter, that.topicFilter); + Objects.equals(topicFilterInternal, that.topicFilterInternal); } @Override public int hashCode() { - return Objects.hash(clientId, shareName, topicFilter); + return Objects.hash(clientId, shareName, topicFilterInternal); } @Override public String toString() { - return String.format("[filter:%s, clientID: %s, options: %s - shareName: %s]", topicFilter, clientId, option, shareName); + return String.format("[filter:%s, clientID: %s, options: %s - shareName: %s]", topicFilterInternal, clientId, option, shareName); } @Override @@ -139,7 +158,7 @@ public int compareTo(Subscription o) { if (compare != 0) { return compare; } - return this.topicFilter.compareTo(o.topicFilter); + return this.topicFilterInternal.compareTo(o.topicFilterInternal); } public String clientAndShareName() { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java index 96940e116..e851cd52a 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/Topic.java @@ -39,6 +39,8 @@ public class Topic implements Serializable, Comparable { private transient boolean valid; + private transient boolean hasWildcard; + /** * Factory method * @@ -110,10 +112,12 @@ private List parseTopic(String topic) throws ParseException { i); } res.add(Token.MULTI); + hasWildcard = true; } else if (s.contains("#")) { throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i); } else if (s.equals("+")) { res.add(Token.SINGLE); + hasWildcard = true; } else if (s.contains("+")) { throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i); } else { @@ -158,6 +162,13 @@ public boolean isValid() { return valid; } + public boolean hasWildcard() { + if (tokens == null) + getTokens(); + return hasWildcard; + } + + /** * Verify if the 2 topics matching respecting the rules of MQTT Appendix A * @@ -199,6 +210,9 @@ public String toString() { @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } if (obj == null) { return false; } diff --git a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java index b01ab13ce..ad71a1d66 100644 --- a/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java +++ b/broker/src/main/java/io/moquette/interception/BrokerInterceptor.java @@ -23,7 +23,6 @@ import io.moquette.broker.subscriptions.Subscription; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; @@ -145,7 +144,7 @@ public void notifyTopicPublished(final MqttPublishMessage msg, final String clie public void notifyTopicSubscribed(final Subscription sub, final String username) { for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) { LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}", - sub.getClientId(), sub.getTopicFilter(), handler.getID()); + sub.getClientId(), sub.getTopicFilterInternal(), handler.getID()); executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username))); } } diff --git a/broker/src/main/java/io/moquette/interception/TopicRewriter.java b/broker/src/main/java/io/moquette/interception/TopicRewriter.java new file mode 100644 index 000000000..825460517 --- /dev/null +++ b/broker/src/main/java/io/moquette/interception/TopicRewriter.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2012-2025 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.moquette.interception; + +import io.moquette.broker.subscriptions.Subscription; +import io.moquette.broker.subscriptions.Topic; + +/** + * A topic re-writer can change the topics of subscriptions and publishes before + * they are handled internally. + */ +public interface TopicRewriter { + + /** + * Rewrite the topic for the given subscription. + * + * @param subscription The subscription to rewrite the Topic for. + * @return the rewritten topic. + */ + public Topic rewriteTopic(Subscription subscription); + + /** + * Reverse the rewrite for the given publish Topic, so that it matches the + * clientTopic. This is needed when a subscription that this topic matched + * (the clientTopic) has a wild-card and is rewritten. + * + * @param clientTopic The topic that the client originally subscribed to. + * This will have a wild-card in it. + * @param publishedTopic The topic that received a publish. + * @return The topic that the client would expect the publish to be on. + */ + public Topic rewriteTopicInverse(Topic clientTopic, Topic publishedTopic); +} diff --git a/broker/src/main/java/io/moquette/interception/TopicRewriterUnity.java b/broker/src/main/java/io/moquette/interception/TopicRewriterUnity.java new file mode 100644 index 000000000..ee47b9d01 --- /dev/null +++ b/broker/src/main/java/io/moquette/interception/TopicRewriterUnity.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2012-2025 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.moquette.interception; + +import io.moquette.broker.subscriptions.Subscription; +import io.moquette.broker.subscriptions.Topic; + +/** + * A TopicRewriter that does not rewrite. + */ +public class TopicRewriterUnity implements TopicRewriter { + + @Override + public Topic rewriteTopic(Subscription subscription) { + return subscription.getTopicFilterClient(); + } + + @Override + public Topic rewriteTopicInverse(Topic clientTopic, Topic publishTopic) { + return publishTopic; + } + +} diff --git a/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java b/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java index 87cf4b289..f189c5816 100644 --- a/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java +++ b/broker/src/main/java/io/moquette/interception/messages/InterceptSubscribeMessage.java @@ -42,8 +42,12 @@ public MqttSubscriptionOption getOption() { return subscription.getOption(); } - public String getTopicFilter() { - return subscription.getTopicFilter().toString(); + public String getTopicFilterClient() { + return subscription.getTopicFilterClient().toString(); + } + + public String getTopicFilterInternal() { + return subscription.getTopicFilterInternal().toString(); } public String getUsername() { diff --git a/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java index beafddab6..ed8f69218 100644 --- a/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java @@ -68,13 +68,13 @@ public Set listAllSubscriptions() { @Override public void addNewSubscription(Subscription subscription) { - subscriptions.put(subscription.getTopicFilter() + "-" + subscription.getClientId(), subscription); + subscriptions.put(subscription.getTopicFilterInternal() + "-" + subscription.getClientId(), subscription); } @Override public void removeSubscription(Subscription subscription) { final String clientID = subscription.getClientId(); - final Topic topicFilter = subscription.getTopicFilter(); + final Topic topicFilter = subscription.getTopicFilterInternal(); subscriptions.remove(topicFilter + "-" + clientID); } @@ -97,7 +97,7 @@ private void wipeAllSharedSubscripptions(String clientId, String sharedSubsMapNa public void removeSharedSubscription(Subscription sub) { final String clientId = sub.getClientId(); final ShareName share = sub.getShareName(); - final Topic topicFilter = sub.getTopicFilter(); + final Topic topicFilter = sub.getTopicFilterInternal(); final String sharedSubsMapName = sharedSubscriptions.get(clientId); if (sharedSubsMapName == null) { LOG.info("Removing a non existing shared subscription for client: {}", clientId); @@ -141,7 +141,7 @@ public void addNewSharedSubscription(Subscription sub) { } else { qosPart = new SubscriptionOptionAndId(sub.getOption()); } - storeNewSharedSubscription(sub.getClientId(), sub.getShareName(), sub.getTopicFilter(), qosPart); + storeNewSharedSubscription(sub.getClientId(), sub.getShareName(), sub.getTopicFilterInternal(), qosPart); } private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, SubscriptionOptionAndId value) { @@ -302,7 +302,7 @@ private static final class SubscriptionValueType extends BasicDataType, Subscription> subsMap = sharedSubscriptions.get(clientId); if (subsMap == null) { LOG.info("Removing a non existing shared subscription for client: {}", clientId); @@ -82,7 +82,7 @@ public void addNewSharedSubscription(Subscription sub) { private void storeNewSharedSubscription(Subscription sharedSub) { Map, Subscription> subsMap = sharedSubscriptions.computeIfAbsent(sharedSub.getClientId(), unused -> new HashMap<>()); - subsMap.put(Utils.Couple.of(sharedSub.getShareName(), sharedSub.getTopicFilter()), sharedSub); + subsMap.put(Utils.Couple.of(sharedSub.getShareName(), sharedSub.getTopicFilterInternal()), sharedSub); } @Override diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index 61b9b43b1..ab9290619 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -191,11 +191,11 @@ private void assertNotMatch(String topicFilter, String topicName) { public void testOverlappingSubscriptions() { Subscription genericSub = new Subscription("Sensor1", asTopic("a/+"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); this.sessionsRepository.addNewSubscription(genericSub); - sut.add(new Subscription(genericSub.clientId, genericSub.topicFilter, genericSub.getOption())); + sut.add(new Subscription(genericSub.clientId, genericSub.topicFilterClient, genericSub.getOption())); Subscription specificSub = new Subscription("Sensor1", asTopic("a/b"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); this.sessionsRepository.addNewSubscription(specificSub); - sut.add(new Subscription(specificSub.clientId, specificSub.topicFilter, specificSub.getOption())); + sut.add(new Subscription(specificSub.clientId, specificSub.topicFilterClient, specificSub.getOption())); //Exercise final List matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationTopicRewriteTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationTopicRewriteTest.java new file mode 100644 index 000000000..d7ef4bc40 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationTopicRewriteTest.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2012-2025 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.moquette.integration; + +import io.moquette.broker.Server; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.broker.subscriptions.Subscription; +import io.moquette.broker.subscriptions.Topic; +import io.moquette.interception.TopicRewriter; +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Properties; + +import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.Assertions; +import static org.junit.jupiter.api.Assertions.*; + +public class ServerIntegrationTopicRewriteTest { + + private static final Logger LOG = LoggerFactory.getLogger(ServerIntegrationPahoTest.class); + + private Server server; + private IMqttClient client; + private IMqttClient publisher; + private MessageCollector messagesCollector; + private IConfig config; + + @TempDir + Path tempFolder; + private String dbPath; + + protected void startServer(String dbPath) throws IOException { + server = new Server(); + final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath); + config = new MemoryConfig(configProps); + server.setTopicRewriter(new TopicRewriter() { + private final Pattern patternForward = Pattern.compile("^sensor/([^/]+)"); + private final Pattern patternInverse = Pattern.compile("^sensors/([^/]+)"); + + @Override + public Topic rewriteTopic(Subscription subscription) { + Topic clientTopic = subscription.getTopicFilterClient(); + Matcher matcher = patternForward.matcher(clientTopic.toString()); + if (matcher.matches()) { + String internalTopic = "sensors/" + matcher.group(1); + LOG.info("Rewritten {} to {}", clientTopic, internalTopic); + return Topic.asTopic(internalTopic); + } + return clientTopic; + } + + @Override + public Topic rewriteTopicInverse(Topic clientTopic, Topic publishedTopicInternal) { + Matcher matcher = patternInverse.matcher((CharSequence) publishedTopicInternal); + if (matcher.matches()) { + String publishedTopicClient = "sensor/" + matcher.group(1); + LOG.info("Inverse-Rewritten {} to {}", publishedTopicInternal, publishedTopicClient); + return Topic.asTopic(publishedTopicClient); + } + return publishedTopicInternal; + } + }); + server.startServer(config); + } + + @BeforeAll + public static void beforeTests() { + Awaitility.setDefaultTimeout(Durations.ONE_SECOND); + } + + @BeforeEach + public void setUp() throws Exception { + dbPath = IntegrationUtils.tempH2Path(tempFolder); + startServer(dbPath); + + MqttClientPersistence dataStore = new MqttDefaultFilePersistence(IntegrationUtils.newFolder(tempFolder, "client").getAbsolutePath()); + MqttClientPersistence pubDataStore = new MqttDefaultFilePersistence(IntegrationUtils.newFolder(tempFolder, "publisher").getAbsolutePath()); + + client = new MqttClient("tcp://localhost:1883", "TestClient", dataStore); + messagesCollector = new MessageCollector(); + client.setCallback(messagesCollector); + + publisher = new MqttClient("tcp://localhost:1883", "Publisher", pubDataStore); + } + + @AfterEach + public void tearDown() throws Exception { + IntegrationUtils.disconnectClient(client); + IntegrationUtils.disconnectClient(publisher); + + stopServer(); + } + + private void stopServer() { + server.stopServer(); + } + + /** + * On the server the sensor sub-tree was moved to sensors, but legacy clients should still receive messages on the old topic. + * Subscriber B subscribing to sensor/sensor1 should get messages sent to sensors/sensor1, as if they were sent to sensor/sensor1. + * Subscriber C subscribing to sensor/+ should get messages sent to sensors/+, as if they were sent to sensor/+. + */ + @Test + public void checkSubscribersGetCorrectTopicNotifications() throws Exception { + LOG.info("*** checkSubscribersGetCorrectTopicNotifications ***"); + + MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence(IntegrationUtils.newFolder(tempFolder, "subscriberA").getAbsolutePath()); + MqttClient subscriberA = new MqttClient("tcp://localhost:1883", "SubscriberA", dsSubscriberA); + MessageCollector cbSubscriberA = new MessageCollector(); + subscriberA.setCallback(cbSubscriberA); + subscriberA.connect(); + subscriberA.subscribe("sensors/sensor1", 1); + + MqttClientPersistence dsSubscriberB = new MqttDefaultFilePersistence(IntegrationUtils.newFolder(tempFolder, "subscriberB").getAbsolutePath()); + MqttClient subscriberB = new MqttClient("tcp://localhost:1883", "SubscriberB", dsSubscriberB); + MessageCollector cbSubscriberB = new MessageCollector(); + subscriberB.setCallback(cbSubscriberB); + subscriberB.connect(); + subscriberB.subscribe("sensor/sensor1", 2); + + MqttClientPersistence dsSubscriberC = new MqttDefaultFilePersistence(IntegrationUtils.newFolder(tempFolder, "subscriberC").getAbsolutePath()); + MqttClient subscriberC = new MqttClient("tcp://localhost:1883", "SubscriberC", dsSubscriberC); + MessageCollector cbSubscriberC = new MessageCollector(); + subscriberC.setCallback(cbSubscriberC); + subscriberC.connect(); + subscriberC.subscribe("sensor/+", 2); + + client.connect(); + final String messageText1 = "Hello world MQTT!!"; + client.publish("sensors/sensor1", messageText1.getBytes(UTF_8), 2, false); + + { + Awaitility.await().until(cbSubscriberA::isMessageReceived); + MqttMessage message = cbSubscriberA.retrieveMessage(); + assertNotNull(message, "MUST be a received message"); + assertEquals(messageText1, new String(message.getPayload(), UTF_8)); + assertEquals(1, message.getQos()); + } + { + Awaitility.await().until(cbSubscriberB::isMessageReceived); + MqttMessage message = cbSubscriberB.retrieveMessage(); + assertNotNull(message, "MUST be a received message"); + assertEquals(messageText1, new String(message.getPayload(), UTF_8)); + assertEquals(2, message.getQos()); + } + { + Awaitility.await().until(cbSubscriberC::isMessageReceived); + MqttMessage messageOnC = cbSubscriberC.retrieveMessage(); + assertNotNull(messageOnC, "MUST be a received message"); + assertEquals(messageText1, new String(messageOnC.getPayload(), UTF_8)); + assertEquals(2, messageOnC.getQos()); + } + + subscriberB.unsubscribe("sensor/sensor1"); + subscriberC.unsubscribe("sensor/+"); + + final String messageText2 = "Hello world again"; + client.publish("sensors/sensor1", messageText2.getBytes(UTF_8), 2, false); + + { + Awaitility.await().until(cbSubscriberA::isMessageReceived); + MqttMessage message = cbSubscriberA.retrieveMessage(); + assertEquals(messageText2, new String(message.getPayload(), UTF_8)); + assertEquals(1, message.getQos()); + subscriberA.disconnect(); + } + { + Thread.sleep(Durations.ONE_SECOND.toMillis()); + assertFalse(cbSubscriberB.isMessageReceived(), "MUST NOT receive a message"); + subscriberB.disconnect(); + + assertFalse(cbSubscriberC.isMessageReceived(), "MUST NOT receive a message"); + subscriberC.disconnect(); + } + } + +}