From a69b0e7e3b9b3fe837c91218c87792d3bec837bf Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 24 Feb 2026 11:30:40 +0800 Subject: [PATCH 1/3] Add auto-repair bad DEX checksums. --- quark/core/apkinfo.py | 132 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 128 insertions(+), 4 deletions(-) diff --git a/quark/core/apkinfo.py b/quark/core/apkinfo.py index a764836b..72527480 100644 --- a/quark/core/apkinfo.py +++ b/quark/core/apkinfo.py @@ -3,8 +3,12 @@ # See the file 'LICENSE' for copying permission. import functools +import hashlib +import io import logging import re +import zipfile +import zlib from collections import defaultdict from os import PathLike from typing import Dict, List, Optional, Set, Union @@ -30,14 +34,105 @@ def __init__(self, apk_filepath: Union[str, PathLike]): # as we don't use Androguard’s AndroidManifest parsing results. logging.getLogger("androguard.axml").disabled = True logging.getLogger("androguard.apk").disabled = True - # return the APK, list of DalvikVMFormat, and Analysis objects - self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) + try: + # return the APK, list of DalvikVMFormat, and Analysis objects + self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) + except Exception as e: + # Check if the exception looks like a checksum error + if self._looks_like_checksum_error(e): + # Repack the APK with fixed DEX headers + fixed_bytes = self._repack_apk_with_fixed_dex_headers_from_bytes(self.data) + + # Check if the APK was actually repacked + if fixed_bytes: + # Analyze the repacked APK + self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(fixed_bytes, raw=True) + else: + # If the APK was not actually repacked, raise the original exception + raise e + else: + # If the exception does not look like a checksum error, raise the original exception + raise e elif self.ret_type == "DEX": - # return the sha256hash, DalvikVMFormat, and Analysis objects - _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) + try: + # return the sha256hash, DalvikVMFormat, and Analysis objects + _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) + except Exception as e: + # Check if the exception looks like a checksum error + if self._looks_like_checksum_error(e): + # Fix the header of the DEX file + fixed = self._fix_single_dex_header(self.data) + + # Check if the DEX file was actually fixed + if fixed != self.data: + # Analyze the fixed DEX file + _, _, self.analysis = get_default_session().addDEX(self.apk_filename, fixed) + else: + # If the DEX file was not actually fixed, raise the original exception + raise e + else: + # If the exception does not look like a checksum error, raise the original exception + raise e else: raise ValueError("Unsupported File type.") + def _repack_apk_with_fixed_dex_headers_from_bytes(self, apk_bytes: bytes) -> Optional[bytes]: + """Repack the APK with fixed DEX headers from bytes. + + Open the APK in memory, fix the headers of all .dex files, and repack them back into bytes. + Only return the new APK if there were actually changes; otherwise return None. + + """ + try: + # Open the APK in memory + in_io = io.BytesIO(apk_bytes) + + # Open the APK as a zip file + with zipfile.ZipFile(in_io, "r") as zf: + # Create a new zip file to store the fixed DEX files + updated = False + out_io = io.BytesIO() + + # Open the new zip file to store the fixed DEX files + with zipfile.ZipFile(out_io, "w", compression=zipfile.ZIP_DEFLATED) as out_zf: + # Iterate over all files in the APK + for info in zf.infolist(): + # Read the data of the file + data = zf.read(info.filename) + + # Check if the file is a DEX file + if info.filename.lower().endswith(".dex"): + # Fix the header of the DEX file + fixed = self._fix_single_dex_header(data) + + # Check if the DEX file was actually fixed + if fixed != data: + updated = True + data = fixed + + # keep basic zip entry metadata + zi = zipfile.ZipInfo(info.filename, date_time=info.date_time) + zi.compress_type = zipfile.ZIP_DEFLATED + zi.external_attr = info.external_attr + zi.create_system = info.create_system + + # Write the fixed DEX file to the new zip file + out_zf.writestr(zi, data) + + # Return the new APK if there were actually changes; otherwise return None + return out_io.getvalue() if updated else None + except Exception as e: + # If there was an error, return None + return None + + + def _looks_like_checksum_error(self,e: Exception) -> bool: + """Check if the exception looks like a checksum error.""" + s = str(e).lower() + + keywords = ("checksum", "adler32") + return any(kw in s for kw in keywords) + @property def android_apis(self) -> Set[MethodObject]: apis = set() @@ -295,3 +390,32 @@ def _convert_to_method_object( descriptor=str(method_analysis.descriptor), cache=method_analysis, ) + + @staticmethod + def _fix_single_dex_header(dex: bytes) -> bytes: + """Fix the header of a single DEX file. + + Layout: + - 0x08..0x0B: Adler32 + - 0x0C..0x1F: SHA-1 + """ + + # Check if the DEX file is valid + if len(dex) < 0x20 or not dex.startswith(b"dex\n"): + # Invalid DEX file + # Return the original DEX file without any changes + return dex + + # SHA-1 Signature (20 bytes) + sha1 = hashlib.sha1(dex[0x20:]).digest() + # Adler32 Checksum (4 bytes) little-endian + ad = zlib.adler32(dex[0x0C:]) & 0xFFFFFFFF + + # Update the header + out = bytearray(dex) + # Update the SHA-1 Signature + out[0x0C:0x20] = sha1 + # Update the Adler32 Checksum + out[0x08:0x0C] = ad.to_bytes(4, "little") + + return bytes(out) From aaa5bf2fd935308cb88bbc650f0650443f5e2af0 Mon Sep 17 00:00:00 2001 From: James Chiang Date: Tue, 24 Feb 2026 16:21:31 +0800 Subject: [PATCH 2/3] Add auto_fix_checksum option to control automatic checksum fixing (androguard only) Allow users to configure whether bad checksum should be automatically fixed instead of raising an error. This option is currently supported for androguard only. --- quark/cli.py | 16 ++++++++++++---- quark/core/apkinfo.py | 12 ++++++++++-- quark/core/interface/baseapkinfo.py | 4 +++- quark/core/parallelquark.py | 8 ++++---- quark/core/quark.py | 6 ++++-- 5 files changed, 33 insertions(+), 13 deletions(-) diff --git a/quark/cli.py b/quark/cli.py index 729b625a..e6f8dab5 100644 --- a/quark/cli.py +++ b/quark/cli.py @@ -149,6 +149,13 @@ required=False, default=1, ) +@click.option( + "--auto-fix-checksum", + help="Automatically repair damaged DEX checksum/signature before analyzing (androguard only).", + is_flag=True, + default=False, + show_default=True, +) def entry_point( summary, detail, @@ -166,6 +173,7 @@ def entry_point( comparison, core_library, num_of_process, + auto_fix_checksum, ): """Quark is an Obfuscation-Neglect Android Malware Scoring System""" # Load rules @@ -227,9 +235,9 @@ def entry_point( malware_confidences = {} for apk_ in apk: data = ( - ParallelQuark(apk_, core_library, num_of_process) + ParallelQuark(apk_, core_library, num_of_process, auto_fix_checksum) if num_of_process > 1 - else Quark(apk_, core_library) + else Quark(apk_, core_library, auto_fix_checksum) ) all_labels = {} # dictionary containing @@ -284,9 +292,9 @@ def entry_point( # Load APK data = ( - ParallelQuark(apk[0], core_library, num_of_process) + ParallelQuark(apk[0], core_library, num_of_process, auto_fix_checksum) if num_of_process > 1 - else Quark(apk[0], core_library) + else Quark(apk[0], core_library, auto_fix_checksum) ) if label: diff --git a/quark/core/apkinfo.py b/quark/core/apkinfo.py index 72527480..15ba3049 100644 --- a/quark/core/apkinfo.py +++ b/quark/core/apkinfo.py @@ -26,8 +26,8 @@ class AndroguardImp(BaseApkinfo): """Information about apk based on androguard analysis""" - def __init__(self, apk_filepath: Union[str, PathLike]): - super().__init__(apk_filepath, "androguard") + def __init__(self, apk_filepath: Union[str, PathLike], auto_fix_checksum=False): + super().__init__(apk_filepath, "androguard", auto_fix_checksum=auto_fix_checksum) if self.ret_type == "APK": # Suppress Androguard warnings about AndroidManifest, @@ -38,6 +38,10 @@ def __init__(self, apk_filepath: Union[str, PathLike]): # return the APK, list of DalvikVMFormat, and Analysis objects self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) except Exception as e: + # If auto_fix_checksum is not enabled, raise the original exception + if not self.auto_fix_checksum: + raise e + # Check if the exception looks like a checksum error if self._looks_like_checksum_error(e): # Repack the APK with fixed DEX headers @@ -58,6 +62,10 @@ def __init__(self, apk_filepath: Union[str, PathLike]): # return the sha256hash, DalvikVMFormat, and Analysis objects _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) except Exception as e: + # If auto_fix_checksum is not enabled, raise the original exception + if not self.auto_fix_checksum: + raise e + # Check if the exception looks like a checksum error if self._looks_like_checksum_error(e): # Fix the header of the DEX file diff --git a/quark/core/interface/baseapkinfo.py b/quark/core/interface/baseapkinfo.py index a2ec80c5..249c895c 100644 --- a/quark/core/interface/baseapkinfo.py +++ b/quark/core/interface/baseapkinfo.py @@ -37,7 +37,8 @@ def __init__( self, apk_filepath: str | PathLike, core_library: str = "None", - tmp_dir: str | PathLike | None = None + tmp_dir: str | PathLike | None = None, + auto_fix_checksum=False ): self.file = open(apk_filepath, "rb") self.data = SeekableMMap(self.file.fileno(), 0, access=mmap.ACCESS_COPY) @@ -53,6 +54,7 @@ def __init__( self.apk_filename = os.path.basename(apk_filepath) self.apk_filepath = apk_filepath self.core_library = core_library + self.auto_fix_checksum = auto_fix_checksum def __repr__(self) -> str: diff --git a/quark/core/parallelquark.py b/quark/core/parallelquark.py index cf6b663b..c76b2e78 100644 --- a/quark/core/parallelquark.py +++ b/quark/core/parallelquark.py @@ -13,9 +13,9 @@ class ParallelQuark(Quark): @staticmethod - def _worker_initializer(apk, core_library): + def _worker_initializer(apk, core_library, auto_fix_checksum): global _quark - _quark = Quark(apk, core_library) + _quark = Quark(apk, core_library, auto_fix_checksum) @staticmethod def _worker_analysis(rule_obj): @@ -112,11 +112,11 @@ def _apply_analysis_result(self, rule_obj): ] ) - def __init__(self, apk, core_library, num_of_process=1): + def __init__(self, apk, core_library, num_of_process=1, auto_fix_checksum=False): self._result_map = {} self._pool = Pool( min(num_of_process, cpu_count() - 1), self._worker_initializer, - (apk, core_library) + (apk, core_library, auto_fix_checksum) ) super().__init__(apk, core_library) diff --git a/quark/core/quark.py b/quark/core/quark.py index 3b222c94..b964d3d6 100644 --- a/quark/core/quark.py +++ b/quark/core/quark.py @@ -46,11 +46,13 @@ class Quark: """Quark module is used to check quark's five-stage theory""" - def __init__(self, apk, core_library="androguard"): + def __init__(self, apk, core_library="androguard", auto_fix_checksum=False): """ :param apk: the filename of the apk. """ + self.auto_fix_checksum = auto_fix_checksum + core_library = core_library.lower() if core_library == "shuriken": self.apkinfo = ShurikenImp(apk) @@ -59,7 +61,7 @@ def __init__(self, apk, core_library="androguard"): elif core_library == "radare2": self.apkinfo = R2Imp(apk) elif core_library == "androguard": - self.apkinfo = AndroguardImp(apk) + self.apkinfo = AndroguardImp(apk, self.auto_fix_checksum) else: raise ValueError( f"Unsupported core library for Quark: {core_library}" From 6b541db32141e295992a127222ce61a0f160029e Mon Sep 17 00:00:00 2001 From: James Chiang Date: Wed, 25 Feb 2026 17:11:54 +0800 Subject: [PATCH 3/3] Add interactive confirmation before repairing damaged DEX checksum unless --auto-fix-checksum is provided --- quark/cli.py | 3 +- quark/core/apkinfo.py | 67 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/quark/cli.py b/quark/cli.py index e6f8dab5..257f6291 100644 --- a/quark/cli.py +++ b/quark/cli.py @@ -151,7 +151,8 @@ ) @click.option( "--auto-fix-checksum", - help="Automatically repair damaged DEX checksum/signature before analyzing (androguard only).", + help="Automatically repair damaged DEX checksum/signature before analyzing (androguard only)." + + "When not provided, Quark will prompt in interactive TTY and skip in non-interactive runs.", is_flag=True, default=False, show_default=True, diff --git a/quark/core/apkinfo.py b/quark/core/apkinfo.py index 15ba3049..9051533a 100644 --- a/quark/core/apkinfo.py +++ b/quark/core/apkinfo.py @@ -2,11 +2,13 @@ # This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine # See the file 'LICENSE' for copying permission. +import click import functools import hashlib import io import logging import re +import sys import zipfile import zlib from collections import defaultdict @@ -38,12 +40,36 @@ def __init__(self, apk_filepath: Union[str, PathLike], auto_fix_checksum=False): # return the APK, list of DalvikVMFormat, and Analysis objects self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) except Exception as e: - # If auto_fix_checksum is not enabled, raise the original exception - if not self.auto_fix_checksum: - raise e - # Check if the exception looks like a checksum error if self._looks_like_checksum_error(e): + # If auto_fix_checksum is not enabled, ask the user for confirmation + if not self.auto_fix_checksum: + if sys.stdin.isatty() and sys.stdout.isatty(): + # If the environment is interactive, ask the user for confirmation + agree = click.confirm( + "\n⚠ Detected damaged DEX checksum/signature." + "\n Fixing will modify DEX headers and invalidate APK signatures (hashes will change)." + "\n Proceed to repair and continue the analysis?", + default=False, + show_default=True, + ) + + if not agree: + # If the user does not agree, cancel the repair and keep the original file + click.echo("\n✖ Repair canceled by user. Keeping the original file.\n", err=True) + + raise e + else: + # If the environment is non-interactive, print a message and abort the analysis + click.echo( + "ℹ Detected damaged DEX checksum/signature but --auto-fix-checksum was not provided " + "and the environment is non-interactive. Skipping repair and aborting.\n" + " Hint: rerun with --auto-fix-checksum to repair automatically.", + err=True, + ) + raise e + + # Repack the APK with fixed DEX headers fixed_bytes = self._repack_apk_with_fixed_dex_headers_from_bytes(self.data) @@ -62,12 +88,35 @@ def __init__(self, apk_filepath: Union[str, PathLike], auto_fix_checksum=False): # return the sha256hash, DalvikVMFormat, and Analysis objects _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) except Exception as e: - # If auto_fix_checksum is not enabled, raise the original exception - if not self.auto_fix_checksum: - raise e - # Check if the exception looks like a checksum error if self._looks_like_checksum_error(e): + # If auto_fix_checksum is not enabled, ask the user for confirmation + if not self.auto_fix_checksum: + if sys.stdin.isatty() and sys.stdout.isatty(): + # If the environment is interactive, ask the user for confirmation + agree = click.confirm( + "\n⚠ Detected damaged DEX checksum/signature." + "\n Fixing will modify DEX headers and invalidate APK signatures (hashes will change)." + "\n Proceed to repair and continue the analysis?", + default=False, + show_default=True, + ) + + if not agree: + # If the user does not agree, cancel the repair and keep the original file + click.echo("✖ Repair canceled by user. Keeping the original file.", err=True) + + raise e + else: + # If the environment is non-interactive, print a message and abort the analysis + click.echo( + "ℹ Detected damaged DEX checksum/signature but --auto-fix-checksum was not provided " + "and the environment is non-interactive. Skipping repair and aborting.\n" + " Hint: rerun with --auto-fix-checksum to repair automatically.", + err=True, + ) + raise e + # Fix the header of the DEX file fixed = self._fix_single_dex_header(self.data) @@ -138,7 +187,7 @@ def _looks_like_checksum_error(self,e: Exception) -> bool: """Check if the exception looks like a checksum error.""" s = str(e).lower() - keywords = ("checksum", "adler32") + keywords = ("checksum", "adler32", "wrong adler32") return any(kw in s for kw in keywords) @property