diff --git a/lib/constants.py b/lib/constants.py index 79b3802..fc3ac23 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -319,3 +319,6 @@ # Cap the number of workers at half the available CPU cores to avoid excessive # overhead and resource contention. MAX_NUM_CHUNKS = (os.cpu_count() or 1) // 2 + +# High score to indicate definite unsafety due to a detected zip slip. +HIGH_SEVERITY_ZIPSLIP = 1337 diff --git a/saferpickle.py b/saferpickle.py index 8a2bde2..42d2086 100644 --- a/saferpickle.py +++ b/saferpickle.py @@ -629,41 +629,27 @@ def strict_security_scan(pickle_bytes: bytes) -> bool: def is_unsafe( - number_of_safe_results: int, - number_of_unsafe_results: int, - number_of_suspicious_results: int, + safe_score: int, + unsafe_score: int, + suspicious_score: int, ) -> bool: """Conditional check for safeness. Args: - number_of_safe_results: Number of safe results from the security scan. - number_of_unsafe_results: Number of unsafe results from the security scan. - number_of_suspicious_results: Number of suspicious results from the security - scan. + safe_score: Safe score from the security scan. + unsafe_score: Unsafe score from the security scan. + suspicious_score: Suspicious score from the security scan. Returns: True if the pickle file is dangerous, False otherwise. """ - if number_of_unsafe_results == 0 and number_of_suspicious_results == 0: + if unsafe_score == 0 and suspicious_score == 0: return False - # We halve the weight of suspicious results to lower false positives - # caused by greedy matches of unknown method-like strings (Ex. "google.com") - if ( - number_of_suspicious_results + number_of_unsafe_results - >= number_of_safe_results - ): - return True - - sum_of_unsafe_and_suspicious_results = ( - number_of_unsafe_results + 0.5 * number_of_suspicious_results - ) - - unsafe = (sum_of_unsafe_and_suspicious_results > number_of_safe_results) or ( - number_of_safe_results == 0 and sum_of_unsafe_and_suspicious_results >= 1 - ) - - return unsafe + # 0.5 was chosen as the weight for suspicious results + # to guard against unknown results filtering into suspicious results. + sum_of_unsafe_and_suspicious_scores = unsafe_score + 0.5 * suspicious_score + return sum_of_unsafe_and_suspicious_scores >= safe_score def picklemagic_scan( @@ -744,8 +730,8 @@ def score_results( number_of_unknown_results = len(unknown_results) safe_score = math.log(number_of_safe_results + 1) * 2 - unsafe_score = math.log(number_of_unsafe_results + 1) * 4 - suspicious_score = math.log(number_of_suspicious_results + 1) * 3 + unsafe_score = math.log(number_of_unsafe_results + 1) * 10 + suspicious_score = math.log(number_of_suspicious_results + 1) * 5 unknown_score = math.log(number_of_unknown_results + 1) * 1 return ( @@ -788,10 +774,10 @@ def apply_approach( logging.info(" Unknown results: %s\n", results.unknown_results) ( - number_of_safe_results, - number_of_unsafe_results, - number_of_suspicious_results, - number_of_unknown_results, + safe_score, + unsafe_score, + suspicious_score, + unknown_score, ) = score_results( results.safe_results, results.unsafe_results, @@ -799,14 +785,14 @@ def apply_approach( results.unknown_results, ) scores = { - "unsafe": number_of_unsafe_results, - "suspicious": number_of_suspicious_results, - "unknown": number_of_unknown_results, + "unsafe": unsafe_score, + "suspicious": suspicious_score, + "unknown": unknown_score, } if results.is_denylisted or is_unsafe( - number_of_safe_results, - number_of_unsafe_results, - number_of_suspicious_results, + safe_score, + unsafe_score, + suspicious_score, ): return scores @@ -872,7 +858,7 @@ def _extract_and_scan_archive( # Zip slip detection logging.warning("Zip slip detected: %s", name) return { - "unsafe": 1337, + "unsafe": constants.HIGH_SEVERITY_ZIPSLIP, "suspicious": 0, "unknown": 0, } # Return early @@ -901,13 +887,21 @@ def _extract_and_scan_archive( for name, content in utils.extract_tar_contents(data): if ".." in name or name.startswith("/"): logging.warning("Tar slip detected: %s", name) - return {"unsafe": 1337, "suspicious": 0, "unknown": 0} # Return early + return { + "unsafe": constants.HIGH_SEVERITY_ZIPSLIP, + "suspicious": 0, + "unknown": 0, + } # Return early scores = security_scan(content, recursion_depth=recursion_depth + 1) _merge_scores(all_scores, scores) else: logging.warning("Unsupported archive type: %s", archive_type) - return {"unsafe": 0, "suspicious": 0, "unknown": 1337} + return { + "unsafe": 0, + "suspicious": 0, + "unknown": constants.HIGH_SEVERITY_ZIPSLIP, + } except MaxRecursionDepthExceededError: raise