From 074e91386a96cdf3c7229dee6f1751b152e9d9c7 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Fri, 28 Nov 2025 00:36:01 +0300 Subject: [PATCH 1/3] Refactor some of ASM and slot-stats functions --- src/cluster.c | 85 ++++++++++++++++++++++++++++++++++++++ src/cluster.h | 2 + src/cluster_legacy.c | 98 +++++++++----------------------------------- src/server.c | 13 ++++-- 4 files changed, 116 insertions(+), 82 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 561ab8bdd80..a999af7dc5c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1089,6 +1089,10 @@ void clusterCommand(client *c) { addReplyBulkCString(c,ni); sdsfree(ni); } + } else if (!strcasecmp(c->argv[1]->ptr, "migration")) { + clusterMigrationCommand(c); + } else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) { + clusterSyncSlotsCommand(c); } else if(!clusterCommandSpecial(c)) { addReplySubcommandSyntaxError(c); return; @@ -2127,3 +2131,84 @@ void resetClusterStats(void) { clusterSlotStatResetAll(); } + +/* This function is called at server startup in order to initialize cluster data + * structures that are shared between the different cluster implementations. */ +void clusterCommonInit(void) { + server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat)); + resetClusterStats(); + asmInit(); +} + +/* This function is called after the node startup in order to check if there + * are any slots that we have keys for, but are not assigned to us. If so, + * we delete the keys. */ +void clusterDeleteKeysInUnownedSlots(void) { + if (clusterNodeIsSlave(getMyClusterNode())) return; + + /* Check that all the slots we have keys for are assigned to us. Otherwise, + * delete the keys. */ + for (int i = 0; i < CLUSTER_SLOTS; i++) { + /* Skip if: no keys in the slot, it's our slot, or we are importing it. */ + if (!countKeysInSlot(i) || + clusterIsMySlot(i) || + getImportingSlotSource(i)) + { + continue; + } + + serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " + "assigned to another node. " + "Deleting keys in the slot.", i); + /* With atomic slot migration, it is safe to drop keys from slots + * that are not owned. This will not result in data loss under the + * legacy slot migration approach either, since the importing state + * has already been persisted in node.conf. */ + clusterDelKeysInSlot(i, 0); + } +} + + +/* This function is called after the node startup in order to verify that data + * loaded from disk is in agreement with the cluster configuration: + * + * 1) If we find keys about hash slots we have no responsibility for, the + * following happens: + * A) If no other node is in charge according to the current cluster + * configuration, we add these slots to our node. + * B) If according to our config other nodes are already in charge for + * this slots, we set the slots as IMPORTING from our point of view + * in order to justify we have those slots, and in order to make + * redis-cli aware of the issue, so that it can try to fix it. + * 2) If we find data in a DB different than DB0 we return C_ERR to + * signal the caller it should quit the server with an error message + * or take other actions. + * + * The function always returns C_OK even if it will try to correct + * the error described in "1". However if data is found in DB different + * from DB0, C_ERR is returned. + * + * The function also uses the logging facility in order to warn the user + * about desynchronizations between the data we have in memory and the + * cluster configuration. */ +int verifyClusterConfigWithData(void) { + /* Return ASAP if a module disabled cluster redirections. In that case + * every master can store keys about every possible hash slot. */ + if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) + return C_OK; + + /* If this node is a slave, don't perform the check at all as we + * completely depend on the replication stream. */ + if (clusterNodeIsSlave(getMyClusterNode())) return C_OK; + + /* Make sure we only have keys in DB0. */ + for (int i = 1; i < server.dbnum; i++) { + if (kvstoreSize(server.db[i].keys)) return C_ERR; + } + + /* Take over slots that we have keys for, but are assigned to no one. */ + clusterClaimUnassignedSlots(); + /* Delete keys in unowned slots */ + clusterDeleteKeysInUnownedSlots(); + return C_OK; +} diff --git a/src/cluster.h b/src/cluster.h index 830dae87b7c..c5e8e8d95c9 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -86,8 +86,10 @@ static inline unsigned int keyHashSlot(const char *key, int keylen) { /* functions requiring mechanism specific implementations */ void clusterInit(void); void clusterInitLast(void); +void clusterCommonInit(void); void clusterCron(void); void clusterBeforeSleep(void); +void clusterClaimUnassignedSlots(void); int verifyClusterConfigWithData(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 8255b80672d..5f8619a67d8 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1036,10 +1036,8 @@ void clusterInit(void) { clusterUpdateMyselfIp(); clusterUpdateMyselfHostname(); clusterUpdateMyselfHumanNodename(); - resetClusterStats(); getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN); - asmInit(); } void clusterInitLast(void) { @@ -4934,9 +4932,6 @@ void clusterCron(void) { if (update_state || server.cluster->state == CLUSTER_FAIL) clusterUpdateState(); - - /* Atomic slot migration cron */ - asmCron(); } /* This function is called before the event handler returns to sleep for @@ -4978,8 +4973,6 @@ void clusterBeforeSleep(void) { /* Broadcast a PONG to all the nodes. */ if (flags & CLUSTER_TODO_BROADCAST_PONG) clusterBroadcastPong(CLUSTER_BROADCAST_ALL); - - asmBeforeSleep(); } void clusterDoBeforeSleep(int flags) { @@ -5252,80 +5245,31 @@ void clusterUpdateState(void) { } } -/* This function is called after the node startup in order to verify that data - * loaded from disk is in agreement with the cluster configuration: - * - * 1) If we find keys about hash slots we have no responsibility for, the - * following happens: - * A) If no other node is in charge according to the current cluster - * configuration, we add these slots to our node. - * B) If according to our config other nodes are already in charge for - * this slots, we set the slots as IMPORTING from our point of view - * in order to justify we have those slots, and in order to make - * redis-cli aware of the issue, so that it can try to fix it. - * 2) If we find data in a DB different than DB0 we return C_ERR to - * signal the caller it should quit the server with an error message - * or take other actions. - * - * The function always returns C_OK even if it will try to correct - * the error described in "1". However if data is found in DB different - * from DB0, C_ERR is returned. - * - * The function also uses the logging facility in order to warn the user - * about desynchronizations between the data we have in memory and the - * cluster configuration. */ -int verifyClusterConfigWithData(void) { - int j; - int update_config = 0; - - /* Return ASAP if a module disabled cluster redirections. In that case - * every master can store keys about every possible hash slot. */ - if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) - return C_OK; - - /* If this node is a slave, don't perform the check at all as we - * completely depend on the replication stream. */ - if (nodeIsSlave(myself)) return C_OK; - - /* Make sure we only have keys in DB0. */ - for (j = 1; j < server.dbnum; j++) { - if (kvstoreSize(server.db[j].keys)) return C_ERR; - } +/* This function is called after the node startup in order to check if there + * are any slots that we have keys for, but are assigned to no one. If so, + * we take ownership of them. */ +void clusterClaimUnassignedSlots(void) { + if (nodeIsSlave(myself)) return; - /* Check that all the slots we see populated memory have a corresponding - * entry in the cluster table. Otherwise fix the table. */ - for (j = 0; j < CLUSTER_SLOTS; j++) { - if (!countKeysInSlot(j)) continue; /* No keys in this slot. */ - /* Check if we are assigned to this slot or if we are importing it. - * In both cases check the next slot as the configuration makes - * sense. */ - if (server.cluster->slots[j] == myself || - server.cluster->importing_slots_from[j] != NULL) continue; + int update_config = 0; + for (int i = 0; i < CLUSTER_SLOTS; i++) { + /* Skip if: no keys, already has an owner, or we are importing it. */ + if (!countKeysInSlot(i) || + server.cluster->slots[i] != NULL || + server.cluster->importing_slots_from[i] != NULL) + { + continue; + } /* If we are here data and cluster config don't agree, and we have - * slot 'j' populated even if we are not importing it, nor we are - * assigned to this slot. Fix this condition. */ - + * slot 'i' populated even if we are not importing it, nor anyone else + * is assigned to it. Fix this condition by taking ownership. */ update_config++; - /* Case A: slot is unassigned. Take responsibility for it. */ - if (server.cluster->slots[j] == NULL) { - serverLog(LL_NOTICE, "I have keys for unassigned slot %d. " - "Taking responsibility for it.",j); - clusterAddSlot(myself,j); - } else { - serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " - "assigned to another node. " - "Deleting keys in the slot.", j); - - /* With atomic slot migration, it is safe to drop keys from slots - * that are not owned. This will not result in data loss under the - * legacy slot migration approach either, since the importing state - * has already been persisted in node.conf. */ - clusterDelKeysInSlot(j, 0); - } + serverLog(LL_NOTICE, "I have keys for unassigned slot %d. " + "Taking responsibility for it.", i); + clusterAddSlot(myself, i); } if (update_config) clusterSaveConfigOrDie(1); - return C_OK; } /* Remove all the shard channel related information not owned by the current shard. */ @@ -6481,10 +6425,6 @@ int clusterCommandSpecial(client *c) { } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) { /* CLUSTER LINKS */ addReplyClusterLinksDescription(c); - } else if (!strcasecmp(c->argv[1]->ptr, "migration")) { - clusterMigrationCommand(c); - } else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) { - clusterSyncSlotsCommand(c); } else { return 0; } diff --git a/src/server.c b/src/server.c index b56c6d3fb6a..d77a9f49c2e 100644 --- a/src/server.c +++ b/src/server.c @@ -1654,7 +1654,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Run the Redis Cluster cron. */ run_with_period(100) { - if (server.cluster_enabled) clusterCron(); + if (server.cluster_enabled) { + clusterCron(); + asmCron(); + } } /* Run the Sentinel timer if we are in sentinel mode. */ @@ -1835,7 +1838,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * may change the state of Redis Cluster (from ok to fail or vice versa), * so it's a good idea to call it before serving the unblocked clients * later in this function, must be done before blockedBeforeSleep. */ - if (server.cluster_enabled) clusterBeforeSleep(); + if (server.cluster_enabled) { + clusterBeforeSleep(); + asmBeforeSleep(); + } /* Handle blocked clients. * must be done before flushAppendOnlyFile, in case of appendfsync=always, @@ -7730,7 +7736,8 @@ int main(int argc, char **argv) { redisAsciiArt(); checkTcpBacklogSettings(); if (server.cluster_enabled) { - server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat)); + /* clusterCommonInit() initializes slot-stats required by clusterInit() */ + clusterCommonInit(); clusterInit(); } if (!server.sentinel_mode) { From 56470f2ba46afbe566dac92ceb890a0719b34d88 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Fri, 28 Nov 2025 00:38:15 +0300 Subject: [PATCH 2/3] better diff --- src/cluster_legacy.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 5f8619a67d8..277ce4819e7 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -5245,6 +5245,17 @@ void clusterUpdateState(void) { } } +/* Remove all the shard channel related information not owned by the current shard. */ +static inline void removeAllNotOwnedShardChannelSubscriptions(void) { + if (!kvstoreSize(server.pubsubshard_channels)) return; + clusterNode *currmaster = clusterNodeIsMaster(myself) ? myself : myself->slaveof; + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (server.cluster->slots[j] != currmaster) { + removeChannelsInSlot(j); + } + } +} + /* This function is called after the node startup in order to check if there * are any slots that we have keys for, but are assigned to no one. If so, * we take ownership of them. */ @@ -5272,17 +5283,6 @@ void clusterClaimUnassignedSlots(void) { if (update_config) clusterSaveConfigOrDie(1); } -/* Remove all the shard channel related information not owned by the current shard. */ -static inline void removeAllNotOwnedShardChannelSubscriptions(void) { - if (!kvstoreSize(server.pubsubshard_channels)) return; - clusterNode *currmaster = clusterNodeIsMaster(myself) ? myself : myself->slaveof; - for (int j = 0; j < CLUSTER_SLOTS; j++) { - if (server.cluster->slots[j] != currmaster) { - removeChannelsInSlot(j); - } - } -} - /* ----------------------------------------------------------------------------- * SLAVE nodes handling * -------------------------------------------------------------------------- */ From 865827b49d7b7125377d812a06d3d0a0ce4ff860 Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Sun, 25 Jan 2026 12:12:01 +0200 Subject: [PATCH 3/3] update pr --- src/cluster.c | 6 +++--- src/server.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a999af7dc5c..91b27fdb88a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2135,7 +2135,7 @@ void resetClusterStats(void) { /* This function is called at server startup in order to initialize cluster data * structures that are shared between the different cluster implementations. */ void clusterCommonInit(void) { - server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat)); + server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat)); resetClusterStats(); asmInit(); } @@ -2206,9 +2206,9 @@ int verifyClusterConfigWithData(void) { if (kvstoreSize(server.db[i].keys)) return C_ERR; } - /* Take over slots that we have keys for, but are assigned to no one. */ - clusterClaimUnassignedSlots(); /* Delete keys in unowned slots */ clusterDeleteKeysInUnownedSlots(); + /* Take over slots that we have keys for, but are assigned to no one. */ + clusterClaimUnassignedSlots(); return C_OK; } diff --git a/src/server.c b/src/server.c index d77a9f49c2e..b93a5d14fdb 100644 --- a/src/server.c +++ b/src/server.c @@ -1655,8 +1655,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Run the Redis Cluster cron. */ run_with_period(100) { if (server.cluster_enabled) { - clusterCron(); asmCron(); + clusterCron(); } }