Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<!-- testing dependencies -->
<dependency>
<groupId>org.apache.bookkeeper</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -228,7 +229,7 @@ static class ClientSideHandler extends ChannelDuplexHandler {
final ClientAuthProvider.Factory authProviderFactory;
ClientAuthProvider authProvider;
final AtomicLong transactionIdGenerator;
final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
final Queue<Pair<Object, ChannelPromise>> waitingForAuth = new ConcurrentLinkedQueue<>();
final ClientConnectionPeer connectionPeer;

private final boolean isUsingV2Protocol;
Expand Down Expand Up @@ -349,7 +350,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
}
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
Expand All @@ -358,10 +359,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
} else {
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
Expand Down Expand Up @@ -427,10 +428,10 @@ public void operationComplete(int rc, Void v) {
if (rc == BKException.Code.OK) {
synchronized (this) {
authenticated = true;
Object msg = waitingForAuth.poll();
while (msg != null) {
ctx.writeAndFlush(msg);
msg = waitingForAuth.poll();
Pair<Object, ChannelPromise> pair = waitingForAuth.poll();
while (pair != null && pair.getKey() != null) {
ctx.writeAndFlush(pair.getKey(), pair.getValue());
pair = waitingForAuth.poll();
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ private static byte[] readMasterKey(ByteBuf packet) {

return masterKey;
}

public static void serializeAddRequests(Object request, ByteBufList buf) {
if (request instanceof ByteBuf) {
buf.add((ByteBuf) request);
} else if (request instanceof ByteBufList) {
buf.add((ByteBufList) request);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.procedures.ObjectProcedure;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -173,6 +176,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.DuplicateEntryIdException,
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.

private static final AtomicLong txnIdGenerator = new AtomicLong(0);

final BookieId bookieId;
Expand Down Expand Up @@ -349,6 +353,9 @@ enum ConnectionState {
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private ByteBufList pendingSendRequests = null;
private final ObjectSet<CompletionKey> pendingSendKeys = new ObjectHashSet<>();
private volatile boolean needFlush = true;

public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
Expand Down Expand Up @@ -1154,26 +1161,96 @@ private void writeAndFlush(final Channel channel,
}

try {
final long startTime = MathUtils.nowInNano();

ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
CompletionValue completion = completionObjects.get(key);
if (completion != null) {
completion.setOutstanding();
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
if (request instanceof ByteBuf || request instanceof ByteBufList) {
if (checkFlushPendingRequests(request)) {
flushPendingRequests();
}
});
channel.writeAndFlush(request, promise);
prepareSendRequests(request, key);
if (needFlush) {
flushPendingRequests();
}
} else {
final long startTime = MathUtils.nowInNano();

// promise complete trigger flush pending request.
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
CompletionValue completion = completionObjects.get(key);
if (completion != null) {
completion.setOutstanding();
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
channel.writeAndFlush(request, promise);
}
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
}
}

public synchronized void prepareSendRequests(Object request, CompletionKey key) {
if (pendingSendRequests == null) {
if (request instanceof ByteBuf) {
pendingSendRequests = ByteBufList.get((ByteBuf) request);
} else if (request instanceof ByteBufList) {
pendingSendRequests = ByteBufList.get((ByteBufList) request);
}
} else {
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
}
pendingSendKeys.add(key);
}

public synchronized boolean checkFlushPendingRequests(Object request) {
if (pendingSendRequests == null) {
return false;
}

int numBytes = request instanceof ByteBuf
? ((ByteBuf) request).readableBytes() : ((ByteBufList) request).readableBytes();
return pendingSendRequests.readableBytes() + numBytes > maxFrameSize;
}

public synchronized void flushPendingRequests() {
if (pendingSendRequests == null) {
needFlush = true;
return;
}

final long startTime = MathUtils.nowInNano();
ObjectSet<CompletionKey> keys = new ObjectHashSet<>(pendingSendKeys);
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
keys.forEach((ObjectProcedure<? super CompletionKey>) k -> {
CompletionValue completion = completionObjects.get(k);
if (completion != null) {
completion.setOutstanding();
}
});

} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
flushPendingRequests();
});

if (channel != null && channel.isActive()) {
channel.writeAndFlush(pendingSendRequests, promise);
} else {
pendingSendRequests.release();
pendingSendKeys.forEach((ObjectProcedure<? super CompletionKey>) key ->
errorOut(key, BKException.Code.TooManyRequestsException));
}
pendingSendRequests = null;
pendingSendKeys.clear();
needFlush = false;
}

void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,19 @@ public void operationComplete(int rc, Set<LedgerFragment> fragments) {
if (bookies.isEmpty()) {
// no missing fragments
callback.processResult(BKException.Code.OK, null, null);
return;
} else {
publishSuspectedLedgersAsync(bookies.stream().map(BookieId::toString).collect(Collectors.toList()),
Sets.newHashSet(lh.getId())
).whenComplete((result, cause) -> {
if (null != cause) {
LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
lh.getId(), bookies, cause);
callback.processResult(BKException.Code.ReplicationException, null, null);
} else {
callback.processResult(BKException.Code.OK, null, null);
}
});
}
publishSuspectedLedgersAsync(bookies.stream().map(BookieId::toString).collect(Collectors.toList()),
Sets.newHashSet(lh.getId())
).whenComplete((result, cause) -> {
if (null != cause) {
LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
lh.getId(), bookies, cause);
callback.processResult(BKException.Code.ReplicationException, null, null);
} else {
callback.processResult(BKException.Code.OK, null, null);
}
});
} else {
callback.processResult(rc, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public static ByteBufList get(ByteBuf b1) {
return buf;
}

public static ByteBufList get(ByteBufList b1) {
ByteBufList buf = get();
for (int i = 0; i < b1.buffers.size(); ++i) {
buf.add(b1.buffers.get(i));
}
return buf;
}


/**
* Get a new {@link ByteBufList} instance from the pool that is the clone of an already existing instance.
*/
Expand Down Expand Up @@ -149,6 +158,10 @@ public void add(ByteBuf buf) {
}
}

public void add(ByteBufList b1) {
buffers.addAll(b1.buffers);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
Expand Down Expand Up @@ -276,6 +289,13 @@ public static ByteBuf coalesce(ByteBufList list) {
return res;
}

public void writeTo(ByteBuf buf) {
for (int i = 0; i < buffers.size(); ++i) {
ByteBuf b = buffers.get(i);
buf.writeBytes(b, b.readerIndex(), b.readableBytes());
}
}

@Override
public ByteBufList retain() {
super.retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public SyncObj() {
@Override
@Before
public void setUp() throws Exception {
baseConf.setJournalWriteData(writeJournal);
baseClientConf.setUseV2WireProtocol(useV2);
super.setUp();
rng = new Random(0); // Initialize the Random
// Number Generator
Expand All @@ -136,14 +138,12 @@ public BookieWriteLedgerTest() {
String ledgerManagerFactory = "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory";
// set ledger manager
baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
baseConf.setJournalWriteData(writeJournal);
/*
* 'testLedgerCreateAdvWithLedgerIdInLoop2' testcase relies on skipListSizeLimit,
* so setting it to some small value for making that testcase lite.
*/
baseConf.setSkipListSizeLimit(4 * 1024 * 1024);
baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
baseClientConf.setUseV2WireProtocol(useV2);
}

/**
Expand Down Expand Up @@ -1549,4 +1549,29 @@ public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedg
}

}

@Test
public void testReadWriteEntry() throws Exception {
lh = bkc.createLedgerAdv(1, 1, 1, digestType, ledgerPassword);
numEntriesToWrite = 10000;
List<byte[]> entries = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(numEntriesToWrite);
for (int i = 0; i < numEntriesToWrite; ++i) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries.add(entry.array());
lh.asyncAddEntry(i, entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
assertEquals(0, rc);
latch.countDown();
}
}, null);
}
latch.await();
readEntries(lh, entries);
lh.close();

}
}
11 changes: 0 additions & 11 deletions circe-checksum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco-maven-plugin.version}</version>
<configuration>
<excludes>
<!-- this class is generated -->
<exclude>com/scurrilous/circe/checksum/NarSystem*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Loading