Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions quark/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@
required=False,
default=1,
)
@click.option(
"--auto-fix-checksum",
help="Automatically repair damaged DEX checksum/signature before analyzing (androguard only)." +
"When not provided, Quark will prompt in interactive TTY and skip in non-interactive runs.",
is_flag=True,
default=False,
show_default=True,
)
def entry_point(
summary,
detail,
Expand All @@ -166,6 +174,7 @@ def entry_point(
comparison,
core_library,
num_of_process,
auto_fix_checksum,
):
"""Quark is an Obfuscation-Neglect Android Malware Scoring System"""
# Load rules
Expand Down Expand Up @@ -227,9 +236,9 @@ def entry_point(
malware_confidences = {}
for apk_ in apk:
data = (
ParallelQuark(apk_, core_library, num_of_process)
ParallelQuark(apk_, core_library, num_of_process, auto_fix_checksum)
if num_of_process > 1
else Quark(apk_, core_library)
else Quark(apk_, core_library, auto_fix_checksum)
)
all_labels = {}
# dictionary containing
Expand Down Expand Up @@ -284,9 +293,9 @@ def entry_point(

# Load APK
data = (
ParallelQuark(apk[0], core_library, num_of_process)
ParallelQuark(apk[0], core_library, num_of_process, auto_fix_checksum)
if num_of_process > 1
else Quark(apk[0], core_library)
else Quark(apk[0], core_library, auto_fix_checksum)
)

if label:
Expand Down
193 changes: 187 additions & 6 deletions quark/core/apkinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@
# This file is part of Quark-Engine - https://github.com/quark-engine/quark-engine
# See the file 'LICENSE' for copying permission.

import click
import functools
import hashlib
import io
import logging
import re
import sys
import zipfile
import zlib
from collections import defaultdict
from os import PathLike
from typing import Dict, List, Optional, Set, Union
Expand All @@ -22,22 +28,168 @@
class AndroguardImp(BaseApkinfo):
"""Information about apk based on androguard analysis"""

def __init__(self, apk_filepath: Union[str, PathLike]):
super().__init__(apk_filepath, "androguard")
def __init__(self, apk_filepath: Union[str, PathLike], auto_fix_checksum=False):
super().__init__(apk_filepath, "androguard", auto_fix_checksum=auto_fix_checksum)

if self.ret_type == "APK":
# Suppress Androguard warnings about AndroidManifest,
# as we don't use Androguard’s AndroidManifest parsing results.
logging.getLogger("androguard.axml").disabled = True
logging.getLogger("androguard.apk").disabled = True
# return the APK, list of DalvikVMFormat, and Analysis objects
self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True)
try:
# return the APK, list of DalvikVMFormat, and Analysis objects
self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(self.data, raw=True)
except Exception as e:
# Check if the exception looks like a checksum error
if self._looks_like_checksum_error(e):
# If auto_fix_checksum is not enabled, ask the user for confirmation
if not self.auto_fix_checksum:
if sys.stdin.isatty() and sys.stdout.isatty():
# If the environment is interactive, ask the user for confirmation
agree = click.confirm(
"\n⚠ Detected damaged DEX checksum/signature."
"\n Fixing will modify DEX headers and invalidate APK signatures (hashes will change)."
"\n Proceed to repair and continue the analysis?",
default=False,
show_default=True,
)

if not agree:
# If the user does not agree, cancel the repair and keep the original file
click.echo("\n✖ Repair canceled by user. Keeping the original file.\n", err=True)

raise e
else:
# If the environment is non-interactive, print a message and abort the analysis
click.echo(
"ℹ Detected damaged DEX checksum/signature but --auto-fix-checksum was not provided "
"and the environment is non-interactive. Skipping repair and aborting.\n"
" Hint: rerun with --auto-fix-checksum to repair automatically.",
err=True,
)
raise e


# Repack the APK with fixed DEX headers
fixed_bytes = self._repack_apk_with_fixed_dex_headers_from_bytes(self.data)

# Check if the APK was actually repacked
if fixed_bytes:
# Analyze the repacked APK
self.apk, self.dalvikvmformat, self.analysis = AnalyzeAPK(fixed_bytes, raw=True)
else:
# If the APK was not actually repacked, raise the original exception
raise e
else:
# If the exception does not look like a checksum error, raise the original exception
raise e
elif self.ret_type == "DEX":
# return the sha256hash, DalvikVMFormat, and Analysis objects
_, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data)
try:
# return the sha256hash, DalvikVMFormat, and Analysis objects
_, _, self.analysis = get_default_session().addDEX(self.apk_filename, self.data)
except Exception as e:
# Check if the exception looks like a checksum error
if self._looks_like_checksum_error(e):
# If auto_fix_checksum is not enabled, ask the user for confirmation
if not self.auto_fix_checksum:
if sys.stdin.isatty() and sys.stdout.isatty():
# If the environment is interactive, ask the user for confirmation
agree = click.confirm(
"\n⚠ Detected damaged DEX checksum/signature."
"\n Fixing will modify DEX headers and invalidate APK signatures (hashes will change)."
"\n Proceed to repair and continue the analysis?",
default=False,
show_default=True,
)

if not agree:
# If the user does not agree, cancel the repair and keep the original file
click.echo("✖ Repair canceled by user. Keeping the original file.", err=True)

raise e
else:
# If the environment is non-interactive, print a message and abort the analysis
click.echo(
"ℹ Detected damaged DEX checksum/signature but --auto-fix-checksum was not provided "
"and the environment is non-interactive. Skipping repair and aborting.\n"
" Hint: rerun with --auto-fix-checksum to repair automatically.",
err=True,
)
raise e

# Fix the header of the DEX file
fixed = self._fix_single_dex_header(self.data)

# Check if the DEX file was actually fixed
if fixed != self.data:
# Analyze the fixed DEX file
_, _, self.analysis = get_default_session().addDEX(self.apk_filename, fixed)
else:
# If the DEX file was not actually fixed, raise the original exception
raise e
else:
# If the exception does not look like a checksum error, raise the original exception
raise e
else:
raise ValueError("Unsupported File type.")

def _repack_apk_with_fixed_dex_headers_from_bytes(self, apk_bytes: bytes) -> Optional[bytes]:
"""Repack the APK with fixed DEX headers from bytes.

Open the APK in memory, fix the headers of all .dex files, and repack them back into bytes.
Only return the new APK if there were actually changes; otherwise return None.

"""
try:
# Open the APK in memory
in_io = io.BytesIO(apk_bytes)

# Open the APK as a zip file
with zipfile.ZipFile(in_io, "r") as zf:
# Create a new zip file to store the fixed DEX files
updated = False
out_io = io.BytesIO()

# Open the new zip file to store the fixed DEX files
with zipfile.ZipFile(out_io, "w", compression=zipfile.ZIP_DEFLATED) as out_zf:
# Iterate over all files in the APK
for info in zf.infolist():
# Read the data of the file
data = zf.read(info.filename)

# Check if the file is a DEX file
if info.filename.lower().endswith(".dex"):
# Fix the header of the DEX file
fixed = self._fix_single_dex_header(data)

# Check if the DEX file was actually fixed
if fixed != data:
updated = True
data = fixed

# keep basic zip entry metadata
zi = zipfile.ZipInfo(info.filename, date_time=info.date_time)
zi.compress_type = zipfile.ZIP_DEFLATED
zi.external_attr = info.external_attr
zi.create_system = info.create_system

# Write the fixed DEX file to the new zip file
out_zf.writestr(zi, data)

# Return the new APK if there were actually changes; otherwise return None
return out_io.getvalue() if updated else None
except Exception as e:
# If there was an error, return None
return None


def _looks_like_checksum_error(self,e: Exception) -> bool:
"""Check if the exception looks like a checksum error."""
s = str(e).lower()

keywords = ("checksum", "adler32", "wrong adler32")
return any(kw in s for kw in keywords)

@property
def android_apis(self) -> Set[MethodObject]:
apis = set()
Expand Down Expand Up @@ -295,3 +447,32 @@ def _convert_to_method_object(
descriptor=str(method_analysis.descriptor),
cache=method_analysis,
)

@staticmethod
def _fix_single_dex_header(dex: bytes) -> bytes:
"""Fix the header of a single DEX file.

Layout:
- 0x08..0x0B: Adler32
- 0x0C..0x1F: SHA-1
"""

# Check if the DEX file is valid
if len(dex) < 0x20 or not dex.startswith(b"dex\n"):
# Invalid DEX file
# Return the original DEX file without any changes
return dex

# SHA-1 Signature (20 bytes)
sha1 = hashlib.sha1(dex[0x20:]).digest()
# Adler32 Checksum (4 bytes) little-endian
ad = zlib.adler32(dex[0x0C:]) & 0xFFFFFFFF

# Update the header
out = bytearray(dex)
# Update the SHA-1 Signature
out[0x0C:0x20] = sha1
# Update the Adler32 Checksum
out[0x08:0x0C] = ad.to_bytes(4, "little")

return bytes(out)
4 changes: 3 additions & 1 deletion quark/core/interface/baseapkinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def __init__(
self,
apk_filepath: str | PathLike,
core_library: str = "None",
tmp_dir: str | PathLike | None = None
tmp_dir: str | PathLike | None = None,
auto_fix_checksum=False
):
self.file = open(apk_filepath, "rb")
self.data = SeekableMMap(self.file.fileno(), 0, access=mmap.ACCESS_COPY)
Expand All @@ -53,6 +54,7 @@ def __init__(
self.apk_filename = os.path.basename(apk_filepath)
self.apk_filepath = apk_filepath
self.core_library = core_library
self.auto_fix_checksum = auto_fix_checksum


def __repr__(self) -> str:
Expand Down
8 changes: 4 additions & 4 deletions quark/core/parallelquark.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

class ParallelQuark(Quark):
@staticmethod
def _worker_initializer(apk, core_library):
def _worker_initializer(apk, core_library, auto_fix_checksum):
global _quark
_quark = Quark(apk, core_library)
_quark = Quark(apk, core_library, auto_fix_checksum)

@staticmethod
def _worker_analysis(rule_obj):
Expand Down Expand Up @@ -112,11 +112,11 @@ def _apply_analysis_result(self, rule_obj):
]
)

