Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.19-SNAPSHOT
[feature] Added topic rewriting, allowing the internally used topic to be different from the topic the client used.
[Cleanup] Cleaned up subscription handling. (#931)
[feature] Added metrics framework and Prometheus implementation.
- Enable by setting `metrics_provider_class` to `MetricsProviderPrometheus`.
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ private List<MqttTopicSubscription> verifyTopicsReadAccessWithTopicExtractor(Str

final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = topicExtractor.apply(req.topicName());
Topic topic = topicExtractor.apply(req.topicFilter());
final MqttQoS qos = getQoSCheckingAlsoPermissionsOnTopic(clientID, username, messageId, topic,
req.qualityOfService());
MqttSubscriptionOption option = PostOffice.optionWithQos(qos, req.option());
ackTopics.add(new MqttTopicSubscription(req.topicName(), option));
ackTopics.add(new MqttTopicSubscription(req.topicFilter(), option));
}
return ackTopics;
}
Expand Down
32 changes: 26 additions & 6 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.stream.Collectors;

import static io.moquette.broker.Utils.messageId;
import io.moquette.interception.TopicRewriter;
import io.moquette.interception.TopicRewriterUnity;
import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
Expand Down Expand Up @@ -206,6 +208,7 @@ public RouteResult ifFailed(Runnable action) {
private final ISessionsRepository sessionRepository;
private SessionRegistry sessionRegistry;
private BrokerInterceptor interceptor;
private TopicRewriter topicRewriter = new TopicRewriterUnity();
private final FailedPublishCollection failedPublishes = new FailedPublishCollection();
private final SessionEventLoopGroup sessionLoops;
private final Clock clock;
Expand Down Expand Up @@ -271,6 +274,10 @@ public Optional<Instant> expireAt() {
recreateRetainedExpires(retainedRepository);
}

public void setTopicRewriter(TopicRewriter topicRewriter) {
this.topicRewriter = topicRewriter;
}

private void cleanRetainedExpired(ExpirableTopic expirable) {
retainedRepository.cleanRetained(expirable.topic);
}
Expand Down Expand Up @@ -420,7 +427,9 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
.map(sub -> {
final Topic topic = new Topic(sub.topicFilter());
MqttSubscriptionOption option = sub.option();//MqttSubscriptionOption.onlyFromQos(sub.qualityOfService());
return new Subscription(clientID, topic, option, subscriptionIdOpt);
final Subscription subscription = new Subscription(clientID, topic, option, subscriptionIdOpt);
subscription.setTopicFilterInternal(topicRewriter.rewriteTopic(subscription));
return subscription;
}).collect(Collectors.toList());

final Set<Subscription> subscriptionToSendRetained = newSubscriptions.stream()
Expand All @@ -444,12 +453,14 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}

private static Subscription buildSharedSubscriptionFrom(MqttTopicSubscription s, String clientID, Optional<SubscriptionIdentifier> subscriptionIdOpt) {
return new Subscription(
private Subscription buildSharedSubscriptionFrom(MqttTopicSubscription s, String clientID, Optional<SubscriptionIdentifier> subscriptionIdOpt) {
final Subscription subscription = new Subscription(
clientID,
Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(s.topicFilter())),
s.option(),
new ShareName(SharedSubscriptionUtils.extractShareName(s.topicFilter())), subscriptionIdOpt);
subscription.setTopicFilterInternal(topicRewriter.rewriteTopic(subscription));
return subscription;
}

private static boolean needToReceiveRetained(Utils.Couple<Boolean, Subscription> addedAndSub) {
Expand Down Expand Up @@ -519,7 +530,7 @@ private static Optional<SubscriptionIdentifier> verifyAndExtractMessageIdentifie
private void publishRetainedMessagesForSubscriptions(String clientID, Collection<Subscription> newSubscriptions) {
Session targetSession = this.sessionRegistry.retrieve(clientID);
for (Subscription subscription : newSubscriptions) {
final String topicFilter = subscription.getTopicFilter().toString();
final String topicFilter = subscription.getTopicFilterInternal().toString();
final Collection<RetainedMessage> retainedMsgs = retainedRepository.retainedOnTopic(topicFilter);

if (retainedMsgs.isEmpty()) {
Expand Down Expand Up @@ -929,7 +940,16 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq
boolean isSessionPresent = targetSession != null;
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
sub.getClientId(), sub.getTopicFilterInternal(), qos);

if (sub.getTopicFilterClient().hasWildcard() && sub.isTopicRewritten()) {
// Topic contains a wildcard AND is rewritten. The interceptor that did the rewriting
// must tell us what topic the client expects.
topic = topicRewriter.rewriteTopicInverse(sub.getTopicFilterClient(), topic);
} else {
// Non-Wildcard or non-rewritten topic, we can use the client version.
topic = sub.getTopicFilterClient();
}

metricsProvider.addMessage(SessionEventLoop.getThreadQueueId(), qos.value());
Collection<? extends MqttProperties.MqttProperty> existingProperties = msg.variableHeader().properties().listAll();
Expand All @@ -941,7 +961,7 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
// destination.
LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}", sub.getClientId(),
sub.getTopicFilter(), qos);
sub.getTopicFilterInternal(), qos);
}
}

