From deabeea3bec790f301048994b43765e882ae8282 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Sun, 21 Dec 2025 23:42:30 +0300 Subject: [PATCH 1/8] Fix adjacent slot range behavior in ASM operations --- src/cluster.c | 48 +++++++++++---- src/cluster.h | 3 +- src/cluster_asm.c | 63 +++++++++++++------- tests/unit/cluster/atomic-slot-migration.tcl | 36 +++++++++++ 4 files changed, 118 insertions(+), 32 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index cd11cbb355c..7aaaaec53a1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1731,8 +1731,9 @@ void replySlotsFlushAndFree(client *c, slotRangeArray *slots) { slotRangeArrayFree(slots); } -/* Checks that slot ranges are well-formed and non-overlapping. */ -int validateSlotRanges(slotRangeArray *slots, sds *err) { +/* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are + * well-formed and non-overlapping. */ +int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err) { unsigned char used_slots[CLUSTER_SLOTS] = {0}; if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) { @@ -1740,6 +1741,9 @@ int validateSlotRanges(slotRangeArray *slots, sds *err) { return C_ERR; } + /* Sort and merge adjacent slot ranges. */ + slotRangeArraySortAndMerge(slots); + for (int i = 0; i < slots->num_ranges; i++) { if (slots->ranges[i].start >= CLUSTER_SLOTS || slots->ranges[i].end >= CLUSTER_SLOTS) @@ -1826,7 +1830,7 @@ slotRangeArray *slotRangeArrayFromString(sds data) { /* Validate all ranges */ sds err_msg = NULL; - if (validateSlotRanges(slots, &err_msg) != C_OK) { + if (slotRangeArrayNormalizeAndValidate(slots, &err_msg) != C_OK) { if (err_msg) sdsfree(err_msg); goto err; } @@ -1847,13 +1851,32 @@ static int compareSlotRange(const void *a, const void *b) { return 0; } +/* Sort slot ranges by start slot and merge adjacent ranges. + * Adjacent means: prev.end + 1 == next.start. + * e.g. 1000-2000 2001-3000 0-100 => 0-100 1000-3000 + * + * Note: Overlapping ranges are not merged.*/ +void slotRangeArraySortAndMerge(slotRangeArray *slots) { + if (!slots || slots->num_ranges <= 1) return; + + qsort(slots->ranges, slots->num_ranges, sizeof(slotRange), compareSlotRange); + + int idx = 0; + for (int i = 1; i < slots->num_ranges; i++) { + if (slots->ranges[idx].end + 1 == slots->ranges[i].start) + slots->ranges[idx].end = slots->ranges[i].end; + else + slots->ranges[++idx] = slots->ranges[i]; + } + slots->num_ranges = idx + 1; +} + /* Compare two slot range arrays, return 1 if equal, 0 otherwise */ int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) { - if (slots1->num_ranges != slots2->num_ranges) return 0; + slotRangeArraySortAndMerge(slots1); + slotRangeArraySortAndMerge(slots2); - /* Sort slot ranges first */ - qsort(slots1->ranges, slots1->num_ranges, sizeof(slotRange), compareSlotRange); - qsort(slots2->ranges, slots2->num_ranges, sizeof(slotRange), compareSlotRange); + if (slots1->num_ranges != slots2->num_ranges) return 0; for (int i = 0; i < slots1->num_ranges; i++) { if (slots1->ranges[i].start != slots2->ranges[i].start || @@ -1959,13 +1982,16 @@ void slotRangeArrayIteratorFree(slotRangeArrayIter *it) { zfree(it); } -/* Parse slot ranges from the command arguments. Returns NULL on error. */ +/* Parse slot range pairs from argv starting at `pos`. + * `argc` is the argument count, `pos` is the first slot argument index. + * Returns a slotRangeArray or NULL on error. */ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { int start, end, count; slotRangeArray *slots; - serverAssert(pos <= argc); - serverAssert((argc - pos) % 2 == 0); + /* Ensure there is at least one or more (start,end) slot range pairs. */ + if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) + return NULL; count = (argc - pos) / 2; slots = slotRangeArrayCreate(count); @@ -1983,7 +2009,7 @@ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { } sds err = NULL; - if (validateSlotRanges(slots, &err) != C_OK) { + if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) { addReplyErrorSds(c, err); slotRangeArrayFree(slots); return NULL; diff --git a/src/cluster.h b/src/cluster.h index 2f15c7a64d8..7daf09323de 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -184,6 +184,7 @@ slotRangeArray *slotRangeArrayDup(slotRangeArray *slots); void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end); sds slotRangeArrayToString(slotRangeArray *slots); slotRangeArray *slotRangeArrayFromString(sds data); +void slotRangeArraySortAndMerge(slotRangeArray *slots); int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2); slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot); int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot); @@ -193,7 +194,7 @@ slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots); int slotRangeArrayNext(slotRangeArrayIter *it); int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it); void slotRangeArrayIteratorFree(slotRangeArrayIter *it); -int validateSlotRanges(slotRangeArray *slots, sds *err); +int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err); slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos); unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command); diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 0c7640ef365..65265e7a142 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -627,7 +627,6 @@ static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { /* Validates the given slot ranges for a migration task: * - Ensures the current node is a master. * - Verifies all slots are in a STABLE state. - * - Checks that slot ranges are well-formed and non-overlapping. * - Confirms all slots belong to a single source node. * - Confirms no ongoing import task that overlaps with the slot ranges. * @@ -804,11 +803,11 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er * initiated for them. */ source = validateImportSlotRanges(slots, err, NULL); if (!source) - return NULL; + goto err; if (source == getMyClusterNode()) { *err = sdsnew("this node is already the owner of the slot range"); - return NULL; + goto err; } /* Only support a single task at a time now. */ @@ -820,7 +819,7 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er asmTaskCancel(current, "new import requested"); } else { *err = sdsnew("another ASM task is already in progress"); - return NULL; + goto err; } } /* There should be no task in progress. */ @@ -828,7 +827,7 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er /* Create a slot migration task */ asmTask *task = asmTaskCreate(task_id); - task->slots = slotRangeArrayDup(slots); + task->slots = slots; task->state = ASM_NONE; task->operation = ASM_IMPORT; task->source_node = source; @@ -842,6 +841,10 @@ asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *er sdsfree(slots_str); return task; + +err: + slotRangeArrayFree(slots); + return NULL; } /* CLUSTER MIGRATION IMPORT @@ -860,7 +863,6 @@ static void clusterMigrationCommandImport(client *c) { sds err = NULL; asmTask *task = asmCreateImportTask(NULL, slots, &err); - slotRangeArrayFree(slots); if (!task) { addReplyErrorSds(c, err); return; @@ -1006,6 +1008,19 @@ void clusterMigrationCommand(client *c) { } } +/* Return the number of keys in the specified slot ranges. */ +unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { + if (!slots) return 0; + + unsigned long long key_count = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + key_count += kvstoreDictSize(server.db[0].keys, j); + } + } + return key_count; +} + /* Log a human-readable message for ASM task lifecycle events. */ void asmLogTaskEvent(asmTask *task, int event) { sds str = slotRangeArrayToString(task->slots); @@ -1021,10 +1036,12 @@ void asmLogTaskEvent(asmTask *task, int event) { serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); break; case ASM_EVENT_IMPORT_COMPLETED: - serverLog(LL_NOTICE, "Import task %s completed for slots: %s", task->id, str); + serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", + task->id, str, asmCountKeysInSlots(task->slots)); break; case ASM_EVENT_MIGRATE_STARTED: - serverLog(LL_NOTICE, "Migrate task %s started for slots: %s", task->id, str); + serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", + task->id, str, asmCountKeysInSlots(task->slots)); break; case ASM_EVENT_MIGRATE_FAILED: serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); @@ -1033,7 +1050,8 @@ void asmLogTaskEvent(asmTask *task, int event) { serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); break; case ASM_EVENT_MIGRATE_COMPLETED: - serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s", task->id, str); + serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", + task->id, str, asmCountKeysInSlots(task->slots)); break; default: break; @@ -2847,24 +2865,32 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { if (err) *err = NULL; switch (event) { - case ASM_EVENT_IMPORT_START: - ret = asmCreateImportTask(task_id, arg, &errsds) ? C_OK : C_ERR; + case ASM_EVENT_IMPORT_START: { + /* Validate the slot ranges. */ + slotRangeArray *slots = slotRangeArrayDup(arg); + if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) { + slotRangeArrayFree(slots); + ret = C_ERR; + break; + } + ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR; break; - case ASM_EVENT_CANCEL: + } case ASM_EVENT_CANCEL: { num_cancelled = clusterAsmCancel(task_id, "user request"); - if (arg) *((int *)arg) = num_cancelled; + if (arg) *((int *) arg) = num_cancelled; ret = C_OK; break; - case ASM_EVENT_HANDOFF: + } case ASM_EVENT_HANDOFF: { ret = clusterAsmHandoff(task_id, &errsds); break; - case ASM_EVENT_DONE: + } case ASM_EVENT_DONE: { ret = clusterAsmDone(task_id, &errsds); break; - default: + } default: { ret = C_ERR; errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event); break; + } } if (ret != C_OK && errsds && err) { @@ -3273,10 +3299,7 @@ void asmActiveTrimStart(void) { asmManager->active_trim_current_job_trimmed = 0; /* Count the number of keys to trim */ - for (int i = 0; i < slots->num_ranges; i++) { - for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) - asmManager->active_trim_current_job_keys += kvstoreDictSize(server.db[0].keys, slot); - } + asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots); RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, diff --git a/tests/unit/cluster/atomic-slot-migration.tcl b/tests/unit/cluster/atomic-slot-migration.tcl index ef66d49038c..ca3c05d35c2 100644 --- a/tests/unit/cluster/atomic-slot-migration.tcl +++ b/tests/unit/cluster/atomic-slot-migration.tcl @@ -511,6 +511,42 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-node-timeout wait_for_asm_done } + test "Test IMPORT with unsorted and adjacent ranges" { + # Redis should sort and merge adjacent ranges + # Adjacent means: prev.end + 1 == next.start + # e.g. 7000-7001 7002-7003 7004-7005 => 7000-7005 + + # Test with adjacent ranges + set task_id [R 0 CLUSTER MIGRATION IMPORT 7000 7001 7002 7100] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7000-7100" [migration_status 0 $task_id slots] + assert_equal "7000-7100" [migration_status 1 $task_id slots] + + # Test with unsorted and adjacent ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 7050 7051 7010 7049 7000 7005] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7000-7005 7010-7051" [migration_status 0 $task_id slots] + assert_equal "7000-7005 7010-7051" [migration_status 1 $task_id slots] + + # Another test with unsorted and adjacent ranges + set task_id [R 1 CLUSTER MIGRATION IMPORT 7007 7007 7008 7009 7006 7006] + wait_for_asm_done + # verify migration is successfully completed on both nodes + assert_equal "completed" [migration_status 0 $task_id state] + assert_equal "completed" [migration_status 1 $task_id state] + # verify slot ranges are merged correctly + assert_equal "7006-7009" [migration_status 0 $task_id slots] + assert_equal "7006-7009" [migration_status 1 $task_id slots] + } + test "Simple slot migration with write load" { # Perform slot migration while traffic is on and verify data consistency. # Trimming is disabled on source nodes so, we can compare the dbs after From 0bb16475860475e45852cc1b0ef25fa904628c9e Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Mon, 22 Dec 2025 00:05:27 +0300 Subject: [PATCH 2/8] minor --- src/cluster_asm.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 65265e7a142..9a8eb06e485 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -2877,7 +2877,7 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { break; } case ASM_EVENT_CANCEL: { num_cancelled = clusterAsmCancel(task_id, "user request"); - if (arg) *((int *) arg) = num_cancelled; + if (arg) *((int *)arg) = num_cancelled; ret = C_OK; break; } case ASM_EVENT_HANDOFF: { From 98943f5379fe2f9278408476a5dad93a79c24f90 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Mon, 22 Dec 2025 00:07:56 +0300 Subject: [PATCH 3/8] minor --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 7aaaaec53a1..f84cbda2123 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1989,7 +1989,7 @@ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { int start, end, count; slotRangeArray *slots; - /* Ensure there is at least one or more (start,end) slot range pairs. */ + /* Ensure there is at least one (start,end) slot range pairs. */ if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) return NULL; From a6db33932db1cac720940b5ec59f477ec1b83dfb Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Mon, 22 Dec 2025 09:10:55 +0300 Subject: [PATCH 4/8] minor --- src/cluster_asm.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 9a8eb06e485..cfed105ef27 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -2875,15 +2875,18 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { } ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR; break; - } case ASM_EVENT_CANCEL: { + } + case ASM_EVENT_CANCEL: { num_cancelled = clusterAsmCancel(task_id, "user request"); if (arg) *((int *)arg) = num_cancelled; ret = C_OK; break; - } case ASM_EVENT_HANDOFF: { + } + case ASM_EVENT_HANDOFF: { ret = clusterAsmHandoff(task_id, &errsds); break; - } case ASM_EVENT_DONE: { + } + case ASM_EVENT_DONE: { ret = clusterAsmDone(task_id, &errsds); break; } default: { From 312901858e86be262232591dd5dbeaa05c4bc162 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Mon, 22 Dec 2025 17:16:09 +0300 Subject: [PATCH 5/8] Update src/cluster_asm.c Co-authored-by: Yuan Wang --- src/cluster_asm.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cluster_asm.c b/src/cluster_asm.c index cfed105ef27..0638890df06 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -2889,7 +2889,8 @@ int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { case ASM_EVENT_DONE: { ret = clusterAsmDone(task_id, &errsds); break; - } default: { + } + default: { ret = C_ERR; errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event); break; From 55e933194d3d896641fb79a5013de88d12a228f7 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Tue, 23 Dec 2025 09:11:37 +0300 Subject: [PATCH 6/8] minor --- src/cluster.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cluster.c b/src/cluster.c index f84cbda2123..efe7148d8fa 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1793,6 +1793,7 @@ void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) { /* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */ sds slotRangeArrayToString(slotRangeArray *slots) { sds s = sdsempty(); + if (slots == NULL || slots->num_ranges == 0) return s; for (int i = 0; i < slots->num_ranges; i++) { slotRange *sr = &slots->ranges[i]; From f32efc83602d2b1bc59aa8be0fbb0a166f26cfa4 Mon Sep 17 00:00:00 2001 From: Ozan Tezcan Date: Tue, 23 Dec 2025 10:28:39 +0300 Subject: [PATCH 7/8] minor --- src/cluster.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index efe7148d8fa..c2cb0a6dfcc 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1991,8 +1991,10 @@ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { slotRangeArray *slots; /* Ensure there is at least one (start,end) slot range pairs. */ - if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) + if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) { + addReplyErrorArity(c); return NULL; + } count = (argc - pos) / 2; slots = slotRangeArrayCreate(count); From 0443e9f9b6614d1cfd9cd6a1e1c222b93d91a154 Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Sun, 25 Jan 2026 12:11:43 +0200 Subject: [PATCH 8/8] update pr --- src/cluster.c | 6 +++--- src/cluster_asm.c | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index c2cb0a6dfcc..eaf2c6aed13 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1874,11 +1874,11 @@ void slotRangeArraySortAndMerge(slotRangeArray *slots) { /* Compare two slot range arrays, return 1 if equal, 0 otherwise */ int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) { + if (slots1->num_ranges != slots2->num_ranges) return 0; + slotRangeArraySortAndMerge(slots1); slotRangeArraySortAndMerge(slots2); - if (slots1->num_ranges != slots2->num_ranges) return 0; - for (int i = 0; i < slots1->num_ranges; i++) { if (slots1->ranges[i].start != slots2->ranges[i].start || slots1->ranges[i].end != slots2->ranges[i].end) { @@ -2013,7 +2013,7 @@ slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { sds err = NULL; if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) { - addReplyErrorSds(c, err); + sdsfree(err); slotRangeArrayFree(slots); return NULL; } diff --git a/src/cluster_asm.c b/src/cluster_asm.c index 0638890df06..1d14eef019e 100644 --- a/src/cluster_asm.c +++ b/src/cluster_asm.c @@ -1013,6 +1013,7 @@ unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { if (!slots) return 0; unsigned long long key_count = 0; + int total_ranges = slots->num_ranges; for (int i = 0; i < slots->num_ranges; i++) { for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { key_count += kvstoreDictSize(server.db[0].keys, j); @@ -3303,7 +3304,7 @@ void asmActiveTrimStart(void) { asmManager->active_trim_current_job_trimmed = 0; /* Count the number of keys to trim */ - asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots); + asmManager->active_trim_current_job_keys = asmCountKeysInSlots(slots); RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,