Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
"load_module",
"loads",
"lzma.open",
"malicious",
"marshal",
"msvcrt",
"open",
Expand Down
160 changes: 71 additions & 89 deletions saferpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _custom_chunked_genops(
pickle_file: BinaryIO,
chunk_range: Tuple[int, int],
) -> Iterator[tuple[pickletools.OpcodeInfo, Any | None]]:
"""Generates string-declaring opcodes and arguments from a chunk of pickle data.
"""Generates string-declaring opcodes and arguments from a chunk.

This function reads a specific byte range (chunk) of the pickle bytecode
and yields opcodes that are known to declare strings, along with their
Expand Down Expand Up @@ -232,7 +232,7 @@ def _process_chunk_for_generate_ops(
chunk_range: Tuple[int, int],
is_shared_memory: bool = False,
) -> Set[str]:
"""Helper function for generate_ops to process a chunk of pickle data."""
"""Helper function to process a chunk of pickle data."""
chunked_operands = set()
try:
if is_shared_memory:
Expand Down Expand Up @@ -266,7 +266,7 @@ def generate_ops_from_file(
shm_name: Optional[str] = None,
pickle_length: Optional[int] = None,
) -> Set[str]:
"""Returns opcodes that declare strings from a pickle file path or shared memory.
"""Returns opcodes that declare strings from a path or shared memory.

Args:
pickle_file_path: The path to the pickle file.
Expand Down Expand Up @@ -344,7 +344,7 @@ def generate_ops_from_file(


def generate_ops(pickle_bytes: bytes) -> Set[str]:
"""Returns opcodes that declare strings from a pickle file.
"""Returns string-declaring opcodes.

Args:
pickle_bytes: The pickle bytecode to yield opcode information for.
Expand Down Expand Up @@ -393,6 +393,17 @@ def get_class_instantiations(pickle_bytes: bytes) -> tuple[io.StringIO, bool]:
unsafe_modules=constants.UNSAFE_STRINGS,
)
factory.default.unpickler = unpickler

# Monkey-patch load_build so that we don't miss
# BUILD instructions due to differing Pickle implementations.
original_load_build = unpickler.load_build

def fixed_load_build(*unused_args):
return original_load_build()

unpickler.load_build = fixed_load_build
unpickler.dispatch[pickle.BUILD[0]] = unpickler.load_build

unpickler.load()

# These errors are expected and should not be raised.
Expand All @@ -410,17 +421,19 @@ def get_class_instantiations(pickle_bytes: bytes) -> tuple[io.StringIO, bool]:
):
pass

is_build_instr_blocked = getattr(
unpickler, "has_blocked_unsafe_build_instr", False
)
is_build_instr_blocked = False
if unpickler:
is_build_instr_blocked = getattr(
unpickler, "has_blocked_unsafe_build_instr", False
)
return picklemagic_output, is_build_instr_blocked


def categorize_strings(
filtered_output: Set[str] | io.StringIO,
use_picklemagic: bool = False,
) -> ScanResults:
"""Counts the relevant strings from the filtered output and categorizes them.
"""Counts strings from filtered output and categorizes them.

