From 584a897e2bc358d4790c734f7941285244f9e3f6 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 31 Oct 2025 10:21:14 -0700 Subject: [PATCH] RATIS-2348. GrpcLogAppender may print a lot of messages in an error condition. --- .../ratis/grpc/server/GrpcLogAppender.java | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 1622c5df9f..9ce45d1abb 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -73,7 +73,11 @@ public class GrpcLogAppender extends LogAppenderBase { private enum BatchLogKey implements BatchLogger.Key { RESET_CLIENT, INCONSISTENCY_REPLY, - APPEND_LOG_RESPONSE_HANDLER_ON_ERROR + APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, + INSTALL_SNAPSHOT_NOTIFY, + INSTALL_SNAPSHOT_REPLY, + INSTALL_SNAPSHOT_IN_PROGRESS, + SNAPSHOT_UNAVAILABLE } public static final int INSTALL_SNAPSHOT_NOTIFICATION_INDEX = 0; @@ -234,7 +238,7 @@ private void resetClient(AppendEntriesRequest request, Event event) { } getFollower().computeNextIndex(getNextIndexForError(nextIndex)); } catch (IOException ie) { - LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie); + LOG.warn("{}: Failed to resetClient for {}", this, getFollowerId(), ie); } } @@ -497,8 +501,8 @@ public void onNext(AppendEntriesReplyProto reply) { try { onNextImpl(request, reply); } catch(Exception t) { - LOG.error("Failed onNext request=" + request - + ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t); + LOG.error("Failed onNext(reply), request={}, reply={}", + request, ServerStringUtils.toAppendEntriesReplyString(reply), t); } } @@ -573,8 +577,8 @@ private void updateNextIndex(long replyNextIndex) { } private class InstallSnapshotResponseHandler implements StreamObserver { - private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass()); - private final Queue pending; + private final String name; + private final Queue pending = new LinkedList<>(); private final CompletableFuture done = new CompletableFuture<>(); private final boolean isNotificationOnly; @@ -583,8 +587,8 @@ private class InstallSnapshotResponseHandler implements StreamObserver(); this.isNotificationOnly = notifyOnly; + this.name = getFollower().getName() + "-InstallSnapshot" + (isNotificationOnly ? "Notification" : ""); } void addPending(InstallSnapshotRequestProto request) { @@ -626,8 +630,8 @@ void onFollowerCatchup(long followerSnapshotIndex) { final long leaderStartIndex = getRaftLog().getStartIndex(); final long followerNextIndex = followerSnapshotIndex + 1; if (followerNextIndex >= leaderStartIndex) { - LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}", - this, followerNextIndex); + LOG.info("{}: follower nextIndex = {} >= leader startIndex = {}", + this, followerNextIndex, leaderStartIndex); notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex); } } @@ -659,10 +663,10 @@ boolean hasAllResponse() { @Override public void onNext(InstallSnapshotReplyProto reply) { - if (LOG.isInfoEnabled()) { - LOG.info("{}: received {} reply {}", this, replyState.isFirstReplyReceived()? "a" : "the first", - ServerStringUtils.toInstallSnapshotReplyString(reply)); - } + BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, name, + suffix -> LOG.info("{}: received {} reply {} {}", this, + replyState.isFirstReplyReceived() ? "a" : "the first", + ServerStringUtils.toInstallSnapshotReplyString(reply), suffix)); // update the last rpc time getFollower().updateLastRpcResponseTime(); @@ -671,12 +675,13 @@ public void onNext(InstallSnapshotReplyProto reply) { final long followerSnapshotIndex; switch (reply.getResult()) { case SUCCESS: - LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply); + LOG.info("{}: Completed", this); getFollower().setAttemptedToInstallSnapshot(); removePending(reply); break; case IN_PROGRESS: - LOG.info("{}: InstallSnapshot in progress.", this); + BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_IN_PROGRESS, name, + suffix -> LOG.info("{}: in progress, {}", this, suffix)); removePending(reply); break; case ALREADY_INSTALLED: @@ -692,7 +697,7 @@ public void onNext(InstallSnapshotReplyProto reply) { onFollowerTerm(reply.getTerm()); break; case CONF_MISMATCH: - LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}", + LOG.error("{}: CONF_MISMATCH ({}): Leader {} has it set to {} but follower {} has it set to {}", this, RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY, getServer().getId(), installSnapshotEnabled, getFollowerId(), !installSnapshotEnabled); break; @@ -707,17 +712,19 @@ public void onNext(InstallSnapshotReplyProto reply) { removePending(reply); break; case SNAPSHOT_UNAVAILABLE: - LOG.info("{}: Follower could not install snapshot as it is not available.", this); + BatchLogger.print(BatchLogKey.SNAPSHOT_UNAVAILABLE, name, + suffix -> LOG.info("{}: Follower failed since the snapshot is unavailable {}", this, suffix)); getFollower().setAttemptedToInstallSnapshot(); notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX); removePending(reply); break; case UNRECOGNIZED: - LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}", - reply.getResult(), getServer().getId(), getFollowerId()); + LOG.error("{}: Reply result {}, {}", + name, reply.getResult(), ServerStringUtils.toInstallSnapshotReplyString(reply)); break; case SNAPSHOT_EXPIRED: - LOG.warn("{}: Follower could not install snapshot as it is expired.", this); + LOG.warn("{}: Follower failed since the request expired, {}", + name, ServerStringUtils.toInstallSnapshotReplyString(reply)); default: break; } @@ -796,8 +803,9 @@ private void installSnapshot(SnapshotInfo snapshot) { * @param firstAvailable the first available log's index on the Leader */ private void notifyInstallSnapshot(TermIndex firstAvailable) { - LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}", - this, firstAvailable, getFollower().getNextIndex()); + BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_NOTIFY, getFollower().getName(), + suffix -> LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={} {}", + this, firstAvailable, getFollower().getNextIndex(), suffix)); final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true); StreamObserver snapshotRequestObserver = null;