Skip to content

Commit f05d8ae

Browse files
committed
Fix TLS stability issues with V2 protocol that caused data corruption
- add the TLS handler after the FlushConsolidationHandler - This makes TLS connections from Pulsar Broker to Bookkeeper stable when bookkeeperUseV2WireProtocol=true is used - Fix test TestTLS for V2 - Fix inconsistency in client configuration in BookKeeperClusterTestCase
1 parent 2970aef commit f05d8ae

5 files changed

Lines changed: 14 additions & 14 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
class BookieNettyServer {
9393

9494
private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
95+
public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
9596

9697
final int maxFrameSize;
9798
final ServerConfiguration conf;
@@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
344345
new BookieSideConnectionPeerContextHandler();
345346
ChannelPipeline pipeline = ch.pipeline();
346347

347-
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
348+
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
348349

349350
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
350351

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,10 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
580580
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
581581
writeAndFlush(c, response.build());
582582
} else {
583+
LOG.info("Starting TLS handshake with client on channel {}", c);
583584
// there is no need to execute in a different thread as this operation is light
584585
SslHandler sslHandler = shFactory.newTLSHandler();
585-
c.pipeline().addFirst("tls", sslHandler);
586+
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, "tls", sslHandler);
586587

587588
response.setStatus(BookkeeperProtocol.StatusCode.EOK);
588589
BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
175175
BKException.Code.WriteOnReadOnlyBookieException));
176176
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
177177
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
178+
static final String CONSOLIDATION_HANDLER_NAME = "consolidation";
178179

179180
final BookieId bookieId;
180181
final BookieAddressResolver bookieAddressResolver;
@@ -595,7 +596,7 @@ protected ChannelFuture connect() {
595596
@Override
596597
protected void initChannel(Channel ch) throws Exception {
597598
ChannelPipeline pipeline = ch.pipeline();
598-
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
599+
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
599600
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
600601
pipeline.addLast("lengthbasedframedecoder",
601602
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
@@ -1573,8 +1574,10 @@ void initTLSHandshake() {
15731574
} else {
15741575
throw new RuntimeException("Unexpected socket address type");
15751576
}
1577+
LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort());
15761578
SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
1577-
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
1579+
channel.pipeline()
1580+
.addAfter(CONSOLIDATION_HANDLER_NAME, parentObj.shFactory.getHandlerName(), handler);
15781581
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
15791582
@Override
15801583
public void operationComplete(Future<Channel> future) throws Exception {

bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception {
318318
}
319319

320320
protected ClientConfiguration newClientConfiguration() {
321-
return new ClientConfiguration(baseConf);
321+
return new ClientConfiguration(baseClientConf);
322322
}
323323

324324
protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {

bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception {
350350
*/
351351
@Test
352352
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
353-
// skip test
354-
if (useV2Protocol) {
355-
return;
356-
}
357-
358353
restartBookies(c -> {
359354
c.setDisableServerSocketBind(true);
360355
c.setEnableLocalTransport(true);
@@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio
622617
*/
623618
@Test
624619
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception {
625-
if (useV2Protocol) {
626-
return;
627-
}
628-
629620
restartBookies(c -> {
630621
c.setBookieAuthProviderFactoryClass(
631622
AllowOnlyClientsWithX509Certificates.class.getName());
@@ -756,6 +747,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
756747
testClient(clientConf, numBookies);
757748
fail("Shouldn't be able to connect");
758749
} catch (BKException.BKUnauthorizedAccessException authFailed) {
750+
} catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) {
751+
if (!useV2Protocol) {
752+
fail("Unexpected exception occurred.");
753+
}
759754
}
760755

761756
assertFalse(secureBookieSideChannel);

0 commit comments

Comments
 (0)