diff --git a/quark/core/apkinfo.py b/quark/core/apkinfo.py index b18660a2..5a619683 100644 --- a/quark/core/apkinfo.py +++ b/quark/core/apkinfo.py @@ -3,6 +3,7 @@ # See the file 'LICENSE' for copying permission. import functools +import logging import re from collections import defaultdict from os import PathLike @@ -10,7 +11,7 @@ from androguard.core.analysis.analysis import MethodAnalysis from androguard.core.bytecodes.dvm_types import Operand -from androguard.misc import AnalyzeAPK, AnalyzeDex +from androguard.misc import AnalyzeAPK, get_default_session from quark.core.interface.baseapkinfo import BaseApkinfo from quark.core.struct.bytecodeobject import BytecodeObject @@ -27,12 +28,15 @@ def __init__(self, apk_filepath: Union[str, PathLike]): super().__init__(apk_filepath, "androguard") if self.ret_type == "APK": + # Suppress Androguard warnings about AndroidManifest, + # as we don't use Androguard’s AndroidManifest parsing results. + logging.getLogger("androguard.axml").disabled = True + logging.getLogger("androguard.apk").disabled = True # return the APK, list of DalvikVMFormat, and Analysis objects - self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(apk_filepath) + self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True) elif self.ret_type == "DEX": # return the sha256hash, DalvikVMFormat, and Analysis objects - _, _, self.analysis = AnalyzeDex(apk_filepath) - self._manifest = None + _, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data) else: raise ValueError("Unsupported File type.") diff --git a/quark/core/apkpatcher.py b/quark/core/apkpatcher.py new file mode 100644 index 00000000..2a0b8c0e --- /dev/null +++ b/quark/core/apkpatcher.py @@ -0,0 +1,333 @@ +from __future__ import annotations + +from contextlib import suppress +import mmap +import struct +import zlib +from typing import Iterator, Tuple +import logging +from quark import config +from quark.utils.logger import defaultHandler + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) +log.addHandler(defaultHandler) +log.disabled = not config.DEBUG + +EOCD_SIGNATURE = b"PK\x05\x06" +CDH_SIGNATURE = b"PK\x01\x02" +LFH_SIGNATURE = b"PK\x03\x04" + +# A set of all compression methods defined in the ZIP file format spec. +# See https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT for details. +VALID_COMPRESSION_METHODS = set(range(0, 21)) | set(range(93, 100)) + + +class SeekableMMap(mmap.mmap): + """ + A mmap.mmap subclass that adds the seekable method required by + zipfile.ZipFile in Python 3.12 or earlier. + """ + + def seekable(self) -> bool: + """ + Return whether the file supports seeking. Always return True. + See https://docs.python.org/3/library/mmap.html#mmap.mmap.seekable. + """ + return True + + +class ApkPatcher: + """ + A utility class to handle anti-analysis techniques in Android APK files. + """ + + @staticmethod + def patch(raw_data: mmap.mmap) -> bool: + """ + Finds and patches known anti-analysis techniques in an APK. + + This function perform patches in place and suppresses any errors to + prevent crashes that would interrupt the analysis. + + :param raw_data: A memory-mapped file object of the APK. + :return: True if any part of the APK was patched; False otherwise. + """ + try: + eocd_offset = ApkPatcher._find_eocd(raw_data) + cdh_count, cdh_start_offset = ApkPatcher._parse_eocd( + raw_data, eocd_offset + ) + compression_patched = ApkPatcher._patch_invalid_compression_method( + raw_data, cdh_count, cdh_start_offset + ) + manifest_patched = ApkPatcher._patch_manifest_signature( + raw_data, cdh_count, cdh_start_offset + ) + return compression_patched or manifest_patched + + except BaseException as e: + log.exception(e) + return False + + @staticmethod + def _find_eocd(raw_data: mmap.mmap) -> int: + """ + Finds the End of Central Directory (EOCD) record in the APK data. + + :param raw_data: A memory-mapped file object of the APK. + :raises ValueError: If the EOCD signature cannot be found. + """ + eocd_offset = raw_data.rfind(EOCD_SIGNATURE) + if eocd_offset == -1: + raise ValueError("EOCD signature not found in the file.") + return eocd_offset + + @staticmethod + def _parse_eocd(raw_data: mmap.mmap, eocd_offset: int) -> Tuple[int, int]: + """ + Parses the EOCD to find the Central Directory offset and entry count. + + :param raw_data: A memory-mapped file object of the APK. + :param eocd_offset: The offset of the EOCD record. + :return: A tuple containing the total number of CDH entries and the + starting offset of the first CDH entry. + """ + cdh_count_offset = eocd_offset + 10 + cdh_start_offset_offset = eocd_offset + 16 + + (cdh_count,) = struct.unpack_from(" Iterator[tuple[int, bool]]: + """ + Iterates over the Central Directory Headers (CDH) and yields offsets. + + :param raw_data: A memory-mapped file object of the APK. + :param cdh_count: The total number of CDH entries. + :param cdh_start_offset: The starting offset of the first CDH entry. + :return: An iterator that yields the offset of each CDH. + """ + current_offset = cdh_start_offset + for _ in range(cdh_count): + actual_signature = raw_data[ + current_offset : current_offset + len(CDH_SIGNATURE) + ] + is_valid_signature = actual_signature == CDH_SIGNATURE + yield current_offset, is_valid_signature + + filename_len_offset = current_offset + 28 + extra_field_len_offset = current_offset + 30 + comment_len_offset = current_offset + 32 + + (filename_len,) = struct.unpack_from( + " bool: + """ + Finds and patches entries with invalid compression methods. + + + This function checks the compression method in all Central Directory + Headers (CDHs). If an invalid compression method is found, it patches + the method to 0 in both the CDH and the corresponding Local File Header + (LFH). It also updates the compressed size to match the uncompressed + size. + + :param raw_data: A memory-mapped file object of the APK. + :param cdh_count: The total number of CDH entries. + :param cdh_start_offset: The starting offset of the first CDH entry. + :return: True if any compression method was patched, False otherwise. + """ + isPatched = False + + for current_offset, is_valid_signature in ApkPatcher._iter_cdh( + raw_data, cdh_count, cdh_start_offset + ): + if not is_valid_signature: + log.warning( + f"Found invalid CDH signature at offset {current_offset}." + " Try parsing it anyway." + ) + + compression_method_offset = current_offset + 10 + lfh_offset_offset = current_offset + 42 + + compression_method, *_ = struct.unpack_from( + " bool: + """ + Finds and patches the signature of an uncompressed AndroidManifest.xml. + + If the manifest file is found and its compression method is STORED (0), + this method checks if the first byte of its data is 0x03. If not, it + patches the byte and updates the CRC-32 checksum in the LFH and CDH. + + :param raw_data: A memory-mapped file object of the APK. + :param cdh_count: The total number of CDH entries. + :param cdh_start_offset: The starting offset of the first CDH entry. + :return: True if the manifest signature was patched, False otherwise. + """ + is_patched = False + + expected_file_name = "AndroidManifest.xml".encode( + "utf-8", errors="ignore" + ) + expected_file_name_len = len(expected_file_name) + + for current_offset, is_valid_signature in ApkPatcher._iter_cdh( + raw_data, cdh_count, cdh_start_offset + ): + if not is_valid_signature: + log.warning( + f"Found invalid CDH signature at offset {current_offset}." + " Try parsing it anyway." + ) + + # Check filename + filename_offset = current_offset + 46 + actual_file_name = raw_data[ + filename_offset : filename_offset + expected_file_name_len + ] + + if actual_file_name != expected_file_name: + continue + + # Check compression method (0 = STORED) + compression_method_offset = current_offset + 10 + (compression_method,) = struct.unpack_from( + " str: return f"" - @staticmethod + def __del__(self): + if hasattr(self, "data"): + self.data.close() + if hasattr(self, "file"): + self.file.close() + def __extractAndroidManifest( - apk_filepath: str | PathLike, - tmp_dir: str | PathLike = None - ) -> str: + self, data: SeekableMMap, tmp_dir: str | PathLike | None + ) -> str | None: tmp_dir = tempfile.mkdtemp() if tmp_dir is None else tmp_dir - with zipfile.ZipFile(apk_filepath) as apk: - apk.extract("AndroidManifest.xml", path=tmp_dir) - return os.path.join( - tmp_dir, "AndroidManifest.xml" - ) + + with zipfile.ZipFile(data, "r") as apk: # type: ignore + if ANDROID_MANIFEST_FILE_NAME not in apk.namelist(): + print_warning("APK does not contain AndroidManifest.xml.") + return None + + apk.extract(ANDROID_MANIFEST_FILE_NAME, path=tmp_dir) + return os.path.join(tmp_dir, ANDROID_MANIFEST_FILE_NAME) @property def filename(self) -> str: @@ -91,7 +115,7 @@ def permissions(self) -> List[str]: :return: a list of all permissions """ - if self.ret_type != "APK": + if self.ret_type != "APK" or self._manifest is None: return [] with AxmlReader(self._manifest) as axml: @@ -114,7 +138,7 @@ def application(self) -> XMLElement | None: :return: an application element """ - if self.ret_type != "APK": + if self.ret_type != "APK" or self._manifest is None: return None with AxmlReader(self._manifest) as axml: @@ -129,7 +153,7 @@ def activities(self) -> List[XMLElement] | None: :return: a list of all activities """ - if self.ret_type != "APK": + if self.ret_type != "APK" or self._manifest is None: return None with AxmlReader(self._manifest) as axml: @@ -144,7 +168,7 @@ def receivers(self) -> List[XMLElement] | None: :return: a list of all receivers """ - if self.ret_type != "APK": + if self.ret_type != "APK" or self._manifest is None: return None with AxmlReader(self._manifest) as axml: @@ -158,7 +182,7 @@ def providers(self) -> List[XMLElement] | None: :return: python list containing provider elements """ - if self.ret_type != "APK": + if self.ret_type != "APK" or self._manifest is None: return None with AxmlReader(self._manifest) as axml: @@ -284,7 +308,9 @@ def get_subclasses(self, class_name) -> Set[str]: pass @staticmethod - def _check_file_signature(raw: bytes) -> Optional[str]: + def _check_file_signature( + raw: mmap.mmap, + ) -> Literal["DEX", "APK", "AXML", None]: if raw[0:3] == b"dex": return "DEX" elif raw[0:2] == b"PK": diff --git a/quark/core/r2apkinfo.py b/quark/core/r2apkinfo.py index 1dd2e482..a999a158 100644 --- a/quark/core/r2apkinfo.py +++ b/quark/core/r2apkinfo.py @@ -54,12 +54,16 @@ def __init__( elif self.ret_type == "APK": self._tmp_dir = tempfile.mkdtemp() if tmp_dir is None else tmp_dir - # Extract AndroidManifest.xml - with zipfile.ZipFile(self.apk_filepath) as apk: - apk.extract("AndroidManifest.xml", path=self._tmp_dir) - - self._manifest = os.path.join( - self._tmp_dir, "AndroidManifest.xml") + if self.isPatched: + # The APK has been patched to mitigate anti-analysis + # techniques. Therefore, Radare2 must parse the patched data + # instead of the original APK. + self.apk_filepath = os.path.join(self._tmp_dir, "patched.apk") + with open(self.apk_filepath, "wb") as patchedApk: + patchedApk.write(self.data) + + self.data.close() + self.file.close() else: raise ValueError("Unsupported File type.") diff --git a/quark/core/rzapkinfo.py b/quark/core/rzapkinfo.py index 7749c392..d69223e7 100644 --- a/quark/core/rzapkinfo.py +++ b/quark/core/rzapkinfo.py @@ -55,11 +55,16 @@ def __init__( elif self.ret_type == "APK": self._tmp_dir = tempfile.mkdtemp() if tmp_dir is None else tmp_dir - with zipfile.ZipFile(self.apk_filepath) as apk: - apk.extract("AndroidManifest.xml", path=self._tmp_dir) - - self._manifest = os.path.join( - self._tmp_dir, "AndroidManifest.xml") + if self.isPatched: + # The APK has been patched to mitigate anti-analysis + # techniques. Therefore, Rizin must parse the patched data + # instead of the original APK. + self.apk_filepath = os.path.join(self._tmp_dir, "patched.apk") + with open(self.apk_filepath, "wb") as patchedApk: + patchedApk.write(self.data) + + self.data.close() + self.file.close() else: raise ValueError("Unsupported File type.") diff --git a/quark/evaluator/pyeval.py b/quark/evaluator/pyeval.py index b4638d6f..6c6bc320 100644 --- a/quark/evaluator/pyeval.py +++ b/quark/evaluator/pyeval.py @@ -7,24 +7,16 @@ # http://pallergabor.uw.hu/androidblog/dalvik_opcodes.html import logging -from datetime import datetime - from quark import config from quark.core.struct.registerobject import RegisterObject from quark.core.struct.tableobject import TableObject +from quark.utils.logger import defaultHandler MAX_REG_COUNT = 40 log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) -if config.DEBUG: - TIMESTAMPS = datetime.now().strftime("%Y-%m-%d") - LOG_FILENAME = f"{TIMESTAMPS}.quark.log" - handler = logging.FileHandler(LOG_FILENAME, mode="w") - format_str = "%(asctime)s %(levelname)s [%(lineno)d]: %(message)s" - handler.setFormatter(logging.Formatter(format_str)) - log.addHandler(handler) -else: - log.disabled = True +log.addHandler(defaultHandler) +log.disabled = not config.DEBUG def logger(func): diff --git a/quark/utils/logger.py b/quark/utils/logger.py new file mode 100644 index 00000000..dbc867ce --- /dev/null +++ b/quark/utils/logger.py @@ -0,0 +1,10 @@ +from datetime import datetime +from logging import FileHandler, Formatter + +TIMESTAMPS = datetime.now().strftime("%Y-%m-%d") +LOG_FILE_NAME = f"{TIMESTAMPS}.quark.log" +LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s [%(lineno)d]: %(message)s" + +defaultFormatter = Formatter(LOG_FORMAT) +defaultHandler = FileHandler(LOG_FILE_NAME, mode="w") +defaultHandler.setFormatter(defaultFormatter) diff --git a/tests/conftest.py b/tests/conftest.py index 5857935c..ff8c3110 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,8 +42,20 @@ "https://github.com/quark-engine/apk-samples" "/raw/master/vulnerable-samples/Vuldroid.apk" ), - "fileName": "Vuldroid.apk" - } + "fileName": "Vuldroid.apk", + }, + { + "sourceUrl": ( + "https://github.com/quark-engine/apk-samples" + "/raw/master/malware-samples/" + "3d52b5728af55c37d5bd74c3f9b7e9ea6b007a9ec202a648ce3dc7e37ff49b29" + ".apk" + ), + "fileName": ( + "3d52b5728af55c37d5bd74c3f9b7e9ea6b007a9ec202a648ce3dc7e37ff49b29" + ".apk" + ), + }, ] @@ -82,6 +94,12 @@ def SAMPLE_PATH_Ahmyth(tmp_path_factory: pytest.TempPathFactory) -> str: def SAMPLE_PATH_pivaa(tmp_path_factory: pytest.TempPathFactory) -> str: return downloadSample(tmp_path_factory, SAMPLES[3]) + @pytest.fixture(scope="session") def SAMPLE_PATH_Vuldroid(tmp_path_factory: pytest.TempPathFactory) -> str: - return downloadSample(tmp_path_factory, SAMPLES[4]) \ No newline at end of file + return downloadSample(tmp_path_factory, SAMPLES[4]) + + +@pytest.fixture(scope="session") +def SAMPLE_PATH_3d52b(tmp_path_factory: pytest.TempPathFactory) -> str: + return downloadSample(tmp_path_factory, SAMPLES[5]) diff --git a/tests/core/test_apkpatcher.py b/tests/core/test_apkpatcher.py new file mode 100644 index 00000000..476d23ad --- /dev/null +++ b/tests/core/test_apkpatcher.py @@ -0,0 +1,66 @@ +import mmap +import zipfile + +import pytest + +from quark.core.apkpatcher import ( + VALID_COMPRESSION_METHODS, + ApkPatcher, + SeekableMMap, +) + + +@pytest.fixture(scope="session") +def apkContent(SAMPLE_PATH_3d52b): + with open(SAMPLE_PATH_3d52b, "rb") as fp, SeekableMMap( + fp.fileno(), 0, access=mmap.ACCESS_COPY + ) as mm: + yield mm + + +class TestApkPatcher: + def test_patch(self, apkContent: SeekableMMap): + """ + Tests that ApkPatcher.patch correctly fixes invalid compression methods, + updates sizes, and patches AndroidManifest.xml signature. + """ + # The return values of patch indicates if any modification was made. + # Assuming SAMPLE_PATH_3d52b requires patching for both. + # If the sample APK doesn't have issues, this assertion might need adjustment + # (e.g., to be more specific about *which* part was patched). + # For now, we assert that *some* patching occurred. + assert ApkPatcher.patch(apkContent) is True + + # Verify all compression methods and sizes are valid. + manifest_found = False + with zipfile.ZipFile(apkContent, "r") as patched_zf: # type: ignore + for info in patched_zf.infolist(): + # Compression method and size checks + assert info.compress_type in VALID_COMPRESSION_METHODS, ( + f"File '{info.filename}' has invalid compression " + f"type {info.compress_type} after patching." + ) + + if info.compress_type == 0: # Only check STORED for size match + assert info.compress_size == info.file_size, ( + f"File '{info.filename}' has type STORED but " + f"mismatched sizes (compress:{info.compress_size}, " + f"file:{info.file_size})." + ) + + # Manifest signature check + if info.filename == "AndroidManifest.xml": + manifest_found = True + # Read AndroidManifest.xml content from the patched ZIP + manifest_content = patched_zf.read(info.filename) + assert ( + len(manifest_content) > 0 + ), "AndroidManifest.xml content is empty." + assert manifest_content[0] == 0x03, ( + "First byte of AndroidManifest.xml" + " is not 0x03 after patching." + ) + + assert ( + manifest_found + ), "AndroidManifest.xml not found in the patched APK."