Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-parent</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>memq-deploy</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions memq-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-parent</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>memq-client-all</artifactId>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-client</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
Expand Down
2 changes: 1 addition & 1 deletion memq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-parent</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>memq-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion memq-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-parent</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>memq-commons</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion memq-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-parent</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>memq-examples</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion memq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.pinterest.memq</groupId>
<artifactId>memq-parent</artifactId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>memq</artifactId>
Expand Down
60 changes: 55 additions & 5 deletions memq/src/main/java/com/pinterest/memq/core/MemqManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MemqManager implements Managed {
private static final Gson gson = new Gson();
private Map<String, TopicProcessor> processorMap = new ConcurrentHashMap<>();
private Map<String, TopicAssignment> topicMap = new ConcurrentHashMap<>();
private Map<String, Long> topicLastAccessMs = new ConcurrentHashMap<>();
private MemqConfig configuration;
private ScheduledExecutorService timerService;
private ScheduledExecutorService cleanupService;
Expand Down Expand Up @@ -89,18 +90,24 @@ public void init() throws Exception {
if (file.exists()) {
byte[] bytes = Files.readAllBytes(file.toPath());
TopicAssignment[] topics = gson.fromJson(new String(bytes), TopicAssignment[].class);
topicMap = new ConcurrentHashMap<>();
for (TopicAssignment topicConfig : topics) {
topicMap.put(topicConfig.getTopic(), topicConfig);
if (topics != null) {
topicMap = new ConcurrentHashMap<>();
for (TopicAssignment topicConfig : topics) {
topicMap.put(topicConfig.getTopic(), topicConfig);
}
}
}
if (configuration.getTopicConfig() != null) {
for (TopicConfig topicConfig : configuration.getTopicConfig()) {
topicMap.put(topicConfig.getTopic(), new TopicAssignment(topicConfig, -1));
}
}
for (Entry<String, TopicAssignment> entry : topicMap.entrySet()) {
createTopicProcessor(entry.getValue());
boolean assignmentsEnabled = configuration.getClusteringConfig() == null
|| configuration.getClusteringConfig().isEnableAssignments();
if (assignmentsEnabled) {
for (Entry<String, TopicAssignment> entry : topicMap.entrySet()) {
createTopicProcessor(entry.getValue());
}
}
}

Expand Down Expand Up @@ -148,6 +155,7 @@ public void createTopicProcessor(TopicAssignment topicConfig) throws BadRequestE

processorMap.put(topicConfig.getTopic(), tp);
topicMap.put(topicConfig.getTopic(), topicConfig);
topicLastAccessMs.put(topicConfig.getTopic(), System.currentTimeMillis());
logger.info("Configured and started TopicProcessor for:" + topicConfig.getTopic());
}

Expand Down Expand Up @@ -183,6 +191,7 @@ public Future<?> deleteTopicProcessor(String topic) {
}
processorMap.remove(topic);
topicMap.remove(topic);
topicLastAccessMs.remove(topic);
});
}

Expand Down Expand Up @@ -211,6 +220,47 @@ public Map<String, MetricRegistry> getRegistry() {
return metricsRegistryMap;
}

public TopicAssignment getTopicAssignment(String topic) {
return topicMap.get(topic);
}

public TopicProcessor getOrCreateTopicProcessor(String topic) throws Exception {
TopicProcessor topicProcessor = processorMap.get(topic);
if (topicProcessor != null) {
return topicProcessor;
}
TopicAssignment assignment = topicMap.get(topic);
if (assignment == null) {
throw new NotFoundException("Topic not found:" + topic);
}
createTopicProcessor(assignment);
return processorMap.get(topic);
}

public void touchTopic(String topic) {
topicLastAccessMs.put(topic, System.currentTimeMillis());
}

public void startIdleTopicCleanup(long maxIdleMs) {
if (maxIdleMs <= 0) {
return;
}
cleanupService.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
for (Entry<String, Long> entry : topicLastAccessMs.entrySet()) {
String topic = entry.getKey();
Long lastAccess = entry.getValue();
if (lastAccess == null) {
continue;
}
if (now - lastAccess > maxIdleMs && processorMap.containsKey(topic)) {
logger.info("Deleting idle TopicProcessor for topic:" + topic);
deleteTopicProcessor(topic);
}
}
}, maxIdleMs, maxIdleMs, TimeUnit.MILLISECONDS);
}

@Override
public void start() throws Exception {
logger.info("Memq manager started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.pinterest.memq.core.clustering;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -125,18 +126,22 @@ public void takeLeadership(CuratorFramework client) throws Exception {
}
});

if (clusteringConfig.isEnableBalancer()) {
Balancer balancer = new Balancer(config, this, client, leaderSelector);
Thread thBalancer = new Thread(balancer);
thBalancer.setName("BalancerThread");
thBalancer.setDaemon(true);
thBalancer.start();
}
if (clusteringConfig.isEnableAssignments()) {
if (clusteringConfig.isEnableBalancer()) {
Balancer balancer = new Balancer(config, this, client, leaderSelector);
Thread thBalancer = new Thread(balancer);
thBalancer.setName("BalancerThread");
thBalancer.setDaemon(true);
thBalancer.start();
}

Thread th = new Thread(new MetadataPoller(client, topicMetadataMap));
th.setName("MetadataPollerThread");
th.setDaemon(true);
th.start();
Thread th = new Thread(new MetadataPoller(client, topicMetadataMap));
th.setName("MetadataPollerThread");
th.setDaemon(true);
th.start();
} else {
mgr.startIdleTopicCleanup(clusteringConfig.getMaxIdleMs());
}

