From 24a3fc0c1973f9a3f98013ac561f2e75fb0b2793 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Fri, 10 Apr 2026 14:35:30 +0200 Subject: [PATCH 1/5] cre-3442 cre-3443 --- .../tests/smoke/cre/v2_sharding_test.go | 74 ++++++++++++++++++- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/system-tests/tests/smoke/cre/v2_sharding_test.go b/system-tests/tests/smoke/cre/v2_sharding_test.go index a0c91b3c3c4..153b96bf7a4 100644 --- a/system-tests/tests/smoke/cre/v2_sharding_test.go +++ b/system-tests/tests/smoke/cre/v2_sharding_test.go @@ -309,6 +309,20 @@ func validateShardingScaleScenario(t *testing.T, testEnv *ttypes.TestEnvironment Interface("distribution", shardCounts). Msg("Real workflows distributed across 2 shards after scaling") + logger.Info().Msg("Step 8b: Wait for mapping version to stabilize") + waitForMappingVersionStable(t, shardOrchClient, workflowIDs) + + cronPeriod := 30 * time.Second + logger.Info().Dur("cronPeriod", cronPeriod).Msg("Step 8c: Waiting one cron period for straggler executions to drain") + time.Sleep(cronPeriod) + + // Re-snapshot mappings after the barrier so we use the post-stable state. + resp, err = shardOrchClient.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ + WorkflowIds: workflowIDs, + }) + require.NoError(t, err) + require.NotNil(t, resp) + logger.Info().Msg("Step 9: Verify all workflows execute on their assigned shards via ChIP test sink") workflowToShardIndex := resp.Mappings nodeP2PIDToShardIndex := buildNodeP2PIDToShardIndex(t, testEnv) @@ -555,9 +569,42 @@ func waitForWorkflowsDistributed(t *testing.T, client ringpb.ShardOrchestratorSe for _, shardID := range resp.Mappings { shardsSeen[shardID] = true } - framework.L.Info().Int("shardsUsed", len(shardsSeen)).Int("minShards", minShards).Msg("Waiting for distribution") - return len(shardsSeen) >= minShards - }, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards within timeout", minShards) + steady := resp.GetRoutingSteady() + framework.L.Info(). + Int("shardsUsed", len(shardsSeen)). + Int("minShards", minShards). + Bool("routingSteady", steady). + Msg("Waiting for distribution + steady") + return len(shardsSeen) >= minShards && steady + }, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards (with RoutingSteady) within timeout", minShards) +} + +func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorServiceClient, workflowIDs []string) { + t.Helper() + var lastVersion uint64 + stableCount := 0 + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := client.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ + WorkflowIds: workflowIDs, + }) + if err != nil { + stableCount = 0 + return false + } + if resp.MappingVersion == lastVersion && lastVersion != 0 { + stableCount++ + } else { + stableCount = 1 + } + lastVersion = resp.MappingVersion + framework.L.Info(). + Uint64("mappingVersion", resp.MappingVersion). + Int("stableCount", stableCount). + Msg("Waiting for mapping version to stabilize") + return stableCount >= 2 + }, 2*time.Minute, 5*time.Second, "Mapping version did not stabilize within timeout") } func buildNodeP2PIDToShardIndex(t *testing.T, testEnv *ttypes.TestEnvironment) map[string]uint32 { @@ -606,6 +653,7 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol executedWorkflows := make(map[string]struct{}) seenNodes := make(map[string]struct{}) seenShardIndices := make(map[uint32]struct{}) + mismatchCounts := make(map[string]int) timeoutCh := time.After(timeout) for { @@ -616,7 +664,13 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol logger.Warn(). Int("executed", len(executedWorkflows)). Int("expected", len(expectedWorkflows)). + Interface("mismatchCounts", mismatchCounts). Msg("Timeout waiting for all workflows to execute") + for wfID, count := range mismatchCounts { + if _, confirmed := executedWorkflows[wfID]; !confirmed { + t.Errorf("Workflow %s saw %d log(s) on wrong shard but never on expected shard %d", wfID, count, workflowToShardIndex[wfID]) + } + } return executedWorkflows case userLogs := <-userLogsCh: if userLogs.M == nil { @@ -640,7 +694,18 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol actualShardIndex, knownNode := nodeP2PIDToShardIndex[normalizedP2PID] require.True(t, knownNode, "Workflow %s executed on unknown node %s", wfID, userLogs.M.P2PID) expectedShardIndex := workflowToShardIndex[wfID] - require.Equal(t, expectedShardIndex, actualShardIndex, "Workflow %s executed on shard index %d but expected %d (node %s)", wfID, actualShardIndex, expectedShardIndex, userLogs.M.P2PID) + + if actualShardIndex != expectedShardIndex { + mismatchCounts[wfID]++ + logger.Warn(). + Str("workflowID", wfID). + Str("p2pID", normalizedP2PID). + Uint32("actualShard", actualShardIndex). + Uint32("expectedShard", expectedShardIndex). + Int("mismatchCount", mismatchCounts[wfID]). + Msg("Stale log from wrong shard, waiting for correct shard execution") + continue + } executedWorkflows[wfID] = struct{}{} seenNodes[normalizedP2PID] = struct{}{} @@ -660,6 +725,7 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol logger.Info(). Int("uniqueNodes", len(seenNodes)). Int("uniqueShardIndices", len(seenShardIndices)). + Interface("mismatchCounts", mismatchCounts). Msg("All workflows executed on correct shards") return executedWorkflows } From 61bcb82cf582ff72670e98bfeaf8a0f2e00c772e Mon Sep 17 00:00:00 2001 From: mchain0 Date: Fri, 10 Apr 2026 15:56:04 +0200 Subject: [PATCH 2/5] cre-3443: unnecessary wait removal --- .../tests/smoke/cre/v2_sharding_test.go | 33 +------------------ 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/system-tests/tests/smoke/cre/v2_sharding_test.go b/system-tests/tests/smoke/cre/v2_sharding_test.go index 153b96bf7a4..9bf3883a6e2 100644 --- a/system-tests/tests/smoke/cre/v2_sharding_test.go +++ b/system-tests/tests/smoke/cre/v2_sharding_test.go @@ -309,11 +309,8 @@ func validateShardingScaleScenario(t *testing.T, testEnv *ttypes.TestEnvironment Interface("distribution", shardCounts). Msg("Real workflows distributed across 2 shards after scaling") - logger.Info().Msg("Step 8b: Wait for mapping version to stabilize") - waitForMappingVersionStable(t, shardOrchClient, workflowIDs) - cronPeriod := 30 * time.Second - logger.Info().Dur("cronPeriod", cronPeriod).Msg("Step 8c: Waiting one cron period for straggler executions to drain") + logger.Info().Dur("cronPeriod", cronPeriod).Msg("Step 8b: Waiting one cron period for straggler executions to drain") time.Sleep(cronPeriod) // Re-snapshot mappings after the barrier so we use the post-stable state. @@ -579,34 +576,6 @@ func waitForWorkflowsDistributed(t *testing.T, client ringpb.ShardOrchestratorSe }, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards (with RoutingSteady) within timeout", minShards) } -func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorServiceClient, workflowIDs []string) { - t.Helper() - var lastVersion uint64 - stableCount := 0 - require.Eventually(t, func() bool { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - resp, err := client.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ - WorkflowIds: workflowIDs, - }) - if err != nil { - stableCount = 0 - return false - } - if resp.MappingVersion == lastVersion && lastVersion != 0 { - stableCount++ - } else { - stableCount = 1 - } - lastVersion = resp.MappingVersion - framework.L.Info(). - Uint64("mappingVersion", resp.MappingVersion). - Int("stableCount", stableCount). - Msg("Waiting for mapping version to stabilize") - return stableCount >= 2 - }, 2*time.Minute, 5*time.Second, "Mapping version did not stabilize within timeout") -} - func buildNodeP2PIDToShardIndex(t *testing.T, testEnv *ttypes.TestEnvironment) map[string]uint32 { t.Helper() shardDONs := testEnv.Dons.DonsWithFlag(cre.ShardDON) From 278f1d4118eb13d76cdf2d873c33237f9e5588ad Mon Sep 17 00:00:00 2001 From: mchain0 Date: Mon, 13 Apr 2026 09:27:21 +0200 Subject: [PATCH 3/5] cre-3443: sleep-less check --- .../tests/smoke/cre/v2_sharding_test.go | 41 +++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/system-tests/tests/smoke/cre/v2_sharding_test.go b/system-tests/tests/smoke/cre/v2_sharding_test.go index 9bf3883a6e2..edd90e5754b 100644 --- a/system-tests/tests/smoke/cre/v2_sharding_test.go +++ b/system-tests/tests/smoke/cre/v2_sharding_test.go @@ -309,9 +309,8 @@ func validateShardingScaleScenario(t *testing.T, testEnv *ttypes.TestEnvironment Interface("distribution", shardCounts). Msg("Real workflows distributed across 2 shards after scaling") - cronPeriod := 30 * time.Second - logger.Info().Dur("cronPeriod", cronPeriod).Msg("Step 8b: Waiting one cron period for straggler executions to drain") - time.Sleep(cronPeriod) + logger.Info().Msg("Step 8b: Waiting for mapping version to stabilize before verifying shard assignments") + waitForMappingVersionStable(t, shardOrchClient, workflowIDs, 15*time.Second, 90*time.Second) // Re-snapshot mappings after the barrier so we use the post-stable state. resp, err = shardOrchClient.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ @@ -611,6 +610,42 @@ func waitForAllWorkflowsOnShard(t *testing.T, client ringpb.ShardOrchestratorSer }, 2*time.Minute, 5*time.Second, "Workflows not remapped to shard %d within timeout", expectedShard) } +func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorServiceClient, workflowIDs []string, stableDuration, timeout time.Duration) { + t.Helper() + logger := framework.L + + var lastVersion uint64 + lastChangeAt := time.Now() + + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := client.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ + WorkflowIds: workflowIDs, + }) + if err != nil { + logger.Warn().Err(err).Msg("Failed to get mapping version during stability check") + return false + } + if resp.MappingVersion != lastVersion { + logger.Info(). + Uint64("previousVersion", lastVersion). + Uint64("currentVersion", resp.MappingVersion). + Msg("Mapping version changed, resetting stability timer") + lastVersion = resp.MappingVersion + lastChangeAt = time.Now() + return false + } + stableFor := time.Since(lastChangeAt) + logger.Info(). + Uint64("version", lastVersion). + Dur("stableFor", stableFor). + Dur("target", stableDuration). + Msg("Mapping version stability check") + return stableFor >= stableDuration + }, timeout, 2*time.Second, "Mapping version did not stabilize within %s (stableDuration=%s)", timeout, stableDuration) +} + func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerolog.Logger, userLogsCh <-chan *workflowevents.UserLogs, workflowIDs []string, workflowToShardIndex map[string]uint32, nodeP2PIDToShardIndex map[string]uint32, timeout time.Duration) map[string]struct{} { t.Helper() From 8c675c5ea6de0a27f41bec5dd4edd8a808dc2a3a Mon Sep 17 00:00:00 2001 From: mchain0 Date: Mon, 13 Apr 2026 10:17:03 +0200 Subject: [PATCH 4/5] cre-3443: minor fix for wait helper --- .../tests/smoke/cre/v2_sharding_test.go | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/system-tests/tests/smoke/cre/v2_sharding_test.go b/system-tests/tests/smoke/cre/v2_sharding_test.go index edd90e5754b..6f822074251 100644 --- a/system-tests/tests/smoke/cre/v2_sharding_test.go +++ b/system-tests/tests/smoke/cre/v2_sharding_test.go @@ -610,11 +610,15 @@ func waitForAllWorkflowsOnShard(t *testing.T, client ringpb.ShardOrchestratorSer }, 2*time.Minute, 5*time.Second, "Workflows not remapped to shard %d within timeout", expectedShard) } +// waitForMappingVersionStable polls GetWorkflowShardMapping and returns once +// the actual workflow-to-shard assignments have not changed for stableDuration. +// MappingVersion itself increments every OCR round regardless of content changes, +// so we compare the mapping content instead. func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorServiceClient, workflowIDs []string, stableDuration, timeout time.Duration) { t.Helper() logger := framework.L - var lastVersion uint64 + lastMappings := map[string]uint32{} lastChangeAt := time.Now() require.Eventually(t, func() bool { @@ -624,26 +628,35 @@ func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorSe WorkflowIds: workflowIDs, }) if err != nil { - logger.Warn().Err(err).Msg("Failed to get mapping version during stability check") + logger.Warn().Err(err).Msg("Failed to get mapping during stability check") return false } - if resp.MappingVersion != lastVersion { + changed := len(resp.Mappings) != len(lastMappings) + if !changed { + for wfID, shard := range resp.Mappings { + if lastMappings[wfID] != shard { + changed = true + break + } + } + } + if changed { logger.Info(). - Uint64("previousVersion", lastVersion). - Uint64("currentVersion", resp.MappingVersion). - Msg("Mapping version changed, resetting stability timer") - lastVersion = resp.MappingVersion + Uint64("mappingVersion", resp.MappingVersion). + Interface("mappings", resp.Mappings). + Msg("Mapping content changed, resetting stability timer") + lastMappings = resp.Mappings lastChangeAt = time.Now() return false } stableFor := time.Since(lastChangeAt) logger.Info(). - Uint64("version", lastVersion). + Uint64("mappingVersion", resp.MappingVersion). Dur("stableFor", stableFor). Dur("target", stableDuration). - Msg("Mapping version stability check") + Msg("Mapping content stability check") return stableFor >= stableDuration - }, timeout, 2*time.Second, "Mapping version did not stabilize within %s (stableDuration=%s)", timeout, stableDuration) + }, timeout, 2*time.Second, "Mapping content did not stabilize within %s (stableDuration=%s)", timeout, stableDuration) } func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerolog.Logger, userLogsCh <-chan *workflowevents.UserLogs, workflowIDs []string, workflowToShardIndex map[string]uint32, nodeP2PIDToShardIndex map[string]uint32, timeout time.Duration) map[string]struct{} { From 47fb7ee170bdf6e653c53c12562c7b1079f5b223 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Mon, 13 Apr 2026 10:20:20 +0200 Subject: [PATCH 5/5] cre-3443: minor cleanup --- system-tests/tests/smoke/cre/v2_sharding_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/system-tests/tests/smoke/cre/v2_sharding_test.go b/system-tests/tests/smoke/cre/v2_sharding_test.go index 6f822074251..0ab68542bfe 100644 --- a/system-tests/tests/smoke/cre/v2_sharding_test.go +++ b/system-tests/tests/smoke/cre/v2_sharding_test.go @@ -610,10 +610,6 @@ func waitForAllWorkflowsOnShard(t *testing.T, client ringpb.ShardOrchestratorSer }, 2*time.Minute, 5*time.Second, "Workflows not remapped to shard %d within timeout", expectedShard) } -// waitForMappingVersionStable polls GetWorkflowShardMapping and returns once -// the actual workflow-to-shard assignments have not changed for stableDuration. -// MappingVersion itself increments every OCR round regardless of content changes, -// so we compare the mapping content instead. func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorServiceClient, workflowIDs []string, stableDuration, timeout time.Duration) { t.Helper() logger := framework.L