Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,10 @@ void clusterCommand(client *c) {
addReplyBulkCString(c,ni);
sdsfree(ni);
}
} else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
clusterMigrationCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
clusterSyncSlotsCommand(c);
} else if(!clusterCommandSpecial(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down Expand Up @@ -2127,3 +2131,84 @@ void resetClusterStats(void) {

clusterSlotStatResetAll();
}

/* This function is called at server startup in order to initialize cluster data
* structures that are shared between the different cluster implementations. */
void clusterCommonInit(void) {
server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
resetClusterStats();
asmInit();
}

/* This function is called after the node startup in order to check if there
* are any slots that we have keys for, but are not assigned to us. If so,
* we delete the keys. */
void clusterDeleteKeysInUnownedSlots(void) {
if (clusterNodeIsSlave(getMyClusterNode())) return;

/* Check that all the slots we have keys for are assigned to us. Otherwise,
* delete the keys. */
for (int i = 0; i < CLUSTER_SLOTS; i++) {
/* Skip if: no keys in the slot, it's our slot, or we are importing it. */
if (!countKeysInSlot(i) ||
clusterIsMySlot(i) ||
getImportingSlotSource(i))
{
continue;
}

serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Deleting keys in the slot.", i);
/* With atomic slot migration, it is safe to drop keys from slots
* that are not owned. This will not result in data loss under the
* legacy slot migration approach either, since the importing state
* has already been persisted in node.conf. */
clusterDelKeysInSlot(i, 0);
}
}


/* This function is called after the node startup in order to verify that data
* loaded from disk is in agreement with the cluster configuration:
*
* 1) If we find keys about hash slots we have no responsibility for, the
* following happens:
* A) If no other node is in charge according to the current cluster
* configuration, we add these slots to our node.
* B) If according to our config other nodes are already in charge for
* this slots, we set the slots as IMPORTING from our point of view
* in order to justify we have those slots, and in order to make
* redis-cli aware of the issue, so that it can try to fix it.
* 2) If we find data in a DB different than DB0 we return C_ERR to
* signal the caller it should quit the server with an error message
* or take other actions.
*
* The function always returns C_OK even if it will try to correct
* the error described in "1". However if data is found in DB different
* from DB0, C_ERR is returned.
*
* The function also uses the logging facility in order to warn the user
* about desynchronizations between the data we have in memory and the
* cluster configuration. */
int verifyClusterConfigWithData(void) {
/* Return ASAP if a module disabled cluster redirections. In that case
* every master can store keys about every possible hash slot. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

/* If this node is a slave, don't perform the check at all as we
* completely depend on the replication stream. */
if (clusterNodeIsSlave(getMyClusterNode())) return C_OK;

/* Make sure we only have keys in DB0. */
for (int i = 1; i < server.dbnum; i++) {
if (kvstoreSize(server.db[i].keys)) return C_ERR;
}

/* Delete keys in unowned slots */
clusterDeleteKeysInUnownedSlots();
/* Take over slots that we have keys for, but are assigned to no one. */
clusterClaimUnassignedSlots();
return C_OK;
}
2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ static inline unsigned int keyHashSlot(const char *key, int keylen) {
/* functions requiring mechanism specific implementations */
void clusterInit(void);
void clusterInitLast(void);
void clusterCommonInit(void);
void clusterCron(void);
void clusterBeforeSleep(void);
void clusterClaimUnassignedSlots(void);
int verifyClusterConfigWithData(void);

int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
Expand Down
114 changes: 27 additions & 87 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,8 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
resetClusterStats();

getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN);
asmInit();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -4934,9 +4932,6 @@ void clusterCron(void) {

if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();

/* Atomic slot migration cron */
asmCron();
}

/* This function is called before the event handler returns to sleep for
Expand Down Expand Up @@ -4978,8 +4973,6 @@ void clusterBeforeSleep(void) {
/* Broadcast a PONG to all the nodes. */
if (flags & CLUSTER_TODO_BROADCAST_PONG)
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

asmBeforeSleep();
}

void clusterDoBeforeSleep(int flags) {
Expand Down Expand Up @@ -5252,82 +5245,6 @@ void clusterUpdateState(void) {
}
}

/* This function is called after the node startup in order to verify that data
* loaded from disk is in agreement with the cluster configuration:
*
* 1) If we find keys about hash slots we have no responsibility for, the
* following happens:
* A) If no other node is in charge according to the current cluster
* configuration, we add these slots to our node.
* B) If according to our config other nodes are already in charge for
* this slots, we set the slots as IMPORTING from our point of view
* in order to justify we have those slots, and in order to make
* redis-cli aware of the issue, so that it can try to fix it.
* 2) If we find data in a DB different than DB0 we return C_ERR to
* signal the caller it should quit the server with an error message
* or take other actions.
*
* The function always returns C_OK even if it will try to correct
* the error described in "1". However if data is found in DB different
* from DB0, C_ERR is returned.
*
* The function also uses the logging facility in order to warn the user
* about desynchronizations between the data we have in memory and the
* cluster configuration. */
int verifyClusterConfigWithData(void) {
int j;
int update_config = 0;

/* Return ASAP if a module disabled cluster redirections. In that case
* every master can store keys about every possible hash slot. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

/* If this node is a slave, don't perform the check at all as we
* completely depend on the replication stream. */
if (nodeIsSlave(myself)) return C_OK;

/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (kvstoreSize(server.db[j].keys)) return C_ERR;
}

/* Check that all the slots we see populated memory have a corresponding
* entry in the cluster table. Otherwise fix the table. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
/* Check if we are assigned to this slot or if we are importing it.
* In both cases check the next slot as the configuration makes
* sense. */
if (server.cluster->slots[j] == myself ||
server.cluster->importing_slots_from[j] != NULL) continue;

/* If we are here data and cluster config don't agree, and we have
* slot 'j' populated even if we are not importing it, nor we are
* assigned to this slot. Fix this condition. */

update_config++;
/* Case A: slot is unassigned. Take responsibility for it. */
if (server.cluster->slots[j] == NULL) {
serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
"Taking responsibility for it.",j);
clusterAddSlot(myself,j);
} else {
serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Deleting keys in the slot.", j);

/* With atomic slot migration, it is safe to drop keys from slots
* that are not owned. This will not result in data loss under the
* legacy slot migration approach either, since the importing state
* has already been persisted in node.conf. */
clusterDelKeysInSlot(j, 0);
}
}
if (update_config) clusterSaveConfigOrDie(1);
return C_OK;
}