if (clusteringConfig.isEnableLeaderSelector()) {
leaderSelector.autoRequeue();
Expand Down Expand Up @@ -171,7 +176,7 @@ private void initializeZNodesAndWatchers(CuratorFramework client) throws Excepti
client.create().withMode(CreateMode.EPHEMERAL).forPath(brokerZnodePath,
GSON.toJson(broker).getBytes());

if (clusteringConfig.isEnableLocalAssigner()) {
if (clusteringConfig.isEnableAssignments() && clusteringConfig.isEnableLocalAssigner()) {
Thread th = new Thread(new TopicAssignmentWatcher(mgr, brokerZnodePath, broker, client));
th.setDaemon(true);
th.setName("TopicAssignmentWatcher");
Expand All @@ -183,6 +188,31 @@ public Map<String, TopicMetadata> getTopicMetadataMap() {
return topicMetadataMap;
}

public TopicConfig getTopicConfig(String topic) throws Exception {
if (client == null) {
return null;
}
String path = ZNODE_TOPICS_BASE + topic;
if (client.checkExists().forPath(path) == null) {
return null;
}
String topicConfig = new String(client.getData().forPath(path));
return GSON.fromJson(topicConfig, TopicConfig.class);
}

public Set<Broker> getAllBrokers() throws Exception {
Set<Broker> brokers = new HashSet<>();
if (client == null) {
return brokers;
}
for (String id : client.getChildren().forPath(ZNODE_BROKERS)) {
String brokerInfo = new String(client.getData().forPath(ZNODE_BROKERS_BASE + id));
Broker broker = GSON.fromJson(brokerInfo, Broker.class);
brokers.add(broker);
}
return brokers;
}

public static List<String> convertTopicAssignmentsSetToList(Set<TopicAssignment> topicAssignments) {
return topicAssignments.stream().map(TopicConfig::getTopic).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class ClusteringConfig {
private boolean enableLocalAssigner = true;
private boolean addBootstrapTopics = true;
private boolean enableExpiration = true;
private boolean enableAssignments = true;
private long maxIdleMs = 5 * 60 * 1000; // 5 minutes

public boolean isAddBootstrapTopics() {
return addBootstrapTopics;
Expand Down Expand Up @@ -80,4 +82,20 @@ public boolean isEnableExpiration() {
public void setEnableExpiration(boolean enableExpiration) {
this.enableExpiration = enableExpiration;
}

public boolean isEnableAssignments() {
return enableAssignments;
}

public void setEnableAssignments(boolean enableAssignments) {
this.enableAssignments = enableAssignments;
}

public long getMaxIdleMs() {
return maxIdleMs;
}

public void setMaxIdleMs(long maxIdleMs) {
this.maxIdleMs = maxIdleMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class NettyServerConfig {
private int brokerInputTrafficShapingMetricsReportIntervalSec = 60; // 1 minute by default
// SSL
private SSLConfig sslConfig;
// Fair queueing configuration
private QueueingConfig queueingConfig = null;

public int getBrokerInputTrafficShapingMetricsReportIntervalSec() {
return brokerInputTrafficShapingMetricsReportIntervalSec;
Expand Down Expand Up @@ -94,4 +96,12 @@ public void setEnableEpoll(boolean enableEpoll) {
this.enableEpoll = enableEpoll;
}

public QueueingConfig getQueueingConfig() {
return queueingConfig;
}

public void setQueueingConfig(QueueingConfig queueingConfig) {
this.queueingConfig = queueingConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright 2022 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.memq.core.config;

import com.pinterest.memq.core.rpc.queue.DeficitRoundRobinStrategy;

import java.util.HashMap;
import java.util.Map;

/**
* Configuration for fair-queueing mechanism in packet processing.
*/
public class QueueingConfig {

private boolean enabled = false;

/**
* The fully qualified class name of the QueueingStrategy implementation to use.
* Default is DeficitRoundRobinStrategy.
*/
private String strategyClass = DeficitRoundRobinStrategy.class.getName();

/**
* Number of threads in the dequeue thread pool.
* Each thread processes a subset of the topic queues.
*/
private int dequeueThreadPoolSize = 4;

/**
* Maximum bytes of pending requests per topic queue.
* If a queue exceeds this limit, requests will be rejected.
* Default is 100MB per topic.
*/
private long maxQueueBytesPerTopic = 100 * 1024 * 1024; // 100MB default

/**
* Strategy-specific configuration options.
* Keys and values depend on the strategy implementation.
* For example, DeficitRoundRobinStrategy uses "quantum" key.
*/
private Map<String, Object> strategyConfig = new HashMap<>();

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public String getStrategyClass() {
return strategyClass;
}

public void setStrategyClass(String strategyClass) {
this.strategyClass = strategyClass;
}

public int getDequeueThreadPoolSize() {
return dequeueThreadPoolSize;
}

public void setDequeueThreadPoolSize(int dequeueThreadPoolSize) {
this.dequeueThreadPoolSize = dequeueThreadPoolSize;
}

public long getMaxQueueBytesPerTopic() {
return maxQueueBytesPerTopic;
}

public void setMaxQueueBytesPerTopic(long maxQueueBytesPerTopic) {
this.maxQueueBytesPerTopic = maxQueueBytesPerTopic;
}

public Map<String, Object> getStrategyConfig() {
return strategyConfig;
}

public void setStrategyConfig(Map<String, Object> strategyConfig) {
this.strategyConfig = strategyConfig;
}
}
Loading