Skip to content
5 changes: 3 additions & 2 deletions xtest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"fixtures.obligations",
"fixtures.keys",
"fixtures.audit",
"fixtures.encryption",
]


Expand Down Expand Up @@ -203,7 +204,7 @@ def defaulted_list_opt[T](


# Core fixtures
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def pt_file(tmp_dir: Path, size: str) -> Path:
"""Generate a plaintext test file.

Expand All @@ -222,7 +223,7 @@ def pt_file(tmp_dir: Path, size: str) -> Path:
return pt_file


@pytest.fixture(scope="package")
@pytest.fixture(scope="session")
def tmp_dir(request: pytest.FixtureRequest) -> Path:
"""Create worker-specific temporary directory for test files.

Expand Down
10 changes: 5 additions & 5 deletions xtest/fixtures/assertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import assertions


@pytest.fixture(scope="package")
@pytest.fixture(scope="session")
def hs256_key() -> str:
"""Generate a random HS256 (HMAC-SHA256) signing key."""
return base64.b64encode(secrets.token_bytes(32)).decode("ascii")


@pytest.fixture(scope="package")
@pytest.fixture(scope="session")
def rs256_keys() -> tuple[str, str]:
"""Generate an RS256 (RSA-SHA256) key pair.

Expand Down Expand Up @@ -88,7 +88,7 @@ def write_assertion_verification_keys_to_file(
return as_file


@pytest.fixture(scope="package")
@pytest.fixture(scope="session")
def assertion_file_no_keys(tmp_dir: Path) -> Path:
"""Assertion file with a single handling assertion (no signing key)."""
assertion_list = [
Expand All @@ -109,7 +109,7 @@ def assertion_file_no_keys(tmp_dir: Path) -> Path:
)


@pytest.fixture(scope="package")
@pytest.fixture(scope="session")
def assertion_file_rs_and_hs_keys(
tmp_dir: Path, hs256_key: str, rs256_keys: tuple[str, str]
) -> Path:
Expand Down Expand Up @@ -152,7 +152,7 @@ def assertion_file_rs_and_hs_keys(
)


@pytest.fixture(scope="package")
@pytest.fixture(scope="session")
def assertion_verification_file_rs_and_hs_keys(
tmp_dir: Path, hs256_key: str, rs256_keys: tuple[str, str]
) -> Path:
Expand Down
97 changes: 97 additions & 0 deletions xtest/fixtures/encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Session-scoped factory fixture for memoized TDF encryption.

The cache key is derived from encryption input parameters; on-disk filenames also embed
the requesting test's name and a short hash for debuggability.
"""

import hashlib
from pathlib import Path

import pytest

import tdfs


class EncryptFactory:
"""Memoized TDF encryption factory bound to the current test.

Call to encrypt (results are cached by input parameters). Use rt_file() to
generate a test-unique decrypted output path derived from the ciphertext.
"""

def __init__(
self,
label: str,
pt_file: Path,
tmp_dir: Path,
cache: dict[tuple, Path],
) -> None:
self._label = label
self._pt_file = pt_file
self._tmp_dir = tmp_dir
self._cache = cache

def __call__(
self,
encrypt_sdk: tdfs.SDK,
*,
container: tdfs.container_type = "ztdf",
attr_values: list[str] | None = None,
target_mode: tdfs.container_version | None = None,
az: str = "",
mime_type: str = "text/plain",
Comment thread
dmihalcik-virtru marked this conversation as resolved.
) -> Path:
attr_key = tuple(attr_values) if attr_values is not None else None
key = (str(encrypt_sdk), container, target_mode, attr_key, az, mime_type)
cached = self._cache.get(key)
if cached is not None:
return cached
digest = hashlib.sha1(repr(key).encode()).hexdigest()[:8]
ct_file = (
self._tmp_dir / f"ct-{self._label}-{encrypt_sdk}-{container}-{digest}.tdf"
)
encrypt_sdk.encrypt(
self._pt_file,
ct_file,
mime_type=mime_type,
container=container,
attr_values=attr_values,
assert_value=az,
target_mode=target_mode,
)
assert ct_file.is_file()
self._cache[key] = ct_file
return ct_file

def rt_file(self, ct_file: Path, decrypt_sdk: tdfs.SDK, variant: str = "") -> Path:
"""Return a test-unique path for the decrypted output.

Embeds the current test label so tests that share a cached ciphertext
don't collide on their output files.
"""
variant_part = f"-{variant}" if variant else ""
return ct_file.with_name(
f"{ct_file.stem}-{decrypt_sdk}-{self._label}{variant_part}.untdf"
)


@pytest.fixture(scope="session")
def _encryption_cache() -> dict[tuple, Path]:
"""Session-wide cache mapping input-tuple keys to encrypted Paths."""
return {}


@pytest.fixture
def encrypted_tdf(
request: pytest.FixtureRequest,
pt_file: Path,
tmp_dir: Path,
_encryption_cache: dict[tuple, Path],
) -> EncryptFactory:
"""Return a memoized encrypt-to-TDF factory for the current test.

Two callers with identical inputs share a ciphertext; differing inputs
produce distinct ciphertexts. Use rt_file() to get a test-unique output path.
"""
label = request.node.originalname or request.node.name
return EncryptFactory(label, pt_file, tmp_dir, _encryption_cache)
Loading
Loading