Expand Down
12 changes: 12 additions & 0 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.ScheduledExecutorService;

import static io.moquette.broker.Session.INFINITE_EXPIRY;
import io.moquette.interception.TopicRewriter;
import static io.moquette.metrics.MetricsUtils.getInterceptorIds;
import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;
Expand All @@ -81,6 +82,7 @@ public class Server {
private NewNettyAcceptor acceptor;
private volatile boolean initialized;
private PostOffice dispatcher;
private TopicRewriter topicRewriter;
private BrokerInterceptor interceptor;
private H2Builder h2Builder;
private SessionRegistry sessions;
Expand Down Expand Up @@ -266,6 +268,9 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final MqttQoS serverGrantedQoS = parseMaxGrantedQoS(config);
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, sessionsRepository, interceptor,
authorizator, loopsGroup, clock, serverGrantedQoS, metricsProvider);
if (topicRewriter != null) {
dispatcher.setTopicRewriter(topicRewriter);
}
final BrokerConfiguration brokerConfig = new BrokerConfiguration(config);
MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions,
dispatcher);
Expand All @@ -284,6 +289,13 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
initialized = true;
}

public void setTopicRewriter(TopicRewriter topicRewriter) {
this.topicRewriter = topicRewriter;
if (dispatcher != null) {
dispatcher.setTopicRewriter(topicRewriter);
}
}

