Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,6 @@
# Cap the number of workers at half the available CPU cores to avoid excessive
# overhead and resource contention.
MAX_NUM_CHUNKS = (os.cpu_count() or 1) // 2

# High score to indicate definite unsafety due to a detected zip slip.
HIGH_SEVERITY_ZIPSLIP = 1337
74 changes: 34 additions & 40 deletions saferpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,41 +629,27 @@ def strict_security_scan(pickle_bytes: bytes) -> bool:


def is_unsafe(
number_of_safe_results: int,
number_of_unsafe_results: int,
number_of_suspicious_results: int,
safe_score: int,
unsafe_score: int,
suspicious_score: int,
) -> bool:
"""Conditional check for safeness.

Args:
number_of_safe_results: Number of safe results from the security scan.
number_of_unsafe_results: Number of unsafe results from the security scan.
number_of_suspicious_results: Number of suspicious results from the security
scan.
safe_score: Safe score from the security scan.
unsafe_score: Unsafe score from the security scan.
suspicious_score: Suspicious score from the security scan.

Returns:
True if the pickle file is dangerous, False otherwise.
"""
if number_of_unsafe_results == 0 and number_of_suspicious_results == 0:
if unsafe_score == 0 and suspicious_score == 0:
return False

# We halve the weight of suspicious results to lower false positives
# caused by greedy matches of unknown method-like strings (Ex. "google.com")
if (
number_of_suspicious_results + number_of_unsafe_results
>= number_of_safe_results
):
return True

sum_of_unsafe_and_suspicious_results = (
number_of_unsafe_results + 0.5 * number_of_suspicious_results
)

unsafe = (sum_of_unsafe_and_suspicious_results > number_of_safe_results) or (
number_of_safe_results == 0 and sum_of_unsafe_and_suspicious_results >= 1
)

return unsafe
# 0.5 was chosen as the weight for suspicious results
# to guard against unknown results filtering into suspicious results.
sum_of_unsafe_and_suspicious_scores = unsafe_score + 0.5 * suspicious_score
return sum_of_unsafe_and_suspicious_scores >= safe_score


def picklemagic_scan(
Expand Down Expand Up @@ -744,8 +730,8 @@ def score_results(
number_of_unknown_results = len(unknown_results)

safe_score = math.log(number_of_safe_results + 1) * 2
unsafe_score = math.log(number_of_unsafe_results + 1) * 4
suspicious_score = math.log(number_of_suspicious_results + 1) * 3
unsafe_score = math.log(number_of_unsafe_results + 1) * 10
suspicious_score = math.log(number_of_suspicious_results + 1) * 5
unknown_score = math.log(number_of_unknown_results + 1) * 1

return (
Expand Down Expand Up @@ -788,25 +774,25 @@ def apply_approach(
logging.info(" Unknown results: %s\n", results.unknown_results)

(
number_of_safe_results,
number_of_unsafe_results,
number_of_suspicious_results,
number_of_unknown_results,
safe_score,
unsafe_score,
suspicious_score,
unknown_score,
) = score_results(
results.safe_results,
results.unsafe_results,
results.suspicious_results,
results.unknown_results,
)
scores = {
"unsafe": number_of_unsafe_results,
"suspicious": number_of_suspicious_results,
"unknown": number_of_unknown_results,
"unsafe": unsafe_score,
"suspicious": suspicious_score,
"unknown": unknown_score,
}
if results.is_denylisted or is_unsafe(
number_of_safe_results,
number_of_unsafe_results,
number_of_suspicious_results,
safe_score,
unsafe_score,
suspicious_score,
):
return scores

Expand Down Expand Up @@ -872,7 +858,7 @@ def _extract_and_scan_archive(
# Zip slip detection
logging.warning("Zip slip detected: %s", name)
return {
"unsafe": 1337,
"unsafe": constants.HIGH_SEVERITY_ZIPSLIP,
"suspicious": 0,
"unknown": 0,
} # Return early
Expand Down Expand Up @@ -901,13 +887,21 @@ def _extract_and_scan_archive(
for name, content in utils.extract_tar_contents(data):
if ".." in name or name.startswith("/"):
logging.warning("Tar slip detected: %s", name)
return {"unsafe": 1337, "suspicious": 0, "unknown": 0} # Return early
return {
"unsafe": constants.HIGH_SEVERITY_ZIPSLIP,
"suspicious": 0,
"unknown": 0,
} # Return early
scores = security_scan(content, recursion_depth=recursion_depth + 1)
_merge_scores(all_scores, scores)

else:
logging.warning("Unsupported archive type: %s", archive_type)
return {"unsafe": 0, "suspicious": 0, "unknown": 1337}
return {
"unsafe": 0,
"suspicious": 0,
"unknown": constants.HIGH_SEVERITY_ZIPSLIP,
}

except MaxRecursionDepthExceededError:
raise
Expand Down