/* Remove all the shard channel related information not owned by the current shard. */
static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
if (!kvstoreSize(server.pubsubshard_channels)) return;
Expand All @@ -5339,6 +5256,33 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
}
}

/* This function is called after the node startup in order to check if there
* are any slots that we have keys for, but are assigned to no one. If so,
* we take ownership of them. */
void clusterClaimUnassignedSlots(void) {
if (nodeIsSlave(myself)) return;

int update_config = 0;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
/* Skip if: no keys, already has an owner, or we are importing it. */
if (!countKeysInSlot(i) ||
server.cluster->slots[i] != NULL ||
server.cluster->importing_slots_from[i] != NULL)
{
continue;
}

/* If we are here data and cluster config don't agree, and we have
* slot 'i' populated even if we are not importing it, nor anyone else
* is assigned to it. Fix this condition by taking ownership. */
update_config++;
serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
"Taking responsibility for it.", i);
clusterAddSlot(myself, i);
}
if (update_config) clusterSaveConfigOrDie(1);
}

/* -----------------------------------------------------------------------------
* SLAVE nodes handling
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -6481,10 +6425,6 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
clusterMigrationCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
clusterSyncSlotsCommand(c);
} else {
return 0;
}
Expand Down
13 changes: 10 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
if (server.cluster_enabled) {
asmCron();
clusterCron();
}
}

/* Run the Sentinel timer if we are in sentinel mode. */
Expand Down Expand Up @@ -1835,7 +1838,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function, must be done before blockedBeforeSleep. */
if (server.cluster_enabled) clusterBeforeSleep();
if (server.cluster_enabled) {
clusterBeforeSleep();
asmBeforeSleep();
}

/* Handle blocked clients.
* must be done before flushAppendOnlyFile, in case of appendfsync=always,
Expand Down Expand Up @@ -7730,7 +7736,8 @@ int main(int argc, char **argv) {
redisAsciiArt();
checkTcpBacklogSettings();
if (server.cluster_enabled) {
server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
/* clusterCommonInit() initializes slot-stats required by clusterInit() */
clusterCommonInit();
clusterInit();
}
if (!server.sentinel_mode) {
Expand Down
Loading