private static MqttQoS parseMaxGrantedQoS(IConfig config) {
final String qosValue = config.getProperty(IConfig.MAX_SERVER_GRANTED_QOS_PROPERTY_NAME, "2");
try {
Expand Down
6 changes: 4 additions & 2 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,10 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri
// publish new session
final Session newSession = createNewSession(msg, clientId);
Session previous = pool.put(clientId, newSession);
metricsProvider.addOpenSession();
if (previous != null) {
LOG.error("We're re-opening a session for clientId {} and we purged the old session, but there is still a session in the pool! this is a bug!", clientId);
metricsProvider.removeOpenSession();
}

LOG.trace("case 2, oldSession with same CId {} disconnected", clientId);
Expand Down Expand Up @@ -353,15 +355,15 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri
private void reactivateSubscriptions(Session session, String username) {
//verify if subscription still satisfy read ACL permissions
for (Subscription existingSub : session.getSubscriptions()) {
final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilter(), username,
final boolean topicReadable = authorizator.canRead(existingSub.getTopicFilterInternal(), username,
session.getClientID());
if (!topicReadable) {
if (existingSub.hasShareName()) {
subscriptionsDirectory.removeSharedSubscription(existingSub);
} else {
subscriptionsDirectory.removeSubscription(existingSub);
}
session.removeSubscription(existingSub.getTopicFilter());
session.removeSubscription(existingSub.getTopicFilterClient());
}
// TODO
// subscriptionsDirectory.reactivate(existingSub.getTopicFilter(), session.getClientID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private List<Subscription> recursiveMatch(Topic topicName, INode inode, int dept
public boolean addToTree(Subscription sub) {
Action res;
do {
res = insert(sub.getTopicFilter(), this.root, sub);
res = insert(sub.getTopicFilterInternal(), this.root, sub);
} while (res == Action.REPEAT);
return res == Action.OK_NEW;
}
Expand Down Expand Up @@ -193,7 +193,7 @@ private INode createLeafNodes(Token token, Subscription sub) {
public void removeFromTree(Subscription sub) {
Action res;
do {
res = remove(sub.getClientId(), sub.getTopicFilter(), this.root, NO_PARENT, sub);
res = remove(sub.getClientId(), sub.getTopicFilterInternal(), this.root, NO_PARENT, sub);
} while (res == Action.REPEAT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private String prettySubscriptions(CNode node) {
int counter = 0;
for (Subscription couple : node.subscriptions()) {
subScriptionsStr
.append("{filter=").append(couple.topicFilter).append(", ")
.append("{filter=").append(couple.getTopicFilterInternal()).append(", ")
.append("option=").append(couple.getOption()).append(", ")
.append("client='").append(couple.clientId).append("'}");
counter++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public final class Subscription implements Serializable, Comparable<Subscription
private static final long serialVersionUID = -3383457629635732794L;
private final MqttSubscriptionOption option;
final String clientId;
final Topic topicFilter;
final Topic topicFilterClient;
private Topic topicFilterInternal;
final ShareName shareName;

private final Optional<SubscriptionIdentifier> subscriptionId;
Expand Down Expand Up @@ -60,7 +61,8 @@ public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption o
public Subscription(String clientId, Topic topicFilter, MqttSubscriptionOption options, ShareName shareName,
Optional<SubscriptionIdentifier> subscriptionId) {
this.clientId = clientId;
this.topicFilter = topicFilter;
this.topicFilterClient = topicFilter;
this.topicFilterInternal = topicFilter;
this.shareName = shareName;
this.subscriptionId = subscriptionId;
this.option = options;
Expand All @@ -70,8 +72,16 @@ public String getClientId() {
return clientId;
}

public Topic getTopicFilter() {
return topicFilter;
public Topic getTopicFilterInternal() {
return topicFilterInternal;
}

public void setTopicFilterInternal(Topic topicFilterInternal) {
this.topicFilterInternal = topicFilterInternal;
}

public Topic getTopicFilterClient() {
return topicFilterClient;
}

/**
Expand All @@ -82,9 +92,18 @@ public Topic getTopicFilter() {
*/
public String getOriginalTopicFilterWithSharename() {
if (shareName.isEmpty()) {
return topicFilter.toString();
return topicFilterClient.toString();
}
return "$share/" + shareName.getShareName() + "/" + topicFilter.toString();
return "$share/" + shareName.getShareName() + "/" + topicFilterClient.toString();
}

/**
* Check if the client topic was rewritten to a different internal topic.
*
* @return true if the client topic is different from the internal topic.
*/
public boolean isTopicRewritten() {
return topicFilterInternal.equals(topicFilterClient);
}

public boolean qosLessThan(Subscription sub) {
Expand All @@ -106,17 +125,17 @@ public boolean equals(Object o) {
Subscription that = (Subscription) o;
return Objects.equals(clientId, that.clientId) &&
Objects.equals(shareName, that.shareName) &&
Objects.equals(topicFilter, that.topicFilter);
Objects.equals(topicFilterInternal, that.topicFilterInternal);
}

@Override
public int hashCode() {
return Objects.hash(clientId, shareName, topicFilter);
return Objects.hash(clientId, shareName, topicFilterInternal);
}

@Override
public String toString() {
return String.format("[filter:%s, clientID: %s, options: %s - shareName: %s]", topicFilter, clientId, option, shareName);
return String.format("[filter:%s, clientID: %s, options: %s - shareName: %s]", topicFilterInternal, clientId, option, shareName);
}

@Override
Expand All @@ -139,7 +158,7 @@ public int compareTo(Subscription o) {
if (compare != 0) {
return compare;
}
return this.topicFilter.compareTo(o.topicFilter);
return this.topicFilterInternal.compareTo(o.topicFilterInternal);
}

public String clientAndShareName() {
Expand Down
14 changes: 14 additions & 0 deletions broker/src/main/java/io/moquette/broker/subscriptions/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Topic implements Serializable, Comparable<Topic> {

private transient boolean valid;

private transient boolean hasWildcard;

/**
* Factory method
*
Expand Down Expand Up @@ -110,10 +112,12 @@ private List<Token> parseTopic(String topic) throws ParseException {
i);
}
res.add(Token.MULTI);
hasWildcard = true;
} else if (s.contains("#")) {
throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i);
} else if (s.equals("+")) {
res.add(Token.SINGLE);
hasWildcard = true;
} else if (s.contains("+")) {
throw new ParseException("Bad format of topic, invalid subtopic name: " + s, i);
} else {
Expand Down Expand Up @@ -158,6 +162,13 @@ public boolean isValid() {
return valid;
}

public boolean hasWildcard() {
if (tokens == null)
getTokens();
return hasWildcard;
}


/**
* Verify if the 2 topics matching respecting the rules of MQTT Appendix A
*
Expand Down Expand Up @@ -199,6 +210,9 @@ public String toString() {

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.moquette.broker.subscriptions.Subscription;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
Expand Down Expand Up @@ -145,7 +144,7 @@ public void notifyTopicPublished(final MqttPublishMessage msg, final String clie
public void notifyTopicSubscribed(final Subscription sub, final String username) {
for (final InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) {
LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}",
sub.getClientId(), sub.getTopicFilter(), handler.getID());
sub.getClientId(), sub.getTopicFilterInternal(), handler.getID());
executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username)));
}
}
Expand Down
Loading
Loading