Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.StateMachine;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongSupplier;

/**
Expand All @@ -39,6 +42,11 @@ public final class StateMachineMetrics extends RatisMetrics {
public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE = "applyCompletedIndex";
public static final String STATEMACHINE_TAKE_SNAPSHOT_TIMER = "takeSnapshot";

/** Time taken for the State Machine applyLog operation to complete execution. */
public static final String STATEMACHINE_APPLY_LOG_EXECUTION_TIME = "%sApplyLogExecutionTime";

private final Map<LogEntryBodyCase, Timekeeper> applyLogTimers = new ConcurrentHashMap<>();

public static StateMachineMetrics getStateMachineMetrics(
RaftServerImpl server, RaftLogIndex appliedIndex,
StateMachine stateMachine) {
Expand Down Expand Up @@ -72,4 +80,12 @@ public Timekeeper getTakeSnapshotTimer() {
return takeSnapshotTimer;
}

private Timekeeper newApplyLogExecutionTimer(LogEntryBodyCase logType) {
return getRegistry().timer(String.format(STATEMACHINE_APPLY_LOG_EXECUTION_TIME,
logType.name().toLowerCase()));
}

public Timekeeper getApplyLogExecutionTimer(LogEntryBodyCase logType) {
return applyLogTimers.computeIfAbsent(logType, this::newApplyLogExecutionTimer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures
} else {
LOG.debug("{}: applying nextIndex={}", this, nextIndex);
}

final Timekeeper.Context applyLogTimerContext = stateMachineMetrics.get().getApplyLogExecutionTimer(
next.getLogEntryBodyCase()).time();
final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
Preconditions.assertTrue(incremented == nextIndex);
Expand All @@ -259,10 +260,12 @@ private CompletableFuture<Void> applyLog(CompletableFuture<Void> applyLogFutures
LOG.error("Exception while {}: applying txn index={}, nextLog={}", this, nextIndex,
LogProtoUtils.toLogEntryString(next), ex);
return null;
});
})
.whenComplete((m, e) -> applyLogTimerContext.stop());
applyLogFutures = applyLogFutures.thenCombine(exceptionHandledFuture, (v, message) -> null);
f.thenAccept(m -> notifyAppliedIndex(incremented));
} else {
applyLogTimerContext.stop();
notifyAppliedIndex(incremented);
}
} else {
Expand Down
Loading