Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 39 additions & 4 deletions system-tests/tests/smoke/cre/v2_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ func validateShardingScaleScenario(t *testing.T, testEnv *ttypes.TestEnvironment
Interface("distribution", shardCounts).
Msg("Real workflows distributed across 2 shards after scaling")

cronPeriod := 30 * time.Second
logger.Info().Dur("cronPeriod", cronPeriod).Msg("Step 8b: Waiting one cron period for straggler executions to drain")
time.Sleep(cronPeriod)

// Re-snapshot mappings after the barrier so we use the post-stable state.
resp, err = shardOrchClient.GetWorkflowShardMapping(ctx, &ringpb.GetWorkflowShardMappingRequest{
WorkflowIds: workflowIDs,
})
require.NoError(t, err)
require.NotNil(t, resp)

logger.Info().Msg("Step 9: Verify all workflows execute on their assigned shards via ChIP test sink")
workflowToShardIndex := resp.Mappings
nodeP2PIDToShardIndex := buildNodeP2PIDToShardIndex(t, testEnv)
Expand Down Expand Up @@ -555,9 +566,14 @@ func waitForWorkflowsDistributed(t *testing.T, client ringpb.ShardOrchestratorSe
for _, shardID := range resp.Mappings {
shardsSeen[shardID] = true
}
framework.L.Info().Int("shardsUsed", len(shardsSeen)).Int("minShards", minShards).Msg("Waiting for distribution")
return len(shardsSeen) >= minShards
}, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards within timeout", minShards)
steady := resp.GetRoutingSteady()
framework.L.Info().
Int("shardsUsed", len(shardsSeen)).
Int("minShards", minShards).
Bool("routingSteady", steady).
Msg("Waiting for distribution + steady")
return len(shardsSeen) >= minShards && steady
}, 2*time.Minute, 5*time.Second, "Workflows not distributed across %d shards (with RoutingSteady) within timeout", minShards)
}

func buildNodeP2PIDToShardIndex(t *testing.T, testEnv *ttypes.TestEnvironment) map[string]uint32 {
Expand Down Expand Up @@ -606,6 +622,7 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol
executedWorkflows := make(map[string]struct{})
seenNodes := make(map[string]struct{})
seenShardIndices := make(map[uint32]struct{})
mismatchCounts := make(map[string]int)

timeoutCh := time.After(timeout)
for {
Expand All @@ -616,7 +633,13 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol
logger.Warn().
Int("executed", len(executedWorkflows)).
Int("expected", len(expectedWorkflows)).
Interface("mismatchCounts", mismatchCounts).
Msg("Timeout waiting for all workflows to execute")
for wfID, count := range mismatchCounts {
if _, confirmed := executedWorkflows[wfID]; !confirmed {
t.Errorf("Workflow %s saw %d log(s) on wrong shard but never on expected shard %d", wfID, count, workflowToShardIndex[wfID])
}
}
return executedWorkflows
case userLogs := <-userLogsCh:
if userLogs.M == nil {
Expand All @@ -640,7 +663,18 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol
actualShardIndex, knownNode := nodeP2PIDToShardIndex[normalizedP2PID]
require.True(t, knownNode, "Workflow %s executed on unknown node %s", wfID, userLogs.M.P2PID)
expectedShardIndex := workflowToShardIndex[wfID]
require.Equal(t, expectedShardIndex, actualShardIndex, "Workflow %s executed on shard index %d but expected %d (node %s)", wfID, actualShardIndex, expectedShardIndex, userLogs.M.P2PID)

if actualShardIndex != expectedShardIndex {
mismatchCounts[wfID]++
logger.Warn().
Str("workflowID", wfID).
Str("p2pID", normalizedP2PID).
Uint32("actualShard", actualShardIndex).
Uint32("expectedShard", expectedShardIndex).
Int("mismatchCount", mismatchCounts[wfID]).
Msg("Stale log from wrong shard, waiting for correct shard execution")
continue
}

executedWorkflows[wfID] = struct{}{}
seenNodes[normalizedP2PID] = struct{}{}
Expand All @@ -660,6 +694,7 @@ func waitForAllWorkflowsExecuted(ctx context.Context, t *testing.T, logger zerol
logger.Info().
Int("uniqueNodes", len(seenNodes)).
Int("uniqueShardIndices", len(seenShardIndices)).
Interface("mismatchCounts", mismatchCounts).
Msg("All workflows executed on correct shards")
return executedWorkflows
}
Expand Down
Loading