diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmis.py b/sonic_platform_base/sonic_xcvr/api/public/cmis.py index 9e0784a45..a4e923b42 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmis.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmis.py @@ -1183,6 +1183,72 @@ def get_active_apsel_hostlane(self): active_apsel_code = self.xcvr_eeprom.read(consts.ACTIVE_APSEL_CODE) return defaultdict(lambda: 'N/A') if not active_apsel_code else active_apsel_code + def _get_active_appsel_descriptor(self): + ''' + Returns the application-advertisement descriptor for the currently + active AppSel on host lane 1. + + Raises RuntimeError if the active AppSel cannot be determined -- + callers must not silently fall back to the default-application + descriptor, since that defeats the purpose of resolving the + runtime-configured application. + ''' + if self.is_flat_memory(): + raise RuntimeError( + 'Cannot determine active AppSel descriptor: ' + 'module reports flat memory (page 11h unavailable)') + + lane1_field = "%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, 1) + active_apsel = self.xcvr_eeprom.read(lane1_field) + if not active_apsel: + raise RuntimeError( + 'Cannot determine active AppSel descriptor: ' + 'read of %s returned %r' % (lane1_field, active_apsel)) + + appl_advt = self.get_application_advertisement() + descriptor = appl_advt.get(active_apsel) + if not descriptor: + raise RuntimeError( + 'Cannot determine active AppSel descriptor: ' + 'AppSel %d has no entry in application advertisement %r' + % (active_apsel, appl_advt)) + + return active_apsel, descriptor + + def get_active_appsel_host_lane_count(self): + ''' + Returns the host lane count for the currently active application. + + Reads the applied AppSel code from the DataPath Configuration + (Page 11h) and looks up the corresponding application descriptor. + Raises RuntimeError if the active AppSel cannot be determined or + the descriptor lacks a host_lane_count + ''' + active_apsel, descriptor = self._get_active_appsel_descriptor() + if 'host_lane_count' not in descriptor: + raise RuntimeError( + 'Cannot determine active AppSel host lane count: ' + 'AppSel %d descriptor %r has no host_lane_count' + % (active_apsel, descriptor)) + return descriptor['host_lane_count'] + + def get_active_appsel_media_lane_count(self): + ''' + Returns the media lane count for the currently active application. + + Reads the applied AppSel code from the DataPath Configuration + (Page 11h) and looks up the corresponding application descriptor. + Raises RuntimeError if the active AppSel cannot be determined or + the descriptor lacks a media_lane_count + ''' + active_apsel, descriptor = self._get_active_appsel_descriptor() + if 'media_lane_count' not in descriptor: + raise RuntimeError( + 'Cannot determine active AppSel media lane count: ' + 'AppSel %d descriptor %r has no media_lane_count' + % (active_apsel, descriptor)) + return descriptor['media_lane_count'] + def get_tx_config_power(self): ''' This function returns the configured TX output power. Unit in dBm @@ -1196,7 +1262,7 @@ def get_media_output_loopback(self): result = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK) if result is None: return None - return result == 1 + return bool(result) def get_media_input_loopback(self): ''' @@ -1205,7 +1271,7 @@ def get_media_input_loopback(self): result = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK) if result is None: return None - return result == 1 + return bool(result) def get_host_output_loopback(self): ''' @@ -1496,7 +1562,8 @@ def set_host_input_loopback(self, lane_mask, enable): logger.error('Host input loopback is not supported') return False - if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff: + all_lanes_mask = (1 << self.get_active_appsel_host_lane_count()) - 1 + if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != all_lanes_mask: logger.error('Per-lane host input loopback is not supported, lane_mask:%#x', lane_mask) return False @@ -1537,7 +1604,8 @@ def set_host_output_loopback(self, lane_mask, enable): logger.error('Host output loopback is not supported') return False - if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff: + all_lanes_mask = (1 << self.get_active_appsel_host_lane_count()) - 1 + if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != all_lanes_mask: logger.error('Per-lane host output loopback is not supported, lane_mask:%#x', lane_mask) return False @@ -1578,7 +1646,8 @@ def set_media_input_loopback(self, lane_mask, enable): logger.error('Media input loopback is not supported') return False - if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff: + all_lanes_mask = (1 << self.get_active_appsel_media_lane_count()) - 1 + if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != all_lanes_mask: logger.error('Per-lane media input loopback is not supported, lane_mask:%#x', lane_mask) return False @@ -1619,7 +1688,8 @@ def set_media_output_loopback(self, lane_mask, enable): logger.error('Media output loopback is not supported') return False - if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff: + all_lanes_mask = (1 << self.get_active_appsel_media_lane_count()) - 1 + if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != all_lanes_mask: logger.error('Per-lane media output loopback is not supported, lane_mask:%#x', lane_mask) return False @@ -1663,11 +1733,13 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff, enable = False): } if loopback_mode == 'none': + host_all_lanes_mask = (1 << self.get_active_appsel_host_lane_count()) - 1 + media_all_lanes_mask = (1 << self.get_active_appsel_media_lane_count()) - 1 return all([ - self.set_host_input_loopback(0xff, False), - self.set_host_output_loopback(0xff, False), - self.set_media_input_loopback(0xff, False), - self.set_media_output_loopback(0xff, False) + self.set_host_input_loopback(host_all_lanes_mask, False), + self.set_host_output_loopback(host_all_lanes_mask, False), + self.set_media_input_loopback(media_all_lanes_mask, False), + self.set_media_output_loopback(media_all_lanes_mask, False) ]) func = loopback_functions.get(loopback_mode) diff --git a/tests/sonic_xcvr/test_cmis.py b/tests/sonic_xcvr/test_cmis.py index c00848bb9..02cb11afc 100755 --- a/tests/sonic_xcvr/test_cmis.py +++ b/tests/sonic_xcvr/test_cmis.py @@ -1174,6 +1174,107 @@ def test_get_active_apsel_hostlane(self, mock_response, expected): result = self.api.get_active_apsel_hostlane() assert result == expected + @pytest.mark.parametrize( + "is_flat_memory, active_apsel, appl_advt, expected", + [ + # AppSel 2 is the configured 4-lane application. + (False, 2, + {1: {'host_lane_count': 1}, 2: {'host_lane_count': 4}}, + 4), + # AppSel 1 is the configured 1-lane application. + (False, 1, + {1: {'host_lane_count': 1}, 2: {'host_lane_count': 4}}, + 1), + ] + ) + def test_get_active_appsel_host_lane_count( + self, is_flat_memory, active_apsel, appl_advt, expected + ): + # patch.object as a context manager so mocks don't leak onto self.api + # (which is a class-shared instance) and break later tests like + # test_get_application_advertisement. + self.api.xcvr_eeprom.read = MagicMock(return_value=active_apsel) + with patch.object(self.api, 'is_flat_memory', return_value=is_flat_memory), \ + patch.object(self.api, 'get_application_advertisement', return_value=appl_advt): + assert self.api.get_active_appsel_host_lane_count() == expected + + @pytest.mark.parametrize( + "is_flat_memory, active_apsel, appl_advt", + [ + # Flat memory: page 11h is unreadable. + (True, None, {}), + # AppSel read returns None. + (False, None, + {1: {'host_lane_count': 1}, 2: {'host_lane_count': 4}}), + # AppSel read returns 0 (no app active). + (False, 0, + {1: {'host_lane_count': 1}, 2: {'host_lane_count': 4}}), + # AppSel code has no matching descriptor. + (False, 5, + {1: {'host_lane_count': 1}, 2: {'host_lane_count': 4}}), + # Descriptor present but missing host_lane_count. + (False, 2, + {1: {'host_lane_count': 1}, 2: {}}), + ] + ) + def test_get_active_appsel_host_lane_count_raises( + self, is_flat_memory, active_apsel, appl_advt + ): + self.api.xcvr_eeprom.read = MagicMock(return_value=active_apsel) + with patch.object(self.api, 'is_flat_memory', return_value=is_flat_memory), \ + patch.object(self.api, 'get_application_advertisement', return_value=appl_advt): + with pytest.raises(RuntimeError): + self.api.get_active_appsel_host_lane_count() + + @pytest.mark.parametrize( + "is_flat_memory, active_apsel, appl_advt, expected", + [ + # AppSel 2 is the configured 4-lane media application. + (False, 2, + {1: {'media_lane_count': 1}, 2: {'media_lane_count': 4}}, + 4), + # AppSel 1 is the configured 1-lane media application. + (False, 1, + {1: {'media_lane_count': 1}, 2: {'media_lane_count': 4}}, + 1), + ] + ) + def test_get_active_appsel_media_lane_count( + self, is_flat_memory, active_apsel, appl_advt, expected + ): + self.api.xcvr_eeprom.read = MagicMock(return_value=active_apsel) + with patch.object(self.api, 'is_flat_memory', return_value=is_flat_memory), \ + patch.object(self.api, 'get_application_advertisement', return_value=appl_advt): + assert self.api.get_active_appsel_media_lane_count() == expected + + @pytest.mark.parametrize( + "is_flat_memory, active_apsel, appl_advt", + [ + # Flat memory: page 11h is unreadable. + (True, None, {}), + # AppSel read returns None. + (False, None, + {1: {'media_lane_count': 1}, 2: {'media_lane_count': 4}}), + # AppSel read returns 0 (no app active). + (False, 0, + {1: {'media_lane_count': 1}, 2: {'media_lane_count': 4}}), + # AppSel code has no matching descriptor. + (False, 5, + {1: {'media_lane_count': 1}, 2: {'media_lane_count': 4}}), + # Descriptor present but missing media_lane_count. + (False, 2, + {1: {'media_lane_count': 1}, 2: {}}), + ] + ) + def test_get_active_appsel_media_lane_count_raises( + self, is_flat_memory, active_apsel, appl_advt + ): + self.api.xcvr_eeprom.read = MagicMock(return_value=active_apsel) + with patch.object(self.api, 'is_flat_memory', return_value=is_flat_memory), \ + patch.object(self.api, 'get_application_advertisement', return_value=appl_advt): + with pytest.raises(RuntimeError): + self.api.get_active_appsel_media_lane_count() + @pytest.mark.parametrize("mock_response, expected", [ (-10, -10) ]) @@ -1395,37 +1496,44 @@ def test_get_loopback_capability(self, mock_response, expected): result = self.api.get_loopback_capability() assert result == expected - @pytest.mark.parametrize("input_param, mock_response, expected",[ - ([0xf, True], None, False), + @pytest.mark.parametrize("input_param, mock_response, host_lane_count, expected",[ + ([0xf, True], None, 8, False), ([0xf, True], { 'host_side_input_loopback_supported': False, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'host_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': False, - }, False), + }, 8, False), ([0xf, True], { 'host_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': False, 'per_lane_host_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'host_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - }, True), + }, 8, True), ([0xf, False], { 'host_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - }, True), + }, 8, True), + # 4-lane module: 0x0f is the all-lanes mask, so per_lane=False must succeed. + ([0x0f, True], { + 'host_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': False, + }, 4, True), ]) - def test_set_host_input_loopback(self, input_param, mock_response, expected): + def test_set_host_input_loopback(self, input_param, mock_response, host_lane_count, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response + self.api.get_active_appsel_host_lane_count = MagicMock(return_value=host_lane_count) self.api.xcvr_eeprom.read = MagicMock() self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] self.api.xcvr_eeprom.write = MagicMock() @@ -1433,37 +1541,44 @@ def test_set_host_input_loopback(self, input_param, mock_response, expected): result = self.api.set_host_input_loopback(input_param[0], input_param[1]) assert result == expected - @pytest.mark.parametrize("input_param, mock_response, expected",[ - ([0xf, True], None, False), + @pytest.mark.parametrize("input_param, mock_response, host_lane_count, expected",[ + ([0xf, True], None, 8, False), ([0xf, True], { 'host_side_output_loopback_supported': False, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'host_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': False, - }, False), + }, 8, False), ([0xf, True], { 'host_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': False, 'per_lane_host_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'host_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - }, True), + }, 8, True), ([0xf, False], { 'host_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_host_loopback_supported': True, - }, True), + }, 8, True), + # 4-lane module: 0x0f is the all-lanes mask, so per_lane=False must succeed. + ([0x0f, True], { + 'host_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_host_loopback_supported': False, + }, 4, True), ]) - def test_set_host_output_loopback(self, input_param, mock_response, expected): + def test_set_host_output_loopback(self, input_param, mock_response, host_lane_count, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response + self.api.get_active_appsel_host_lane_count = MagicMock(return_value=host_lane_count) self.api.xcvr_eeprom.read = MagicMock() self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] self.api.xcvr_eeprom.write = MagicMock() @@ -1471,37 +1586,44 @@ def test_set_host_output_loopback(self, input_param, mock_response, expected): result = self.api.set_host_output_loopback(input_param[0], input_param[1]) assert result == expected - @pytest.mark.parametrize("input_param, mock_response, expected",[ - ([0xf, True], None, False), + @pytest.mark.parametrize("input_param, mock_response, media_lane_count, expected",[ + ([0xf, True], None, 8, False), ([0xf, True], { 'media_side_input_loopback_supported': False, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'media_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': False, - }, False), + }, 8, False), ([0xf, True], { 'media_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': False, 'per_lane_media_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'media_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': True, - }, True), + }, 8, True), ([0xf, False], { 'media_side_input_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': True, - }, True), + }, 8, True), + # 4-lane module: 0x0f is the all-lanes mask, so per_lane=False must succeed. + ([0x0f, True], { + 'media_side_input_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': False, + }, 4, True), ]) - def test_set_media_input_loopback(self, input_param, mock_response, expected): + def test_set_media_input_loopback(self, input_param, mock_response, media_lane_count, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response + self.api.get_active_appsel_media_lane_count = MagicMock(return_value=media_lane_count) self.api.xcvr_eeprom.read = MagicMock() self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] self.api.xcvr_eeprom.write = MagicMock() @@ -1509,37 +1631,44 @@ def test_set_media_input_loopback(self, input_param, mock_response, expected): result = self.api.set_media_input_loopback(input_param[0], input_param[1]) assert result == expected - @pytest.mark.parametrize("input_param, mock_response, expected",[ - ([0xf, True], None, False), + @pytest.mark.parametrize("input_param, mock_response, media_lane_count, expected",[ + ([0xf, True], None, 8, False), ([0xf, True], { 'media_side_output_loopback_supported': False, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': False, - }, False), + }, 8, False), ([0xf, True], { 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': False, 'per_lane_media_loopback_supported': True, - }, False), + }, 8, False), ([0xf, True], { 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': True, - }, True), + }, 8, True), ([0xf, False], { 'media_side_output_loopback_supported': True, 'simultaneous_host_media_loopback_supported': True, 'per_lane_media_loopback_supported': True, - }, True), + }, 8, True), + # 4-lane module: 0x0f is the all-lanes mask, so per_lane=False must succeed. + ([0x0f, True], { + 'media_side_output_loopback_supported': True, + 'simultaneous_host_media_loopback_supported': True, + 'per_lane_media_loopback_supported': False, + }, 4, True), ]) - def test_set_media_output_loopback(self, input_param, mock_response, expected): + def test_set_media_output_loopback(self, input_param, mock_response, media_lane_count, expected): self.api.get_loopback_capability = MagicMock() self.api.get_loopback_capability.return_value = mock_response + self.api.get_active_appsel_media_lane_count = MagicMock(return_value=media_lane_count) self.api.xcvr_eeprom.read = MagicMock() self.api.xcvr_eeprom.read.side_effect = [0x0f,0x0f] self.api.xcvr_eeprom.write = MagicMock() @@ -1569,6 +1698,8 @@ def test_set_loopback_mode(self, input_param, mock_response, expected): self.api.set_media_input_loopback.return_value = mock_response self.api.set_media_output_loopback = MagicMock() self.api.set_media_output_loopback.return_value = mock_response + self.api.get_active_appsel_host_lane_count = MagicMock(return_value=8) + self.api.get_active_appsel_media_lane_count = MagicMock(return_value=8) result = self.api.set_loopback_mode(input_param[0], input_param[1]) assert result == expected