diff --git a/setup.py b/setup.py index ff177bab5..477025872 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,8 @@ 'sonic_platform_base.sonic_xcvr.codes.public', 'sonic_platform_base.sonic_xcvr.codes.bailly', 'sonic_platform_base.sonic_xcvr.utils', + 'sonic_platform_base.sonic_xcvr.api.arista', + 'sonic_platform_base.sonic_xcvr.mem_maps.arista', 'sonic_platform_base.sonic_xcvr.api.credo', 'sonic_platform_base.sonic_xcvr.mem_maps.credo', 'sonic_platform_base.sonic_xcvr.codes.credo', diff --git a/sonic_platform_base/sonic_xcvr/api/arista/__init__.py b/sonic_platform_base/sonic_xcvr/api/arista/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sonic_platform_base/sonic_xcvr/api/arista/cmis_enhanced_lpo.py b/sonic_platform_base/sonic_xcvr/api/arista/cmis_enhanced_lpo.py new file mode 100644 index 000000000..755d5d1c5 --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/api/arista/cmis_enhanced_lpo.py @@ -0,0 +1,152 @@ +""" + cmis_enhanced_lpo.py + + API for Arista Enhanced LPO modules. +""" + +from ..public.cmis import CmisApi +from ...fields import arista_lpo_consts as lpo + +# VMA: U8 raw count -> mV +_VMA_MV_PER_COUNT = 5 +# OMA: U16 raw count -> mW (0.1 uW per count = 0.0001 mW per count) +_OMA_MW_PER_COUNT = 0.0001 +# OER max: U8 raw count -> dB +_OER_DB_PER_COUNT = 0.1 +# VMA accuracy: U4 (bits 3-0) -> mV +_VMA_ACC_MV_PER_COUNT = 5 +# OMA accuracy: U4 (bits 3-0) -> dB +_OMA_ACC_DB_PER_COUNT = 0.2 + +_VMA_FLAG_FIELDS = [ + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_FLAG, +] + +_OMA_FLAG_FIELDS = [ + lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_FLAG, + lpo.LPO_RX_INPUT_OMA_LOW_ALARM_FLAG, + lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_FLAG, + lpo.LPO_RX_INPUT_OMA_LOW_WARNING_FLAG, +] + +_VMA_TX_FIELDS = [ + lpo.LPO_HOST_INPUT_VMA_TX1, + lpo.LPO_HOST_INPUT_VMA_TX2, + lpo.LPO_HOST_INPUT_VMA_TX3, + lpo.LPO_HOST_INPUT_VMA_TX4, + lpo.LPO_HOST_INPUT_VMA_TX5, + lpo.LPO_HOST_INPUT_VMA_TX6, + lpo.LPO_HOST_INPUT_VMA_TX7, + lpo.LPO_HOST_INPUT_VMA_TX8, +] + +_OMA_RX_FIELDS = [ + lpo.LPO_INPUT_OMA_RX1, + lpo.LPO_INPUT_OMA_RX2, + lpo.LPO_INPUT_OMA_RX3, + lpo.LPO_INPUT_OMA_RX4, + lpo.LPO_INPUT_OMA_RX5, + lpo.LPO_INPUT_OMA_RX6, + lpo.LPO_INPUT_OMA_RX7, + lpo.LPO_INPUT_OMA_RX8, +] + + +def _scale(raw, factor, precision=3): + """Return raw * factor as a rounded float, or 'N/A' if raw is None.""" + if raw is None: + return 'N/A' + return float("{:.{}f}".format(raw * factor, precision)) + + +def _lower_nibble(raw): + """Mask to lower 4 bits (U4 field), propagating None.""" + return None if raw is None else raw & 0x0F + + +def _unpack_flags(field_name, raw): + """Unpack a bitmask byte into a dict of per-lane boolean keys (lane 1-8).""" + result = {} + for lane in range(1, 9): + key = "{}{}".format(field_name, lane) + result[key] = bool((raw >> (lane - 1)) & 1) if raw is not None else 'N/A' + return result + + +class CmisEnhancedLpoApi(CmisApi): + def get_transceiver_info(self): + xcvr_info = super().get_transceiver_info() + if xcvr_info is None: + return None + + tx_pol = self.xcvr_eeprom.read(lpo.LPO_TX_POLARITY_INVERTED) + rx_pol = self.xcvr_eeprom.read(lpo.LPO_RX_POLARITY_INVERTED) + + vma_acc_raw = self.xcvr_eeprom.read(lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY) + vma_acc = _scale(_lower_nibble(vma_acc_raw), _VMA_ACC_MV_PER_COUNT) + + oma_acc_raw = self.xcvr_eeprom.read(lpo.LPO_RX_INPUT_OMA_MON_ACCURACY) + oma_acc = _scale(_lower_nibble(oma_acc_raw), _OMA_ACC_DB_PER_COUNT) + + oer_raw = self.xcvr_eeprom.read(lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX) + oer_max = _scale(oer_raw, _OER_DB_PER_COUNT) + + xcvr_info.update({ + lpo.LPO_TX_POLARITY_INVERTED: tx_pol if tx_pol is not None else 'N/A', + lpo.LPO_RX_POLARITY_INVERTED: rx_pol if rx_pol is not None else 'N/A', + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: vma_acc, + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: oma_acc, + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: oer_max, + }) + return xcvr_info + + def get_transceiver_dom_real_value(self): + trans_dom = super().get_transceiver_dom_real_value() + if trans_dom is None: + return None + + for field in _VMA_TX_FIELDS: + trans_dom[field] = _scale(self.xcvr_eeprom.read(field), _VMA_MV_PER_COUNT) + + for field in _OMA_RX_FIELDS: + trans_dom[field] = _scale(self.xcvr_eeprom.read(field), _OMA_MW_PER_COUNT) + + return trans_dom + + def get_transceiver_threshold_info(self): + threshold_dict = super().get_transceiver_threshold_info() + if threshold_dict is None: + return None + + vma_threshold_fields = [ + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_THRESHOLD, _VMA_MV_PER_COUNT), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_THRESHOLD, _VMA_MV_PER_COUNT), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_THRESHOLD, _VMA_MV_PER_COUNT), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_THRESHOLD, _VMA_MV_PER_COUNT), + ] + oma_threshold_fields = [ + (lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_THRESHOLD, _OMA_MW_PER_COUNT), + (lpo.LPO_RX_INPUT_OMA_LOW_ALARM_THRESHOLD, _OMA_MW_PER_COUNT), + (lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_THRESHOLD, _OMA_MW_PER_COUNT), + (lpo.LPO_RX_INPUT_OMA_LOW_WARNING_THRESHOLD, _OMA_MW_PER_COUNT), + ] + + for field, factor in vma_threshold_fields + oma_threshold_fields: + raw = self.xcvr_eeprom.read(field) + threshold_dict[field] = _scale(raw, factor) + + return threshold_dict + + def get_transceiver_dom_flags(self): + dom_flag_dict = super().get_transceiver_dom_flags() + if dom_flag_dict is None: + return None + + for field in _VMA_FLAG_FIELDS + _OMA_FLAG_FIELDS: + raw = self.xcvr_eeprom.read(field) + dom_flag_dict.update(_unpack_flags(field, raw)) + + return dom_flag_dict diff --git a/sonic_platform_base/sonic_xcvr/fields/arista_lpo_consts.py b/sonic_platform_base/sonic_xcvr/fields/arista_lpo_consts.py new file mode 100644 index 000000000..8cc4f1dbc --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/fields/arista_lpo_consts.py @@ -0,0 +1,79 @@ +# Enhanced LPO (Arista) - field groups +LPO_EEPROM_FIELD = "LpoEepromField" +LPO_INFO_FIELD = "LpoInfoField" +LPO_VMA_THRESHOLDS_FIELD = "LpoVmaThresholdsField" +LPO_OMA_THRESHOLDS_FIELD = "LpoOmaThresholdsField" +LPO_VMA_FLAGS_FIELD = "LpoVmaFlagsField" +LPO_VMA_DOM_FIELD = "LpoVmaDomField" +LPO_VMA_MASKS_FIELD = "LpoVmaMasksField" +LPO_OMA_FLAGS_FIELD = "LpoOmaFlagsField" +LPO_OMA_DOM_FIELD = "LpoOmaDomField" +LPO_OMA_MASKS_FIELD = "LpoOmaMasksField" + +# Enhanced LPO - page 01h advertisement +LPO_EEPROM_COMPLIANCE = "LPOEepromCompliance" +LPO_ENHANCED_SPEC_VERSION = "LPOEnhancedSpecVersion" + +# Enhanced LPO - page C1h info / capability +LPO_CAPABILITY = "LPOCapability" +LPO_TX_OUTER_EXTINCTION_RATIO_MAX = "LPOTxOuterExtinctionRatioMax" +LPO_TX_POLARITY_INVERTED = "LPOTxPolarityInverted" +LPO_RX_POLARITY_INVERTED = "LPORxPolarityInverted" +LPO_TX_HOST_INPUT_VMA_MON_ACCURACY = "LPOTxHostInputVMAMonAccuracySupported" +LPO_RX_INPUT_OMA_MON_ACCURACY = "LPORxInputOMAMonAccuracySupported" + +# Enhanced LPO - page C1h VMA thresholds +LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_THRESHOLD = "LPOTxHostInputVMAHighAlarmThreshold" +LPO_TX_HOST_INPUT_VMA_LOW_ALARM_THRESHOLD = "LPOTxHostInputVMALowAlarmThreshold" +LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_THRESHOLD = "LPOTxHostInputVMAHighWarningThreshold" +LPO_TX_HOST_INPUT_VMA_LOW_WARNING_THRESHOLD = "LPOTxHostInputVMALowWarningThreshold" + +# Enhanced LPO - page C1h OMA thresholds +LPO_RX_INPUT_OMA_HIGH_ALARM_THRESHOLD = "LPORxInputOMAHighAlarmThreshold" +LPO_RX_INPUT_OMA_LOW_ALARM_THRESHOLD = "LPORxInputOMALowAlarmThreshold" +LPO_RX_INPUT_OMA_HIGH_WARNING_THRESHOLD = "LPORxInputOMAHighWarningThreshold" +LPO_RX_INPUT_OMA_LOW_WARNING_THRESHOLD = "LPORxInputOMALowWarningThreshold" + +# Enhanced LPO - page C2h VMA flags +LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG = "LPOTxHostInputVMAHighAlarmFlag" +LPO_TX_HOST_INPUT_VMA_LOW_ALARM_FLAG = "LPOTxHostInputVMALowAlarmFlag" +LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_FLAG = "LPOTxHostInputVMAHighWarningFlag" +LPO_TX_HOST_INPUT_VMA_LOW_WARNING_FLAG = "LPOTxHostInputVMALowWarningFlag" + +# Enhanced LPO - page C2h per-lane VMA measurements +LPO_HOST_INPUT_VMA_TX1 = "LPOHostInputVMATx1" +LPO_HOST_INPUT_VMA_TX2 = "LPOHostInputVMATx2" +LPO_HOST_INPUT_VMA_TX3 = "LPOHostInputVMATx3" +LPO_HOST_INPUT_VMA_TX4 = "LPOHostInputVMATx4" +LPO_HOST_INPUT_VMA_TX5 = "LPOHostInputVMATx5" +LPO_HOST_INPUT_VMA_TX6 = "LPOHostInputVMATx6" +LPO_HOST_INPUT_VMA_TX7 = "LPOHostInputVMATx7" +LPO_HOST_INPUT_VMA_TX8 = "LPOHostInputVMATx8" + +# Enhanced LPO - page C2h VMA masks +LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_MASK = "LPOTxHostInputVMAHighAlarmMask" +LPO_TX_HOST_INPUT_VMA_LOW_ALARM_MASK = "LPOTxHostInputVMALowAlarmMask" +LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_MASK = "LPOTxHostInputVMAHighWarningMask" +LPO_TX_HOST_INPUT_VMA_LOW_WARNING_MASK = "LPOTxHostInputVMALowWarningMask" + +# Enhanced LPO - page C2h OMA flags +LPO_RX_INPUT_OMA_HIGH_ALARM_FLAG = "LPORxInputOMAHighAlarmFlag" +LPO_RX_INPUT_OMA_LOW_ALARM_FLAG = "LPORxInputOMALowAlarmFlag" +LPO_RX_INPUT_OMA_HIGH_WARNING_FLAG = "LPORxInputOMAHighWarningFlag" +LPO_RX_INPUT_OMA_LOW_WARNING_FLAG = "LPORxInputOMALowWarningFlag" + +# Enhanced LPO - page C2h per-lane OMA measurements +LPO_INPUT_OMA_RX1 = "LPOInputOMARx1" +LPO_INPUT_OMA_RX2 = "LPOInputOMARx2" +LPO_INPUT_OMA_RX3 = "LPOInputOMARx3" +LPO_INPUT_OMA_RX4 = "LPOInputOMARx4" +LPO_INPUT_OMA_RX5 = "LPOInputOMARx5" +LPO_INPUT_OMA_RX6 = "LPOInputOMARx6" +LPO_INPUT_OMA_RX7 = "LPOInputOMARx7" +LPO_INPUT_OMA_RX8 = "LPOInputOMARx8" + +# Enhanced LPO - page C2h OMA masks +LPO_RX_INPUT_OMA_HIGH_ALARM_MASK = "LPORxInputOMAHighAlarmMask" +LPO_RX_INPUT_OMA_LOW_ALARM_MASK = "LPORxInputOMALowAlarmMask" +LPO_RX_INPUT_OMA_HIGH_WARNING_MASK = "LPORxInputOMAHighWarningMask" +LPO_RX_INPUT_OMA_LOW_WARNING_MASK = "LPORxInputOMALowWarningMask" diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/arista/__init__.py b/sonic_platform_base/sonic_xcvr/mem_maps/arista/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/arista/cmis_enhanced_lpo.py b/sonic_platform_base/sonic_xcvr/mem_maps/arista/cmis_enhanced_lpo.py new file mode 100644 index 000000000..883381803 --- /dev/null +++ b/sonic_platform_base/sonic_xcvr/mem_maps/arista/cmis_enhanced_lpo.py @@ -0,0 +1,133 @@ +""" + cmis_enhanced_lpo.py + + Memory map for Arista Enhanced LPO modules +""" + +from ..public.cmis import CmisMemMap +from ..public.cmis.pages.page import CmisPage +from ...fields.xcvr_field import NumberRegField +from ...fields import arista_lpo_consts as lpo + + +class _CmisEnhancedLpoPage01(CmisPage): + """Page 01h extension: LPO EEPROM compliance advertisement (bytes 195-196).""" + + def __init__(self, codes): + super().__init__(codes, page=0x01, bank=0) + self.fields[lpo.LPO_EEPROM_FIELD] = [ + NumberRegField(lpo.LPO_EEPROM_COMPLIANCE, self.getaddr(195), format="B", size=1), + NumberRegField(lpo.LPO_ENHANCED_SPEC_VERSION, self.getaddr(196), format="B", size=1), + ] + + +class _CmisEnhancedLpoPageC1(CmisPage): + """Page C1h: capability advertisement, polarity, accuracy, OER max, thresholds.""" + + def __init__(self, codes): + super().__init__(codes, page=0xC1, bank=0) + + # Capability advertisement byte (C1h:128) + self.fields[lpo.LPO_INFO_FIELD] = [ + NumberRegField(lpo.LPO_CAPABILITY, self.getaddr(128), format="B", size=1), + # C1h:129 - Tx outer extinction ratio max (U8, 0.1 dB/count) + NumberRegField(lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX, self.getaddr(129), format="B", size=1), + # C1h:133 - Tx polarity inversion bitmask (1 bit per lane) + NumberRegField(lpo.LPO_TX_POLARITY_INVERTED, self.getaddr(133), format="B", size=1), + # C1h:134 - Rx polarity inversion bitmask (1 bit per lane) + NumberRegField(lpo.LPO_RX_POLARITY_INVERTED, self.getaddr(134), format="B", size=1), + # C1h:135 - Tx host input VMA monitoring accuracy (bits 3-0, U4, 5 mV/count) + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY, self.getaddr(135), format="B", size=1), + # C1h:140 - Rx input OMA monitoring accuracy (bits 3-0, U4, 0.2 dB/count) + NumberRegField(lpo.LPO_RX_INPUT_OMA_MON_ACCURACY, self.getaddr(140), format="B", size=1), + ] + + # VMA thresholds (U8, 5 mV/count) - C1h:136-139 + self.fields[lpo.LPO_VMA_THRESHOLDS_FIELD] = [ + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_THRESHOLD, self.getaddr(136), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_THRESHOLD, self.getaddr(137), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_THRESHOLD, self.getaddr(138), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_THRESHOLD, self.getaddr(139), format="B", size=1), + ] + + # OMA thresholds (U16 big-endian, 0.1 uW/count) - C1h:141-148 + self.fields[lpo.LPO_OMA_THRESHOLDS_FIELD] = [ + NumberRegField(lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_THRESHOLD, self.getaddr(141), format=">H", size=2), + NumberRegField(lpo.LPO_RX_INPUT_OMA_LOW_ALARM_THRESHOLD, self.getaddr(143), format=">H", size=2), + NumberRegField(lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_THRESHOLD, self.getaddr(145), format=">H", size=2), + NumberRegField(lpo.LPO_RX_INPUT_OMA_LOW_WARNING_THRESHOLD, self.getaddr(147), format=">H", size=2), + ] + + +class _CmisEnhancedLpoPageC2(CmisPage): + """Page C2h: VMA/OMA flags, per-lane measurements, masks.""" + + def __init__(self, codes): + super().__init__(codes, page=0xC2, bank=0) + + # VMA flags (latched bitmasks, 1 bit per lane) - C2h:141-144 + self.fields[lpo.LPO_VMA_FLAGS_FIELD] = [ + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG, self.getaddr(141), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_FLAG, self.getaddr(142), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_FLAG, self.getaddr(143), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_FLAG, self.getaddr(144), format="B", size=1), + ] + + # Per-lane Tx host input VMA measurements (U8, 5 mV/count) - C2h:145-152 + self.fields[lpo.LPO_VMA_DOM_FIELD] = [ + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX1, self.getaddr(145), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX2, self.getaddr(146), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX3, self.getaddr(147), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX4, self.getaddr(148), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX5, self.getaddr(149), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX6, self.getaddr(150), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX7, self.getaddr(151), format="B", size=1), + NumberRegField(lpo.LPO_HOST_INPUT_VMA_TX8, self.getaddr(152), format="B", size=1), + ] + + # VMA masks (bitmasks) - C2h:153-156 + self.fields[lpo.LPO_VMA_MASKS_FIELD] = [ + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_MASK, self.getaddr(153), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_MASK, self.getaddr(154), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_MASK, self.getaddr(155), format="B", size=1), + NumberRegField(lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_MASK, self.getaddr(156), format="B", size=1), + ] + + # OMA flags (latched bitmasks, 1 bit per lane) - C2h:157-160 + self.fields[lpo.LPO_OMA_FLAGS_FIELD] = [ + NumberRegField(lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_FLAG, self.getaddr(157), format="B", size=1), + NumberRegField(lpo.LPO_RX_INPUT_OMA_LOW_ALARM_FLAG, self.getaddr(158), format="B", size=1), + NumberRegField(lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_FLAG, self.getaddr(159), format="B", size=1), + NumberRegField(lpo.LPO_RX_INPUT_OMA_LOW_WARNING_FLAG, self.getaddr(160), format="B", size=1), + ] + + # Per-lane Rx optical input OMA measurements (U16 big-endian, 0.1 uW/count) - C2h:161-176 + self.fields[lpo.LPO_OMA_DOM_FIELD] = [ + NumberRegField(lpo.LPO_INPUT_OMA_RX1, self.getaddr(161), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX2, self.getaddr(163), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX3, self.getaddr(165), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX4, self.getaddr(167), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX5, self.getaddr(169), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX6, self.getaddr(171), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX7, self.getaddr(173), format=">H", size=2), + NumberRegField(lpo.LPO_INPUT_OMA_RX8, self.getaddr(175), format=">H", size=2), + ] + + # OMA masks (bitmasks) - C2h:177-180 + self.fields[lpo.LPO_OMA_MASKS_FIELD] = [ + NumberRegField(lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_MASK, self.getaddr(177), format="B", size=1), + NumberRegField(lpo.LPO_RX_INPUT_OMA_LOW_ALARM_MASK, self.getaddr(178), format="B", size=1), + NumberRegField(lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_MASK, self.getaddr(179), format="B", size=1), + NumberRegField(lpo.LPO_RX_INPUT_OMA_LOW_WARNING_MASK, self.getaddr(180), format="B", size=1), + ] + + +class CmisEnhancedLpoMemMap(CmisMemMap): + def __init__(self, codes, bank=0): + super().__init__(codes, bank=bank) + # LPO-specific pages are always bank=0 per the Enhanced LPO spec. + self.add_pages( + _CmisEnhancedLpoPage01(codes), + _CmisEnhancedLpoPageC1(codes), + _CmisEnhancedLpoPageC2(codes), + ) diff --git a/sonic_platform_base/sonic_xcvr/xcvr_api_factory.py b/sonic_platform_base/sonic_xcvr/xcvr_api_factory.py index 93e799c7b..a2043dd10 100644 --- a/sonic_platform_base/sonic_xcvr/xcvr_api_factory.py +++ b/sonic_platform_base/sonic_xcvr/xcvr_api_factory.py @@ -17,6 +17,9 @@ from .api.credo.aec_800g import CredoAec800gApi from .mem_maps.credo.aec_800g import CredoAec800gMemMap +from .api.arista.cmis_enhanced_lpo import CmisEnhancedLpoApi +from .mem_maps.arista.cmis_enhanced_lpo import CmisEnhancedLpoMemMap + from .api.innolight.fr_800g import CmisFr800gApi from .api.hisense.aoc_2x100g import CmisAocSingleBankApi @@ -42,6 +45,7 @@ VENDOR_PART_NUM_LENGTH = 16 CREDO_800G_AEC_VENDOR_PN_LIST = ["CAC81X321M2MC1MS", "CAC815321M2MC1MS", "CAC82X321M2MC1MS"] +ARISTA_ENHANCED_LPO_PN_LIST = ["LPO-800G-2DR4"] INL_800G_VENDOR_PN_LIST = ["T-DL8CNT-NCI", "T-DH8CNT-NCI", "T-DH8CNT-N00", "T-DP4CNH-NCI", "T-DP8CNT-NNO", "T-DP8CNH-NNO", "T-DC8CNT-NNO", "T-DP8CNL-NNO", "T-OL8CNT-N00", "T-OH8CNH-N00", "T-OH8CNH-NNO", "T-OL8CNT-NNO"] @@ -94,6 +98,9 @@ def _create_cmis_api(self, bank=0): elif vendor_name == 'Hisense' and vendor_pn is not None and re.match(HISENSE_2X100G_VENDOR_PN, vendor_pn): xcvr_eeprom = XcvrEeprom(self.reader, self.writer, CmisMemMap(CmisCodes, bank=bank)) api = CmisAocSingleBankApi(xcvr_eeprom, init_cdb_fw_handler=True) + elif vendor_pn in ARISTA_ENHANCED_LPO_PN_LIST: + xcvr_eeprom = XcvrEeprom(self.reader, self.writer, CmisEnhancedLpoMemMap(CmisCodes, bank=bank)) + api = CmisEnhancedLpoApi(xcvr_eeprom, init_cdb_fw_handler=True) else: xcvr_eeprom = XcvrEeprom(self.reader, self.writer, CmisMemMap(CmisCodes, bank=bank)) api = CmisApi(xcvr_eeprom, init_cdb_fw_handler=True) diff --git a/tests/sonic_xcvr/test_cmis_enhanced_lpo.py b/tests/sonic_xcvr/test_cmis_enhanced_lpo.py new file mode 100644 index 000000000..eabe15da5 --- /dev/null +++ b/tests/sonic_xcvr/test_cmis_enhanced_lpo.py @@ -0,0 +1,341 @@ +from unittest.mock import patch, MagicMock +import pytest + +from sonic_platform_base.sonic_xcvr.api.arista.cmis_enhanced_lpo import CmisEnhancedLpoApi +from sonic_platform_base.sonic_xcvr.codes.public.cmis import CmisCodes +from sonic_platform_base.sonic_xcvr.mem_maps.arista.cmis_enhanced_lpo import CmisEnhancedLpoMemMap +from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom +from sonic_platform_base.sonic_xcvr.fields import arista_lpo_consts as lpo +from sonic_platform_base.sonic_xcvr.xcvr_api_factory import XcvrApiFactory + + +class TestCmisEnhancedLpoApi: + def setup_method(self): + self.mem_map = CmisEnhancedLpoMemMap(CmisCodes) + reader = MagicMock(return_value=None) + writer = MagicMock() + self.eeprom = XcvrEeprom(reader, writer, self.mem_map) + self.api = CmisEnhancedLpoApi(self.eeprom) + + # ------------------------------------------------------------------ + # get_transceiver_info + # ------------------------------------------------------------------ + + @pytest.mark.parametrize("lpo_reads, expected_lpo", [ + ( + # All fields readable + { + lpo.LPO_TX_POLARITY_INVERTED: 0b10000001, + lpo.LPO_RX_POLARITY_INVERTED: 0b01000010, + # bits 3-0 = 0x03 -> 3 * 5 = 15 mV + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: 0b11110011, + # bits 3-0 = 0x05 -> 5 * 0.2 = 1.0 dB + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: 0b11110101, + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: 50, # 50 * 0.1 = 5.0 dB + }, + { + lpo.LPO_TX_POLARITY_INVERTED: 0b10000001, + lpo.LPO_RX_POLARITY_INVERTED: 0b01000010, + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: 15.0, + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: 1.0, + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: 5.0, + }, + ), + ( + # Failed reads -> N/A + { + lpo.LPO_TX_POLARITY_INVERTED: None, + lpo.LPO_RX_POLARITY_INVERTED: None, + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: None, + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: None, + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: None, + }, + { + lpo.LPO_TX_POLARITY_INVERTED: 'N/A', + lpo.LPO_RX_POLARITY_INVERTED: 'N/A', + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: 'N/A', + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: 'N/A', + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: 'N/A', + }, + ), + ( + # Zero raw values -> zero scaled values + { + lpo.LPO_TX_POLARITY_INVERTED: 0x00, + lpo.LPO_RX_POLARITY_INVERTED: 0x00, + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: 0x00, + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: 0x00, + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: 0, + }, + { + lpo.LPO_TX_POLARITY_INVERTED: 0x00, + lpo.LPO_RX_POLARITY_INVERTED: 0x00, + lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY: 0.0, + lpo.LPO_RX_INPUT_OMA_MON_ACCURACY: 0.0, + lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX: 0.0, + }, + ), + ]) + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info', + return_value={'type': 'QSFP-DD', 'model': 'LPO-800G-2DR4'}) + def test_get_transceiver_info(self, mock_super, lpo_reads, expected_lpo): + self.api.xcvr_eeprom.read = MagicMock(side_effect=lambda f: lpo_reads.get(f)) + result = self.api.get_transceiver_info() + for key, expected_val in expected_lpo.items(): + assert result[key] == expected_val + + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info', + return_value=None) + def test_get_transceiver_info_super_returns_none(self, _): + assert self.api.get_transceiver_info() is None + + # ------------------------------------------------------------------ + # get_transceiver_dom_real_value + # ------------------------------------------------------------------ + + @pytest.mark.parametrize("vma_raws, oma_raws, expected_vma, expected_oma", [ + ( + [10, 20, 30, 40, 50, 60, 70, 80], # VMA raw counts + [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000], # OMA raw counts + [50.0, 100.0, 150.0, 200.0, 250.0, 300.0, 350.0, 400.0], # mV + [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8], # mW + ), + ( + [None] * 8, + [None] * 8, + ['N/A'] * 8, + ['N/A'] * 8, + ), + ( + [255] * 8, # max U8 -> 255 * 5 = 1275 mV + [65535] * 8, # max U16 -> 65535 * 0.0001 = 6.5535 mW, rounds to 6.554 at precision=3 + [1275.0] * 8, + [6.554] * 8, + ), + ( + # Mixed: alternate None and non-None per lane + [10, None, 30, None, 50, None, 70, None], + [1000, None, 3000, None, 5000, None, 7000, None], + [50.0, 'N/A', 150.0, 'N/A', 250.0, 'N/A', 350.0, 'N/A'], + [0.1, 'N/A', 0.3, 'N/A', 0.5, 'N/A', 0.7, 'N/A'], + ), + ]) + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_dom_real_value', + return_value={'temperature': 25.0, 'voltage': 3.3}) + def test_get_transceiver_dom_real_value(self, mock_super, + vma_raws, oma_raws, + expected_vma, expected_oma): + vma_fields = [ + lpo.LPO_HOST_INPUT_VMA_TX1, lpo.LPO_HOST_INPUT_VMA_TX2, + lpo.LPO_HOST_INPUT_VMA_TX3, lpo.LPO_HOST_INPUT_VMA_TX4, + lpo.LPO_HOST_INPUT_VMA_TX5, lpo.LPO_HOST_INPUT_VMA_TX6, + lpo.LPO_HOST_INPUT_VMA_TX7, lpo.LPO_HOST_INPUT_VMA_TX8, + ] + oma_fields = [ + lpo.LPO_INPUT_OMA_RX1, lpo.LPO_INPUT_OMA_RX2, + lpo.LPO_INPUT_OMA_RX3, lpo.LPO_INPUT_OMA_RX4, + lpo.LPO_INPUT_OMA_RX5, lpo.LPO_INPUT_OMA_RX6, + lpo.LPO_INPUT_OMA_RX7, lpo.LPO_INPUT_OMA_RX8, + ] + read_map = {f: v for f, v in zip(vma_fields + oma_fields, vma_raws + oma_raws)} + self.api.xcvr_eeprom.read = MagicMock(side_effect=lambda f: read_map.get(f)) + + result = self.api.get_transceiver_dom_real_value() + for field, expected in zip(vma_fields, expected_vma): + assert result[field] == expected + for field, expected in zip(oma_fields, expected_oma): + assert result[field] == expected + + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_dom_real_value', + return_value=None) + def test_get_transceiver_dom_real_value_super_returns_none(self, _): + assert self.api.get_transceiver_dom_real_value() is None + + # ------------------------------------------------------------------ + # get_transceiver_threshold_info + # ------------------------------------------------------------------ + + @pytest.mark.parametrize("vma_raws, oma_raws, expected_vma_mv, expected_oma_mw", [ + ( + [200, 50, 180, 70], # high alarm, low alarm, high warn, low warn (raw counts) + [10000, 2000, 8000, 3000], + [1000.0, 250.0, 900.0, 350.0], # mV + [1.0, 0.2, 0.8, 0.3], # mW + ), + ( + [None, None, None, None], + [None, None, None, None], + ['N/A', 'N/A', 'N/A', 'N/A'], + ['N/A', 'N/A', 'N/A', 'N/A'], + ), + ]) + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_threshold_info', + return_value={'temphighalarm': 80.0}) + def test_get_transceiver_threshold_info(self, mock_super, + vma_raws, oma_raws, + expected_vma_mv, expected_oma_mw): + vma_threshold_fields = [ + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_THRESHOLD, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_THRESHOLD, + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_THRESHOLD, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_THRESHOLD, + ] + oma_threshold_fields = [ + lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_THRESHOLD, + lpo.LPO_RX_INPUT_OMA_LOW_ALARM_THRESHOLD, + lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_THRESHOLD, + lpo.LPO_RX_INPUT_OMA_LOW_WARNING_THRESHOLD, + ] + read_map = {f: v for f, v in zip( + vma_threshold_fields + oma_threshold_fields, + vma_raws + oma_raws, + )} + self.api.xcvr_eeprom.read = MagicMock(side_effect=lambda f: read_map.get(f)) + + result = self.api.get_transceiver_threshold_info() + for field, expected in zip(vma_threshold_fields, expected_vma_mv): + assert result[field] == expected + for field, expected in zip(oma_threshold_fields, expected_oma_mw): + assert result[field] == expected + + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_threshold_info', + return_value=None) + def test_get_transceiver_threshold_info_super_returns_none(self, _): + assert self.api.get_transceiver_threshold_info() is None + + # ------------------------------------------------------------------ + # get_transceiver_dom_flags (per-lane flag unpacking) + # ------------------------------------------------------------------ + + @pytest.mark.parametrize("bitmask, expected_lanes", [ + (0xFF, {i: True for i in range(1, 9)}), # all lanes set + (0x00, {i: False for i in range(1, 9)}), # no lanes set + (0x01, {1: True, 2: False, 3: False, 4: False, + 5: False, 6: False, 7: False, 8: False}), # lane 1 only + (0x80, {1: False, 2: False, 3: False, 4: False, + 5: False, 6: False, 7: False, 8: True}), # lane 8 only + (0x81, {1: True, 2: False, 3: False, 4: False, + 5: False, 6: False, 7: False, 8: True}), # lanes 1 and 8 + ]) + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_dom_flags', + return_value={}) + def test_get_transceiver_dom_flags_unpack(self, mock_super, bitmask, expected_lanes): + self.api.xcvr_eeprom.read = MagicMock(return_value=bitmask) + result = self.api.get_transceiver_dom_flags() + + flag_field_names = [ + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_FLAG, + lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_FLAG, + lpo.LPO_RX_INPUT_OMA_LOW_ALARM_FLAG, + lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_FLAG, + lpo.LPO_RX_INPUT_OMA_LOW_WARNING_FLAG, + ] + for field_name in flag_field_names: + for lane, expected_val in expected_lanes.items(): + key = "{}{}".format(field_name, lane) + assert result[key] == expected_val, \ + "bitmask=0x{:02x} field={} lane={}: got {}, want {}".format( + bitmask, field_name, lane, result[key], expected_val) + + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_dom_flags', + return_value={}) + def test_get_transceiver_dom_flags_none_read(self, mock_super): + self.api.xcvr_eeprom.read = MagicMock(return_value=None) + result = self.api.get_transceiver_dom_flags() + all_flag_fields = [ + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_FLAG, + lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_FLAG, + lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_FLAG, + lpo.LPO_RX_INPUT_OMA_LOW_ALARM_FLAG, + lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_FLAG, + lpo.LPO_RX_INPUT_OMA_LOW_WARNING_FLAG, + ] + for field in all_flag_fields: + for lane in range(1, 9): + assert result["{}{}".format(field, lane)] == 'N/A' + + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_dom_flags', + return_value=None) + def test_get_transceiver_dom_flags_super_returns_none(self, _): + assert self.api.get_transceiver_dom_flags() is None + + # ------------------------------------------------------------------ + # mem_map: verify field offsets are registered correctly + # ------------------------------------------------------------------ + + @pytest.mark.parametrize("field_name, expected_offset", [ + # Page 01h advertisement bytes (page=1, bank=0): offset = 1*128 + byte + (lpo.LPO_EEPROM_COMPLIANCE, 1 * 128 + 195), + (lpo.LPO_ENHANCED_SPEC_VERSION, 1 * 128 + 196), + # Page C1h (page=0xC1=193, bank=0): offset = 193*128 + byte + (lpo.LPO_CAPABILITY, 193 * 128 + 128), + (lpo.LPO_TX_OUTER_EXTINCTION_RATIO_MAX, 193 * 128 + 129), + (lpo.LPO_TX_POLARITY_INVERTED, 193 * 128 + 133), + (lpo.LPO_RX_POLARITY_INVERTED, 193 * 128 + 134), + (lpo.LPO_TX_HOST_INPUT_VMA_MON_ACCURACY, 193 * 128 + 135), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_THRESHOLD, 193 * 128 + 136), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_THRESHOLD, 193 * 128 + 137), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_THRESHOLD, 193 * 128 + 138), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_THRESHOLD, 193 * 128 + 139), + (lpo.LPO_RX_INPUT_OMA_MON_ACCURACY, 193 * 128 + 140), + (lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_THRESHOLD, 193 * 128 + 141), + (lpo.LPO_RX_INPUT_OMA_LOW_ALARM_THRESHOLD, 193 * 128 + 143), + (lpo.LPO_RX_INPUT_OMA_HIGH_WARNING_THRESHOLD, 193 * 128 + 145), + (lpo.LPO_RX_INPUT_OMA_LOW_WARNING_THRESHOLD, 193 * 128 + 147), + # Page C2h (page=0xC2=194, bank=0): offset = 194*128 + byte + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG, 194 * 128 + 141), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_ALARM_FLAG, 194 * 128 + 142), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_WARNING_FLAG, 194 * 128 + 143), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_FLAG, 194 * 128 + 144), + (lpo.LPO_HOST_INPUT_VMA_TX1, 194 * 128 + 145), + (lpo.LPO_HOST_INPUT_VMA_TX8, 194 * 128 + 152), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_MASK, 194 * 128 + 153), + (lpo.LPO_TX_HOST_INPUT_VMA_LOW_WARNING_MASK, 194 * 128 + 156), + (lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_FLAG, 194 * 128 + 157), + (lpo.LPO_RX_INPUT_OMA_LOW_WARNING_FLAG, 194 * 128 + 160), + (lpo.LPO_INPUT_OMA_RX1, 194 * 128 + 161), + (lpo.LPO_INPUT_OMA_RX8, 194 * 128 + 175), + (lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_MASK, 194 * 128 + 177), + (lpo.LPO_RX_INPUT_OMA_LOW_WARNING_MASK, 194 * 128 + 180), + ]) + def test_field_offsets(self, field_name, expected_offset): + field = self.mem_map.get_field(field_name) + assert field.get_offset() == expected_offset, \ + "field {} offset: got {}, want {}".format(field_name, field.get_offset(), expected_offset) + + @pytest.mark.parametrize("field_name, expected_size", [ + # U8 fields + (lpo.LPO_EEPROM_COMPLIANCE, 1), + (lpo.LPO_TX_POLARITY_INVERTED, 1), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_THRESHOLD, 1), + (lpo.LPO_TX_HOST_INPUT_VMA_HIGH_ALARM_FLAG, 1), + (lpo.LPO_HOST_INPUT_VMA_TX1, 1), + # U16 fields + (lpo.LPO_RX_INPUT_OMA_HIGH_ALARM_THRESHOLD, 2), + (lpo.LPO_INPUT_OMA_RX1, 2), + (lpo.LPO_INPUT_OMA_RX8, 2), + ]) + def test_field_sizes(self, field_name, expected_size): + field = self.mem_map.get_field(field_name) + assert field.get_size() == expected_size + + # ------------------------------------------------------------------ + # factory dispatch + # ------------------------------------------------------------------ + + @patch('sonic_platform_base.sonic_xcvr.xcvr_api_factory.XcvrApiFactory._get_vendor_name', + MagicMock(return_value='Arista')) + @patch('sonic_platform_base.sonic_xcvr.xcvr_api_factory.XcvrApiFactory._get_vendor_part_num', + MagicMock(return_value='LPO-800G-2DR4')) + def test_factory_creates_arista_lpo_api(self): + def mock_reader(start, length): + return bytes([0x18]) + + factory = XcvrApiFactory(mock_reader, MagicMock()) + api = factory.create_xcvr_api() + assert isinstance(api, CmisEnhancedLpoApi)