Skip to content

Commit f56999a

Browse files
author
xiangying
committed
add test
1 parent 90965ce commit f56999a

2 files changed

Lines changed: 69 additions & 28 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,21 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21-
import static org.testng.Assert.assertEquals;
22-
import static org.testng.Assert.assertFalse;
23-
import static org.testng.Assert.assertNotNull;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.spy;
24+
import static org.testng.Assert.*;
2425

2526
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
2831
import lombok.Cleanup;
2932
import lombok.extern.slf4j.Slf4j;
3033
import org.apache.pulsar.broker.BrokerTestUtil;
3134
import org.apache.pulsar.broker.service.ServerCnx;
32-
import org.apache.pulsar.client.api.BatcherBuilder;
33-
import org.apache.pulsar.client.api.Consumer;
34-
import org.apache.pulsar.client.api.Message;
35-
import org.apache.pulsar.client.api.MessageId;
36-
import org.apache.pulsar.client.api.Producer;
37-
import org.apache.pulsar.client.api.ProducerConsumerBase;
38-
import org.apache.pulsar.client.api.SubscriptionType;
35+
import org.apache.pulsar.client.api.*;
3936
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
4037
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
4138
import org.awaitility.Awaitility;
@@ -230,4 +227,44 @@ public void testRetentionPolicyByProducingMessages() throws Exception {
230227
assertEquals(internalStats.ledgers.size(), 1);
231228
});
232229
}
230+
231+
232+
@Test
233+
public void testProducerCompressionMinMsgBodySize() throws PulsarClientException {
234+
byte[] msg1022 = new byte[1022];
235+
byte[] msg1025 = new byte[1025];
236+
AtomicBoolean isCompressed = new AtomicBoolean(false);
237+
final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
238+
@Cleanup
239+
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
240+
.topic(topicName)
241+
.create();
242+
producer = spy(producer);
243+
doAnswer(invocation -> {
244+
isCompressed.set(false);
245+
return null;
246+
}).when(producer).applyCompression(any());
247+
248+
producer.conf.setCompressMinMsgBodySize(1024);
249+
producer.conf.setCompressionType(CompressionType.LZ4);
250+
// disable batch
251+
producer.conf.setBatchingEnabled(false);
252+
isCompressed.set(false);
253+
producer.newMessage().value(msg1022).send();
254+
assertFalse(isCompressed.get());
255+
256+
isCompressed.set(false);
257+
producer.newMessage().value(msg1025).send();
258+
assertTrue(isCompressed.get());
259+
260+
// enable batch
261+
producer.conf.setBatchingEnabled(true);
262+
isCompressed.set(false);
263+
producer.newMessage().value(msg1022).send();
264+
assertFalse(isCompressed.get());
265+
266+
isCompressed.set(false);
267+
producer.newMessage().value(msg1025).send();
268+
assertTrue(isCompressed.get());
269+
}
233270
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
477477
* @param payload
478478
* @return a new payload
479479
*/
480-
protected ByteBuf applyCompression(ByteBuf payload) {
480+
@VisibleForTesting
481+
public ByteBuf applyCompression(ByteBuf payload) {
481482
ByteBuf compressedPayload = compressor.encode(payload);
482483
payload.release();
483484
return compressedPayload;
@@ -504,23 +505,26 @@ public void sendAsync(Message<?> message, SendCallback callback) {
504505
boolean compressed = false;
505506
// Batch will be compressed when closed
506507
// If a message has a delayed delivery time, we'll always send it individually
507-
if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime())
508-
&& payload.readableBytes() > conf.getCompressMinMsgBodySize())) {
509-
compressedPayload = applyCompression(payload);
510-
compressed = true;
511-
512-
// validate msg-size (For batching this will be check at the batch completion size)
513-
int compressedSize = compressedPayload.readableBytes();
514-
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
515-
compressedPayload.release();
516-
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
517-
PulsarClientException.InvalidMessageException invalidMessageException =
518-
new PulsarClientException.InvalidMessageException(
519-
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
520-
+ " %d bytes",
521-
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
522-
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
523-
return;
508+
if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) {
509+
if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {
510+
511+
} else {
512+
compressedPayload = applyCompression(payload);
513+
compressed = true;
514+
515+
// validate msg-size (For batching this will be check at the batch completion size)
516+
int compressedSize = compressedPayload.readableBytes();
517+
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
518+
compressedPayload.release();
519+
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
520+
PulsarClientException.InvalidMessageException invalidMessageException =
521+
new PulsarClientException.InvalidMessageException(
522+
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
523+
+ " %d bytes",
524+
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
525+
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
526+
return;
527+
}
524528
}
525529
}
526530

0 commit comments

Comments
 (0)