Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 93 additions & 47 deletions dissect/target/plugins/os/unix/_os.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

import re
import shlex
import uuid
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -446,6 +446,54 @@ def _get_architecture(self, os: str = "unix", path: Path | str = "/bin/ls") -> s
return f"{arch}_32-{os}" if bits == 1 and arch[-2:] != "32" else f"{arch}-{os}"


def parse_fstab_entry(entry: str) -> tuple[str, str, str, str, bool, int]:
"""Parse a single fstab entry according to the man page fstab(5).

According to the man page, the structure of a fstab entry is::

<file system> <mount point> <type> <options> <dump> <pass>
"""
entry = entry.strip()
if not entry or entry.startswith("#"):
raise ValueError("Empty or commented line")

# Fields are separated by tabs or spaces.
parts = shlex.split(entry)

# <file system> <mount point> <type> <options> <dump> <pass>
if len(parts) < 2:
raise ValueError(f"Invalid fstab entry, not enough fields: {entry}")

if len(parts) > 6:
raise ValueError(f"Invalid fstab entry, too many fields: {entry}")

# Pad with defaults
parts.extend([""] * (6 - len(parts)))
fs_spec, mount_point, fs_type, options, dump, pass_num = parts

if not fs_type:
fs_type = "auto"

if not options:
options = "defaults"

if dump == "1":
is_dump = True
elif not dump or dump == "0":
is_dump = False
else:
raise ValueError(f"Invalid dump: {dump}")

if not pass_num:
pass_num = 0
elif pass_num.isnumeric():
pass_num = int(pass_num)
else:
raise ValueError(f"Invalid pass num: {pass_num}")

return fs_spec, mount_point, fs_type, options, is_dump, pass_num


