From badb5299208f8a81d552549e0beb00501df685e5 Mon Sep 17 00:00:00 2001 From: Raghav Verma Date: Fri, 13 Mar 2026 13:55:55 -0700 Subject: [PATCH] No public description PiperOrigin-RevId: 883325139 --- lib/constants.py | 1 + saferpickle.py | 160 +++++++++++++++++++++-------------------------- 2 files changed, 72 insertions(+), 89 deletions(-) diff --git a/lib/constants.py b/lib/constants.py index fc3ac23..114a4cf 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -115,6 +115,7 @@ "load_module", "loads", "lzma.open", + "malicious", "marshal", "msvcrt", "open", diff --git a/saferpickle.py b/saferpickle.py index 05ff012..f6f321d 100644 --- a/saferpickle.py +++ b/saferpickle.py @@ -144,7 +144,7 @@ def _custom_chunked_genops( pickle_file: BinaryIO, chunk_range: Tuple[int, int], ) -> Iterator[tuple[pickletools.OpcodeInfo, Any | None]]: - """Generates string-declaring opcodes and arguments from a chunk of pickle data. + """Generates string-declaring opcodes and arguments from a chunk. This function reads a specific byte range (chunk) of the pickle bytecode and yields opcodes that are known to declare strings, along with their @@ -232,7 +232,7 @@ def _process_chunk_for_generate_ops( chunk_range: Tuple[int, int], is_shared_memory: bool = False, ) -> Set[str]: - """Helper function for generate_ops to process a chunk of pickle data.""" + """Helper function to process a chunk of pickle data.""" chunked_operands = set() try: if is_shared_memory: @@ -266,7 +266,7 @@ def generate_ops_from_file( shm_name: Optional[str] = None, pickle_length: Optional[int] = None, ) -> Set[str]: - """Returns opcodes that declare strings from a pickle file path or shared memory. + """Returns opcodes that declare strings from a path or shared memory. Args: pickle_file_path: The path to the pickle file. @@ -344,7 +344,7 @@ def generate_ops_from_file( def generate_ops(pickle_bytes: bytes) -> Set[str]: - """Returns opcodes that declare strings from a pickle file. + """Returns string-declaring opcodes. Args: pickle_bytes: The pickle bytecode to yield opcode information for. @@ -393,6 +393,17 @@ def get_class_instantiations(pickle_bytes: bytes) -> tuple[io.StringIO, bool]: unsafe_modules=constants.UNSAFE_STRINGS, ) factory.default.unpickler = unpickler + + # Monkey-patch load_build so that we don't miss + # BUILD instructions due to differing Pickle implementations. + original_load_build = unpickler.load_build + + def fixed_load_build(*unused_args): + return original_load_build() + + unpickler.load_build = fixed_load_build + unpickler.dispatch[pickle.BUILD[0]] = unpickler.load_build + unpickler.load() # These errors are expected and should not be raised. @@ -410,9 +421,11 @@ def get_class_instantiations(pickle_bytes: bytes) -> tuple[io.StringIO, bool]: ): pass - is_build_instr_blocked = getattr( - unpickler, "has_blocked_unsafe_build_instr", False - ) + is_build_instr_blocked = False + if unpickler: + is_build_instr_blocked = getattr( + unpickler, "has_blocked_unsafe_build_instr", False + ) return picklemagic_output, is_build_instr_blocked @@ -420,7 +433,7 @@ def categorize_strings( filtered_output: Set[str] | io.StringIO, use_picklemagic: bool = False, ) -> ScanResults: - """Counts the relevant strings from the filtered output and categorizes them. + """Counts strings from filtered output and categorizes them. Args: filtered_output: The series of statements filtered by string declarations. @@ -591,7 +604,7 @@ def categorize_strings( def strict_security_scan(pickle_bytes: bytes) -> bool: - """Strict security scan to detect malicious content in pickle files. + """Strict security scan for malicious content in pickle files. Args: pickle_bytes: Pickle bytecode to scan. @@ -669,7 +682,7 @@ def is_unsafe( def picklemagic_scan( pickle_bytes: bytes, ) -> ScanResults: - """Picklemagic scan to detect malicious content in pickle files. + """Picklemagic scan for malicious content in pickle files. Args: pickle_bytes: Pickle bytecode to scan. @@ -696,7 +709,7 @@ def genops_scan( pickle_file_path: Optional[str] = None, shm_name: Optional[str] = None, ) -> ScanResults: - """Genops scan to detect malicious content in pickle files. + """Genops scan for malicious content in pickle files. Args: pickle_bytes: Pickle bytecode to scan. @@ -1075,15 +1088,6 @@ def _scan_and_load( if not loader_mod: loader_mod = pickle_copy - if strict_check and allow_unsafe: - error_string_illegal_combination = ( - "Strict scanning and allow_unsafe cannot be used together." - ) - if report_only: - logging.error(error_string_illegal_combination) - return - raise IllegalArgumentCombinationError(error_string_illegal_combination) - if is_load: load_func = loader_mod.load load_args = (pickle_file,) @@ -1091,82 +1095,60 @@ def _scan_and_load( load_func = loader_mod.loads load_args = (data_bytes,) - if allow_unsafe: + if strict_check and allow_unsafe: + error_string_illegal_combination = ( + "Strict scanning and allow_unsafe cannot be used together." + ) + if report_only: + logging.error(error_string_illegal_combination) + else: + raise IllegalArgumentCombinationError(error_string_illegal_combination) + elif allow_unsafe: if report_only: logging.info("Loading pickle file with allow_unsafe set to True.") - try: - return load_func(*load_args, *args, **kwargs) - except AttributeError as exc: - logging.info("Could not load an absent class: %s", exc) - return - - if strict_check: + elif strict_check: if strict_security_scan(data_bytes): error_string_strict_check = "Pickle file failed strict security check." if report_only: logging.error(error_string_strict_check) - return - raise StrictCheckError(error_string_strict_check) - try: - return load_func(*load_args, *args, **kwargs) - except (AttributeError, pickle.UnpicklingError) as exc: - if "persistent load" in str(exc): - logging.info("Persistent load error: %s", exc) - return - elif "Can't get attribute" in str(exc): - logging.exception( - "Could not load an absent class: %s", exc, exc_info=True - ) - raise UnsafePickleDetectedError( - constants.ERROR_STRING.substitute( - classification=Classification.SUSPICIOUS.value - ) - ) from exc - elif "underflow" in str(exc): - raise - logging.exception("Unknown error: %s", exc, exc_info=True) - return - - # If we get here, we are not in strict check or allow_unsafe mode. - # We perform non-strict scanning as usual with force_scan if needed. - scan_scores = security_scan(data_bytes, force_scan=force_scan) - number_of_unsafe_results = scan_scores["unsafe"] - number_of_suspicious_results = scan_scores["suspicious"] - number_of_unknown_results = scan_scores["unknown"] - - if number_of_suspicious_results == 0 and number_of_unsafe_results == 0: - if report_only: - logging.info("Loading safe pickle file") - if number_of_unknown_results > 0: - logging.warning( - "SaferPickle: File contains %d unknown items that were ignored.", - number_of_unknown_results, - ) - try: - return load_func(*load_args, *args, **kwargs) - except (AttributeError, pickle.UnpicklingError) as exc: - if "persistent load" in str(exc): - logging.info("Persistent load error: %s", exc) - return - elif "Can't get attribute" in str(exc): - logging.exception( - "Could not load an absent class: %s", exc, exc_info=True - ) - raise UnsafePickleDetectedError( - constants.ERROR_STRING.substitute( - classification=Classification.SUSPICIOUS.value - ) - ) from exc - elif "underflow" in str(exc): - raise - logging.exception("Unknown error: %s", exc, exc_info=True) - return - - elif number_of_unsafe_results > number_of_suspicious_results: - _report_or_raise(Classification.UNSAFE, report_only, log_info) - return + else: + raise StrictCheckError(error_string_strict_check) else: - _report_or_raise(Classification.SUSPICIOUS, report_only, log_info) + # Default scanning routines + scan_scores = security_scan(data_bytes, force_scan=force_scan) + number_of_unsafe_results = scan_scores["unsafe"] + number_of_suspicious_results = scan_scores["suspicious"] + number_of_unknown_results = scan_scores["unknown"] + + if number_of_suspicious_results == 0 and number_of_unsafe_results == 0: + if report_only: + logging.info("Loading safe pickle file") + if number_of_unknown_results > 0: + logging.warning( + "SaferPickle: File contains %d unknown items that were ignored.", + number_of_unknown_results, + ) + elif number_of_unsafe_results > number_of_suspicious_results: + _report_or_raise(Classification.UNSAFE, report_only, log_info) + else: + _report_or_raise(Classification.SUSPICIOUS, report_only, log_info) + + # Load the pickle if report_only is True and no exceptions were raised earlier + try: + return load_func(*load_args, *args, **kwargs) + except (AttributeError, pickle.UnpicklingError, ModuleNotFoundError) as exc: + if "persistent load" in str(exc): + logging.info("Persistent load error: %s", exc) + elif "Can't get attribute" in str(exc): + logging.exception( + "Could not load an absent class: %s", exc, exc_info=True + ) + elif "underflow" in str(exc): + logging.exception("Unpickling underflow error: %s", exc, exc_info=True) + elif "No module named" in str(exc): + logging.exception("Module was not found: %s", exc, exc_info=True) + else: + logging.exception("Unknown error during load: %s", exc, exc_info=True) return