Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 82 additions & 10 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,72 @@ def get_active_apsel_hostlane(self):
active_apsel_code = self.xcvr_eeprom.read(consts.ACTIVE_APSEL_CODE)
return defaultdict(lambda: 'N/A') if not active_apsel_code else active_apsel_code

def _get_active_appsel_descriptor(self):
'''
Returns the application-advertisement descriptor for the currently
active AppSel on host lane 1.

Raises RuntimeError if the active AppSel cannot be determined --
callers must not silently fall back to the default-application
descriptor, since that defeats the purpose of resolving the
runtime-configured application.
'''
if self.is_flat_memory():
raise RuntimeError(
'Cannot determine active AppSel descriptor: '
'module reports flat memory (page 11h unavailable)')

lane1_field = "%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, 1)
active_apsel = self.xcvr_eeprom.read(lane1_field)
if not active_apsel:
raise RuntimeError(
'Cannot determine active AppSel descriptor: '
'read of %s returned %r' % (lane1_field, active_apsel))

appl_advt = self.get_application_advertisement()
descriptor = appl_advt.get(active_apsel)
if not descriptor:
raise RuntimeError(
'Cannot determine active AppSel descriptor: '
'AppSel %d has no entry in application advertisement %r'
% (active_apsel, appl_advt))

return active_apsel, descriptor

def get_active_appsel_host_lane_count(self):
'''
Returns the host lane count for the currently active application.

Reads the applied AppSel code from the DataPath Configuration
(Page 11h) and looks up the corresponding application descriptor.
Raises RuntimeError if the active AppSel cannot be determined or
the descriptor lacks a host_lane_count
'''
active_apsel, descriptor = self._get_active_appsel_descriptor()
if 'host_lane_count' not in descriptor:
raise RuntimeError(
'Cannot determine active AppSel host lane count: '
'AppSel %d descriptor %r has no host_lane_count'
% (active_apsel, descriptor))
return descriptor['host_lane_count']

def get_active_appsel_media_lane_count(self):
'''
Returns the media lane count for the currently active application.

Reads the applied AppSel code from the DataPath Configuration
(Page 11h) and looks up the corresponding application descriptor.
Raises RuntimeError if the active AppSel cannot be determined or
the descriptor lacks a media_lane_count
'''
active_apsel, descriptor = self._get_active_appsel_descriptor()
if 'media_lane_count' not in descriptor:
raise RuntimeError(
'Cannot determine active AppSel media lane count: '
'AppSel %d descriptor %r has no media_lane_count'
% (active_apsel, descriptor))
return descriptor['media_lane_count']

def get_tx_config_power(self):
'''
This function returns the configured TX output power. Unit in dBm
Expand All @@ -1196,7 +1262,7 @@ def get_media_output_loopback(self):
result = self.xcvr_eeprom.read(consts.MEDIA_OUTPUT_LOOPBACK)
if result is None:
return None
return result == 1
return bool(result)

def get_media_input_loopback(self):
'''
Expand All @@ -1205,7 +1271,7 @@ def get_media_input_loopback(self):
result = self.xcvr_eeprom.read(consts.MEDIA_INPUT_LOOPBACK)
if result is None:
return None
return result == 1
return bool(result)

def get_host_output_loopback(self):
'''
Expand Down Expand Up @@ -1496,7 +1562,8 @@ def set_host_input_loopback(self, lane_mask, enable):
logger.error('Host input loopback is not supported')
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
all_lanes_mask = (1 << self.get_active_appsel_host_lane_count()) - 1
if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != all_lanes_mask:
logger.error('Per-lane host input loopback is not supported, lane_mask:%#x', lane_mask)
return False

Expand Down Expand Up @@ -1537,7 +1604,8 @@ def set_host_output_loopback(self, lane_mask, enable):
logger.error('Host output loopback is not supported')
return False

if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != 0xff:
all_lanes_mask = (1 << self.get_active_appsel_host_lane_count()) - 1
if loopback_capability['per_lane_host_loopback_supported'] is False and lane_mask != all_lanes_mask:
logger.error('Per-lane host output loopback is not supported, lane_mask:%#x', lane_mask)
return False

Expand Down Expand Up @@ -1578,7 +1646,8 @@ def set_media_input_loopback(self, lane_mask, enable):
logger.error('Media input loopback is not supported')
return False

if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
all_lanes_mask = (1 << self.get_active_appsel_media_lane_count()) - 1
if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != all_lanes_mask:
logger.error('Per-lane media input loopback is not supported, lane_mask:%#x', lane_mask)
return False

Expand Down Expand Up @@ -1619,7 +1688,8 @@ def set_media_output_loopback(self, lane_mask, enable):
logger.error('Media output loopback is not supported')
return False

if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != 0xff:
all_lanes_mask = (1 << self.get_active_appsel_media_lane_count()) - 1
if loopback_capability['per_lane_media_loopback_supported'] is False and lane_mask != all_lanes_mask:
logger.error('Per-lane media output loopback is not supported, lane_mask:%#x', lane_mask)
return False

Expand Down Expand Up @@ -1663,11 +1733,13 @@ def set_loopback_mode(self, loopback_mode, lane_mask = 0xff, enable = False):
}

if loopback_mode == 'none':
host_all_lanes_mask = (1 << self.get_active_appsel_host_lane_count()) - 1
media_all_lanes_mask = (1 << self.get_active_appsel_media_lane_count()) - 1
return all([
self.set_host_input_loopback(0xff, False),
self.set_host_output_loopback(0xff, False),
self.set_media_input_loopback(0xff, False),
self.set_media_output_loopback(0xff, False)
self.set_host_input_loopback(host_all_lanes_mask, False),
self.set_host_output_loopback(host_all_lanes_mask, False),
self.set_media_input_loopback(media_all_lanes_mask, False),
self.set_media_output_loopback(media_all_lanes_mask, False)
])

func = loopback_functions.get(loopback_mode)
Expand Down
Loading
Loading