Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ public class GrpcLogAppender extends LogAppenderBase {
private enum BatchLogKey implements BatchLogger.Key {
RESET_CLIENT,
INCONSISTENCY_REPLY,
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR,
INSTALL_SNAPSHOT_NOTIFY,
INSTALL_SNAPSHOT_REPLY,
INSTALL_SNAPSHOT_IN_PROGRESS,
SNAPSHOT_UNAVAILABLE
}

public static final int INSTALL_SNAPSHOT_NOTIFICATION_INDEX = 0;
Expand Down Expand Up @@ -234,7 +238,7 @@ private void resetClient(AppendEntriesRequest request, Event event) {
}
getFollower().computeNextIndex(getNextIndexForError(nextIndex));
} catch (IOException ie) {
LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie);
LOG.warn("{}: Failed to resetClient for {}", this, getFollowerId(), ie);
}
}

Expand Down Expand Up @@ -497,8 +501,8 @@ public void onNext(AppendEntriesReplyProto reply) {
try {
onNextImpl(request, reply);
} catch(Exception t) {
LOG.error("Failed onNext request=" + request
+ ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t);
LOG.error("Failed onNext(reply), request={}, reply={}",
request, ServerStringUtils.toAppendEntriesReplyString(reply), t);
}
}

Expand Down Expand Up @@ -573,8 +577,8 @@ private void updateNextIndex(long replyNextIndex) {
}

private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass());
private final Queue<Integer> pending;
private final String name;
private final Queue<Integer> pending = new LinkedList<>();
private final CompletableFuture<Void> done = new CompletableFuture<>();
private final boolean isNotificationOnly;

Expand All @@ -583,8 +587,8 @@ private class InstallSnapshotResponseHandler implements StreamObserver<InstallSn
}

InstallSnapshotResponseHandler(boolean notifyOnly) {
pending = new LinkedList<>();
this.isNotificationOnly = notifyOnly;
this.name = getFollower().getName() + "-InstallSnapshot" + (isNotificationOnly ? "Notification" : "");
}

void addPending(InstallSnapshotRequestProto request) {
Expand Down Expand Up @@ -626,8 +630,8 @@ void onFollowerCatchup(long followerSnapshotIndex) {
final long leaderStartIndex = getRaftLog().getStartIndex();
final long followerNextIndex = followerSnapshotIndex + 1;
if (followerNextIndex >= leaderStartIndex) {
LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}",
this, followerNextIndex);
LOG.info("{}: follower nextIndex = {} >= leader startIndex = {}",
this, followerNextIndex, leaderStartIndex);
notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex);
}
}
Expand Down Expand Up @@ -659,10 +663,10 @@ boolean hasAllResponse() {

@Override
public void onNext(InstallSnapshotReplyProto reply) {
if (LOG.isInfoEnabled()) {
LOG.info("{}: received {} reply {}", this, replyState.isFirstReplyReceived()? "a" : "the first",
ServerStringUtils.toInstallSnapshotReplyString(reply));
}
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, name,
suffix -> LOG.info("{}: received {} reply {} {}", this,
replyState.isFirstReplyReceived() ? "a" : "the first",
ServerStringUtils.toInstallSnapshotReplyString(reply), suffix));

// update the last rpc time
getFollower().updateLastRpcResponseTime();
Expand All @@ -671,12 +675,13 @@ public void onNext(InstallSnapshotReplyProto reply) {
final long followerSnapshotIndex;
switch (reply.getResult()) {
case SUCCESS:
LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply);
LOG.info("{}: Completed", this);
getFollower().setAttemptedToInstallSnapshot();
removePending(reply);
break;
case IN_PROGRESS:
LOG.info("{}: InstallSnapshot in progress.", this);
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_IN_PROGRESS, name,
suffix -> LOG.info("{}: in progress, {}", this, suffix));
removePending(reply);
break;
case ALREADY_INSTALLED:
Expand All @@ -692,7 +697,7 @@ public void onNext(InstallSnapshotReplyProto reply) {
onFollowerTerm(reply.getTerm());
break;
case CONF_MISMATCH:
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
LOG.error("{}: CONF_MISMATCH ({}): Leader {} has it set to {} but follower {} has it set to {}",
this, RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
getServer().getId(), installSnapshotEnabled, getFollowerId(), !installSnapshotEnabled);
break;
Expand All @@ -707,17 +712,19 @@ public void onNext(InstallSnapshotReplyProto reply) {
removePending(reply);
break;
case SNAPSHOT_UNAVAILABLE:
LOG.info("{}: Follower could not install snapshot as it is not available.", this);
BatchLogger.print(BatchLogKey.SNAPSHOT_UNAVAILABLE, name,
suffix -> LOG.info("{}: Follower failed since the snapshot is unavailable {}", this, suffix));
getFollower().setAttemptedToInstallSnapshot();
notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX);
removePending(reply);
break;
case UNRECOGNIZED:
LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}",
reply.getResult(), getServer().getId(), getFollowerId());
LOG.error("{}: Reply result {}, {}",
name, reply.getResult(), ServerStringUtils.toInstallSnapshotReplyString(reply));
break;
case SNAPSHOT_EXPIRED:
LOG.warn("{}: Follower could not install snapshot as it is expired.", this);
LOG.warn("{}: Follower failed since the request expired, {}",
name, ServerStringUtils.toInstallSnapshotReplyString(reply));
default:
break;
}
Expand Down Expand Up @@ -796,8 +803,9 @@ private void installSnapshot(SnapshotInfo snapshot) {
* @param firstAvailable the first available log's index on the Leader
*/
private void notifyInstallSnapshot(TermIndex firstAvailable) {
LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}",
this, firstAvailable, getFollower().getNextIndex());
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_NOTIFY, getFollower().getName(),
suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={} {}",
this, firstAvailable, getFollower().getNextIndex(), suffix));

final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true);
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
Expand Down
Loading