def __init__(self, apk, core_library, num_of_process=1):
def __init__(self, apk, core_library, num_of_process=1, auto_fix_checksum=False):
self._result_map = {}
self._pool = Pool(
min(num_of_process, cpu_count() - 1), self._worker_initializer,
(apk, core_library)
(apk, core_library, auto_fix_checksum)
)

super().__init__(apk, core_library)
Expand Down
6 changes: 4 additions & 2 deletions quark/core/quark.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
class Quark:
"""Quark module is used to check quark's five-stage theory"""

def __init__(self, apk, core_library="androguard"):
def __init__(self, apk, core_library="androguard", auto_fix_checksum=False):
"""

:param apk: the filename of the apk.
"""
self.auto_fix_checksum = auto_fix_checksum

core_library = core_library.lower()
if core_library == "shuriken":
self.apkinfo = ShurikenImp(apk)
Expand All @@ -59,7 +61,7 @@ def __init__(self, apk, core_library="androguard"):
elif core_library == "radare2":
self.apkinfo = R2Imp(apk)
elif core_library == "androguard":
self.apkinfo = AndroguardImp(apk)
self.apkinfo = AndroguardImp(apk, self.auto_fix_checksum)
else:
raise ValueError(
f"Unsupported core library for Quark: {core_library}"
Expand Down