Args:
filtered_output: The series of statements filtered by string declarations.
Expand Down Expand Up @@ -591,7 +604,7 @@ def categorize_strings(


def strict_security_scan(pickle_bytes: bytes) -> bool:
"""Strict security scan to detect malicious content in pickle files.
"""Strict security scan for malicious content in pickle files.

Args:
pickle_bytes: Pickle bytecode to scan.
Expand Down Expand Up @@ -669,7 +682,7 @@ def is_unsafe(
def picklemagic_scan(
pickle_bytes: bytes,
) -> ScanResults:
"""Picklemagic scan to detect malicious content in pickle files.
"""Picklemagic scan for malicious content in pickle files.

Args:
pickle_bytes: Pickle bytecode to scan.
Expand All @@ -696,7 +709,7 @@ def genops_scan(
pickle_file_path: Optional[str] = None,
shm_name: Optional[str] = None,
) -> ScanResults:
"""Genops scan to detect malicious content in pickle files.
"""Genops scan for malicious content in pickle files.

Args:
pickle_bytes: Pickle bytecode to scan.
Expand Down Expand Up @@ -1075,98 +1088,67 @@ def _scan_and_load(
if not loader_mod:
loader_mod = pickle_copy

if strict_check and allow_unsafe:
error_string_illegal_combination = (
"Strict scanning and allow_unsafe cannot be used together."
)
if report_only:
logging.error(error_string_illegal_combination)
return
raise IllegalArgumentCombinationError(error_string_illegal_combination)

if is_load:
load_func = loader_mod.load
load_args = (pickle_file,)
else:
load_func = loader_mod.loads
load_args = (data_bytes,)

if allow_unsafe:
if strict_check and allow_unsafe:
error_string_illegal_combination = (
"Strict scanning and allow_unsafe cannot be used together."
)
if report_only:
logging.error(error_string_illegal_combination)
else:
raise IllegalArgumentCombinationError(error_string_illegal_combination)
elif allow_unsafe:
if report_only:
logging.info("Loading pickle file with allow_unsafe set to True.")
try:
return load_func(*load_args, *args, **kwargs)
except AttributeError as exc:
logging.info("Could not load an absent class: %s", exc)
return

if strict_check:
elif strict_check:
if strict_security_scan(data_bytes):
error_string_strict_check = "Pickle file failed strict security check."
if report_only:
logging.error(error_string_strict_check)
return
raise StrictCheckError(error_string_strict_check)
try:
return load_func(*load_args, *args, **kwargs)
except (AttributeError, pickle.UnpicklingError) as exc:
if "persistent load" in str(exc):
logging.info("Persistent load error: %s", exc)
return
elif "Can't get attribute" in str(exc):
logging.exception(
"Could not load an absent class: %s", exc, exc_info=True
)
raise UnsafePickleDetectedError(
constants.ERROR_STRING.substitute(
classification=Classification.SUSPICIOUS.value
)
) from exc
elif "underflow" in str(exc):
raise
logging.exception("Unknown error: %s", exc, exc_info=True)
return

# If we get here, we are not in strict check or allow_unsafe mode.
# We perform non-strict scanning as usual with force_scan if needed.
scan_scores = security_scan(data_bytes, force_scan=force_scan)
number_of_unsafe_results = scan_scores["unsafe"]
number_of_suspicious_results = scan_scores["suspicious"]
number_of_unknown_results = scan_scores["unknown"]

if number_of_suspicious_results == 0 and number_of_unsafe_results == 0:
if report_only:
logging.info("Loading safe pickle file")
if number_of_unknown_results > 0:
logging.warning(
"SaferPickle: File contains %d unknown items that were ignored.",
number_of_unknown_results,
)
try:
return load_func(*load_args, *args, **kwargs)
except (AttributeError, pickle.UnpicklingError) as exc:
if "persistent load" in str(exc):
logging.info("Persistent load error: %s", exc)
return
elif "Can't get attribute" in str(exc):
logging.exception(
"Could not load an absent class: %s", exc, exc_info=True
)
raise UnsafePickleDetectedError(
constants.ERROR_STRING.substitute(
classification=Classification.SUSPICIOUS.value
)
) from exc
elif "underflow" in str(exc):
raise
logging.exception("Unknown error: %s", exc, exc_info=True)
return

elif number_of_unsafe_results > number_of_suspicious_results:
_report_or_raise(Classification.UNSAFE, report_only, log_info)
return
else:
raise StrictCheckError(error_string_strict_check)
else:
_report_or_raise(Classification.SUSPICIOUS, report_only, log_info)
# Default scanning routines
scan_scores = security_scan(data_bytes, force_scan=force_scan)
number_of_unsafe_results = scan_scores["unsafe"]
number_of_suspicious_results = scan_scores["suspicious"]
number_of_unknown_results = scan_scores["unknown"]

if number_of_suspicious_results == 0 and number_of_unsafe_results == 0:
if report_only:
logging.info("Loading safe pickle file")
if number_of_unknown_results > 0:
logging.warning(
"SaferPickle: File contains %d unknown items that were ignored.",
number_of_unknown_results,
)
elif number_of_unsafe_results > number_of_suspicious_results:
_report_or_raise(Classification.UNSAFE, report_only, log_info)
else:
_report_or_raise(Classification.SUSPICIOUS, report_only, log_info)

# Load the pickle if report_only is True and no exceptions were raised earlier
try:
return load_func(*load_args, *args, **kwargs)
except (AttributeError, pickle.UnpicklingError, ModuleNotFoundError) as exc:
if "persistent load" in str(exc):
logging.info("Persistent load error: %s", exc)
elif "Can't get attribute" in str(exc):
logging.exception(
"Could not load an absent class: %s", exc, exc_info=True
)
elif "underflow" in str(exc):
logging.exception("Unpickling underflow error: %s", exc, exc_info=True)
elif "No module named" in str(exc):
logging.exception("Module was not found: %s", exc, exc_info=True)
else:
logging.exception("Unknown error during load: %s", exc, exc_info=True)
return


Expand Down