def parse_fstab(
fstab: TargetPath,
log: logging.Logger = log,
Expand All @@ -465,53 +513,51 @@ def parse_fstab(
if not fstab.exists():
return

for entry in fstab.open("rt"):
entry = entry.strip()
if entry.startswith("#"):
continue

entry_parts = re.split(r"\s+", entry)

if len(entry_parts) != 6:
continue

dev, mount_point, fs_type, options, _, _ = entry_parts

if fs_type in SKIP_FS_TYPES:
log.warning("Skipped FS type: %s, %s, %s", fs_type, dev, mount_point)
continue

dev_id = None
volume_name = None
if dev.startswith(("/dev/mapper", "/dev/gpt")):
volume_name = dev.rsplit("/")[-1]
elif dev.startswith("/dev/disk/by-uuid"):
dev_id = dev.rsplit("/")[-1]
elif dev.startswith("/dev/") and dev.count("/") == 3:
# When composing a vg-lv name, LVM2 replaces hyphens with double hyphens in the vg and lv names
# Emulate that here when combining the vg and lv names
volume_name = "-".join(part.replace("-", "--") for part in dev.rsplit("/")[-2:])
elif dev.startswith("UUID="):
dev_id = dev.split("=")[1]
elif dev.startswith("LABEL="):
volume_name = dev.split("=")[1]
elif fs_type == "nfs":
# Put the nfs server address in dev_id and the root path in volume_name
dev_id, sep, volume_name = dev.partition(":")
if sep != ":":
log.warning("Invalid NFS mount: %s", dev)
with fstab.open("rt") as fstab_file:
for line in fstab_file:
try:
entry = parse_fstab_entry(line)
except ValueError as e:
log.warning("Failed to parse fstab entry: %s", e)
continue
else:
log.warning("Unsupported mount device: %s %s", dev, mount_point)
continue

if mount_point == "/":
continue
dev, mount_point, fs_type, options, _, _ = entry

if dev_id:
try:
dev_id = uuid.UUID(dev_id)
except ValueError:
pass
if fs_type in SKIP_FS_TYPES:
log.warning("Skipped FS type: %s, %s, %s", fs_type, dev, mount_point)
continue

dev_id = None
volume_name = None
if dev.startswith(("/dev/mapper", "/dev/gpt")):
volume_name = dev.rsplit("/")[-1]
elif dev.startswith("/dev/disk/by-uuid"):
dev_id = dev.rsplit("/")[-1]
elif dev.startswith("/dev/") and dev.count("/") == 3:
# When composing a vg-lv name, LVM2 replaces hyphens with double hyphens in the vg and lv names
# Emulate that here when combining the vg and lv names
volume_name = "-".join(part.replace("-", "--") for part in dev.rsplit("/")[-2:])
elif dev.startswith("UUID="):
dev_id = dev.split("=")[1].strip('"')
elif dev.startswith("LABEL="):
volume_name = dev.split("=")[1].strip('"')
elif fs_type == "nfs":
# Put the nfs server address in dev_id and the root path in volume_name
dev_id, sep, volume_name = dev.partition(":")
if sep != ":":
log.warning("Invalid NFS mount: %s", dev)
continue
else:
log.warning("Unsupported mount device: %s %s", dev, mount_point)
continue

if mount_point == "/":
continue

if dev_id:
try:
dev_id = uuid.UUID(dev_id)
except ValueError:
pass

yield dev_id, volume_name, mount_point, fs_type, options
yield dev_id, volume_name, mount_point, fs_type, options
68 changes: 68 additions & 0 deletions dissect/target/plugins/os/unix/etc/fstab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export
from dissect.target.plugins.os.unix._os import parse_fstab_entry

if TYPE_CHECKING:
from collections.abc import Iterator

FstabRecord = TargetRecordDescriptor(
"linux/etc/fstab",
[
("string", "device_path"),
("string", "mount_path"),
("string", "fs_type"),
("string[]", "options"),
("boolean", "is_dump"),
("varint", "pass_num"),
],
)


class FstabPlugin(Plugin):
"""Linux fstab file plugin."""

__namespace__ = "etc"

def check_compatible(self) -> None:
if not self.target.fs.exists("/etc/fstab"):
raise UnsupportedPluginError("No fstab file found on target")

@export(record=FstabRecord)
def fstab(self) -> Iterator[FstabRecord]:
"""Return the mount entries from ``/etc/fstab``.

Yields ``FstabRecord`` with the following fields:

. code-block:: text

device_path (string): The device path.
mount_path (string): The mount path.
fs_type (string): The filesystem type.
options (string[]): The mount options.
is_dump (boolean): The dump frequency flag.
pass_num (varint): The pass number.
"""
fstab_path = self.target.fs.path("/etc/fstab")
with fstab_path.open("rt") as fstab_file:
for line in fstab_file:
try:
entry = parse_fstab_entry(line)
except ValueError as e:
self.target.log.warning("Failed to parse fstab entry: %s", e)
continue

fs_spec, mount_point, fs_type, options, is_dump, pass_num = entry
yield FstabRecord(
device_path=fs_spec,
mount_path=mount_point,
fs_type=fs_type,
options=options.split(","),
is_dump=is_dump,
pass_num=pass_num,
_target=self.target,
)
56 changes: 56 additions & 0 deletions dissect/target/plugins/os/unix/linux/mounts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export
from dissect.target.plugins.os.unix.etc.fstab import FstabRecord

if TYPE_CHECKING:
from collections.abc import Iterator

MountRecord = TargetRecordDescriptor(
"linux/proc/mounts",
[
("varint", "pid"),
*FstabRecord.target_fields,
],
)


class MountsPlugin(Plugin):
"""Linux volatile proc mounts plugin."""

def check_compatible(self) -> None:
if not self.target.has_function("proc"):
raise UnsupportedPluginError("proc filesystem not available")

@export(record=MountRecord)
def mounts(self) -> Iterator[MountRecord]:
"""Return the mount points for all processes.

Yields ``MountRecord`` with the following fields:

. code-block:: text

pid (varint): The process id (pid) of the process.
device_path (string): The device path.
mount_path (string): The mount path.
fs_type (string): The filesystem type.
options (string[]): The mount options.
is_dump (boolean): The dump frequency flag.
pass_num (varint): The pass number.
"""
for process in self.target.proc.processes():
for mount_entry in process.mounts():
yield MountRecord(
pid=process.pid,
device_path=mount_entry.fs_spec,
mount_path=mount_entry.mount_path,
fs_type=mount_entry.fs_type,
options=mount_entry.options,
is_dump=mount_entry.is_dump,
pass_num=mount_entry.pass_num,
_target=self.target,
)
37 changes: 34 additions & 3 deletions dissect/target/plugins/os/unix/linux/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
from dissect.target.filesystem import fsutil
from dissect.target.helpers.utils import StrEnum
from dissect.target.plugin import Plugin, internal
from dissect.target.plugins.os.unix._os import parse_fstab_entry

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import datetime
from pathlib import Path

from typing_extensions import Self

from dissect.target.helpers.fsutil import TargetPath
from dissect.target.target import Target


Expand Down Expand Up @@ -155,6 +156,16 @@ class Environ:
contents: str


@dataclass
class FstabEntry:
fs_spec: str
mount_path: str
fs_type: str
options: list[str]
is_dump: bool
pass_num: int


class ProcessStateEnum(StrEnum):
R = "Running" # Running
I = "Idle" # Idle # noqa: E741
Expand Down Expand Up @@ -501,7 +512,7 @@ def _boottime(self) -> int | None:
return int(line.split()[1])
return None

def get(self, path: str) -> Path:
def get(self, path: str) -> TargetPath:
"""Returns a TargetPath relative to this process."""
return self.entry.joinpath(path)

Expand Down Expand Up @@ -608,6 +619,26 @@ def cmdline(self) -> str:

return line

def mounts(self) -> Iterator[FstabEntry]:
"""Yields the content of the mount file associated with the process."""
mount_path = self.get("mounts")

if not (mount_path.exists() and mount_path.is_file()):
return

with mount_path.open("rt") as mount_file:
for line in mount_file:
try:
entry = parse_fstab_entry(line)
except ValueError as e:
self.target.log.warning("Failed to parse fstab entry: %s", e)
continue

fs_spec, mount_point, fs_type, options, is_dump, pass_num = entry
options = options.split(",")

yield FstabEntry(fs_spec, mount_point, fs_type, options, is_dump, pass_num)

def stat(self) -> fsutil.stat_result:
"""Return a stat entry of the process."""
return self.entry.stat()
Expand Down Expand Up @@ -646,7 +677,7 @@ def inode_map(self) -> dict[int, list[ProcProcess]]:
return map

@internal
def iter_proc(self) -> Iterator[Path]:
def iter_proc(self) -> Iterator[TargetPath]:
"""Yields ``/proc/[pid]`` filesystems entries for every process id (pid) found in procfs."""
yield from self.target.fs.path("/proc").glob("[0-9]*")

Expand Down
3 changes: 3 additions & 0 deletions tests/_data/plugins/os/unix/etc/fstab
Git LFS file not shown
Loading