From d24698489d8c0cdc8706ec5d72f57c253d40d157 Mon Sep 17 00:00:00 2001 From: Marc Schuh Date: Sat, 17 Jan 2026 12:18:43 +0100 Subject: [PATCH 1/2] Adding KVMSnapshotManager first version --- src/opsbox/KVMSnapshotManager/__init__.py | 1 + .../kvm_snapshot_manager.py | 407 ++++++++++++++++++ 2 files changed, 408 insertions(+) create mode 100644 src/opsbox/KVMSnapshotManager/__init__.py create mode 100644 src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py diff --git a/src/opsbox/KVMSnapshotManager/__init__.py b/src/opsbox/KVMSnapshotManager/__init__.py new file mode 100644 index 0000000..32b73a8 --- /dev/null +++ b/src/opsbox/KVMSnapshotManager/__init__.py @@ -0,0 +1 @@ +"""KVM snapshot manager package.""" diff --git a/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py b/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py new file mode 100644 index 0000000..fe9add7 --- /dev/null +++ b/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py @@ -0,0 +1,407 @@ +"""KVM snapshot management helpers.""" + +import argparse +import contextlib +import getpass +import hashlib +import logging +import subprocess +import tempfile +from enum import Enum +from pathlib import Path + +from opsbox.locking.lock_manager import LockManager +from opsbox.logging.logger_setup import LoggingConfig, configure_logging + + +class DomblklistConfig(Enum): + """Configuration values for parsing domblklist output.""" + + HEADER_LINES = ("header_lines", 2) + MAX_DATA_LINES = ("max_data_lines", 2) + PARTS_EXPECTED = ("parts_expected", 2) + + @property + def number(self) -> int: + """Return the numeric configuration value.""" + return self.value[1] + + +class LockConfig(Enum): + """Configuration values for snapshot manager locking.""" + + DIR_NAME = "opsbox-kvm-locks" + + +def lock_path_for_base_image(base_image_path: Path) -> Path: + """Build a lock file path for the given base image.""" + resolved_path = base_image_path.expanduser().resolve() + digest = hashlib.sha256(str(resolved_path).encode("utf-8")).hexdigest() + lock_dir = Path(tempfile.gettempdir()) / LockConfig.DIR_NAME.value + lock_dir.mkdir(parents=True, exist_ok=True) + filename = f"opsbox-kvm-{resolved_path.name or 'base'}-{digest}.lock" + return lock_dir / filename + + +class DomblklistTooManyLinesError(Exception): + """Raised when domblklist returns too many data lines.""" + + def __init__(self, max_lines: int, actual_lines: int) -> None: + """Initialize the error with domblklist line counts.""" + error_msg = ( + "domblklist output has too many data lines " + f"(max {max_lines}, got {actual_lines})." + ) + super().__init__(error_msg) + + +class KVMStillRunningError(Exception): + """Raised when a KVM domain is not shut off.""" + + def __init__(self, domain: str, state: str) -> None: + """Initialize the error with the domain and its state.""" + error_msg = f"Domain '{domain}' is in state '{state}'. It must be shut off." + super().__init__(error_msg) + + +class VirshError(Exception): + """Raised when a virsh command fails.""" + + def __init__(self, command: list[str], stderr: str | None) -> None: + """Initialize the error with command details and stderr output.""" + stderr_text = stderr.strip() if stderr else "no error output" + command_text = " ".join(command) + error_msg = f"Error running '{command_text}': {stderr_text}" + super().__init__(error_msg) + + +class NoHeadSnapshotFoundError(Exception): + """Raised when domblklist has no data lines.""" + + def __init__(self) -> None: + """Initialize the error for empty domblklist output.""" + super().__init__("No data lines in domblklist output.") + + +class HeadSnapshotParseError(Exception): + """Raised when domblklist data cannot be parsed.""" + + def __init__(self, line: str) -> None: + """Initialize the error with the unparseable domblklist line.""" + error_msg = f"Could not parse head snapshot path from line: {line}" + super().__init__(error_msg) + + +class NoBaseImageFoundError(Exception): + """Raised when the configured base image is missing.""" + + def __init__(self, base_image_path: Path) -> None: + """Initialize the error with the missing base image path.""" + error_msg = f"Base image file not found: {base_image_path}" + super().__init__(error_msg) + + +class SnapshotNotFoundError(Exception): + """Raised when a snapshot file in the chain is missing.""" + + def __init__(self, snapshot_path: Path) -> None: + """Initialize the error with the missing snapshot path.""" + error_msg = f"Snapshot file not found in chain: {snapshot_path}" + super().__init__(error_msg) + + +class BackingFileNotFoundError(Exception): + """Raised when a backing file cannot be found.""" + + def __init__(self, snapshot_path: Path) -> None: + """Initialize the error with the snapshot missing a backing file.""" + error_msg = f"No backing file found for snapshot: {snapshot_path}" + super().__init__(error_msg) + + +class BaseImageError(Exception): + """Raised when the base image does not match the snapshot chain.""" + + def __init__(self, snapshot_path: Path, base_image_path: Path) -> None: + """Initialize the error with the snapshot and base image paths.""" + error_msg = ( + f"Snapshot {snapshot_path} does not point to base image {base_image_path}." + ) + super().__init__(error_msg) + + +class KVMSnapshotManager: + """Manage KVM snapshots by inspecting domblklist output.""" + + def __init__( + self, + log_handler: logging.Logger, + domain: str, + base_image: str, + ) -> None: + """Initialize the snapshot manager.""" + self.domain = domain + self.log_handler = log_handler + self.base_image_path = Path(base_image) + self.sudo_password: str | None = None + if not self.base_image_path.is_file(): + raise NoBaseImageFoundError(self.base_image_path) + + self.log_handler.info(f"Using base image: {self.base_image_path}") + + def _run_with_sudo(self, command: list[str]) -> None: + """Run a command with sudo using a prompted password.""" + if self.sudo_password is None: + self.sudo_password = getpass.getpass(prompt="Enter sudo password: ") + sudo_password = self.sudo_password + if sudo_password is None: + error_msg = "Failed to read sudo password." + raise RuntimeError(error_msg) + + subprocess.run( # noqa: S603 + ["sudo", "-S", *command], # noqa: S607 + input=sudo_password + "\n", # Provide password via stdin + text=True, + check=True, + ) + + def _run_command(self, command: list[str]) -> list[str]: + """Run a virsh command for the configured domain and return output lines.""" + try: + result = subprocess.run( # noqa: S603 + command, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + raise VirshError(command, e.stderr) from e + + return result.stdout.strip().split("\n") + + def check_domain_off(self) -> None: + """Ensure the domain is in the shut off state.""" + self.log_handler.info(f"Checking if domain '{self.domain}' is shut off...") + + lines = self._run_command(["virsh", "domstate", self.domain]) + # domstate usually returns something like "running", "shut off", etc. + domain_state = lines[0].lower() if lines else "" + + if domain_state != "shut off": + raise KVMStillRunningError(self.domain, domain_state) + + self.log_handler.info(f"Domain '{self.domain}' is properly shut off.") + + def _run_domblklist(self) -> list[str]: + """Run 'virsh domblklist' for the configured domain.""" + return self._run_command(["virsh", "domblklist", self.domain]) + + def extract_head_snapshot_path(self) -> Path: + """Extract the head snapshot path from domblklist output.""" + self.log_handler.info("Extracting head snapshot path from domblklist output...") + lines = self._run_domblklist() + + # Skip the first two lines (header and separator): + data_lines = lines[DomblklistConfig.HEADER_LINES.number :] + + if len(data_lines) > DomblklistConfig.MAX_DATA_LINES.number: + raise DomblklistTooManyLinesError( + DomblklistConfig.MAX_DATA_LINES.number, + len(data_lines), + ) + + if not data_lines: + raise NoHeadSnapshotFoundError + + parts = data_lines[0].split(None, 1) + if len(parts) == DomblklistConfig.PARTS_EXPECTED.number: + self.log_handler.info(f"Found head snapshot path: {parts[1]}") + return Path(parts[1]) + + raise HeadSnapshotParseError(data_lines[0]) + + def _extract_backing_file(self, path_to_snapshot: Path) -> Path: + """Extract the backing file path for a given snapshot.""" + if not path_to_snapshot.is_file(): + raise SnapshotNotFoundError(path_to_snapshot) + proc = subprocess.run( # noqa: S603 + ["qemu-img", "info", str(path_to_snapshot)], # noqa: S607 + capture_output=True, + text=True, + check=True, + ) + output = proc.stdout.splitlines() + + # Extract "backing file" line if present + backing_file = None + for line in output: + if line.strip().startswith("backing file:"): + # Format typically: "backing file: /path/to/backing.qcow2" + # Split on colon once, then strip extra whitespace + backing_file = Path(line.split(":", 1)[1].strip()) + self.log_handler.info(f"Found backing file: {backing_file}") + return backing_file + raise BackingFileNotFoundError(path_to_snapshot) + + def traverse_snapshot_chain(self, head_snapshot: Path) -> list[Path]: + """Traverse the snapshot chain starting at the head snapshot.""" + chain_info = [] + current = head_snapshot + + while True: + if not current.is_file(): + raise SnapshotNotFoundError(current) + + try: + next_snapshot = self._extract_backing_file(current) + except BackingFileNotFoundError: + if current == self.base_image_path: + self.log_handler.info(f"Reached base image: {current}") + break + raise # Re-raise the exception + + chain_info.append(current) + current = next_snapshot + + return chain_info + + def commit_and_rebase_for_last_image( + self, + snapshot_chain: list[Path], + dryrun: bool = True, + ) -> None: + """Commit and rebase the last snapshot in the chain.""" + last_snapshot = snapshot_chain[-1] + # Ensure the last snapshot points to the base image. + if self._extract_backing_file(last_snapshot) != self.base_image_path: + raise BaseImageError(last_snapshot, self.base_image_path) + + # 1) Commit command for the last snapshot. + commit_cmd = [ + "qemu-img", + "commit", + "-f", + "qcow2", + "-p", + "-t", + "none", + str(last_snapshot), + ] + if dryrun: + self.log_handler.info(f"[DRY RUN] Would run: {' '.join(commit_cmd)}") + else: + self._run_with_sudo(commit_cmd) + + # 2) Rebase the second-to-last snapshot to the base image. + second_last_snapshot = snapshot_chain[-2] + rebase_cmd = [ + "qemu-img", + "rebase", + "-f", + "qcow2", + "-F", + "qcow2", + "-b", + str(self.base_image_path), + str(second_last_snapshot), + ] + if dryrun: + self.log_handler.info(f"[DRY RUN] Would run: {' '.join(rebase_cmd)}") + else: + self._run_with_sudo(rebase_cmd) + + # 3) Remove the actual file for the now-committed snapshot. + if dryrun: + self.log_handler.info(f"[DRY RUN] Would remove file: {last_snapshot}") + else: + last_snapshot.unlink(missing_ok=True) + + # 4) Try to remove the snapshot metadata from libvirt, ignoring errors. + # We'll take the snapshot name from the file stem (strip '.qcow2'). + snapshot_name = last_snapshot.stem + virsh_delete_cmd = [ + "virsh", + "snapshot-delete", + self.domain, + snapshot_name, + "--metadata", + ] + if dryrun: + self.log_handler.info(f"[DRY RUN] Would run: {' '.join(virsh_delete_cmd)}") + else: + with contextlib.suppress(subprocess.CalledProcessError): + subprocess.run(virsh_delete_cmd, check=True) # noqa: S603 + + # 5) Pop the last snapshot from the chain in memory. + snapshot_chain.pop() + + +def main() -> None: + """Parse command line arguments and run checks.""" + parser = argparse.ArgumentParser( + description="Manage KVM external snapshots by checking domblklist output.", + ) + parser.add_argument( + "--domain", + help="The name or ID of the KVM domain to check.", + type=str, + required=True, + ) + parser.add_argument( + "--base-image", + help="The path to the base image of all snapshots.", + type=str, + required=True, + ) + parser.add_argument( + "--dry-run", + help="If given, commands will be logged instead of executed.", + action="store_true", + default=False, + ) + parser.add_argument( + "--remove-count", + help="Number of snapshots from the end of the chain to remove.", + type=int, + required=True, + ) + + args = parser.parse_args() + + log_handler = configure_logging(LoggingConfig(log_name="kvm_snapshot_manager")) + base_image_path = Path(args.base_image).expanduser().resolve() + lock_path = lock_path_for_base_image(base_image_path) + + with LockManager( + lock_file=lock_path, + logger=log_handler, + script_name="kvm_snapshot_manager", + ): + manager = KVMSnapshotManager( + log_handler=log_handler, + domain=args.domain, + base_image=str(base_image_path), + ) + + manager.check_domain_off() + + head_snapshot_path = manager.extract_head_snapshot_path() + snapshot_chain = manager.traverse_snapshot_chain(head_snapshot_path) + # Validate that at least (remove_count + 5) snapshots exist + if len(snapshot_chain) < (args.remove_count + 5): + error_msg = ( + f"Cannot remove {args.remove_count} snapshots; " + "at least 5 must remain in the chain." + ) + raise ValueError(error_msg) + + # Remove the specified number of snapshots + for _ in range(args.remove_count): + manager.commit_and_rebase_for_last_image( + snapshot_chain, + dryrun=args.dry_run, + ) + + +if __name__ == "__main__": + main() From 89f7579b492245c54d0412d0b91d006f8e37b783 Mon Sep 17 00:00:00 2001 From: Marc Schuh Date: Sat, 17 Jan 2026 12:49:01 +0100 Subject: [PATCH 2/2] Improving KVM Snapshot Manager and adding tests --- .github/workflows/release.yml | 5 +- pyproject.toml | 2 +- scripts/build_kvm_snapshot_manager.sh | 55 ++++ .../kvm_snapshot_manager.py | 33 +- tests/kvm_snapshot_manager/__init__.py | 1 + .../test_kvm_snapshot_manager.py | 309 ++++++++++++++++++ 6 files changed, 393 insertions(+), 12 deletions(-) create mode 100644 scripts/build_kvm_snapshot_manager.sh create mode 100644 tests/kvm_snapshot_manager/__init__.py create mode 100644 tests/kvm_snapshot_manager/test_kvm_snapshot_manager.py diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c9a3859..33debd5 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -68,12 +68,13 @@ jobs: ./scripts/build_restic_backup.sh ./scripts/build_rsync_manager.sh ./scripts/build_health_monitor.sh + ./scripts/build_kvm_snapshot_manager.sh - name: Verify all builds run: | echo "Checking built executables..." ls -lh dist/ - for exe in notifier check_mails db_backup encrypted_mail restic_backup rsync_manager health_monitor; do + for exe in notifier check_mails db_backup encrypted_mail restic_backup rsync_manager health_monitor kvm_snapshot_manager; do if [ ! -f "dist/$exe" ]; then echo "❌ Missing executable: dist/$exe" exit 1 @@ -118,6 +119,7 @@ jobs: - `restic_backup` - Restic backup utility - `rsync_manager` - Rsync management utility - `health_monitor` - Health monitoring utility + - `kvm_snapshot_manager` - KVM snapshot management utility ### Installation Download the executable you need and make it executable: @@ -132,5 +134,6 @@ jobs: dist/restic_backup dist/rsync_manager dist/health_monitor + dist/kvm_snapshot_manager draft: false prerelease: false diff --git a/pyproject.toml b/pyproject.toml index cfb7edb..b38a3dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "opsbox" -version = "0.2.1" +version = "0.3.1" description = "A comprehensive Python library for server operations including backup scripts, encrypted mail functionality, and utility tools" readme = "README.md" license = {text = "MIT"} diff --git a/scripts/build_kvm_snapshot_manager.sh b/scripts/build_kvm_snapshot_manager.sh new file mode 100644 index 0000000..0338681 --- /dev/null +++ b/scripts/build_kvm_snapshot_manager.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +# Build script for kvm_snapshot_manager executable using PyInstaller + +set -e + +echo "Building kvm_snapshot_manager executable..." + +# Check if we're in the right directory +if [ ! -f "pyproject.toml" ]; then + echo "Error: Please run this script from the project root directory" + exit 1 +fi + +# Clean previous builds +echo "Cleaning previous builds..." +rm -rf build/ dist/kvm_snapshot_manager __pycache__/ *.spec + +# Build the executable +echo "Building executable with PyInstaller..." +uv run pyinstaller --onefile \ + --name kvm_snapshot_manager \ + --hidden-import opsbox.logging \ + --hidden-import opsbox.logging.logger_setup \ + --hidden-import opsbox.logging.LoggingConfig \ + --hidden-import opsbox.exceptions \ + --hidden-import opsbox.encrypted_mail \ + --hidden-import opsbox.locking \ + --hidden-import opsbox.locking.lock_manager \ + --hidden-import envelope \ + --exclude-module opsbox.backup \ + --exclude-module opsbox.db_snapshot \ + --exclude-module opsbox.rsync \ + --exclude-module opsbox.utils \ + --strip \ + --upx-dir=/usr/bin \ + src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py + +# Check if build was successful +if [ -f "dist/kvm_snapshot_manager" ]; then + echo "✅ Build successful!" + echo "📦 Executable created: dist/kvm_snapshot_manager" + echo "📏 File size: $(du -h dist/kvm_snapshot_manager | cut -f1)" + + # Make executable + chmod +x dist/kvm_snapshot_manager + + echo "" + echo "🚀 You can now run: ./dist/kvm_snapshot_manager --help" + echo "📋 Example usage: ./dist/kvm_snapshot_manager --domain --base-image /path/to/base.qcow2 --remove-count 1" + rm -rf build/ *.spec +else + echo "❌ Build failed!" + exit 1 +fi diff --git a/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py b/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py index fe9add7..43f46e9 100644 --- a/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py +++ b/src/opsbox/KVMSnapshotManager/kvm_snapshot_manager.py @@ -33,6 +33,17 @@ class LockConfig(Enum): DIR_NAME = "opsbox-kvm-locks" +class CliConfig(Enum): + """Configuration values for command-line validation.""" + + MIN_REMAINING = ("min_remaining", 5) + + @property + def number(self) -> int: + """Return the numeric configuration value.""" + return self.value[1] + + def lock_path_for_base_image(base_image_path: Path) -> Path: """Build a lock file path for the given base image.""" resolved_path = base_image_path.expanduser().resolve() @@ -153,14 +164,13 @@ def _run_with_sudo(self, command: list[str]) -> None: """Run a command with sudo using a prompted password.""" if self.sudo_password is None: self.sudo_password = getpass.getpass(prompt="Enter sudo password: ") - sudo_password = self.sudo_password - if sudo_password is None: + if self.sudo_password is None: error_msg = "Failed to read sudo password." raise RuntimeError(error_msg) subprocess.run( # noqa: S603 ["sudo", "-S", *command], # noqa: S607 - input=sudo_password + "\n", # Provide password via stdin + input=self.sudo_password + "\n", # Provide password via stdin text=True, check=True, ) @@ -360,13 +370,16 @@ def main() -> None: default=False, ) parser.add_argument( - "--remove-count", - help="Number of snapshots from the end of the chain to remove.", + "--remaining", + help="Number of snapshots to remain after removal (min 5).", type=int, required=True, ) args = parser.parse_args() + if args.remaining < CliConfig.MIN_REMAINING.number: + error_msg = f"--remaining must be at least {CliConfig.MIN_REMAINING.number}." + raise ValueError(error_msg) log_handler = configure_logging(LoggingConfig(log_name="kvm_snapshot_manager")) base_image_path = Path(args.base_image).expanduser().resolve() @@ -387,16 +400,16 @@ def main() -> None: head_snapshot_path = manager.extract_head_snapshot_path() snapshot_chain = manager.traverse_snapshot_chain(head_snapshot_path) - # Validate that at least (remove_count + 5) snapshots exist - if len(snapshot_chain) < (args.remove_count + 5): + remove_count = len(snapshot_chain) - args.remaining + if remove_count < 0: error_msg = ( - f"Cannot remove {args.remove_count} snapshots; " - "at least 5 must remain in the chain." + f"Cannot keep {args.remaining} snapshots; " + f"only {len(snapshot_chain)} available." ) raise ValueError(error_msg) # Remove the specified number of snapshots - for _ in range(args.remove_count): + for _ in range(remove_count): manager.commit_and_rebase_for_last_image( snapshot_chain, dryrun=args.dry_run, diff --git a/tests/kvm_snapshot_manager/__init__.py b/tests/kvm_snapshot_manager/__init__.py new file mode 100644 index 0000000..5fd4e0f --- /dev/null +++ b/tests/kvm_snapshot_manager/__init__.py @@ -0,0 +1 @@ +"""Tests for the KVM snapshot manager.""" diff --git a/tests/kvm_snapshot_manager/test_kvm_snapshot_manager.py b/tests/kvm_snapshot_manager/test_kvm_snapshot_manager.py new file mode 100644 index 0000000..d314b61 --- /dev/null +++ b/tests/kvm_snapshot_manager/test_kvm_snapshot_manager.py @@ -0,0 +1,309 @@ +"""Business logic tests for the KVM snapshot manager.""" + +from __future__ import annotations + +import contextlib +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Iterable + from pathlib import Path + from typing import Self + +import pytest + +from opsbox.KVMSnapshotManager.kvm_snapshot_manager import ( + BackingFileNotFoundError, + DomblklistTooManyLinesError, + HeadSnapshotParseError, + KVMSnapshotManager, + KVMStillRunningError, + NoHeadSnapshotFoundError, + SnapshotNotFoundError, + lock_path_for_base_image, + main, +) + + +class StubCommandManager(KVMSnapshotManager): + """Test helper to provide fixed command output.""" + + def __init__( + self, + log_handler: logging.Logger, + domain: str, + base_image: str, + command_output: list[str], + ) -> None: + """Initialize with fixed command output.""" + self._command_output = command_output + super().__init__(log_handler=log_handler, domain=domain, base_image=base_image) + + def _run_command(self, command: list[str]) -> list[str]: + return self._command_output + + +class StubDomblklistManager(KVMSnapshotManager): + """Test helper to provide domblklist output.""" + + def __init__( + self, + log_handler: logging.Logger, + domain: str, + base_image: str, + domblklist_output: list[str], + ) -> None: + """Initialize with fixed domblklist output.""" + self._domblklist_output = domblklist_output + super().__init__(log_handler=log_handler, domain=domain, base_image=base_image) + + def _run_domblklist(self) -> list[str]: + return self._domblklist_output + + +class StubBackingFileManager(KVMSnapshotManager): + """Test helper to provide backing file mappings.""" + + def __init__( + self, + log_handler: logging.Logger, + domain: str, + base_image: str, + backing_map: dict[Path, Path], + ) -> None: + """Initialize with backing file mappings.""" + self._backing_map = backing_map + super().__init__(log_handler=log_handler, domain=domain, base_image=base_image) + + def _extract_backing_file(self, path_to_snapshot: Path) -> Path: + if path_to_snapshot == self.base_image_path: + raise BackingFileNotFoundError(path_to_snapshot) + if path_to_snapshot in self._backing_map: + return self._backing_map[path_to_snapshot] + raise BackingFileNotFoundError(path_to_snapshot) + + +class DummyLockManager: + """No-op lock manager for main() tests.""" + + def __init__(self, *_: object, **__: object) -> None: + """Initialize a no-op context manager.""" + self._context = contextlib.nullcontext() + + def __enter__(self) -> Self: + """Enter the lock manager context.""" + self._context.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + """Exit the lock manager context.""" + self._context.__exit__(exc_type, exc_val, exc_tb) + + +def _make_base_image(tmp_path: Path) -> Path: + base_image = tmp_path / "base.qcow2" + base_image.touch() + return base_image + + +def _domblklist_lines(data_lines: Iterable[str]) -> list[str]: + return ["Target Source", "------ ------", *data_lines] + + +def test_lock_path_is_stable_for_same_base_image(tmp_path: Path) -> None: + """Test lock path is stable and scoped to temp lock directory.""" + base_image = tmp_path / "base.qcow2" + first = lock_path_for_base_image(base_image) + second = lock_path_for_base_image(base_image) + + assert first == second + assert first.parent.name == "opsbox-kvm-locks" + assert first.name.startswith("opsbox-kvm-base.qcow2-") + + +def test_check_domain_off_raises_when_running(tmp_path: Path) -> None: + """Test running domains raise KVMStillRunningError.""" + base_image = _make_base_image(tmp_path) + manager = StubCommandManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + command_output=["running"], + ) + + with pytest.raises(KVMStillRunningError): + manager.check_domain_off() + + +def test_check_domain_off_allows_shut_off(tmp_path: Path) -> None: + """Test shut off domains pass validation.""" + base_image = _make_base_image(tmp_path) + manager = StubCommandManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + command_output=["shut off"], + ) + + manager.check_domain_off() + + +def test_extract_head_snapshot_path_returns_path(tmp_path: Path) -> None: + """Test head snapshot path is extracted from domblklist output.""" + base_image = _make_base_image(tmp_path) + head_snapshot = tmp_path / "head.qcow2" + manager = StubDomblklistManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + domblklist_output=_domblklist_lines([f"vda {head_snapshot}"]), + ) + + assert manager.extract_head_snapshot_path() == head_snapshot + + +def test_extract_head_snapshot_path_raises_on_too_many_lines(tmp_path: Path) -> None: + """Test too many domblklist data lines raise an error.""" + base_image = _make_base_image(tmp_path) + manager = StubDomblklistManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + domblklist_output=_domblklist_lines(["vda a", "vdb b", "vdc c"]), + ) + + with pytest.raises(DomblklistTooManyLinesError): + manager.extract_head_snapshot_path() + + +def test_extract_head_snapshot_path_raises_when_no_data(tmp_path: Path) -> None: + """Test empty domblklist data raises an error.""" + base_image = _make_base_image(tmp_path) + manager = StubDomblklistManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + domblklist_output=_domblklist_lines([]), + ) + + with pytest.raises(NoHeadSnapshotFoundError): + manager.extract_head_snapshot_path() + + +def test_extract_head_snapshot_path_raises_when_unparseable(tmp_path: Path) -> None: + """Test unparseable domblklist data raises an error.""" + base_image = _make_base_image(tmp_path) + manager = StubDomblklistManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + domblklist_output=_domblklist_lines(["vda"]), + ) + + with pytest.raises(HeadSnapshotParseError): + manager.extract_head_snapshot_path() + + +def test_traverse_snapshot_chain_returns_paths_until_base(tmp_path: Path) -> None: + """Test traversal returns snapshots until base image is reached.""" + base_image = _make_base_image(tmp_path) + snapshot_a = tmp_path / "snap_a.qcow2" + snapshot_b = tmp_path / "snap_b.qcow2" + snapshot_a.touch() + snapshot_b.touch() + backing_map = {snapshot_a: snapshot_b, snapshot_b: base_image} + + manager = StubBackingFileManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + backing_map=backing_map, + ) + + assert manager.traverse_snapshot_chain(snapshot_a) == [snapshot_a, snapshot_b] + + +def test_traverse_snapshot_chain_raises_when_head_missing(tmp_path: Path) -> None: + """Test missing head snapshot raises SnapshotNotFoundError.""" + base_image = _make_base_image(tmp_path) + missing_head = tmp_path / "missing.qcow2" + manager = StubBackingFileManager( + log_handler=logging.getLogger("test"), + domain="test-domain", + base_image=str(base_image), + backing_map={}, + ) + + with pytest.raises(SnapshotNotFoundError): + manager.traverse_snapshot_chain(missing_head) + + +def test_main_rejects_remaining_below_minimum(monkeypatch: pytest.MonkeyPatch) -> None: + """Test CLI rejects remaining values below minimum.""" + monkeypatch.setattr( + "sys.argv", + [ + "kvm_snapshot_manager", + "--domain", + "test-domain", + "--base-image", + "/tmp/base.qcow2", + "--remaining", + "4", + ], + ) + + with pytest.raises(ValueError, match=r"--remaining must be at least 5\."): + main() + + +def test_main_rejects_remaining_greater_than_available( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """Test CLI rejects remaining value above available snapshots.""" + base_image = _make_base_image(tmp_path) + available_snapshot = tmp_path / "snapshot.qcow2" + available_snapshot.touch() + + class StubManager: + def __init__(self, *_: object, **__: object) -> None: + pass + + def check_domain_off(self) -> None: + return None + + def extract_head_snapshot_path(self) -> Path: + return available_snapshot + + def traverse_snapshot_chain(self, head_snapshot: Path) -> list[Path]: + return [available_snapshot] + + monkeypatch.setattr( + "sys.argv", + [ + "kvm_snapshot_manager", + "--domain", + "test-domain", + "--base-image", + str(base_image), + "--remaining", + "6", + ], + ) + monkeypatch.setattr( + "opsbox.KVMSnapshotManager.kvm_snapshot_manager.KVMSnapshotManager", + StubManager, + ) + monkeypatch.setattr( + "opsbox.KVMSnapshotManager.kvm_snapshot_manager.LockManager", + DummyLockManager, + ) + monkeypatch.setattr( + "opsbox.KVMSnapshotManager.kvm_snapshot_manager.configure_logging", + lambda *_args, **_kwargs: logging.getLogger("test"), + ) + + with pytest.raises(ValueError, match=r"only 1 available\."): + main()