diff --git a/system-tests/tests/smoke/cre/v2_sharding_test.go b/system-tests/tests/smoke/cre/v2_sharding_test.go index a0c91b3c3c4..0ab68542bfe 100644 --- a/system-tests/tests/smoke/cre/v2_sharding_test.go +++ b/system-tests/tests/smoke/cre/v2_sharding_test.go @@ -309,6 +309,16 @@ func validateShardingScaleScenario(t *testing.T, testEnv *ttypes.TestEnvironment Interface("distribution", shardCounts). Msg("Real workflows distributed across 2 shards after scaling") + logger.Info().Msg("Step 8b: Waiting for mapping version to stabilize before verifying shard assignments") + waitForMappingVersionStable(t, shardOrchClient, workflowIDs, 15*time.Second, 90*time.Second) + + // Re-snapshot mappings after the barrier so we use the post-stable state. + resp, err = shardOrchClient.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ + WorkflowIds: workflowIDs, + }) + require.NoError(t, err) + require.NotNil(t, resp) + logger.Info().Msg("Step 9: Verify all workflows execute on their assigned shards via ChIP test sink") workflowToShardIndex := resp.Mappings nodeP2PIDToShardIndex := buildNodeP2PIDToShardIndex(t, testEnv) @@ -555,9 +565,14 @@ func waitForWorkflowsDistributed(t *testing.T, client ringpb.ShardOrchestratorSe for _, shardID := range resp.Mappings { shardsSeen[shardID] = true } - framework.L.Info().Int("shardsUsed", len(shardsSeen)).Int("minShards", minShards).Msg("Waiting for distribution") - return len(shardsSeen) >= minShards - }, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards within timeout", minShards) + steady := resp.GetRoutingSteady() + framework.L.Info(). + Int("shardsUsed", len(shardsSeen)). + Int("minShards", minShards). + Bool("routingSteady", steady). + Msg("Waiting for distribution + steady") + return len(shardsSeen) >= minShards && steady + }, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards (with RoutingSteady) within timeout", minShards) } func buildNodeP2PIDToShardIndex(t *testing.T, testEnv *ttypes.TestEnvironment) map[string]uint32 { @@ -595,6 +610,51 @@ func waitForAllWorkflowsOnShard(t *testing.T, client ringpb.ShardOrchestratorSer }, 2*time.Minute, 5*time.Second, "Workflows not remapped to shard %d within timeout", expectedShard) } +func waitForMappingVersionStable(t *testing.T, client ringpb.ShardOrchestratorServiceClient, workflowIDs []string, stableDuration, timeout time.Duration) { + t.Helper() + logger := framework.L + + lastMappings := map[string]uint32{} + lastChangeAt := time.Now() + + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := client.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{ + WorkflowIds: workflowIDs, + }) + if err != nil { + logger.Warn().Err(err).Msg("Failed to get mapping during stability check") + return false + } + changed := len(resp.Mappings) != len(lastMappings) + if !changed { + for wfID, shard := range resp.Mappings { + if lastMappings[wfID] != shard { + changed = true + break + } + } + } + if changed { + logger.Info(). + Uint64("mappingVersion", resp.MappingVersion). + Interface("mappings", resp.Mappings). + Msg("Mapping content changed, resetting stability timer") + lastMappings = resp.Mappings + lastChangeAt = time.Now() + return false + } + stableFor := time.Since(lastChangeAt) + logger.Info(). + Uint64("mappingVersion", resp.MappingVersion). + Dur("stableFor", stableFor). + Dur("target", stableDuration). + Msg("Mapping content stability check") + return stableFor >= stableDuration + }, timeout, 2*time.Second, "Mapping content did not stabilize within %s (stableDuration=%s)", timeout, stableDuration) +} + func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerolog.Logger, userLogsCh <-chan *workflowevents.UserLogs, workflowIDs []string, workflowToShardIndex map[string]uint32, nodeP2PIDToShardIndex map[string]uint32, timeout time.Duration) map[string]struct{} { t.Helper() @@ -606,6 +666,7 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol executedWorkflows := make(map[string]struct{}) seenNodes := make(map[string]struct{}) seenShardIndices := make(map[uint32]struct{}) + mismatchCounts := make(map[string]int) timeoutCh := time.After(timeout) for { @@ -616,7 +677,13 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol logger.Warn(). Int("executed", len(executedWorkflows)). Int("expected", len(expectedWorkflows)). + Interface("mismatchCounts", mismatchCounts). Msg("Timeout waiting for all workflows to execute") + for wfID, count := range mismatchCounts { + if _, confirmed := executedWorkflows[wfID]; !confirmed { + t.Errorf("Workflow %s saw %d log(s) on wrong shard but never on expected shard %d", wfID, count, workflowToShardIndex[wfID]) + } + } return executedWorkflows case userLogs := <-userLogsCh: if userLogs.M == nil { @@ -640,7 +707,18 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol actualShardIndex, knownNode := nodeP2PIDToShardIndex[normalizedP2PID] require.True(t, knownNode, "Workflow %s executed on unknown node %s", wfID, userLogs.M.P2PID) expectedShardIndex := workflowToShardIndex[wfID] - require.Equal(t, expectedShardIndex, actualShardIndex, "Workflow %s executed on shard index %d but expected %d (node %s)", wfID, actualShardIndex, expectedShardIndex, userLogs.M.P2PID) + + if actualShardIndex != expectedShardIndex { + mismatchCounts[wfID]++ + logger.Warn(). + Str("workflowID", wfID). + Str("p2pID", normalizedP2PID). + Uint32("actualShard", actualShardIndex). + Uint32("expectedShard", expectedShardIndex). + Int("mismatchCount", mismatchCounts[wfID]). + Msg("Stale log from wrong shard, waiting for correct shard execution") + continue + } executedWorkflows[wfID] = struct{}{} seenNodes[normalizedP2PID] = struct{}{} @@ -660,6 +738,7 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol logger.Info(). Int("uniqueNodes", len(seenNodes)). Int("uniqueShardIndices", len(seenShardIndices)). + Interface("mismatchCounts", mismatchCounts). Msg("All workflows executed on correct shards") return executedWorkflows }