diff --git a/quark/cli.py b/quark/cli.py index 729b625a..257f6291 100644 --- a/quark/cli.py +++ b/quark/cli.py @@ -149,6 +149,14 @@ required=False, default=1, ) +@click.option( + "--auto-fix-checksum", + help="Automatically repair damaged DEX checksum/signature before analyzing (androguard only)." + + "When not provided, Quark will prompt in interactive TTY and skip in non-interactive runs.", + is_flag=True, + default=False, + show_default=True, +) def entry_point( summary, detail, @@ -166,6 +174,7 @@ def entry_point( comparison, core_library, num_of_process, + auto_fix_checksum, ): """Quark is an Obfuscation-Neglect Android Malware Scoring System""" # Load rules @@ -227,9 +236,9 @@ def entry_point( malware_confidences = {} for apk_ in apk: data = ( - ParallelQuark(apk_, core_library, num_of_process) + ParallelQuark(apk_, core_library, num_of_process, auto_fix_checksum) if num_of_process > 1 - else Quark(apk_, core_library) + else Quark(apk_, core_library, auto_fix_checksum) ) all_labels = {} # dictionary containing @@ -284,9 +293,9 @@ def entry_point( # Load APK data = ( - ParallelQuark(apk[0], core_library, num_of_process) + ParallelQuark(apk[0], core_library, num_of_process, auto_fix_checksum) if num_of_process > 1 - else Quark(apk[0], core_library) + else Quark(apk[0], core_library, auto_fix_checksum) ) if label: diff --git a/quark/core/apkinfo.py b/quark/core/apkinfo.py index a764836b..9051533a 100644 --- a/quark/core/apkinfo.py +++ b/quark/core/apkinfo.py @@ -2,9 +2,15 @@ # This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine # See the file 'LICENSE' for copying permission. +import click import functools +import hashlib +import io import logging import re +import sys +import zipfile +import zlib from collections import defaultdict from os import PathLike from typing import Dict, List, Optional, Set, Union @@ -22,22 +28,168 @@ class AndroguardImp(BaseApkinfo): """Information about apk based on androguard analysis""" - def __init__(self, apk_filepath: Union[str, PathLike]): - super().__init__(apk_filepath, "androguard") + def __init__(self, apk_filepath: Union[str, PathLike], auto_fix_checksum=False): + super().__init__(apk_filepath, "androguard", auto_fix_checksum=auto_fix_checksum) if self.ret_type == "APK": # Suppress Androguard warnings about AndroidManifest, # as we don't use Androguard’s AndroidManifest parsing results. logging.getLogger("androguard.axml").disabled = True logging.getLogger("androguard.apk").disabled = True - # return the APK, list of DalvikVMFormat, and Analysis objects - self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) + try: + # return the APK, list of DalvikVMFormat, and Analysis objects + self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) + except Exception as e: + # Check if the exception looks like a checksum error + if self._looks_like_checksum_error(e): + # If auto_fix_checksum is not enabled, ask the user for confirmation + if not self.auto_fix_checksum: + if sys.stdin.isatty() and sys.stdout.isatty(): + # If the environment is interactive, ask the user for confirmation + agree = click.confirm( + "\n⚠ Detected damaged DEX checksum/signature." + "\n Fixing will modify DEX headers and invalidate APK signatures (hashes will change)." + "\n Proceed to repair and continue the analysis?", + default=False, + show_default=True, + ) + + if not agree: + # If the user does not agree, cancel the repair and keep the original file + click.echo("\n✖ Repair canceled by user. Keeping the original file.\n", err=True) + + raise e + else: + # If the environment is non-interactive, print a message and abort the analysis + click.echo( + "ℹ Detected damaged DEX checksum/signature but --auto-fix-checksum was not provided " + "and the environment is non-interactive. Skipping repair and aborting.\n" + " Hint: rerun with --auto-fix-checksum to repair automatically.", + err=True, + ) + raise e + + + # Repack the APK with fixed DEX headers + fixed_bytes = self._repack_apk_with_fixed_dex_headers_from_bytes(self.data) + + # Check if the APK was actually repacked + if fixed_bytes: + # Analyze the repacked APK + self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(fixed_bytes, raw=True) + else: + # If the APK was not actually repacked, raise the original exception + raise e + else: + # If the exception does not look like a checksum error, raise the original exception + raise e elif self.ret_type == "DEX": - # return the sha256hash, DalvikVMFormat, and Analysis objects - _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) + try: + # return the sha256hash, DalvikVMFormat, and Analysis objects + _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) + except Exception as e: + # Check if the exception looks like a checksum error + if self._looks_like_checksum_error(e): + # If auto_fix_checksum is not enabled, ask the user for confirmation + if not self.auto_fix_checksum: + if sys.stdin.isatty() and sys.stdout.isatty(): + # If the environment is interactive, ask the user for confirmation + agree = click.confirm( + "\n⚠ Detected damaged DEX checksum/signature." + "\n Fixing will modify DEX headers and invalidate APK signatures (hashes will change)." + "\n Proceed to repair and continue the analysis?", + default=False, + show_default=True, + ) + + if not agree: + # If the user does not agree, cancel the repair and keep the original file + click.echo("✖ Repair canceled by user. Keeping the original file.", err=True) + + raise e + else: + # If the environment is non-interactive, print a message and abort the analysis + click.echo( + "ℹ Detected damaged DEX checksum/signature but --auto-fix-checksum was not provided " + "and the environment is non-interactive. Skipping repair and aborting.\n" + " Hint: rerun with --auto-fix-checksum to repair automatically.", + err=True, + ) + raise e + + # Fix the header of the DEX file + fixed = self._fix_single_dex_header(self.data) + + # Check if the DEX file was actually fixed + if fixed != self.data: + # Analyze the fixed DEX file + _, _, self.analysis = get_default_session().addDEX(self.apk_filename, fixed) + else: + # If the DEX file was not actually fixed, raise the original exception + raise e + else: + # If the exception does not look like a checksum error, raise the original exception + raise e else: raise ValueError("Unsupported File type.") + def _repack_apk_with_fixed_dex_headers_from_bytes(self, apk_bytes: bytes) -> Optional[bytes]: + """Repack the APK with fixed DEX headers from bytes. + + Open the APK in memory, fix the headers of all .dex files, and repack them back into bytes. + Only return the new APK if there were actually changes; otherwise return None. + + """ + try: + # Open the APK in memory + in_io = io.BytesIO(apk_bytes) + + # Open the APK as a zip file + with zipfile.ZipFile(in_io, "r") as zf: + # Create a new zip file to store the fixed DEX files + updated = False + out_io = io.BytesIO() + + # Open the new zip file to store the fixed DEX files + with zipfile.ZipFile(out_io, "w", compression=zipfile.ZIP_DEFLATED) as out_zf: + # Iterate over all files in the APK + for info in zf.infolist(): + # Read the data of the file + data = zf.read(info.filename) + + # Check if the file is a DEX file + if info.filename.lower().endswith(".dex"): + # Fix the header of the DEX file + fixed = self._fix_single_dex_header(data) + + # Check if the DEX file was actually fixed + if fixed != data: + updated = True + data = fixed + + # keep basic zip entry metadata + zi = zipfile.ZipInfo(info.filename, date_time=info.date_time) + zi.compress_type = zipfile.ZIP_DEFLATED + zi.external_attr = info.external_attr + zi.create_system = info.create_system + + # Write the fixed DEX file to the new zip file + out_zf.writestr(zi, data) + + # Return the new APK if there were actually changes; otherwise return None + return out_io.getvalue() if updated else None + except Exception as e: + # If there was an error, return None + return None + + + def _looks_like_checksum_error(self,e: Exception) -> bool: + """Check if the exception looks like a checksum error.""" + s = str(e).lower() + + keywords = ("checksum", "adler32", "wrong adler32") + return any(kw in s for kw in keywords) + @property def android_apis(self) -> Set[MethodObject]: apis = set() @@ -295,3 +447,32 @@ def _convert_to_method_object( descriptor=str(method_analysis.descriptor), cache=method_analysis, ) + + @staticmethod + def _fix_single_dex_header(dex: bytes) -> bytes: + """Fix the header of a single DEX file. + + Layout: + - 0x08..0x0B: Adler32 + - 0x0C..0x1F: SHA-1 + """ + + # Check if the DEX file is valid + if len(dex) < 0x20 or not dex.startswith(b"dex\n"): + # Invalid DEX file + # Return the original DEX file without any changes + return dex + + # SHA-1 Signature (20 bytes) + sha1 = hashlib.sha1(dex[0x20:]).digest() + # Adler32 Checksum (4 bytes) little-endian + ad = zlib.adler32(dex[0x0C:]) & 0xFFFFFFFF + + # Update the header + out = bytearray(dex) + # Update the SHA-1 Signature + out[0x0C:0x20] = sha1 + # Update the Adler32 Checksum + out[0x08:0x0C] = ad.to_bytes(4, "little") + + return bytes(out) diff --git a/quark/core/interface/baseapkinfo.py b/quark/core/interface/baseapkinfo.py index a2ec80c5..249c895c 100644 --- a/quark/core/interface/baseapkinfo.py +++ b/quark/core/interface/baseapkinfo.py @@ -37,7 +37,8 @@ def __init__( self, apk_filepath: str | PathLike, core_library: str = "None", - tmp_dir: str | PathLike | None = None + tmp_dir: str | PathLike | None = None, + auto_fix_checksum=False ): self.file = open(apk_filepath, "rb") self.data = SeekableMMap(self.file.fileno(), 0, access=mmap.ACCESS_COPY) @@ -53,6 +54,7 @@ def __init__( self.apk_filename = os.path.basename(apk_filepath) self.apk_filepath = apk_filepath self.core_library = core_library + self.auto_fix_checksum = auto_fix_checksum def __repr__(self) -> str: diff --git a/quark/core/parallelquark.py b/quark/core/parallelquark.py index cf6b663b..c76b2e78 100644 --- a/quark/core/parallelquark.py +++ b/quark/core/parallelquark.py @@ -13,9 +13,9 @@ class ParallelQuark(Quark): @staticmethod - def _worker_initializer(apk, core_library): + def _worker_initializer(apk, core_library, auto_fix_checksum): global _quark - _quark = Quark(apk, core_library) + _quark = Quark(apk, core_library, auto_fix_checksum) @staticmethod def _worker_analysis(rule_obj): @@ -112,11 +112,11 @@ def _apply_analysis_result(self, rule_obj): ] ) - def __init__(self, apk, core_library, num_of_process=1): + def __init__(self, apk, core_library, num_of_process=1, auto_fix_checksum=False): self._result_map = {} self._pool = Pool( min(num_of_process, cpu_count() - 1), self._worker_initializer, - (apk, core_library) + (apk, core_library, auto_fix_checksum) ) super().__init__(apk, core_library) diff --git a/quark/core/quark.py b/quark/core/quark.py index 3b222c94..b964d3d6 100644 --- a/quark/core/quark.py +++ b/quark/core/quark.py @@ -46,11 +46,13 @@ class Quark: """Quark module is used to check quark's five-stage theory""" - def __init__(self, apk, core_library="androguard"): + def __init__(self, apk, core_library="androguard", auto_fix_checksum=False): """ :param apk: the filename of the apk. """ + self.auto_fix_checksum = auto_fix_checksum + core_library = core_library.lower() if core_library == "shuriken": self.apkinfo = ShurikenImp(apk) @@ -59,7 +61,7 @@ def __init__(self, apk, core_library="androguard"): elif core_library == "radare2": self.apkinfo = R2Imp(apk) elif core_library == "androguard": - self.apkinfo = AndroguardImp(apk) + self.apkinfo = AndroguardImp(apk, self.auto_fix_checksum) else: raise ValueError( f"Unsupported core library for Quark: {core_library}"