From 553b7cf4ab1569c961b77fc4393c54d106e2281c Mon Sep 17 00:00:00 2001 From: Raghav Verma Date: Fri, 27 Feb 2026 19:38:59 +0000 Subject: [PATCH] No public description FUTURE_COPYBARA_INTEGRATE_REVIEW=https://github.com/google/saferpickle/pull/8 from PrathameshWalunj:add-unsafe-modules 165be490ce17c9c0b057b0d400e27703dd352cb5 PiperOrigin-RevId: 876350504 --- cli.py | 8 +- lib/constants.py | 11 ++- lib/utils.py | 2 +- saferpickle.py | 186 ++++++++++++++++++++++------------------------- 4 files changed, 100 insertions(+), 107 deletions(-) diff --git a/cli.py b/cli.py index 45dad55..f486e40 100644 --- a/cli.py +++ b/cli.py @@ -64,7 +64,7 @@ def security_scan_with_justifications( # Call the individual scan functions from SaferPickle to sets of results. # Picklemagic Scan - picklemagic_results = safer_pickle.picklemagic_scan(pickle_bytes) + picklemagic_results = saferpickle.picklemagic_scan(pickle_bytes) safe_results.update(picklemagic_results.safe_results) unsafe_results.update(picklemagic_results.unsafe_results) @@ -72,7 +72,7 @@ def security_scan_with_justifications( unknown_results.update(picklemagic_results.unknown_results) # Genops Scan - genops_results = safer_pickle.genops_scan( + genops_results = saferpickle.genops_scan( pickle_bytes, pickle_file_path=file_path ) safe_results.update(genops_results.safe_results) @@ -97,7 +97,7 @@ def security_scan_with_justifications( num_unsafe, num_suspicious, _, # The unknown_score is not used for classification, only reporting - ) = safer_pickle.score_results( + ) = saferpickle.score_results( final_safe_results, final_unsafe_results, final_suspicious_results, @@ -105,7 +105,7 @@ def security_scan_with_justifications( ) # Check for safety and return the results with justifications. - if safer_pickle.is_unsafe(num_safe, num_unsafe, num_suspicious): + if saferpickle.is_unsafe(num_safe, num_unsafe, num_suspicious): if num_unsafe > num_suspicious: classification = "unsafe" all_results = [] diff --git a/lib/constants.py b/lib/constants.py index c43f8db..fc3ac23 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -85,6 +85,8 @@ "cProfile", "cloudpickle.load", "cloudpickle.loads", + "code.interact", + "code.InteractiveConsole", "code.InteractiveInterpreter", "codecs.decode", "codeop.compile_command", @@ -101,6 +103,7 @@ "eval", "exec", "execfile", + "fileinput", "get_type_hints", "gzip", "hashlib", @@ -134,7 +137,9 @@ "read", "requests", "runpy", - "safer_pickle_hook", + "safer_pickle", + "saferpickle", + "shutil", "socket", "ssl", "stdin", @@ -184,6 +189,7 @@ "reconstruct", "scipy", "set", + "shutil.disk_usage", "sklearn", "spacy", "str", @@ -313,3 +319,6 @@ # Cap the number of workers at half the available CPU cores to avoid excessive # overhead and resource contention. MAX_NUM_CHUNKS = (os.cpu_count() or 1) // 2 + +# High score to indicate definite unsafety due to a detected zip slip. +HIGH_SEVERITY_ZIPSLIP = 1337 diff --git a/lib/utils.py b/lib/utils.py index c8ff86f..02343b1 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Utility functions for safer_pickle.""" +"""Utility functions for saferpickle.""" import ast import bz2 diff --git a/saferpickle.py b/saferpickle.py index 1c9f5eb..42d2086 100644 --- a/saferpickle.py +++ b/saferpickle.py @@ -247,10 +247,15 @@ def _process_chunk_for_generate_ops( chunked_operands.add(str(operand)) else: with open(pickle_data_source, "rb") as f: - for _, operand in _custom_chunked_genops(f, chunk_range): - if operand is None: - continue - chunked_operands.add(str(operand)) + f.seek(chunk_range[0]) + chunk_data = f.read(chunk_range[1] - chunk_range[0]) + with io.BytesIO(chunk_data) as memory_f: + for _, operand in _custom_chunked_genops( + memory_f, (0, len(chunk_data)) + ): + if operand is None: + continue + chunked_operands.add(str(operand)) except StopIteration: pass return chunked_operands @@ -565,10 +570,8 @@ def categorize_strings( elif classification == Classification.SUSPICIOUS: new_suspicious_results.add(result) elif classification == Classification.UNKNOWN: - new_unknown_results.add(result) - else: # Fallback: Check against original categories if - # classify_class_name returns None. + # classify_class_name returns UNKNOWN. if result in unsafe_results: new_unsafe_results.add(result) elif result in suspicious_results: @@ -626,41 +629,27 @@ def strict_security_scan(pickle_bytes: bytes) -> bool: def is_unsafe( - number_of_safe_results: int, - number_of_unsafe_results: int, - number_of_suspicious_results: int, + safe_score: int, + unsafe_score: int, + suspicious_score: int, ) -> bool: """Conditional check for safeness. Args: - number_of_safe_results: Number of safe results from the security scan. - number_of_unsafe_results: Number of unsafe results from the security scan. - number_of_suspicious_results: Number of suspicious results from the security - scan. + safe_score: Safe score from the security scan. + unsafe_score: Unsafe score from the security scan. + suspicious_score: Suspicious score from the security scan. Returns: True if the pickle file is dangerous, False otherwise. """ - if number_of_unsafe_results == 0 and number_of_suspicious_results == 0: + if unsafe_score == 0 and suspicious_score == 0: return False - # We halve the weight of suspicious results to lower false positives - # caused by greedy matches of unknown method-like strings (Ex. "google.com") - if ( - number_of_suspicious_results + number_of_unsafe_results - >= number_of_safe_results - ): - return True - - sum_of_unsafe_and_suspicious_results = ( - number_of_unsafe_results + 0.5 * number_of_suspicious_results - ) - - unsafe = (sum_of_unsafe_and_suspicious_results > number_of_safe_results) or ( - number_of_safe_results == 0 and sum_of_unsafe_and_suspicious_results >= 1 - ) - - return unsafe + # 0.5 was chosen as the weight for suspicious results + # to guard against unknown results filtering into suspicious results. + sum_of_unsafe_and_suspicious_scores = unsafe_score + 0.5 * suspicious_score + return sum_of_unsafe_and_suspicious_scores >= safe_score def picklemagic_scan( @@ -741,8 +730,8 @@ def score_results( number_of_unknown_results = len(unknown_results) safe_score = math.log(number_of_safe_results + 1) * 2 - unsafe_score = math.log(number_of_unsafe_results + 1) * 4 - suspicious_score = math.log(number_of_suspicious_results + 1) * 3 + unsafe_score = math.log(number_of_unsafe_results + 1) * 10 + suspicious_score = math.log(number_of_suspicious_results + 1) * 5 unknown_score = math.log(number_of_unknown_results + 1) * 1 return ( @@ -785,10 +774,10 @@ def apply_approach( logging.info(" Unknown results: %s\n", results.unknown_results) ( - number_of_safe_results, - number_of_unsafe_results, - number_of_suspicious_results, - number_of_unknown_results, + safe_score, + unsafe_score, + suspicious_score, + unknown_score, ) = score_results( results.safe_results, results.unsafe_results, @@ -796,14 +785,14 @@ def apply_approach( results.unknown_results, ) scores = { - "unsafe": number_of_unsafe_results, - "suspicious": number_of_suspicious_results, - "unknown": number_of_unknown_results, + "unsafe": unsafe_score, + "suspicious": suspicious_score, + "unknown": unknown_score, } if results.is_denylisted or is_unsafe( - number_of_safe_results, - number_of_unsafe_results, - number_of_suspicious_results, + safe_score, + unsafe_score, + suspicious_score, ): return scores @@ -867,9 +856,12 @@ def _extract_and_scan_archive( for name in zf.namelist(): if ".." in name or name.startswith("/"): # Zip slip detection - all_scores["unsafe"] += 1337 # High score for zip slip logging.warning("Zip slip detected: %s", name) - continue + return { + "unsafe": constants.HIGH_SEVERITY_ZIPSLIP, + "suspicious": 0, + "unknown": 0, + } # Return early with zf.open(name) as f: content = f.read() @@ -894,15 +886,22 @@ def _extract_and_scan_archive( elif archive_type == "tar": for name, content in utils.extract_tar_contents(data): if ".." in name or name.startswith("/"): - all_scores["unsafe"] += 1337 logging.warning("Tar slip detected: %s", name) - continue + return { + "unsafe": constants.HIGH_SEVERITY_ZIPSLIP, + "suspicious": 0, + "unknown": 0, + } # Return early scores = security_scan(content, recursion_depth=recursion_depth + 1) _merge_scores(all_scores, scores) else: logging.warning("Unsupported archive type: %s", archive_type) - return {"unsafe": 0, "suspicious": 0, "unknown": 1337} + return { + "unsafe": 0, + "suspicious": 0, + "unknown": constants.HIGH_SEVERITY_ZIPSLIP, + } except MaxRecursionDepthExceededError: raise @@ -996,7 +995,8 @@ def _security_scan_internal( os.remove(pickle_file_path) -_thread_local_storage_for_hooking = threading.local() +_ORIG_METHODS_BEFORE_HOOKING = {} +_HOOKING_LOCK = threading.Lock() def _report_or_raise( @@ -1047,9 +1047,9 @@ def _scan_and_load( raise TypeError("pickle_file_or_bytes must be IOBase when is_load=True") pickle_file = pickle_file_or_bytes data_bytes = pickle_file.read() - if hasattr(pickle_file, "seek"): + try: pickle_file.seek(0) - else: + except (OSError, AttributeError, io.UnsupportedOperation): pickle_file = io.BytesIO(data_bytes) else: if not isinstance(pickle_file_or_bytes, bytes): @@ -1163,10 +1163,6 @@ def hook_pickle( ) -> None: """This implements the hooking of pickle-like libraries.""" config.set_config_path(config_path) - if not hasattr( - _thread_local_storage_for_hooking, "orig_methods_before_hooking" - ): - _thread_local_storage_for_hooking.orig_methods_before_hooking = {} def custom_loads( pickle_bytes: bytes, @@ -1295,32 +1291,27 @@ def custom_load( logging.debug("Failed to import %s", hookable_mod) continue - if ( - hookable_mod - not in _thread_local_storage_for_hooking.orig_methods_before_hooking - ): - _thread_local_storage_for_hooking.orig_methods_before_hooking[ - hookable_mod - ] = {} - - methods_to_patch = { - "load": functools.partial(custom_load, hooked_mod_name=hookable_mod), - "_load": functools.partial(custom_load, hooked_mod_name=hookable_mod), - "loads": functools.partial(custom_loads, hooked_mod_name=hookable_mod), - "_loads": functools.partial(custom_loads, hooked_mod_name=hookable_mod), - } - for method_name, custom_func in methods_to_patch.items(): - if hasattr(module, method_name): - if ( - method_name - not in _thread_local_storage_for_hooking.orig_methods_before_hooking[ - hookable_mod - ] - ): - _thread_local_storage_for_hooking.orig_methods_before_hooking[ - hookable_mod - ][method_name] = getattr(module, method_name) - setattr(module, method_name, custom_func) + with _HOOKING_LOCK: + if hookable_mod not in _ORIG_METHODS_BEFORE_HOOKING: + _ORIG_METHODS_BEFORE_HOOKING[hookable_mod] = {} + + methods_to_patch = { + "load": functools.partial(custom_load, hooked_mod_name=hookable_mod), + "_load": functools.partial(custom_load, hooked_mod_name=hookable_mod), + "loads": functools.partial( + custom_loads, hooked_mod_name=hookable_mod + ), + "_loads": functools.partial( + custom_loads, hooked_mod_name=hookable_mod + ), + } + for method_name, custom_func in methods_to_patch.items(): + if hasattr(module, method_name): + if method_name not in _ORIG_METHODS_BEFORE_HOOKING[hookable_mod]: + _ORIG_METHODS_BEFORE_HOOKING[hookable_mod][method_name] = getattr( + module, method_name + ) + setattr(module, method_name, custom_func) @contextlib.contextmanager @@ -1348,25 +1339,18 @@ def hook_pickle_libs( def unhook_pickle() -> None: """Unhooks the pickle-like libraries.""" - if not hasattr( - _thread_local_storage_for_hooking, "orig_methods_before_hooking" - ): - return - for ( - module_name, - methods, - ) in _thread_local_storage_for_hooking.orig_methods_before_hooking.items(): - try: - module = importlib.import_module(module_name) - for method_name, original_method in methods.items(): - if hasattr(module, method_name): - setattr(module, method_name, original_method) - except (ImportError, ModuleNotFoundError): - logging.debug("Failed to import %s for unhooking", module_name) - continue - # Empty stored methods to avoid re-unhooking on a second unhook call - # items() would be empty after clearing the dictionary - _thread_local_storage_for_hooking.orig_methods_before_hooking.clear() + with _HOOKING_LOCK: + for module_name, methods in _ORIG_METHODS_BEFORE_HOOKING.items(): + try: + module = importlib.import_module(module_name) + for method_name, original_method in methods.items(): + if hasattr(module, method_name): + setattr(module, method_name, original_method) + except (ImportError, ModuleNotFoundError): + logging.debug("Failed to import %s for unhooking", module_name) + continue + # Empty stored methods to avoid re-unhooking on a second unhook call + _ORIG_METHODS_BEFORE_HOOKING.clear() # To avoid creating __pycache__ files