From efac71508981ca0d19cab97596b4c4e37b74e987 Mon Sep 17 00:00:00 2001 From: tshalvi Date: Mon, 8 Sep 2025 17:08:37 +0300 Subject: [PATCH] Add lock for each eeprom access allowing only one thread to read/write from the eeprom at a time --- .../mlnx-platform-api/sonic_platform/sfp.py | 157 ++++++++++++------ 1 file changed, 102 insertions(+), 55 deletions(-) diff --git a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py index 307eae7164a..ad4a21aba52 100644 --- a/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py +++ b/platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py @@ -268,6 +268,15 @@ # Global logger class instance logger = Logger() +# --------- Global RLock + simple lock tracing ---------- +_GLOBAL_EEPROM_LOCK = threading.RLock() +_LOCK_TAG = "[EEPROM_LOCK]" + +def _thread_label(): + t = threading.current_thread() + return f"{t.name}:{t.ident}" +# ------------------------------------------------------ + class NvidiaSFPCommon(SfpOptoeBase): sfp_index_to_logical_port_dict = {} @@ -443,7 +452,14 @@ def get_presence(self): presence_sysfs = f'/sys/module/sx_core/asic0/module{self.sdk_index}/hw_present' if self.is_sw_control() else f'/sys/module/sx_core/asic0/module{self.sdk_index}/present' if utils.read_int_from_file(presence_sysfs) != 1: return False - eeprom_raw = self._read_eeprom(0, 1, log_on_error=False) + # Probe a byte under the lock so we don't race with other I/O + logger.log_notice(f'{_LOCK_TAG} wait acquire op=presence_probe sfp={self.sdk_index} offset=0 size=1 thread={_thread_label()}') + with _GLOBAL_EEPROM_LOCK: + logger.log_notice(f'{_LOCK_TAG} acquired op=presence_probe sfp={self.sdk_index} offset=0 size=1 thread={_thread_label()}') + try: + eeprom_raw = self._read_eeprom(0, 1, log_on_error=False) + finally: + logger.log_notice(f'{_LOCK_TAG} released op=presence_probe sfp={self.sdk_index} offset=0 size=1 thread={_thread_label()}') return eeprom_raw is not None @classmethod @@ -451,7 +467,18 @@ def wait_sfp_eeprom_ready(cls, sfp_list, wait_time): not_ready_list = sfp_list while wait_time > 0: - not_ready_list = [s for s in not_ready_list if s.state == STATE_FW_CONTROL and s._read_eeprom(0, 2,False) is None] + tmp = [] + for s in not_ready_list: + if s.state == STATE_FW_CONTROL: + logger.log_notice(f'{_LOCK_TAG} wait acquire op=probe sfp={s.sdk_index} offset=0 size=2 thread={_thread_label()}') + with _GLOBAL_EEPROM_LOCK: + logger.log_notice(f'{_LOCK_TAG} acquired op=probe sfp={s.sdk_index} offset=0 size=2 thread={_thread_label()}') + try: + if s._read_eeprom(0, 2, False) is None: + tmp.append(s) + finally: + logger.log_notice(f'{_LOCK_TAG} released op=probe sfp={s.sdk_index} offset=0 size=2 thread={_thread_label()}') + not_ready_list = tmp if not_ready_list: time.sleep(0.1) wait_time -= 0.1 @@ -469,7 +496,13 @@ def read_eeprom(self, offset, num_bytes): bytearray, if raw sequence of bytes are read correctly from the offset of size num_bytes None, if the read_eeprom fails """ - return self._read_eeprom(offset, num_bytes) + logger.log_notice(f'{_LOCK_TAG} wait acquire op=read sfp={self.sdk_index} offset={offset} size={num_bytes} thread={_thread_label()}') + with _GLOBAL_EEPROM_LOCK: + logger.log_notice(f'{_LOCK_TAG} acquired op=read sfp={self.sdk_index} offset={offset} size={num_bytes} thread={_thread_label()}') + try: + return self._read_eeprom(offset, num_bytes) + finally: + logger.log_notice(f'{_LOCK_TAG} released op=read sfp={self.sdk_index} offset={offset} size={num_bytes} thread={_thread_label()}') def _read_eeprom(self, offset, num_bytes, log_on_error=True): """Read eeprom specfic bytes beginning from a random offset with size as num_bytes @@ -534,38 +567,44 @@ def write_eeprom(self, offset, num_bytes, write_buffer): logger.log_error("Error mismatch between buffer length and number of bytes to be written") return False - while num_bytes > 0: - page_num, page, page_offset = self._get_page_and_page_offset(offset) - if not page: - return False - + logger.log_notice(f'{_LOCK_TAG} wait acquire op=write sfp={self.sdk_index} offset={offset} size={num_bytes} thread={_thread_label()}') + with _GLOBAL_EEPROM_LOCK: + logger.log_notice(f'{_LOCK_TAG} acquired op=write sfp={self.sdk_index} offset={offset} size={num_bytes} thread={_thread_label()}') try: - if self._is_write_protected(page_num, page_offset, num_bytes): - # write limited eeprom is not supported - raise IOError('write limited bytes') - with open(page, mode='r+b', buffering=0) as f: - f.seek(page_offset) - ret = f.write(write_buffer[0:num_bytes]) - written_buffer = write_buffer[0:ret] - if ret != num_bytes: - page_size = f.seek(0, os.SEEK_END) - if page_offset + ret == page_size: - # Move to next page - write_buffer = write_buffer[ret:num_bytes] - offset += ret - else: - raise IOError(f'write return code = {ret}') - num_bytes -= ret - if ctypes.get_errno() != 0: - raise IOError(f'errno = {os.strerror(ctypes.get_errno())}') - logger.log_debug(f'write EEPROM sfp={self.sdk_index}, page={page}, page_offset={page_offset}, '\ - f'size={ret}, left={num_bytes}, data={written_buffer}') - except (OSError, IOError) as e: - data = ''.join('{:02x}'.format(x) for x in write_buffer) - logger.log_error(f'Failed to write EEPROM data sfp={self.sdk_index} EEPROM page={page}, page_offset={page_offset}, size={num_bytes}, '\ - f'offset={offset}, data = {data}, error = {e}') - return False - return True + while num_bytes > 0: + page_num, page, page_offset = self._get_page_and_page_offset(offset) + if not page: + return False + + try: + if self._is_write_protected(page_num, page_offset, num_bytes): + # write limited eeprom is not supported + raise IOError('write limited bytes') + with open(page, mode='r+b', buffering=0) as f: + f.seek(page_offset) + ret = f.write(write_buffer[0:num_bytes]) + written_buffer = write_buffer[0:ret] + if ret != num_bytes: + page_size = f.seek(0, os.SEEK_END) + if page_offset + ret == page_size: + # Move to next page + write_buffer = write_buffer[ret:num_bytes] + offset += ret + else: + raise IOError(f'write return code = {ret}') + num_bytes -= ret + if ctypes.get_errno() != 0: + raise IOError(f'errno = {os.strerror(ctypes.get_errno())}') + logger.log_debug(f'write EEPROM sfp={self.sdk_index}, page={page}, page_offset={page_offset}, '\ + f'size={ret}, left={num_bytes}, data={written_buffer}') + except (OSError, IOError) as e: + data = ''.join('{:02x}'.format(x) for x in write_buffer) + logger.log_error(f'Failed to write EEPROM data sfp={self.sdk_index} EEPROM page={page}, page_offset={page_offset}, size={num_bytes}, '\ + f'offset={offset}, data = {data}, error = {e}') + return False + return True + finally: + logger.log_notice(f'{_LOCK_TAG} released op=write sfp={self.sdk_index} offset={offset} size={num_bytes} thread={_thread_label()}') def get_lpmode(self): """ @@ -747,27 +786,34 @@ def _get_sfp_type_str(self, eeprom_path): """ if self._sfp_type_str is None: page = os.path.join(eeprom_path, SFP_PAGE0_PATH) - try: - with open(page, mode='rb', buffering=0) as f: - id_byte_raw = bytearray(f.read(1)) - id = id_byte_raw[0] - if id == 0x18 or id == 0x19 or id == 0x1e: - self._sfp_type_str = SFP_TYPE_CMIS - elif id == 0x11 or id == 0x0D: - # in sonic-platform-common, 0x0D is treated as sff8436, - # but it shared the same implementation on Nvidia platforms, - # so, we treat it as sff8636 here. - self._sfp_type_str = SFP_TYPE_SFF8636 - elif id == 0x03: - self._sfp_type_str = SFP_TYPE_SFF8472 - else: - logger.log_error(f'Unsupported sfp type {id}') - except (OSError, IOError) as e: - # SFP_EEPROM_NOT_AVAILABLE usually indicates SFP is not present, no need - # print such error information to log - if SFP_EEPROM_NOT_AVAILABLE not in str(e): - logger.log_error(f'Failed to get SFP type, index={self.sdk_index}, error={e}') - return None + # Protect this direct byte read so it doesn't interleave with other EEPROM I/O + logger.log_notice(f'{_LOCK_TAG} wait acquire op=peek_id sfp={self.sdk_index} offset=0 size=1 thread={_thread_label()}') + with _GLOBAL_EEPROM_LOCK: + logger.log_notice(f'{_LOCK_TAG} acquired op=peek_id sfp={self.sdk_index} offset=0 size=1 thread={_thread_label()}') + try: + try: + with open(page, mode='rb', buffering=0) as f: + id_byte_raw = bytearray(f.read(1)) + id = id_byte_raw[0] + if id == 0x18 or id == 0x19 or id == 0x1e: + self._sfp_type_str = SFP_TYPE_CMIS + elif id == 0x11 or id == 0x0D: + # in sonic-platform-common, 0x0D is treated as sff8436, + # but it shared the same implementation on Nvidia platforms, + # so, we treat it as sff8636 here. + self._sfp_type_str = SFP_TYPE_SFF8636 + elif id == 0x03: + self._sfp_type_str = SFP_TYPE_SFF8472 + else: + logger.log_error(f'Unsupported sfp type {id}') + except (OSError, IOError) as e: + # SFP_EEPROM_NOT_AVAILABLE usually indicates SFP is not present, no need + # print such error information to log + if SFP_EEPROM_NOT_AVAILABLE not in str(e): + logger.log_error(f'Failed to get SFP type, index={self.sdk_index}, error={e}') + return None + finally: + logger.log_notice(f'{_LOCK_TAG} released op=peek_id sfp={self.sdk_index} offset=0 size=1 thread={_thread_label()}') return self._sfp_type_str def _is_write_protected(self, page, page_offset, num_bytes): @@ -1811,3 +1857,4 @@ def get_module_status(self): """ status = super().get_module_status() return SFP_STATUS_REMOVED if status == SFP_STATUS_UNKNOWN else status +