Skip to content

Commit 77b09c2

Browse files
committed
Extract helper class
1 parent 501627f commit 77b09c2

2 files changed

Lines changed: 76 additions & 33 deletions

File tree

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.apache.bookkeeper.proto;
20+
21+
import com.google.protobuf.ByteString;
22+
import com.google.protobuf.UnsafeByteOperations;
23+
import io.netty.buffer.ByteBuf;
24+
import java.nio.ByteBuffer;
25+
import org.apache.bookkeeper.util.ByteBufList;
26+
27+
public class ByteStringUtil {
28+
29+
/**
30+
* Wrap the internal buffers of a ByteBufList into a single ByteString.
31+
* The lifecycle of the wrapped ByteString is tied to the ByteBufList.
32+
*
33+
* @param bufList ByteBufList to wrap
34+
* @return ByteString wrapping the internal buffers of the ByteBufList
35+
*/
36+
public static ByteString byteBufListToByteString(ByteBufList bufList) {
37+
ByteString aggregated = null;
38+
for (int i = 0; i < bufList.size(); i++) {
39+
aggregated = byteBufToByteString(aggregated, bufList.getBuffer(i));
40+
}
41+
return aggregated != null ? aggregated : ByteString.EMPTY;
42+
}
43+
44+
/**
45+
* Wrap the internal buffers of a ByteBuf into a single ByteString.
46+
* The lifecycle of the wrapped ByteString is tied to the ByteBuf.
47+
*
48+
* @param byteBuf ByteBuf to wrap
49+
* @return ByteString wrapping the internal buffers of the ByteBuf
50+
*/
51+
public static ByteString byteBufToByteString(ByteBuf byteBuf) {
52+
return byteBufToByteString(null, byteBuf);
53+
}
54+
55+
// internal method to aggregate a ByteBuf into a single aggregated ByteString
56+
private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) {
57+
if (byteBuf.nioBufferCount() > 1) {
58+
for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) {
59+
ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
60+
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
61+
}
62+
} else {
63+
ByteString piece;
64+
if (byteBuf.hasArray()) {
65+
piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset(),
66+
byteBuf.readableBytes());
67+
} else {
68+
piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer());
69+
}
70+
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
71+
}
72+
return aggregated;
73+
}
74+
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.net.InetSocketAddress;
7070
import java.net.SocketAddress;
7171
import java.net.UnknownHostException;
72-
import java.nio.ByteBuffer;
7372
import java.security.cert.Certificate;
7473
import java.util.ArrayDeque;
7574
import java.util.ArrayList;
@@ -701,7 +700,7 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
701700
.setVersion(ProtocolVersion.VERSION_THREE)
702701
.setOperation(OperationType.WRITE_LAC)
703702
.setTxnId(txnId);
704-
ByteString body = byteBufListToByteString(toSend);
703+
ByteString body = ByteStringUtil.byteBufListToByteString(toSend);
705704
toSend.retain();
706705
Runnable cleanupActionFailedBeforeWrite = toSend::release;
707706
Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
@@ -808,7 +807,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
808807
}
809808

810809
ByteBufList bufToSend = (ByteBufList) toSend;
811-
ByteString body = byteBufListToByteString(bufToSend);
810+
ByteString body = ByteStringUtil.byteBufListToByteString(bufToSend);
812811
bufToSend.retain();
813812
cleanupActionFailedBeforeWrite = bufToSend::release;
814813
cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
@@ -841,36 +840,6 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
841840
cleanupActionAfterWrite);
842841
}
843842

844-
/**
845-
* Wrap the internal buffers of a ByteBufList into a single ByteString.
846-
* The lifecycle of the wrapped ByteString is tied to the ByteBufList.
847-
*
848-
* @param bufList ByteBufList to wrap
849-
* @return ByteString wrapping the internal buffers of the ByteBufList
850-
*/
851-
private static ByteString byteBufListToByteString(ByteBufList bufList) {
852-
ByteString aggregated = null;
853-
for (int i = 0; i < bufList.size(); i++) {
854-
ByteBuf buffer = bufList.getBuffer(i);
855-
ByteString piece;
856-
if (buffer.nioBufferCount() > 1) {
857-
for (ByteBuffer nioBuffer : buffer.nioBuffers()) {
858-
piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
859-
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
860-
}
861-
} else {
862-
if (buffer.hasArray()) {
863-
piece = UnsafeByteOperations.unsafeWrap(buffer.array(), buffer.arrayOffset(),
864-
buffer.readableBytes());
865-
} else {
866-
piece = UnsafeByteOperations.unsafeWrap(buffer.nioBuffer());
867-
}
868-
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
869-
}
870-
}
871-
return aggregated != null ? aggregated : ByteString.EMPTY;
872-
}
873-
874843
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
875844
Object request = null;
876845
CompletionKey completionKey = null;

0 commit comments

Comments
 (0)