diff --git a/setup.py b/setup.py index ad28151cc..09b895cb1 100644 --- a/setup.py +++ b/setup.py @@ -43,14 +43,18 @@ 'sonic_platform_base.sonic_xcvr', 'sonic_platform_base.sonic_xcvr.fields', 'sonic_platform_base.sonic_xcvr.fields.public', + 'sonic_platform_base.sonic_xcvr.fields.bailly', 'sonic_platform_base.sonic_xcvr.mem_maps', 'sonic_platform_base.sonic_xcvr.mem_maps.public', + 'sonic_platform_base.sonic_xcvr.mem_maps.bailly', 'sonic_platform_base.sonic_xcvr.mem_maps.public.cmis', 'sonic_platform_base.sonic_xcvr.mem_maps.public.cmis.pages', 'sonic_platform_base.sonic_xcvr.api', 'sonic_platform_base.sonic_xcvr.api.public', + 'sonic_platform_base.sonic_xcvr.api.bailly', 'sonic_platform_base.sonic_xcvr.codes', 'sonic_platform_base.sonic_xcvr.codes.public', + 'sonic_platform_base.sonic_xcvr.codes.bailly', 'sonic_platform_base.sonic_xcvr.utils', 'sonic_platform_base.sonic_xcvr.api.credo', 'sonic_platform_base.sonic_xcvr.mem_maps.credo', diff --git a/sonic_platform_base/sonic_xcvr/api/bailly/__init__.py b/sonic_platform_base/sonic_xcvr/api/bailly/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sonic_platform_base/sonic_xcvr/api/bailly/bailly_api.py b/sonic_platform_base/sonic_xcvr/api/bailly/bailly_api.py new file mode 100644 index 000000000..7c1a2d450 --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/api/bailly/bailly_api.py @@ -0,0 +1,105 @@ +""" + bailly_api.py + + Implementation of Micas Bailly CPO specific in addition to the CMIS specification. +""" +from ..public.cmis import CmisApi +from ...fields.bailly import bailly_consts + +class BaillyApi(CmisApi): + def __init__(self, xcvr_eeprom): + super(BaillyApi, self).__init__(xcvr_eeprom) + + def get_dpinit_pending(self): + ''' + Bailly not supported, return fake value. + ''' + dpinit_pending_dict = {} + for lane in range(self.NUM_CHANNELS): + key = "DPInitPending{}".format(lane + 1) + dpinit_pending_dict[key] = True + return dpinit_pending_dict + + def get_active_apsel_hostlane(self): + ''' + Bailly not supported Deinit, if it is deinit return fake value. + ''' + has_zero = False + current_map = {} + for lane in range(self.NUM_CHANNELS): + lane_key = 'ActiveAppSelLane{}'.format(lane + 1) + app_lane = self.get_application(lane) + current_map[lane_key] = app_lane + if app_lane == 0: + has_zero = True + + if has_zero: + return current_map + else: + normal = super().get_active_apsel_hostlane() + return normal + def _format_revision(self, revision): + if revision is None: + return None + return "{}.{}".format((revision >> 4) & 0xf, revision & 0xf) + + def get_transceiver_info(self): + info = super().get_transceiver_info() + if info is None: + return None + + els_info = self.get_els_info() + cpo_info = els_info.get("cpo_info") + vendor_info = els_info.get("vendor_info") + laser_power_mode = els_info.get("laser_power_mode") + if cpo_info is None and vendor_info is None and laser_power_mode is None: + return info + + if cpo_info is not None: + info.update({ + "els_identifier": cpo_info.get(bailly_consts.CPO_IDENTIFIER), + "els_revision": self._format_revision(cpo_info.get(bailly_consts.CPO_REVISION)), + "els_laser_grid_and_count": cpo_info.get(bailly_consts.LASER_GRID_AND_COUNT), + "els_laser_wavelength_grid": cpo_info.get(bailly_consts.LASER_WAVELENGTH_GRID), + "els_laser_count": cpo_info.get(bailly_consts.LASER_COUNT), + }) + + if vendor_info is not None: + info.update({ + "els_vendor_name": self._strip_str( + vendor_info.get(bailly_consts.VENDOR_NAME_ASCII_FIELD) + ), + "els_vendor_oui": vendor_info.get(bailly_consts.VENDOR_OUI_HEX_FIELD), + "els_vendor_pn": self._strip_str( + vendor_info.get(bailly_consts.VENDOR_PART_NUMBER_ASCII_FIELD) + ), + "els_vendor_rev": self._strip_str( + vendor_info.get(bailly_consts.VENDOR_REVISION_ASCII_FIELD) + ), + "els_vendor_sn": self._strip_str( + vendor_info.get(bailly_consts.VENDOR_SERIAL_NUMBER_ASCII_FIELD) + ), + "els_date_code": self._strip_str( + vendor_info.get(bailly_consts.DATE_CODE_FIELD) + ), + "els_max_power": vendor_info.get(bailly_consts.MAX_POWER_CONSUMPTION_FIELD), + }) + + if laser_power_mode is not None: + info.update({ + "els_laser_power_mode_control": laser_power_mode.get( + bailly_consts.LASER_POWER_MODE_CONTROL_BITS_FIELD + ), + }) + + return info + + def get_els_vendor_info(self): + return self.xcvr_eeprom.read(bailly_consts.CPO_VENDOR_INFO_FIELD) + + def get_els_info(self): + return { + "cpo_info": self.xcvr_eeprom.read(bailly_consts.CPO_INFO_FIELD), + "vendor_info": self.get_els_vendor_info(), + "laser_power_mode": self.xcvr_eeprom.read(bailly_consts.LASER_POWER_MODE_CONTROL_FIELD), + } diff --git a/sonic_platform_base/sonic_xcvr/bailly_optoe_base.py b/sonic_platform_base/sonic_xcvr/bailly_optoe_base.py new file mode 100644 index 000000000..0576717ea --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/bailly_optoe_base.py @@ -0,0 +1,176 @@ +""" + cpo_optoe_base.py + + Platform-independent class with which to interact with a cpo module + in SONiC +""" +import abc +import os +import json +from .sfp_optoe_base import SfpOptoeBase +from sonic_py_common.device_info import get_platform, get_path_to_platform_dir + +CPO_JSON_FILE = "cpo.json" +def get_cpo_json_data(): + """ + Retrieve the data from cpo.json file + + Returns: + A dictionary containing the key/value pairs as found in the cpo.json file + """ + platform = get_platform() + if not platform: + return None + + platform_path = get_path_to_platform_dir() + if not platform_path: + return None + + platform_json = os.path.join(platform_path, CPO_JSON_FILE) + if not os.path.isfile(platform_json): + return None + + try: + with open(platform_json, 'r') as f: + cpo_data = json.loads(f.read()) + return cpo_data + except (json.JSONDecodeError, IOError, TypeError, ValueError): + return None + +class CpoOptoeBase(SfpOptoeBase): + def __init__(self): + super().__init__() + self._port_id = -1 + self._oe_bank_id = -1 + self._oe_id = -1 + self._els_id = -1 + self._els_bank_id = -1 + + def get_oe_eeprom_path(self): + oes_cfg = self.get_oes_config() or {} + cpo_bus = oes_cfg.get("oe_cmis_path") + return (cpo_bus + "eeprom") if cpo_bus else None + + def get_eeprom_path(self): + return self.get_oe_eeprom_path() + + def get_oes_config(self): + key = f"oe{self._oe_id}" + cpo_data = get_cpo_json_data() or {} + return (cpo_data.get("oes") or {}).get(key) + + def get_elss_config(self): + key = f"els{self._els_id}" + cpo_data = get_cpo_json_data() or {} + return (cpo_data.get("elss") or {}).get(key) + + def read_eeprom(self, offset, num_bytes): + sys_path = self.get_eeprom_path() + if not sys_path: + return None + try: + with open(sys_path, mode='rb', buffering=0) as f: + f.seek(offset) + return bytearray(f.read(num_bytes)) + except (OSError, IOError, TypeError): + return None + + def write_eeprom(self, offset, num_bytes, write_buffer): + sys_path = self.get_eeprom_path() + if not sys_path: + return False + try: + with open(sys_path, mode='r+b', buffering=0) as f: + f.seek(offset) + f.write(write_buffer[0:num_bytes]) + except (OSError, IOError, TypeError): + return False + return True + + def get_els_presence(self): + try: + elss_cfg = self.get_elss_config() or {} + els_presence = elss_cfg.get("els_presence") or {} + + els_presence_file = els_presence.get("presence_file") + presence_offset = int(els_presence.get("presence_offset", "0"), 16) + presence_len = int(els_presence.get("presence_len", 0)) + presence_bit = int(els_presence.get("presence_bit", 0)) + presence_value = int(els_presence.get("presence_value", 1)) + + if not els_presence_file or presence_len <= 0: + return False + + with open(els_presence_file, mode='rb', buffering=0) as f: + f.seek(presence_offset) + raw = bytearray(f.read(presence_len)) + int_value = int.from_bytes(raw, byteorder='little') + return ((int_value >> presence_bit) & 1) == presence_value + except (OSError, IOError, TypeError, ValueError, AttributeError): + return False + + def get_presence(self): + return self.get_els_presence() + + def get_els_base_page(self): + elss_cfg = self.get_elss_config() or {} + return int(elss_cfg.get("base_page", 0)) + + def get_oe_bank_id(self): + return self._oe_bank_id + def get_oe_id(self): + return self._oe_id + def get_els_bank_id(self): + return self._els_bank_id + def get_els_id(self): + return self._els_id + + @abc.abstractmethod + def check_fiber_dirty(self): + """ + Check whether the fiber is dirty. True:check ok; False: check failed + """ + raise NotImplementedError + + @abc.abstractmethod + def check_calibration(self): + """ + Check whether the calibration such as oe power sufficient. True:check ok; False: check failed + """ + raise NotImplementedError + + @abc.abstractmethod + def is_els_power_sufficient(self): + """ + Check whether els power sufficient. + """ + raise NotImplementedError + + @abc.abstractmethod + def is_calibration_checked(self): + """ + Check whether the calibration such as oe power sufficient detection​ has been completed. + """ + raise NotImplementedError + + @abc.abstractmethod + def is_fiber_checked(self): + """ + Check whether the fiber detection​ has been completed. + """ + raise NotImplementedError + + @abc.abstractmethod + def is_els_tx_on(self): + """ + Check the ELS TX​ status to see if it is emitting light normally. + """ + raise NotImplementedError + + @abc.abstractmethod + def is_els_tx_enabled(self): + """ + Check whether the ELS TX enable​ has been set. + """ + raise NotImplementedError + diff --git a/sonic_platform_base/sonic_xcvr/codes/bailly/__init__.py b/sonic_platform_base/sonic_xcvr/codes/bailly/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sonic_platform_base/sonic_xcvr/codes/bailly/bailly_codes.py b/sonic_platform_base/sonic_xcvr/codes/bailly/bailly_codes.py new file mode 100644 index 000000000..353cb4150 --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/codes/bailly/bailly_codes.py @@ -0,0 +1,66 @@ +from ..public.cmis import CmisCodes + +class BaillyCodes(CmisCodes): + # Vendor specific implementation to be added here + XCVR_IDENTIFIERS = { + **CmisCodes.XCVR_IDENTIFIERS, + 128: 'CPO Bailly', + } + + XCVR_IDENTIFIER_ABBRV = { + **CmisCodes.XCVR_IDENTIFIER_ABBRV, + 128: 'QSFP-DD', + } + + HOST_ELECTRICAL_INTERFACE = { + **CmisCodes.HOST_ELECTRICAL_INTERFACE, + 253: 'Bailly-Reserved-1', + 254: 'Bailly-Reserved-2', + } + + SM_MEDIA_INTERFACE = { + **CmisCodes.SM_MEDIA_INTERFACE, + 193: 'Bailly-800G-2xFR4', + 253: 'Bailly-Reserved-LC-1', + 254: 'Bailly-Reserved-LC-2', + } + + LASER_WAVELENGTH_GRID = { + 0: "CWDM4", + 1: "DR4", + } + + LASER_COUNT = { + code: code + 1 for code in range(16) + } + + POWER_MODE = { + 0: "High power mode", + 1: "Low power mode", + } + + INTERRUPT_STATUS = { + 0: "Interrupt event occurred", + 1: "Interrupt event cleared", + } + + LASER_DISABLE_CONTROL = { + 0: "Enable", + 1: "Disable", + } + + LASER_ACTIVE_STATUS = { + 0: "Inactive", + 1: "Active", + } + + LASER_POWER_MODE_ENABLE = { + 0: "Disable", + 1: "Enable", + } + + MAX_BANKS_SUPPORTED = { + **CmisCodes.MAX_BANKS_SUPPORTED, + 3: 8, + } + diff --git a/sonic_platform_base/sonic_xcvr/fields/bailly/__init__.py b/sonic_platform_base/sonic_xcvr/fields/bailly/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sonic_platform_base/sonic_xcvr/fields/bailly/bailly_consts.py b/sonic_platform_base/sonic_xcvr/fields/bailly/bailly_consts.py new file mode 100644 index 000000000..a85c94371 --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/fields/bailly/bailly_consts.py @@ -0,0 +1,124 @@ +from ...fields.consts import * # noqa: F401,F403 + +DIAG_BANK_SIZE_SUPPORT_ADVT_FIELD = "BankSizeSupport" + +# Bailly-specific Page B0 fields +CPO_INFO_FIELD = "CpoInfo" +CPO_IDENTIFIER = "CpoIdentifier" +CPO_REVISION = "CpoRevision" + +LASER_CONTROL_FIELD = "LaserControl" +LASER_GRID_AND_COUNT = "LaserGridAndCount" +LASER_WAVELENGTH_GRID = "LaserWavelengthGrid" +LASER_COUNT = "LaserCount" +MODULE_LOW_POWER_CONTROL = "ModuleLowPowerControl" +LASER_DISABLE_CONTROL_FIELD = "LaserDisableControl" +LASER_DISABLE_CONTROL_7_0 = "LaserDisableControl7_0" +LASER_DISABLE_CONTROL_15_8 = "LaserDisableControl15_8" + +LASER_STATUS_FIELD = "LaserStatus" +MODULE_STATE_AND_INTERRUPT = "ModuleStateAndInterrupt" +MODULE_LOW_POWER_STATE = "ModuleLowPowerState" +INTL_INTERRUPT_STATUS = "IntLInterruptStatus" +LASER_ACTIVE_STATUS_FIELD = "LaserActiveStatus" +LASER_ACTIVE_STATUS_7_0 = "LaserActiveStatus7_0" +LASER_ACTIVE_STATUS_15_8 = "LaserActiveStatus15_8" + +MODULE_ALARMS_FIELD = "ModuleAlarms" +VCC_3V3_TEMPERATURE_WARNING_ALARM = "Vcc3V3TemperatureWarningAlarm" +TEMP_HIGH_ALARM_FLAG = "TempHighAlarmFlag" +TEMP_LOW_ALARM_FLAG = "TempLowAlarmFlag" +TEMP_HIGH_WARN_FLAG = "TempHighWarnFlag" +TEMP_LOW_WARN_FLAG = "TempLowWarnFlag" +VOLTAGE_HIGH_ALARM_FLAG = "VoltageHighAlarmFlag" +VOLTAGE_LOW_ALARM_FLAG = "VoltageLowAlarmFlag" +VOLTAGE_HIGH_WARN_FLAG = "VoltageHighWarnFlag" +VOLTAGE_LOW_WARN_FLAG = "VoltageLowWarnFlag" +LASER_BIAS_WARNING_FIELD = "LaserBiasWarning" +LASER_BIAS_WARNING_7_0 = "LaserBiasWarning7_0" +LASER_BIAS_WARNING_15_8 = "LaserBiasWarning15_8" +LASER_BIAS_ALARM_FIELD = "LaserBiasAlarm" +LASER_BIAS_ALARM_7_0 = "LaserBiasAlarm7_0" +LASER_BIAS_ALARM_15_8 = "LaserBiasAlarm15_8" + +CPO_MODULE_MONITORS_FIELD = "CpoModuleMonitors" +MODULE_TEMPERATURE_MONITOR = "ModuleTemperatureMonitor" +MODULE_SUPPLY_VOLTAGE_MONITOR = "ModuleSupplyVoltageMonitor" +LASER_CURRENT_MONITOR_FIELD = "LaserCurrentMonitor" +LASER_VOLTAGE_MONITOR_FIELD = "LaserVoltageMonitor" +LASER_OPTICAL_POWER_MONITOR_FIELD = "LaserOpticalPowerMonitor" +TEC_CURRENT_MONITOR = "TecCurrentMonitor" + +CPO_VENDOR_INFO_FIELD = "CpoVendorInfo" +VENDOR_NAME_ASCII_FIELD = "VendorNameAscii" +VENDOR_OUI_HEX_FIELD = "VendorOUIHex" +VENDOR_PART_NUMBER_ASCII_FIELD = "VendorPartNumberAscii" +VENDOR_REVISION_ASCII_FIELD = "VendorRevisionAscii" +VENDOR_SERIAL_NUMBER_ASCII_FIELD = "VendorSerialNumberAscii" +DATE_CODE_YY_FIELD = "DateCodeYY" +DATE_CODE_MM_FIELD = "DateCodeMM" +DATE_CODE_DD_FIELD = "DateCodeDD" +DATE_CODE_LOT_FIELD = "DateCodeLot" +DATE_CODE_FIELD = "DateCode" +MAX_POWER_CONSUMPTION_FIELD = "MaximumPowerConsumption" +CHECKSUM_FIELD = "Checksum" + +LASER_POWER_MODE_CONTROL_FIELD = "LaserPowerModeControl" +LASER_POWER_MODE_CONTROL_CHANNELS_FIELD = "LaserPowerModeControlChannels" +LASER_POWER_MODE_CONTROL_BITS_FIELD = "LaserPowerModeControlBits" +CHANNEL_LASER_POWER_MODE_ENABLE = "Channel{}LaserPowerModeEnable" +THRESHOLD_VALUES_FIELD = "ThresholdValues" +TEMP_HIGH_ALARM_MSB_FIELD = "TempHighAlarmMSB" +TEMP_HIGH_ALARM_LSB_FIELD = "TempHighAlarmLSB" +TEMP_LOW_ALARM_MSB_FIELD = "TempLowAlarmMSB" +TEMP_LOW_ALARM_LSB_FIELD = "TempLowAlarmLSB" +TEMP_HIGH_WARNING_MSB_FIELD = "TempHighWarningMSB" +TEMP_HIGH_WARNING_LSB_FIELD = "TempHighWarningLSB" +TEMP_LOW_WARNING_MSB_FIELD = "TempLowWarningMSB" +TEMP_LOW_WARNING_LSB_FIELD = "TempLowWarningLSB" +VCC_HIGH_ALARM_MSB_FIELD = "VccHighAlarmMSB" +VCC_HIGH_ALARM_LSB_FIELD = "VccHighAlarmLSB" +VCC_LOW_ALARM_MSB_FIELD = "VccLowAlarmMSB" +VCC_LOW_ALARM_LSB_FIELD = "VccLowAlarmLSB" +VCC_HIGH_WARNING_MSB_FIELD = "VccHighWarningMSB" +VCC_HIGH_WARNING_LSB_FIELD = "VccHighWarningLSB" +VCC_LOW_WARNING_MSB_FIELD = "VccLowWarningMSB" +VCC_LOW_WARNING_LSB_FIELD = "VccLowWarningLSB" +TX_POWER_HIGH_ALARM_MSB_FIELD = "TxPowerHighAlarmMSB" +TX_POWER_HIGH_ALARM_LSB_FIELD = "TxPowerHighAlarmLSB" +TX_POWER_LOW_ALARM_MSB_FIELD = "TxPowerLowAlarmMSB" +TX_POWER_LOW_ALARM_LSB_FIELD = "TxPowerLowAlarmLSB" +TX_POWER_HIGH_WARNING_MSB_FIELD = "TxPowerHighWarningMSB" +TX_POWER_HIGH_WARNING_LSB_FIELD = "TxPowerHighWarningLSB" +TX_POWER_LOW_WARNING_MSB_FIELD = "TxPowerLowWarningMSB" +TX_POWER_LOW_WARNING_LSB_FIELD = "TxPowerLowWarningLSB" +TX_BIAS_HIGH_ALARM_MSB_FIELD = "TxBiasHighAlarmMSB" +TX_BIAS_HIGH_ALARM_LSB_FIELD = "TxBiasHighAlarmLSB" +TX_BIAS_HIGH_WARNING_MSB_FIELD = "TxBiasHighWarningMSB" +TX_BIAS_HIGH_WARNING_LSB_FIELD = "TxBiasHighWarningLSB" +RESERVED_BYTES_FIELD = "ReservedBytes" +RESERVED_BYTE_FIELD = "Reserved{}" + +BIT0_FIELD = "Bit0" +BIT1_FIELD = "Bit1" +BIT4_FIELD = "Bit4" +BITS0_3_FIELD = "Bits0_3" +BIT_FIELD = "Bit{}" +LASER_DISABLE_CONTROL = "Laser{}DisableControl" +LASER_ACTIVE_STATUS = "Laser{}ActiveStatus" +LASER_BIAS_WARNING = "Laser{}BiasWarning" +LASER_BIAS_ALARM = "Laser{}BiasAlarm" +MODULE_TEMPERATURE_MONITOR_MSB = "ModuleTemperatureMonitorMSB" +MODULE_TEMPERATURE_MONITOR_LSB = "ModuleTemperatureMonitorLSB" +MODULE_SUPPLY_VOLTAGE_MONITOR_MSB = "ModuleSupplyVoltageMonitorMSB" +MODULE_SUPPLY_VOLTAGE_MONITOR_LSB = "ModuleSupplyVoltageMonitorLSB" +LASER_CURRENT_MONITOR = "Laser{}CurrentMonitor" +LASER_CURRENT_MONITOR_MSB = "Laser{}CurrentMonitorMSB" +LASER_CURRENT_MONITOR_LSB = "Laser{}CurrentMonitorLSB" +LASER_VOLTAGE_MONITOR = "Laser{}VoltageMonitor" +LASER_OPTICAL_POWER_MONITOR = "Laser{}OpticalPowerMonitor" +LASER_OPTICAL_POWER_MONITOR_MSB = "Laser{}OpticalPowerMonitorMSB" +LASER_OPTICAL_POWER_MONITOR_LSB = "Laser{}OpticalPowerMonitorLSB" +TEC_CURRENT_MONITOR_MSB = "TecCurrentMonitorMSB" +TEC_CURRENT_MONITOR_LSB = "TecCurrentMonitorLSB" + diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/bailly/__init__.py b/sonic_platform_base/sonic_xcvr/mem_maps/bailly/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/bailly/bailly_mem_map.py b/sonic_platform_base/sonic_xcvr/mem_maps/bailly/bailly_mem_map.py new file mode 100644 index 000000000..f1fd09f7a --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/mem_maps/bailly/bailly_mem_map.py @@ -0,0 +1,239 @@ +""" + bailly_mem_map.py + + Implementation of XcvrMemMap for Bailly extending CMIS Rev 5.0 +""" + +from ..public.cmis import CmisMemMap +from ..public.cmis.pages.page import CmisPage +from ...fields.xcvr_field import ( + CodeRegField, + DateField, + HexRegField, + NumberRegField, + RegBitField, + RegBitsField, + RegGroupField, + StringRegField, +) +from ...fields.consts import * +from ...fields.bailly import bailly_consts + +class _BaillyElsControlPage(CmisPage): + """Bailly-specific fields on CMIS Page 0xB0h""" + def __init__(self, codes, page=0xB0, bank=0): + super().__init__(codes, page=page, bank=bank) + + self.fields[bailly_consts.CPO_INFO_FIELD] = [ + RegGroupField(bailly_consts.CPO_INFO_FIELD, + CodeRegField(bailly_consts.CPO_IDENTIFIER, self.getaddr(128), codes.XCVR_IDENTIFIERS), + NumberRegField(bailly_consts.CPO_REVISION, self.getaddr(129), format="B", size=1), + NumberRegField(bailly_consts.LASER_GRID_AND_COUNT, self.getaddr(130), size=1, format="B"), + CodeRegField(bailly_consts.LASER_WAVELENGTH_GRID, self.getaddr(130), + codes.LASER_WAVELENGTH_GRID, + RegBitField(bailly_consts.BIT4_FIELD, bitpos=4, ro=True) + ), + CodeRegField(bailly_consts.LASER_COUNT, self.getaddr(130), + codes.LASER_COUNT, + RegBitsField(bailly_consts.BITS0_3_FIELD, bitpos=0, size=4, ro=True) + ), + ) + ] + + self.fields[bailly_consts.LASER_CONTROL_FIELD] = [ + RegGroupField(bailly_consts.LASER_CONTROL_FIELD, + NumberRegField(bailly_consts.MODULE_LOW_POWER_CONTROL, self.getaddr(132), + RegBitField(bailly_consts.BIT0_FIELD, bitpos=0, ro=False), size=1, ro=False + ), + NumberRegField(bailly_consts.LASER_DISABLE_CONTROL_7_0, self.getaddr(133), format="B", size=1, ro=False), + NumberRegField(bailly_consts.LASER_DISABLE_CONTROL_15_8, self.getaddr(134), format="B", size=1, ro=False), + RegGroupField(bailly_consts.LASER_DISABLE_CONTROL_FIELD, + *(NumberRegField(bailly_consts.LASER_DISABLE_CONTROL.format(laser), + self.getaddr(133 + laser // 8), + RegBitField(bailly_consts.BIT_FIELD.format(laser % 8), bitpos=laser % 8, ro=False), + size=1, ro=False) + for laser in range(0, 16)) + ), + ) + ] + + self.fields[bailly_consts.LASER_STATUS_FIELD] = [ + RegGroupField(bailly_consts.LASER_STATUS_FIELD, + NumberRegField(bailly_consts.MODULE_STATE_AND_INTERRUPT, self.getaddr(131), size=1, format="B"), + CodeRegField(bailly_consts.MODULE_LOW_POWER_STATE, self.getaddr(131), + codes.POWER_MODE, + RegBitField(bailly_consts.BIT1_FIELD, bitpos=1) + ), + CodeRegField(bailly_consts.INTL_INTERRUPT_STATUS, self.getaddr(131), + codes.INTERRUPT_STATUS, + RegBitField(bailly_consts.BIT0_FIELD, bitpos=0) + ), + NumberRegField(bailly_consts.LASER_ACTIVE_STATUS_7_0, self.getaddr(135), format="B", size=1), + NumberRegField(bailly_consts.LASER_ACTIVE_STATUS_15_8, self.getaddr(136), format="B", size=1), + RegGroupField(bailly_consts.LASER_ACTIVE_STATUS_FIELD, + *(CodeRegField(bailly_consts.LASER_ACTIVE_STATUS.format(laser), + self.getaddr(135 + laser // 8), + codes.LASER_ACTIVE_STATUS, + RegBitField(bailly_consts.BIT_FIELD.format(laser % 8), bitpos=laser % 8), + size=1) + for laser in range(0, 16)) + ), + ) + ] + + self.fields[bailly_consts.MODULE_ALARMS_FIELD] = [ + RegGroupField(bailly_consts.MODULE_ALARMS_FIELD, + NumberRegField(bailly_consts.VCC_3V3_TEMPERATURE_WARNING_ALARM, self.getaddr(137), format="B", size=1), + RegBitField(bailly_consts.TEMP_HIGH_ALARM_FLAG, offset=self.getaddr(137), bitpos=0), + RegBitField(bailly_consts.TEMP_LOW_ALARM_FLAG, offset=self.getaddr(137), bitpos=1), + RegBitField(bailly_consts.TEMP_HIGH_WARN_FLAG, offset=self.getaddr(137), bitpos=2), + RegBitField(bailly_consts.TEMP_LOW_WARN_FLAG, offset=self.getaddr(137), bitpos=3), + RegBitField(bailly_consts.VOLTAGE_HIGH_ALARM_FLAG, offset=self.getaddr(137), bitpos=4), + RegBitField(bailly_consts.VOLTAGE_LOW_ALARM_FLAG, offset=self.getaddr(137), bitpos=5), + RegBitField(bailly_consts.VOLTAGE_HIGH_WARN_FLAG, offset=self.getaddr(137), bitpos=6), + RegBitField(bailly_consts.VOLTAGE_LOW_WARN_FLAG, offset=self.getaddr(137), bitpos=7), + NumberRegField(bailly_consts.LASER_BIAS_WARNING_7_0, self.getaddr(138), format="B", size=1), + NumberRegField(bailly_consts.LASER_BIAS_WARNING_15_8, self.getaddr(139), format="B", size=1), + RegGroupField(bailly_consts.LASER_BIAS_WARNING_FIELD, + *(NumberRegField(bailly_consts.LASER_BIAS_WARNING.format(laser), + self.getaddr(138 + laser // 8), + RegBitField(bailly_consts.BIT_FIELD.format(laser % 8), bitpos=laser % 8), + size=1) + for laser in range(0, 16)) + ), + NumberRegField(bailly_consts.LASER_BIAS_ALARM_7_0, self.getaddr(140), format="B", size=1), + NumberRegField(bailly_consts.LASER_BIAS_ALARM_15_8, self.getaddr(141), format="B", size=1), + RegGroupField(bailly_consts.LASER_BIAS_ALARM_FIELD, + *(NumberRegField(bailly_consts.LASER_BIAS_ALARM.format(laser), + self.getaddr(140 + laser // 8), + RegBitField(bailly_consts.BIT_FIELD.format(laser % 8), bitpos=laser % 8), + size=1) + for laser in range(0, 16)) + ), + ) + ] + + self.fields[bailly_consts.CPO_MODULE_MONITORS_FIELD] = [ + RegGroupField(bailly_consts.CPO_MODULE_MONITORS_FIELD, + NumberRegField(bailly_consts.MODULE_TEMPERATURE_MONITOR_MSB, self.getaddr(150), format="B", size=1), + NumberRegField(bailly_consts.MODULE_TEMPERATURE_MONITOR_LSB, self.getaddr(151), format="B", size=1), + NumberRegField(bailly_consts.MODULE_TEMPERATURE_MONITOR, self.getaddr(150), size=2, format=">h", scale=256.0), + NumberRegField(bailly_consts.MODULE_SUPPLY_VOLTAGE_MONITOR_MSB, self.getaddr(152), format="B", size=1), + NumberRegField(bailly_consts.MODULE_SUPPLY_VOLTAGE_MONITOR_LSB, self.getaddr(153), format="B", size=1), + NumberRegField(bailly_consts.MODULE_SUPPLY_VOLTAGE_MONITOR, self.getaddr(152), size=2, format=">H", scale=10000.0), + RegGroupField(bailly_consts.LASER_CURRENT_MONITOR_FIELD, + *(RegGroupField(bailly_consts.LASER_CURRENT_MONITOR.format(laser), + NumberRegField(bailly_consts.LASER_CURRENT_MONITOR_MSB.format(laser), self.getaddr(154 + 2 * laser), format="B", size=1), + NumberRegField(bailly_consts.LASER_CURRENT_MONITOR_LSB.format(laser), self.getaddr(155 + 2 * laser), format="B", size=1), + NumberRegField(bailly_consts.LASER_CURRENT_MONITOR.format(laser), self.getaddr(154 + 2 * laser), size=2, format=">H", scale=10.0), + ) for laser in range(0, 16)) + ), + RegGroupField(bailly_consts.LASER_VOLTAGE_MONITOR_FIELD, + *(NumberRegField(bailly_consts.LASER_VOLTAGE_MONITOR.format(laser), + self.getaddr(186 + laser), size=1, format="B", scale=100.0) + for laser in range(0, 16)) + ), + RegGroupField(bailly_consts.LASER_OPTICAL_POWER_MONITOR_FIELD, + *(RegGroupField(bailly_consts.LASER_OPTICAL_POWER_MONITOR.format(laser), + NumberRegField(bailly_consts.LASER_OPTICAL_POWER_MONITOR_MSB.format(laser), self.getaddr(203 + 2 * laser), format="B", size=1), + NumberRegField(bailly_consts.LASER_OPTICAL_POWER_MONITOR_LSB.format(laser), self.getaddr(204 + 2 * laser), format="B", size=1), + NumberRegField(bailly_consts.LASER_OPTICAL_POWER_MONITOR.format(laser), self.getaddr(203 + 2 * laser), size=2, format=">H", scale=100.0), + ) for laser in range(0, 16)) + ), + NumberRegField(bailly_consts.TEC_CURRENT_MONITOR_MSB, self.getaddr(250), format="B", size=1), + NumberRegField(bailly_consts.TEC_CURRENT_MONITOR_LSB, self.getaddr(251), format="B", size=1), + NumberRegField(bailly_consts.TEC_CURRENT_MONITOR, self.getaddr(250), size=2, format=">h", scale=327.67), + ) + ] + +class _BaillyElsInfoPage(CmisPage): + """Bailly-specific fields on CMIS Page 0xB1h""" + def __init__(self, codes, page=0xB1, bank=0): + super().__init__(codes, page=page, bank=bank) + + self.fields[bailly_consts.CPO_VENDOR_INFO_FIELD] = [ + RegGroupField(bailly_consts.CPO_VENDOR_INFO_FIELD, + StringRegField(bailly_consts.VENDOR_NAME_ASCII_FIELD, self.getaddr(129), size=16), + HexRegField(bailly_consts.VENDOR_OUI_HEX_FIELD, self.getaddr(145), size=3), + StringRegField(bailly_consts.VENDOR_PART_NUMBER_ASCII_FIELD, self.getaddr(148), size=16), + StringRegField(bailly_consts.VENDOR_REVISION_ASCII_FIELD, self.getaddr(164), size=2), + StringRegField(bailly_consts.VENDOR_SERIAL_NUMBER_ASCII_FIELD, self.getaddr(166), size=16), + StringRegField(bailly_consts.DATE_CODE_YY_FIELD, self.getaddr(182), size=2), + StringRegField(bailly_consts.DATE_CODE_MM_FIELD, self.getaddr(184), size=2), + StringRegField(bailly_consts.DATE_CODE_DD_FIELD, self.getaddr(186), size=2), + StringRegField(bailly_consts.DATE_CODE_LOT_FIELD, self.getaddr(188), size=2), + DateField(bailly_consts.DATE_CODE_FIELD, self.getaddr(182), size=8), + StringRegField(bailly_consts.CLEI_CODE_FIELD, self.getaddr(190), size=10), + NumberRegField(bailly_consts.MAX_POWER_CONSUMPTION_FIELD, self.getaddr(200), format="B", size=1, scale=4.0), + NumberRegField(bailly_consts.CHECKSUM_FIELD, self.getaddr(251), format="B", size=1), + ) + ] + +class _BaillyElsThresholdPage(CmisPage): + """Bailly-specific fields on CMIS Page 0xB2h""" + def __init__(self, codes, page=0xB2, bank=0): + super().__init__(codes, page=page, bank=bank) + + self.fields[bailly_consts.LASER_POWER_MODE_CONTROL_FIELD] = [ + RegGroupField(bailly_consts.LASER_POWER_MODE_CONTROL_FIELD, + NumberRegField(bailly_consts.LASER_POWER_MODE_CONTROL_BITS_FIELD, self.getaddr(128), format="B", size=1, ro=False, + *(RegBitField(bailly_consts.CHANNEL_LASER_POWER_MODE_ENABLE.format(channel), bitpos=channel, ro=False) + for channel in range(0, 8)) + ), + RegGroupField(bailly_consts.LASER_POWER_MODE_CONTROL_CHANNELS_FIELD, + *(NumberRegField(bailly_consts.CHANNEL_LASER_POWER_MODE_ENABLE.format(channel), + self.getaddr(128), + RegBitField(bailly_consts.BIT_FIELD.format(channel), bitpos=channel, ro=False), + size=1, ro=False) + for channel in range(0, 8)) + ), + RegGroupField(bailly_consts.THRESHOLD_VALUES_FIELD, + NumberRegField(bailly_consts.TEMP_HIGH_ALARM_MSB_FIELD, self.getaddr(162), format="B", size=1), + NumberRegField(bailly_consts.TEMP_HIGH_ALARM_LSB_FIELD, self.getaddr(163), format="B", size=1), + NumberRegField(bailly_consts.TEMP_LOW_ALARM_MSB_FIELD, self.getaddr(164), format="B", size=1), + NumberRegField(bailly_consts.TEMP_LOW_ALARM_LSB_FIELD, self.getaddr(165), format="B", size=1), + NumberRegField(bailly_consts.TEMP_HIGH_WARNING_MSB_FIELD, self.getaddr(166), format="B", size=1), + NumberRegField(bailly_consts.TEMP_HIGH_WARNING_LSB_FIELD, self.getaddr(167), format="B", size=1), + NumberRegField(bailly_consts.TEMP_LOW_WARNING_MSB_FIELD, self.getaddr(168), format="B", size=1), + NumberRegField(bailly_consts.TEMP_LOW_WARNING_LSB_FIELD, self.getaddr(169), format="B", size=1), + NumberRegField(bailly_consts.VCC_HIGH_ALARM_MSB_FIELD, self.getaddr(170), format="B", size=1), + NumberRegField(bailly_consts.VCC_HIGH_ALARM_LSB_FIELD, self.getaddr(171), format="B", size=1), + NumberRegField(bailly_consts.VCC_LOW_ALARM_MSB_FIELD, self.getaddr(172), format="B", size=1), + NumberRegField(bailly_consts.VCC_LOW_ALARM_LSB_FIELD, self.getaddr(173), format="B", size=1), + NumberRegField(bailly_consts.VCC_HIGH_WARNING_MSB_FIELD, self.getaddr(174), format="B", size=1), + NumberRegField(bailly_consts.VCC_HIGH_WARNING_LSB_FIELD, self.getaddr(175), format="B", size=1), + NumberRegField(bailly_consts.VCC_LOW_WARNING_MSB_FIELD, self.getaddr(176), format="B", size=1), + NumberRegField(bailly_consts.VCC_LOW_WARNING_LSB_FIELD, self.getaddr(177), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_HIGH_ALARM_MSB_FIELD, self.getaddr(178), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_HIGH_ALARM_LSB_FIELD, self.getaddr(179), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_LOW_ALARM_MSB_FIELD, self.getaddr(180), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_LOW_ALARM_LSB_FIELD, self.getaddr(181), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_HIGH_WARNING_MSB_FIELD, self.getaddr(182), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_HIGH_WARNING_LSB_FIELD, self.getaddr(183), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_LOW_WARNING_MSB_FIELD, self.getaddr(184), format="B", size=1), + NumberRegField(bailly_consts.TX_POWER_LOW_WARNING_LSB_FIELD, self.getaddr(185), format="B", size=1), + NumberRegField(bailly_consts.TX_BIAS_HIGH_ALARM_MSB_FIELD, self.getaddr(186), format="B", size=1), + NumberRegField(bailly_consts.TX_BIAS_HIGH_ALARM_LSB_FIELD, self.getaddr(187), format="B", size=1), + NumberRegField(bailly_consts.TX_BIAS_HIGH_WARNING_MSB_FIELD, self.getaddr(188), format="B", size=1), + NumberRegField(bailly_consts.TX_BIAS_HIGH_WARNING_LSB_FIELD, self.getaddr(189), format="B", size=1), + ), + NumberRegField(bailly_consts.CHECKSUM_FIELD, self.getaddr(255), format="B", size=1), + ) + ] + +# -------------------------- Bailly Main Memory Map Class -------------------------- +class BaillyMemMap(CmisMemMap): + def __init__(self, codes, bank=0, base_page=0): + self._bank = bank + self._base_page = base_page + super().__init__(codes, bank) + + ELS_CONTROL_PAGE = 0xB0 + ELS_INFO_PAGE = 0xB1 + ELS_THRESHOLD_PAGE = 0xB2 + + self.add_pages( + _BaillyElsControlPage(codes, ELS_CONTROL_PAGE + base_page), + _BaillyElsInfoPage(codes, ELS_INFO_PAGE + base_page), + _BaillyElsThresholdPage(codes, ELS_THRESHOLD_PAGE + base_page), + ) diff --git a/tests/sonic_xcvr/test_bailly_cmis.py b/tests/sonic_xcvr/test_bailly_cmis.py new file mode 100644 index 000000000..a5f06b3fc --- /dev/null +++ b/tests/sonic_xcvr/test_bailly_cmis.py @@ -0,0 +1,85 @@ +import pytest +from unittest.mock import MagicMock, patch, Mock +from sonic_platform_base.sonic_xcvr.api.bailly.bailly_api import BaillyApi +from sonic_platform_base.sonic_xcvr.mem_maps.bailly.bailly_mem_map import BaillyMemMap +from sonic_platform_base.sonic_xcvr.codes.bailly.bailly_codes import BaillyCodes +from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom + +# Test BaillyCodes +class TestBaillyCodes: + def setup_method(self): + self.codes = BaillyCodes() + + def test_codes_inheritance(self): + assert isinstance(self.codes, object) + assert hasattr(self.codes, 'XCVR_IDENTIFIERS') + assert hasattr(self.codes, 'XCVR_IDENTIFIER_ABBRV') + + def test_bailly_specific_identifiers(self): + assert 128 in self.codes.XCVR_IDENTIFIERS + assert 128 in self.codes.XCVR_IDENTIFIER_ABBRV + + def test_wavelength_grid_definitions(self): + assert hasattr(self.codes, 'LASER_WAVELENGTH_GRID') + assert isinstance(self.codes.LASER_WAVELENGTH_GRID, dict) + + def test_laser_count_definitions(self): + assert hasattr(self.codes, 'LASER_COUNT') + assert isinstance(self.codes.LASER_COUNT, dict) + +# Test BaillyApi +NUM_CHANNELS = 8 +class TestBaillyApi: + def setup_method(self): + self.mock_eeprom = MagicMock(spec=XcvrEeprom) + self.api = BaillyApi(self.mock_eeprom) + self.api.NUM_CHANNELS = NUM_CHANNELS + + def test_get_dpinit_pending(self): + res = self.api.get_dpinit_pending() + assert isinstance(res, dict) + assert len(res) == NUM_CHANNELS + for i in range(NUM_CHANNELS): + assert f"DPInitPending{i+1}" in res + assert res[f"DPInitPending{i+1}"] is True + + def test__format_revision_none(self): + assert self.api._format_revision(None) is None + + def test__format_revision_values(self): + assert self.api._format_revision(0x12) == "1.2" + assert self.api._format_revision(0xF5) == "15.5" + assert self.api._format_revision(0x00) == "0.0" + + def test_get_active_apsel_hostlane_with_zero_returns_current_map(self): + app_values = [1, 2, 0, 3, 4, 5, 6, 7] + with patch.object(self.api, 'get_application', side_effect=app_values): + result = self.api.get_active_apsel_hostlane() + assert len(result) == NUM_CHANNELS + assert result['ActiveAppSelLane1'] == 1 + assert result['ActiveAppSelLane3'] == 0 + + def test_get_active_apsel_hostlane_no_zero_calls_parent(self): + app_values = [1, 1, 1, 1, 1, 1, 1, 1] + fake_parent = {"key": "value"} + + with patch.object(self.api, 'get_application', side_effect=app_values): + with patch.object(BaillyApi.__bases__[0], 'get_active_apsel_hostlane', return_value=fake_parent) as mock_super: + result = self.api.get_active_apsel_hostlane() + assert result == fake_parent + mock_super.assert_called_once() + + def test_get_transceiver_info(self): + with patch.object(self.api, 'get_els_info') as mock_els: + mock_els.return_value = {} + with patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info') as mock_super: + mock_super.return_value = {} + self.api.get_transceiver_info() + mock_super.assert_called_once() + mock_els.assert_called_once() + + def test_get_els_info(self): + self.mock_eeprom.read.return_value = 0 + result = self.api.get_els_info() + assert isinstance(result, dict) + diff --git a/tests/sonic_xcvr/test_bailly_optoe_base.py b/tests/sonic_xcvr/test_bailly_optoe_base.py new file mode 100644 index 000000000..ea4832d7d --- /dev/null +++ b/tests/sonic_xcvr/test_bailly_optoe_base.py @@ -0,0 +1,279 @@ +import os +import json +import pytest +from unittest.mock import mock_open, patch, MagicMock +from sonic_platform_base.sonic_xcvr.bailly_optoe_base import ( + CpoOptoeBase, + get_cpo_json_data +) + +# Mock CPO JSON data for testing +MOCK_CPO_JSON = { + "oes": { + "oe0": { + "oe_cmis_path": "/sys/bus/i2c/devices/0-0050/" + } + }, + "elss": { + "els0": { + "index": "0", + "els_presence": { + "presence_file": "/sys/class/gpio/gpio123/value", + "presence_offset": "0x00", + "presence_len": 1, + "presence_bit": 0, + "presence_value": 1 + } + } + } +} + +@pytest.fixture +def cpo_instance(): + """Fixture to create a CpoOptoeBase instance with test attributes""" + instance = CpoOptoeBase() + instance._oe_id = 0 + instance._els_id = 0 + instance._port_id = 0 + return instance + +@pytest.fixture +def mock_platform_info(): + """Fixture to mock platform info retrieval""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_platform") as mock_get_platform: + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_path_to_platform_dir") as mock_get_path: + mock_get_platform.return_value = "test_platform" + mock_get_path.return_value = "/usr/share/sonic/device/test_platform" + yield + +class TestCpoOptoeBase: + def test_get_cpo_json_data_success(self, mock_platform_info): + """Test successful retrieval of CPO JSON data""" + mock_json_path = os.path.join("/usr/share/sonic/device/test_platform", "cpo.json") + + with patch("os.path.isfile", return_value=True): + with patch("builtins.open", mock_open(read_data=json.dumps(MOCK_CPO_JSON))): + result = get_cpo_json_data() + assert result == MOCK_CPO_JSON + open.assert_called_once_with(mock_json_path, 'r') + + def test_get_cpo_json_data_platform_none(self): + """Test get_cpo_json_data when platform is None""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_platform", return_value=None): + assert get_cpo_json_data() is None + + def test_get_cpo_json_data_path_none(self): + """Test get_cpo_json_data when platform path is None""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_platform", return_value="test_platform"): + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_path_to_platform_dir", return_value=None): + assert get_cpo_json_data() is None + + def test_get_cpo_json_data_file_not_exists(self, mock_platform_info): + """Test get_cpo_json_data when JSON file doesn't exist""" + with patch("os.path.isfile", return_value=False): + assert get_cpo_json_data() is None + + def test_get_cpo_json_data_invalid_json(self, mock_platform_info): + """Test get_cpo_json_data with invalid JSON content""" + with patch("os.path.isfile", return_value=True): + with patch("builtins.open", mock_open(read_data="invalid json")): + assert get_cpo_json_data() is None + + def test_get_oe_eeprom_path(self, cpo_instance, mock_platform_info): + """Test EEPROM path generation""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_cpo_json_data", return_value=MOCK_CPO_JSON): + assert cpo_instance.get_oe_eeprom_path() == "/sys/bus/i2c/devices/0-0050/eeprom" + + def test_get_oe_eeprom_path_none(self, cpo_instance, mock_platform_info): + """Test EEPROM path when config is missing""" + mock_empty_json = {"oes": {"oe0": {}}} + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_cpo_json_data", return_value=mock_empty_json): + assert cpo_instance.get_oe_eeprom_path() is None + + def test_read_eeprom_success(self, cpo_instance, mock_platform_info): + """Test successful EEPROM read operation""" + mock_data = b'\x01\x02\x03' + with patch.object(cpo_instance, "get_eeprom_path", return_value="/fake/eeprom"): + with patch("builtins.open", mock_open(read_data=mock_data)) as mock_file: + result = cpo_instance.read_eeprom(0, 3) + assert result == bytearray(mock_data) + mock_file.assert_called_once_with("/fake/eeprom", mode='rb', buffering=0) + mock_file.return_value.seek.assert_called_once_with(0) + mock_file.return_value.read.assert_called_once_with(3) + + def test_read_eeprom_error(self, cpo_instance, mock_platform_info): + """Test EEPROM read operation with IO error""" + with patch.object(cpo_instance, "get_eeprom_path", return_value="/fake/eeprom"): + with patch("builtins.open", side_effect=OSError("Read error")): + result = cpo_instance.read_eeprom(0, 3) + assert result is None + + def test_write_eeprom_success(self, cpo_instance, mock_platform_info): + """Test successful EEPROM write operation""" + write_buffer = bytearray([0x01, 0x02, 0x03]) + with patch.object(cpo_instance, "get_eeprom_path", return_value="/fake/eeprom"): + with patch("builtins.open", mock_open()) as mock_file: + result = cpo_instance.write_eeprom(0, 3, write_buffer) + assert result is True + mock_file.assert_called_once_with("/fake/eeprom", mode='r+b', buffering=0) + mock_file.return_value.seek.assert_called_once_with(0) + mock_file.return_value.write.assert_called_once_with(write_buffer[0:3]) + + def test_write_eeprom_error(self, cpo_instance, mock_platform_info): + """Test EEPROM write operation with IO error""" + write_buffer = bytearray([0x01, 0x02, 0x03]) + with patch.object(cpo_instance, "get_eeprom_path", return_value="/fake/eeprom"): + with patch("builtins.open", side_effect=OSError("Write error")): + result = cpo_instance.write_eeprom(0, 3, write_buffer) + assert result is False + + def test_get_els_presence_success(self, cpo_instance, mock_platform_info): + """Test ELS presence detection with valid data""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_cpo_json_data", return_value=MOCK_CPO_JSON): + with patch("builtins.open", mock_open(read_data=b'\x01')) as mock_file: + result = cpo_instance.get_els_presence() + assert result is True + mock_file.assert_called_once_with("/sys/class/gpio/gpio123/value", mode='rb', buffering=0) + mock_file.return_value.seek.assert_called_once_with(0) + + def test_get_els_presence_mismatch(self, cpo_instance, mock_platform_info): + """Test ELS presence detection with mismatched bit value""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_cpo_json_data", return_value=MOCK_CPO_JSON): + with patch("builtins.open", mock_open(read_data=b'\x00')) as mock_file: + result = cpo_instance.get_els_presence() + assert result is False + + def test_get_els_presence_error(self, cpo_instance, mock_platform_info): + """Test ELS presence detection with IO error""" + with patch("sonic_platform_base.sonic_xcvr.bailly_optoe_base.get_cpo_json_data", return_value=MOCK_CPO_JSON): + with patch("builtins.open", side_effect=OSError("Read error")): + result = cpo_instance.get_els_presence() + assert result is False + + def test_get_els_base_page_invalid_type(self, cpo_instance, mock_platform_info): + """Test get_els_base_page with non-int/non-string invalid type""" + with pytest.raises(TypeError): + mock_get_elss = lambda: {"base_page": [1, 2, 3]} + cpo_instance.get_elss_config = mock_get_elss + cpo_instance.get_els_base_page() + + def test_getter_methods(self, cpo_instance): + """Test all getter methods return correct values""" + cpo_instance._oe_bank_id = 1 + cpo_instance._oe_id = 2 + cpo_instance._els_bank_id = 3 + cpo_instance._els_id = 4 + + assert cpo_instance.get_oe_bank_id() == 1 + assert cpo_instance.get_oe_id() == 2 + assert cpo_instance.get_els_bank_id() == 3 + assert cpo_instance.get_els_id() == 4 + + def test_get_presence(self, cpo_instance, mock_platform_info): + """Test presence method delegates to els presence""" + with patch.object(cpo_instance, "get_els_presence", return_value=True): + assert cpo_instance.get_presence() is True + + with patch.object(cpo_instance, "get_els_presence", return_value=False): + assert cpo_instance.get_presence() is False + +class TestCpoOptoeBaseAbstractMethods: + """ + Unit tests for abstract methods in CpoOptoeBase class + Follows SONiC community testing standards + """ + def test_abstract_methods_raise_not_implemented(self): + """Test that unimplemented abstract methods raise NotImplementedError""" + # Create a minimal subclass that implements all abstract methods + class MinimalCpoImpl(CpoOptoeBase): + def check_fiber_dirty(self): + return super().check_fiber_dirty() + + def check_calibration(self): + return super().check_calibration() + + def is_els_power_sufficient(self): + return super().is_els_power_sufficient() + + def is_calibration_checked(self): + return super().is_calibration_checked() + + def is_fiber_checked(self): + return super().is_fiber_checked() + + def is_els_tx_on(self): + return super().is_els_tx_on() + + def is_els_tx_enabled(self): + return super().is_els_tx_enabled() + + # Instantiate the subclass + instance = MinimalCpoImpl() + instance._oe_id = 0 + instance._els_id = 0 + instance._port_id = 0 + + # Test each abstract method raises NotImplementedError + with pytest.raises(NotImplementedError): + instance.check_fiber_dirty() + + with pytest.raises(NotImplementedError): + instance.check_calibration() + + with pytest.raises(NotImplementedError): + instance.is_els_power_sufficient() + + with pytest.raises(NotImplementedError): + instance.is_calibration_checked() + + with pytest.raises(NotImplementedError): + instance.is_fiber_checked() + + with pytest.raises(NotImplementedError): + instance.is_els_tx_on() + + with pytest.raises(NotImplementedError): + instance.is_els_tx_enabled() + + def test_valid_subclass_implementation(self): + """Test that a subclass with all abstract methods implemented works correctly""" + class ValidCpoImpl(CpoOptoeBase): + def __init__(self): + super().__init__() + self._oe_id = 0 + self._els_id = 0 + self._port_id = 0 + + def check_fiber_dirty(self): + return True + + def check_calibration(self): + return True + + def is_els_power_sufficient(self): + return True + + def is_calibration_checked(self): + return True + + def is_fiber_checked(self): + return True + + def is_els_tx_on(self): + return True + + def is_els_tx_enabled(self): + return True + + # Should instantiate without error + instance = ValidCpoImpl() + assert isinstance(instance, CpoOptoeBase) + + # All methods should return implemented values + assert instance.check_fiber_dirty() is True + assert instance.check_calibration() is True + assert instance.is_els_power_sufficient() is True + assert instance.is_calibration_checked() is True + assert instance.is_fiber_checked() is True + assert instance.is_els_tx_on() is True + assert instance.is_els_tx_enabled() is True