From 046d2ba66fd61821957c4b03aed511d21fcf7455 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Wed, 29 Apr 2026 15:55:33 -0400 Subject: [PATCH 01/10] chore(xtest): replace cipherTexts caches with session-scoped fixture Four test modules each maintained a module-level cipherTexts dict and inline if-cache-hit-else-encrypt blocks at every call site (40+ blocks plus do_encrypt_with). Replace with a single auto-keyed factory fixture so pytest's own scoping owns the cache lifetime and callers don't pick scenario strings. Also fixes a latent ordering bug where manifest-shape assertions in do_encrypt_with only fired on cache miss. Co-Authored-By: Claude Opus 4.7 (1M context) --- xtest/conftest.py | 5 +- xtest/fixtures/encryption.py | 70 +++++++ xtest/test_abac.py | 360 ++++++++++++----------------------- xtest/test_policytypes.py | 79 +++----- xtest/test_pqc.py | 43 ++--- xtest/test_tdfs.py | 194 +++++-------------- 6 files changed, 281 insertions(+), 470 deletions(-) create mode 100644 xtest/fixtures/encryption.py diff --git a/xtest/conftest.py b/xtest/conftest.py index 36f7ac0db..ce6a71317 100644 --- a/xtest/conftest.py +++ b/xtest/conftest.py @@ -35,6 +35,7 @@ "fixtures.obligations", "fixtures.keys", "fixtures.audit", + "fixtures.encryption", ] @@ -203,7 +204,7 @@ def defaulted_list_opt[T]( # Core fixtures -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def pt_file(tmp_dir: Path, size: str) -> Path: """Generate a plaintext test file. @@ -222,7 +223,7 @@ def pt_file(tmp_dir: Path, size: str) -> Path: return pt_file -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def tmp_dir(request: pytest.FixtureRequest) -> Path: """Create worker-specific temporary directory for test files. diff --git a/xtest/fixtures/encryption.py b/xtest/fixtures/encryption.py new file mode 100644 index 000000000..b5e70351e --- /dev/null +++ b/xtest/fixtures/encryption.py @@ -0,0 +1,70 @@ +"""Session-scoped factory fixture for memoized TDF encryption. + +Replaces the per-module ``cipherTexts: dict[str, Path]`` caches that several +test modules used to maintain by hand. The cache key is derived automatically +from the inputs that affect the encrypted output, so callers do not pick a +scenario string. +""" + +import hashlib +from collections.abc import Callable +from pathlib import Path + +import pytest + +import tdfs + +EncryptFactory = Callable[..., Path] + + +@pytest.fixture(scope="session") +def _encryption_cache() -> dict[tuple, Path]: + """Session-wide cache mapping input-tuple keys to encrypted Paths.""" + return {} + + +@pytest.fixture +def encrypted_tdf( + request: pytest.FixtureRequest, + pt_file: Path, + tmp_dir: Path, + _encryption_cache: dict[tuple, Path], +) -> EncryptFactory: + """Return a memoized encrypt-to-TDF function. + + Two callers that pass identical inputs share a ciphertext; differing + inputs produce distinct ciphertexts. The on-disk filename embeds the + requesting test's name plus a short hash for debuggability. + """ + label = request.node.originalname or request.node.name + + def _factory( + encrypt_sdk: tdfs.SDK, + *, + container: tdfs.container_type = "ztdf", + attr_values: list[str] | None = None, + target_mode: tdfs.container_version | None = None, + az: str = "", + mime_type: str = "text/plain", + ) -> Path: + attr_key = tuple(attr_values) if attr_values is not None else None + key = (str(encrypt_sdk), container, target_mode, attr_key, az, mime_type) + cached = _encryption_cache.get(key) + if cached is not None: + return cached + digest = hashlib.sha1(repr(key).encode()).hexdigest()[:8] + ct_file = tmp_dir / f"ct-{label}-{encrypt_sdk}-{container}-{digest}.tdf" + encrypt_sdk.encrypt( + pt_file, + ct_file, + mime_type=mime_type, + container=container, + attr_values=attr_values, + assert_value=az, + target_mode=target_mode, + ) + assert ct_file.is_file() + _encryption_cache[key] = ct_file + return ct_file + + return _factory diff --git a/xtest/test_abac.py b/xtest/test_abac.py index f32e7b35a..c41741af6 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -8,9 +8,9 @@ import tdfs from abac import Attribute, ObligationValue from audit_logs import AuditLogAsserter +from fixtures.encryption import EncryptFactory from test_policytypes import skip_rts_as_needed -cipherTexts: dict[str, Path] = {} rewrap_403_pattern = ( "tdf: rewrap request 403|403 for \\[https?://[^\\]]+\\]; rewrap permission denied" ) @@ -74,9 +74,8 @@ def test_key_mapping_multiple_mechanisms( pt_file: Path, kas_url_default: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): - global counter - tdfs.skip_if_unsupported(encrypt_sdk, "key_management") skip_dspx2457(encrypt_sdk) skip_dspx1153(encrypt_sdk, decrypt_sdk) @@ -87,26 +86,17 @@ def test_key_mapping_multiple_mechanisms( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"multimechanism-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - # Currently, we only support rsa:2048 and ec:secp256r1 - vals = [ - v - for v in attribute_with_different_kids.value_fqns - if v.endswith("/e1") or v.endswith("/r1") - ] - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=vals, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + # Currently, we only support rsa:2048 and ec:secp256r1 + vals = [ + v + for v in attribute_with_different_kids.value_fqns + if v.endswith("/e1") or v.endswith("/r1") + ] + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert {kao.kid for kao in manifest.encryptionInformation.keyAccess} == {"r1", "e1"} assert manifest.encryptionInformation.keyAccess[0].url == kas_url_default @@ -126,6 +116,7 @@ def test_key_mapping_extended_mechanisms( kas_url_km1: str, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Test encryption and decryption with extended cryptographic mechanisms. @@ -151,20 +142,11 @@ def test_key_mapping_extended_mechanisms( attr, key_ids = attribute_allof_with_extended_mechanisms - sample_name = f"extended-mechanisms-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 5 @@ -202,6 +184,7 @@ def test_key_mapping_extended_ec_mechanisms( pt_file: Path, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Test encryption and decryption with extended cryptographic mechanisms. @@ -227,20 +210,11 @@ def test_key_mapping_extended_ec_mechanisms( ec_vals = [v for v in attr.value_fqns if "ec-secp3" in v] assert len(ec_kids) == len(ec_vals), "Mismatch in EC key IDs and attribute values" - sample_name = f"extended-mechanisms-ec-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=ec_vals, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=ec_vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == len(ec_kids) @@ -272,6 +246,7 @@ def test_key_mapping_extended_rsa_mechanisms( pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Test encryption and decryption with extended cryptographic mechanisms. @@ -296,20 +271,11 @@ def test_key_mapping_extended_rsa_mechanisms( "Mismatch in RSA key IDs and attribute values" ) - sample_name = f"extended-mechanisms-rsa-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=rsa_vals, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=rsa_vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == len(rsa_kids) @@ -342,9 +308,8 @@ def test_autoconfigure_one_attribute_standard( kas_url_alpha: str, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, + encrypted_tdf: EncryptFactory, ): - global counter - skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -353,20 +318,11 @@ def test_autoconfigure_one_attribute_standard( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-one-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attribute_single_kas_grant.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attribute_single_kas_grant.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 assert manifest.encryptionInformation.keyAccess[0].url == kas_url_alpha @@ -400,6 +356,7 @@ def test_autoconfigure_two_kas_or_standard( kas_url_beta: str, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -409,23 +366,14 @@ def test_autoconfigure_two_kas_or_standard( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-two-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attribute_two_kas_grant_or.value_fqns[0], - attribute_two_kas_grant_or.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attribute_two_kas_grant_or.value_fqns[0], + attribute_two_kas_grant_or.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 assert ( @@ -462,6 +410,7 @@ def test_autoconfigure_double_kas_and( kas_url_beta: str, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -471,23 +420,14 @@ def test_autoconfigure_double_kas_and( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-three-and-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attribute_two_kas_grant_and.value_fqns[0], - attribute_two_kas_grant_and.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attribute_two_kas_grant_and.value_fqns[0], + attribute_two_kas_grant_and.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -523,6 +463,7 @@ def test_autoconfigure_one_attribute_attr_grant( pt_file: Path, kas_url_gamma: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -532,22 +473,13 @@ def test_autoconfigure_one_attribute_attr_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-one-attr-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - one_attribute_attr_kas_grant.value_fqns[0], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + one_attribute_attr_kas_grant.value_fqns[0], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -570,6 +502,7 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( kas_url_gamma: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -579,23 +512,14 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-attr-val-or-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attr_and_value_kas_grants_or.value_fqns[0], - attr_and_value_kas_grants_or.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attr_and_value_kas_grants_or.value_fqns[0], + attr_and_value_kas_grants_or.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -624,6 +548,7 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( kas_url_gamma: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -633,23 +558,14 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-attr-val-and-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attr_and_value_kas_grants_and.value_fqns[0], - attr_and_value_kas_grants_and.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attr_and_value_kas_grants_and.value_fqns[0], + attr_and_value_kas_grants_and.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -677,6 +593,7 @@ def test_autoconfigure_one_attribute_ns_grant( pt_file: Path, kas_url_delta: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -686,22 +603,13 @@ def test_autoconfigure_one_attribute_ns_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-one-ns-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - one_attribute_ns_kas_grant.value_fqns[0], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + one_attribute_ns_kas_grant.value_fqns[0], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -724,6 +632,7 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( kas_url_delta: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -733,23 +642,14 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-ns-val-or-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - ns_and_value_kas_grants_or.value_fqns[0], - ns_and_value_kas_grants_or.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + ns_and_value_kas_grants_or.value_fqns[0], + ns_and_value_kas_grants_or.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -778,6 +678,7 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( kas_url_delta: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -787,23 +688,14 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-ns-val-and-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - ns_and_value_kas_grants_and.value_fqns[0], - ns_and_value_kas_grants_and.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + ns_and_value_kas_grants_and.value_fqns[0], + ns_and_value_kas_grants_and.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -1029,6 +921,7 @@ def test_autoconfigure_key_management_two_kas_two_keys( kas_url_km1: str, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypts with an ALL_OF attribute that has two managed keys and decrypts successfully. @@ -1044,20 +937,11 @@ def test_autoconfigure_key_management_two_kas_two_keys( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"km-allof-two-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attribute_allof_with_two_managed_keys[0].value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attribute_allof_with_two_managed_keys[0].value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -1087,6 +971,7 @@ def test_encrypt_with_missing_value_uses_definition_key( pt_file: Path, kas_url_gamma: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypts with a missing value FQN and verifies definition-level key mapping.""" if not in_focus & {encrypt_sdk}: @@ -1097,19 +982,10 @@ def test_encrypt_with_missing_value_uses_definition_key( missing_value_fqn, key_id = attribute_missing_value_key_mapping - sample_name = f"missing-value-def-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[missing_value_fqn], - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[missing_value_fqn], + ) manifest = tdfs.manifest(ct_file) policy = manifest.encryptionInformation.policy_object diff --git a/xtest/test_policytypes.py b/xtest/test_policytypes.py index a7bf62c82..ecdf58e5c 100644 --- a/xtest/test_policytypes.py +++ b/xtest/test_policytypes.py @@ -7,8 +7,7 @@ import tdfs from abac import Attribute - -cipherTexts: dict[str, Path] = {} +from fixtures.encryption import EncryptFactory def skip_rts_as_needed( @@ -45,6 +44,7 @@ def test_or_attributes_success( pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_rts_as_needed(encrypt_sdk, decrypt_sdk, container, in_focus) @@ -63,24 +63,15 @@ def test_or_attributes_success( assert len(fqns) == len(vals_to_use) short_names = [v.value for v in vals_to_use] assert len(short_names) == len(vals_to_use) - sample_name = f"pt-or-{'-'.join(short_names)}-{encrypt_sdk}.{container}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}" - # Currently, we only support rsa:2048 and ec:secp256r1 - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - assert_expected_attrs(container, None, ct_file, fqns) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + container=container, + attr_values=fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"{sample_name}.returned" + rt_file = tmp_dir / f"pt-or-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -125,6 +116,7 @@ def test_and_attributes_success( pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """ Test AND attribute policy type. @@ -149,23 +141,15 @@ def test_and_attributes_success( assert len(fqns) == len(vals_to_use) short_names = [v.value for v in vals_to_use] assert len(short_names) == len(vals_to_use) - sample_name = f"pt-and-{'-'.join(short_names)}-{encrypt_sdk}.{container}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - assert_expected_attrs(container, None, ct_file, fqns) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + container=container, + attr_values=fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"{sample_name}.returned" + rt_file = tmp_dir / f"pt-and-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -179,6 +163,7 @@ def test_hierarchy_attributes_success( pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """ Test HIERARCHY attribute policy type. @@ -206,23 +191,15 @@ def test_hierarchy_attributes_success( assert len(fqns) == len(vals_to_use) short_names = [v.value for v in vals_to_use] assert len(short_names) == len(vals_to_use) - sample_name = f"pt-hierarchy-{'-'.join(short_names)}-{encrypt_sdk}.{container}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - assert_expected_attrs(container, None, ct_file, fqns) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + container=container, + attr_values=fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"{sample_name}.returned" + rt_file = tmp_dir / f"pt-hierarchy-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) diff --git a/xtest/test_pqc.py b/xtest/test_pqc.py index dc8536e1e..42a4000c0 100644 --- a/xtest/test_pqc.py +++ b/xtest/test_pqc.py @@ -12,10 +12,9 @@ import tdfs from abac import Attribute, KasKey +from fixtures.encryption import EncryptFactory from tdfs import KeyAccessObject -cipherTexts: dict[str, Path] = {} - # X-Wing KEM sizes per draft-connolly-cfrg-xwing-kem-10 XWING_ENCAPSULATION_KEY_SIZE = 1216 # public key, bytes XWING_CIPHERTEXT_SIZE = 1120 # KEM ciphertext (wrappedKey), bytes @@ -65,6 +64,7 @@ def test_xwing_roundtrip( pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with an X-Wing managed key.""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -79,20 +79,11 @@ def test_xwing_roundtrip( attr, key_ids = attribute_with_xwing_key - sample_name = f"xwing-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -126,6 +117,7 @@ def test_xwing_with_ec_roundtrip( kas_url_km1: str, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with both X-Wing and EC keys (multi-mechanism).""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -140,20 +132,11 @@ def test_xwing_with_ec_roundtrip( attr, key_ids = attribute_with_xwing_and_ec_keys - sample_name = f"xwing-ec-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index bfa896f1f..a264f60e5 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -11,80 +11,7 @@ import tdfs from abac import Attribute from audit_logs import AuditLogAsserter - -cipherTexts: dict[str, Path] = {} -counter = 0 - -#### HELPERS - - -def do_encrypt_with( - pt_file: Path, - encrypt_sdk: tdfs.SDK, - container: tdfs.container_type, - tmp_dir: Path, - az: str = "", - scenario: str = "", - target_mode: tdfs.container_version | None = None, - attr_values: list[str] | None = None, -) -> Path: - """ - Encrypt a file with the given SDK and container type, and return the path to the ciphertext file. - - Scenario is used to create a unique filename for the ciphertext file. - - If targetmode is set, asserts that the manifest is in the correct format for that target. - - If attr_values is provided, uses those attribute FQNs to ensure deterministic key selection. - This prevents test flakiness when base_key is configured on the platform. - """ - global counter - counter = (counter or 0) + 1 - c = counter - container_id = f"{encrypt_sdk}-{container}" - if scenario != "": - container_id += f"-{scenario}" - if container_id in cipherTexts: - return cipherTexts[container_id] - ct_file = tmp_dir / f"test-{encrypt_sdk}-{scenario}{c}.{container}" - - use_ecwrap = container == "ztdf-ecwrap" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=attr_values, - assert_value=az, - target_mode=target_mode, - ) - - assert ct_file.is_file() - - if tdfs.simple_container(container) == "ztdf": - manifest = tdfs.manifest(ct_file) - assert manifest.payload.isEncrypted - assert len(manifest.encryptionInformation.keyAccess) == 1 - kao = manifest.encryptionInformation.keyAccess[0] - if use_ecwrap: - assert kao.type == "ec-wrapped" - assert kao.ephemeralPublicKey is not None - else: - assert kao.type == "wrapped" - assert kao.ephemeralPublicKey is None - if target_mode == "4.2.2": - looks_like_422(manifest) - elif target_mode == "4.3.0": - looks_like_430(manifest) - elif not encrypt_sdk.supports("hexless"): - looks_like_422(manifest) - else: - looks_like_430(manifest) - else: - assert False, f"Unknown container type: {container}" - cipherTexts[container_id] = ct_file - return ct_file - +from fixtures.encryption import EncryptFactory dspx1153Fails = [] @@ -107,6 +34,7 @@ def test_tdf_roundtrip( in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if container == "ztdf" and decrypt_sdk in dspx1153Fails: pytest.skip(f"DSPX-1153 SDK [{decrypt_sdk}] has a bug with payload tampering") @@ -133,15 +61,28 @@ def test_tdf_roundtrip( attr_values = ( None if container == "ztdf-ecwrap" else attribute_default_rsa.value_fqns ) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - container, - tmp_dir, + container=container, target_mode=target_mode, attr_values=attr_values, ) + manifest = tdfs.manifest(ct_file) + assert manifest.payload.isEncrypted + assert len(manifest.encryptionInformation.keyAccess) == 1 + kao = manifest.encryptionInformation.keyAccess[0] + if container == "ztdf-ecwrap": + assert kao.type == "ec-wrapped" + assert kao.ephemeralPublicKey is not None + else: + assert kao.type == "wrapped" + assert kao.ephemeralPublicKey is None + if target_mode == "4.2.2" or (target_mode is None and not encrypt_sdk.supports("hexless")): + looks_like_422(manifest) + else: + looks_like_430(manifest) + fname = ct_file.stem rt_file = tmp_dir / f"{fname}.untdf" @@ -174,6 +115,7 @@ def test_tdf_spec_target_422( tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) @@ -186,12 +128,8 @@ def test_tdf_spec_target_422( f"Encrypt SDK {encrypt_sdk} doesn't support targeting container format 4.2.2" ) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="target-422", target_mode="4.2.2", attr_values=attribute_default_rsa.value_fqns, ) @@ -279,14 +217,12 @@ def test_manifest_validity( tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk}: pytest.skip("Not in focus") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, attr_values=attribute_default_rsa.value_fqns, ) @@ -300,17 +236,14 @@ def test_manifest_validity_with_assertions( assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk}: pytest.skip("Not in focus") if not encrypt_sdk.supports("assertions"): pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions", az=assertion_file_no_keys, attr_values=attribute_default_rsa.value_fqns, ) @@ -329,6 +262,7 @@ def test_tdf_assertions_unkeyed( assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): pfs = tdfs.get_platform_features() if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -339,12 +273,8 @@ def test_tdf_assertions_unkeyed( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertions"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertions") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions", az=assertion_file_no_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, @@ -364,6 +294,7 @@ def test_tdf_assertions_with_keys( assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): pfs = tdfs.get_platform_features() if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -374,12 +305,8 @@ def test_tdf_assertions_with_keys( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertion_verification"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertion_verification") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions-keys-roundtrip", az=assertion_file_rs_and_hs_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, @@ -405,6 +332,7 @@ def test_tdf_assertions_422_format( assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -418,12 +346,8 @@ def test_tdf_assertions_422_format( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertion_verification"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertion_verification") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions-422-keys-roundtrip", az=assertion_file_rs_and_hs_keys, target_mode="4.2.2", attr_values=attribute_default_rsa.value_fqns, @@ -603,17 +527,15 @@ def test_tdf_with_unbound_policy( in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) @@ -643,17 +565,15 @@ def test_tdf_with_altered_policy_binding( in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest( @@ -686,17 +606,15 @@ def test_tdf_with_altered_root_sig( tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) @@ -717,17 +635,15 @@ def test_tdf_with_altered_seg_sig_wrong( tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) @@ -753,17 +669,15 @@ def test_tdf_with_altered_enc_seg_size( tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) @@ -790,6 +704,7 @@ def test_tdf_with_altered_assertion_statement( assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -800,12 +715,8 @@ def test_tdf_with_altered_assertion_statement( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertions"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertions") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions", az=assertion_file_no_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, @@ -831,6 +742,7 @@ def test_tdf_with_altered_assertion_with_keys( assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -841,12 +753,8 @@ def test_tdf_with_altered_assertion_with_keys( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertion_verification"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertion_verification") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions-keys-roundtrip-altered", az=assertion_file_rs_and_hs_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, @@ -879,6 +787,7 @@ def test_tdf_altered_payload_end( tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -887,11 +796,8 @@ def test_tdf_altered_payload_end( pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) @@ -916,6 +822,7 @@ def test_tdf_with_malicious_kao( in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -924,11 +831,8 @@ def test_tdf_with_malicious_kao( tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) if not decrypt_sdk.supports("kasallowlist"): pytest.skip(f"{encrypt_sdk} sdk doesn't yet support an allowlist for kases") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("malicious_kao", ct_file, malicious_kao) From c1dda701cbf2878ce491242ba86f55b9e7c422ff Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Wed, 29 Apr 2026 17:33:11 -0400 Subject: [PATCH 02/10] refactor(xtest): convert remaining secpmlkem tests to encrypted_tdf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two tests added in 5958b3f (test_secpmlkem_3_roundtrip and test_secpmlkem_5_roundtrip) still used the removed cipherTexts dict. Move them onto the encrypted_tdf fixture introduced in 81fcefa. Side note: the second test had a copy-paste sample_name of "secpmlkem_3-..." which would have made it cache-collide with the first under the old keying. Auto-keying by attr_values fixes that incidentally — the two tests use different attribute fixtures. Co-Authored-By: Claude Opus 4.7 (1M context) --- xtest/test_pqc.py | 40 ++++++++++++---------------------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/xtest/test_pqc.py b/xtest/test_pqc.py index 42a4000c0..46c9fce6e 100644 --- a/xtest/test_pqc.py +++ b/xtest/test_pqc.py @@ -182,6 +182,7 @@ def test_secpmlkem_3_roundtrip( pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with an X-Wing managed key.""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -196,20 +197,11 @@ def test_secpmlkem_3_roundtrip( attr, key_ids = attribute_with_secpmlkem_3_key - sample_name = f"secpmlkem_3-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -249,6 +241,7 @@ def test_secpmlkem_5_roundtrip( pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with an X-Wing managed key.""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -263,20 +256,11 @@ def test_secpmlkem_5_roundtrip( attr, key_ids = attribute_with_secpmlkem_5_key - sample_name = f"secpmlkem_3-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 From 1b1e9d0606aef690b472bbe0b29527a3c9ffd0ef Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Wed, 29 Apr 2026 17:39:59 -0400 Subject: [PATCH 03/10] fixup --- xtest/test_tdfs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index a264f60e5..99131c305 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -352,6 +352,7 @@ def test_tdf_assertions_422_format( target_mode="4.2.2", attr_values=attribute_default_rsa.value_fqns, ) + looks_like_422(tdfs.manifest(ct_file)) fname = ct_file.stem rt_file = tmp_dir / f"{fname}.untdf" From b299d16c4f6b427fe2949cc656c4a69bc402af26 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 12:30:59 -0400 Subject: [PATCH 04/10] fixup remove overly verbose text --- xtest/fixtures/encryption.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/xtest/fixtures/encryption.py b/xtest/fixtures/encryption.py index b5e70351e..a603d7571 100644 --- a/xtest/fixtures/encryption.py +++ b/xtest/fixtures/encryption.py @@ -1,9 +1,6 @@ """Session-scoped factory fixture for memoized TDF encryption. -Replaces the per-module ``cipherTexts: dict[str, Path]`` caches that several -test modules used to maintain by hand. The cache key is derived automatically -from the inputs that affect the encrypted output, so callers do not pick a -scenario string. +The cache key and filenames are derived automatically from the test name and input. """ import hashlib From 1f985f536aa8702c9279001b823171162aec73c0 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 12:43:45 -0400 Subject: [PATCH 05/10] fixup ruff format --- xtest/test_policytypes.py | 15 ++++++++++++--- xtest/test_tdfs.py | 4 +++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/xtest/test_policytypes.py b/xtest/test_policytypes.py index ecdf58e5c..d01980035 100644 --- a/xtest/test_policytypes.py +++ b/xtest/test_policytypes.py @@ -71,7 +71,10 @@ def test_or_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"pt-or-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" + rt_file = ( + tmp_dir + / f"pt-or-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" + ) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -149,7 +152,10 @@ def test_and_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"pt-and-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" + rt_file = ( + tmp_dir + / f"pt-and-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" + ) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -199,7 +205,10 @@ def test_hierarchy_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"pt-hierarchy-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" + rt_file = ( + tmp_dir + / f"pt-hierarchy-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" + ) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index 99131c305..3543a870a 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -78,7 +78,9 @@ def test_tdf_roundtrip( else: assert kao.type == "wrapped" assert kao.ephemeralPublicKey is None - if target_mode == "4.2.2" or (target_mode is None and not encrypt_sdk.supports("hexless")): + if target_mode == "4.2.2" or ( + target_mode is None and not encrypt_sdk.supports("hexless") + ): looks_like_422(manifest) else: looks_like_430(manifest) From 4416611debb7bd35015320361e48e173e25cb5a1 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 13:06:25 -0400 Subject: [PATCH 06/10] refactor(xtest): derive rt_file paths from ct_file instead of manual fstrings Avoids hand-crafted unique names by constructing rt_file as ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf"), which is automatically unique across all encrypt params (already encoded in ct_file) and the decrypt sdk/version. Also removes the now-redundant short_names variable and tmp_dir fixture parameter from test_policytypes, and fixes a copy-paste bug where test_secpmlkem_5_roundtrip was writing to a secpmlkem_3-named file. Co-Authored-By: Claude Sonnet 4.6 --- xtest/test_abac.py | 28 ++++++++++++++-------------- xtest/test_policytypes.py | 24 +++--------------------- xtest/test_pqc.py | 8 ++++---- 3 files changed, 21 insertions(+), 39 deletions(-) diff --git a/xtest/test_abac.py b/xtest/test_abac.py index c41741af6..e17966bdd 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -102,7 +102,7 @@ def test_key_mapping_multiple_mechanisms( assert manifest.encryptionInformation.keyAccess[0].url == kas_url_default tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"multimechanism-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -171,7 +171,7 @@ def test_key_mapping_extended_mechanisms( tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") # Decrypt and verify - rt_file = tmp_dir / f"extended-mechanisms-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -233,7 +233,7 @@ def test_key_mapping_extended_ec_mechanisms( ) # Decrypt and verify - rt_file = tmp_dir / f"extended-mechanisms-ec-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -294,7 +294,7 @@ def test_key_mapping_extended_rsa_mechanisms( ) # Decrypt and verify - rt_file = tmp_dir / f"extended-mechanisms-rsa-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -334,7 +334,7 @@ def test_autoconfigure_one_attribute_standard( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-one-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -391,7 +391,7 @@ def test_autoconfigure_two_kas_or_standard( # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") - rt_file = tmp_dir / f"test-abac-or-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -446,7 +446,7 @@ def test_autoconfigure_double_kas_and( # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") - rt_file = tmp_dir / f"test-abac-and-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -488,7 +488,7 @@ def test_autoconfigure_one_attribute_attr_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-one-attr-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -534,7 +534,7 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-attr-val-or-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -580,7 +580,7 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-attr-val-and-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -618,7 +618,7 @@ def test_autoconfigure_one_attribute_ns_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-one-ns-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -664,7 +664,7 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-ns-val-or-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -710,7 +710,7 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-ns-val-and-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -959,7 +959,7 @@ def test_autoconfigure_key_management_two_kas_two_keys( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"km-allof-two-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) diff --git a/xtest/test_policytypes.py b/xtest/test_policytypes.py index d01980035..30e922562 100644 --- a/xtest/test_policytypes.py +++ b/xtest/test_policytypes.py @@ -40,7 +40,6 @@ def test_or_attributes_success( attribute_with_or_type: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], @@ -61,8 +60,6 @@ def test_or_attributes_success( assert len([v.fqn for v in vals_to_use if v.fqn is None]) == 0 fqns = [v.fqn for v in vals_to_use if v.fqn is not None] assert len(fqns) == len(vals_to_use) - short_names = [v.value for v in vals_to_use] - assert len(short_names) == len(vals_to_use) ct_file = encrypted_tdf( encrypt_sdk, container=container, @@ -71,10 +68,7 @@ def test_or_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = ( - tmp_dir - / f"pt-or-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" - ) + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.returned") decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -115,7 +109,6 @@ def test_and_attributes_success( attribute_with_and_type: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], @@ -142,8 +135,6 @@ def test_and_attributes_success( assert len([v.fqn for v in vals_to_use if v.fqn is None]) == 0 fqns = [v.fqn for v in vals_to_use if v.fqn is not None] assert len(fqns) == len(vals_to_use) - short_names = [v.value for v in vals_to_use] - assert len(short_names) == len(vals_to_use) ct_file = encrypted_tdf( encrypt_sdk, container=container, @@ -152,10 +143,7 @@ def test_and_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = ( - tmp_dir - / f"pt-and-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" - ) + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.returned") decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -165,7 +153,6 @@ def test_hierarchy_attributes_success( attribute_with_hierarchy_type: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], @@ -195,8 +182,6 @@ def test_hierarchy_attributes_success( assert len([v.fqn for v in vals_to_use if v.fqn is None]) == 0 fqns = [v.fqn for v in vals_to_use if v.fqn is not None] assert len(fqns) == len(vals_to_use) - short_names = [v.value for v in vals_to_use] - assert len(short_names) == len(vals_to_use) ct_file = encrypted_tdf( encrypt_sdk, container=container, @@ -205,10 +190,7 @@ def test_hierarchy_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = ( - tmp_dir - / f"pt-hierarchy-{'-'.join(short_names)}-{encrypt_sdk}-{decrypt_sdk}.{container}.returned" - ) + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.returned") decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) diff --git a/xtest/test_pqc.py b/xtest/test_pqc.py index 46c9fce6e..eb456d7b2 100644 --- a/xtest/test_pqc.py +++ b/xtest/test_pqc.py @@ -102,7 +102,7 @@ def test_xwing_roundtrip( assert_xwing_kao_sizes(kao) assert_xwing_public_key_size(key_xwing) - rt_file = tmp_dir / f"xwing-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -168,7 +168,7 @@ def test_xwing_with_ec_roundtrip( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"xwing-ec-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -227,7 +227,7 @@ def test_secpmlkem_3_roundtrip( f"public key DER should be >= {XWING_ENCAPSULATION_KEY_SIZE} bytes, got {der_len}" ) - rt_file = tmp_dir / f"secpmlkem_3-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -286,6 +286,6 @@ def test_secpmlkem_5_roundtrip( f"public key DER should be >= {XWING_ENCAPSULATION_KEY_SIZE} bytes, got {der_len}" ) - rt_file = tmp_dir / f"secpmlkem_3-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) From 68714394da63421456d4a25a67bd668418d92a99 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 13:12:59 -0400 Subject: [PATCH 07/10] fixup remove unused tmp_dir fixture parameters Co-Authored-By: Claude Sonnet 4.6 --- xtest/test_abac.py | 15 --------------- xtest/test_pqc.py | 4 ---- xtest/test_tdfs.py | 2 -- 3 files changed, 21 deletions(-) diff --git a/xtest/test_abac.py b/xtest/test_abac.py index e17966bdd..1c12884ec 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -70,7 +70,6 @@ def test_key_mapping_multiple_mechanisms( attribute_with_different_kids: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_default: str, in_focus: set[tdfs.SDK], @@ -111,7 +110,6 @@ def test_key_mapping_extended_mechanisms( attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, kas_url_km2: str, @@ -180,7 +178,6 @@ def test_key_mapping_extended_ec_mechanisms( attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km2: str, in_focus: set[tdfs.SDK], @@ -242,7 +239,6 @@ def test_key_mapping_extended_rsa_mechanisms( attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], @@ -303,7 +299,6 @@ def test_autoconfigure_one_attribute_standard( attribute_single_kas_grant: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_alpha: str, in_focus: set[tdfs.SDK], @@ -350,7 +345,6 @@ def test_autoconfigure_two_kas_or_standard( attribute_two_kas_grant_or: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_alpha: str, kas_url_beta: str, @@ -404,7 +398,6 @@ def test_autoconfigure_double_kas_and( attribute_two_kas_grant_and: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_alpha: str, kas_url_beta: str, @@ -459,7 +452,6 @@ def test_autoconfigure_one_attribute_attr_grant( one_attribute_attr_kas_grant: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, in_focus: set[tdfs.SDK], @@ -497,7 +489,6 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( attr_and_value_kas_grants_or: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, kas_url_alpha: str, @@ -543,7 +534,6 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( attr_and_value_kas_grants_and: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, kas_url_alpha: str, @@ -589,7 +579,6 @@ def test_autoconfigure_one_attribute_ns_grant( one_attribute_ns_kas_grant: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_delta: str, in_focus: set[tdfs.SDK], @@ -627,7 +616,6 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( ns_and_value_kas_grants_or: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_delta: str, kas_url_alpha: str, @@ -673,7 +661,6 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( ns_and_value_kas_grants_and: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_delta: str, kas_url_alpha: str, @@ -916,7 +903,6 @@ def test_autoconfigure_key_management_two_kas_two_keys( attribute_allof_with_two_managed_keys: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, kas_url_km2: str, @@ -967,7 +953,6 @@ def test_autoconfigure_key_management_two_kas_two_keys( def test_encrypt_with_missing_value_uses_definition_key( attribute_missing_value_key_mapping: tuple[str, str], encrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, in_focus: set[tdfs.SDK], diff --git a/xtest/test_pqc.py b/xtest/test_pqc.py index eb456d7b2..569577cf8 100644 --- a/xtest/test_pqc.py +++ b/xtest/test_pqc.py @@ -60,7 +60,6 @@ def test_xwing_roundtrip( key_xwing: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], @@ -112,7 +111,6 @@ def test_xwing_with_ec_roundtrip( key_xwing: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, kas_url_km2: str, @@ -178,7 +176,6 @@ def test_secpmlkem_3_roundtrip( key_secpmlkem_3: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], @@ -237,7 +234,6 @@ def test_secpmlkem_5_roundtrip( key_secpmlkem_5: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index 3543a870a..5257edc37 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -216,7 +216,6 @@ def looks_like_430(manifest: tdfs.Manifest): def test_manifest_validity( encrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -234,7 +233,6 @@ def test_manifest_validity( def test_manifest_validity_with_assertions( encrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, From dbd484b11761835d7a55360bc18d6eea6136611b Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 13:26:50 -0400 Subject: [PATCH 08/10] fixup more --- xtest/test_legacy.py | 14 ++++----- xtest/test_tdfs.py | 69 ++++++++++---------------------------------- 2 files changed, 22 insertions(+), 61 deletions(-) diff --git a/xtest/test_legacy.py b/xtest/test_legacy.py index 4db0d81c3..4a221ff17 100644 --- a/xtest/test_legacy.py +++ b/xtest/test_legacy.py @@ -24,7 +24,7 @@ def test_decrypt_small( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("small-java-4.3.0-e0f8caf.tdf") - rt_file = tmp_dir / "small-java.untdf" + rt_file = tmp_dir / f"small-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 5 * 2**10 @@ -44,7 +44,7 @@ def test_decrypt_big( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("big-java-4.3.0-e0f8caf.tdf") - rt_file = tmp_dir / "big-java.untdf" + rt_file = tmp_dir / f"big-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 10 * 2**20 @@ -65,7 +65,7 @@ def test_decrypt_SDKv0_7_5( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("xstext-java-v0.7.5-94b161d53-DSP2.0.2_and_2.0.3.tdf") - rt_file = tmp_dir / "0.7.5-java.untdf" + rt_file = tmp_dir / f"0.7.5-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 102 @@ -82,7 +82,7 @@ def test_decrypt_SDKv0_7_8( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("xstext-java-v0.7.8-7f487c2-DSP2.0.4.tdf") - rt_file = tmp_dir / "0.7.8-java.untdf" + rt_file = tmp_dir / f"0.7.8-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 92 @@ -99,7 +99,7 @@ def test_decrypt_SDKv0_9_0( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("xstext-java-v0.9.0-2de6a49-DSP2.0.5.1_and_2.0.6.tdf") - rt_file = tmp_dir / "0.9.0-java.untdf" + rt_file = tmp_dir / f"0.9.0-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 92 @@ -115,7 +115,7 @@ def test_decrypt_no_splitid( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("no-splitids-java.tdf") - rt_file = tmp_dir / "no-splitids-java.untdf" + rt_file = tmp_dir / f"no-splitids-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 5 * 2**10 @@ -135,7 +135,7 @@ def test_decrypt_object_statement_value_json( if not decrypt_sdk.supports("assertion_verification"): pytest.skip("assertion_verification is not supported") ct_file = get_golden_file("with-json-object-assertions-java.tdf") - rt_file = tmp_dir / "with-json-object-assertions-java.untdf" + rt_file = tmp_dir / f"with-json-object-assertions-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf", verify_assertions=False) with rt_file.open("rb") as f: assert f.read().decode("utf-8") == "text" diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index 5257edc37..c2d2530f7 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -29,7 +29,6 @@ def test_tdf_roundtrip( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, @@ -85,8 +84,7 @@ def test_tdf_roundtrip( else: looks_like_430(manifest) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") @@ -102,7 +100,7 @@ def test_tdf_roundtrip( and decrypt_sdk.supports("ecwrap") and "ecwrap" in pfs.features ): - ert_file = tmp_dir / f"{fname}-ecrewrap.untdf" + ert_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}-ecrewrap.untdf") ec_mark = audit_logs.mark("before_ecwrap_decrypt") decrypt_sdk.decrypt(ct_file, ert_file, container, ecwrap=True) assert filecmp.cmp(pt_file, ert_file) @@ -114,7 +112,6 @@ def test_tdf_spec_target_422( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -136,8 +133,7 @@ def test_tdf_spec_target_422( attr_values=attribute_default_rsa.value_fqns, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -258,7 +254,6 @@ def test_tdf_assertions_unkeyed( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, @@ -279,8 +274,7 @@ def test_tdf_assertions_unkeyed( target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -289,7 +283,6 @@ def test_tdf_assertions_with_keys( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_rs_and_hs_keys: str, assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], @@ -311,8 +304,7 @@ def test_tdf_assertions_with_keys( target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt( ct_file, @@ -327,7 +319,6 @@ def test_tdf_assertions_422_format( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_rs_and_hs_keys: str, assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], @@ -354,8 +345,7 @@ def test_tdf_assertions_422_format( ) looks_like_422(tdfs.manifest(ct_file)) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") decrypt_sdk.decrypt( ct_file, @@ -523,10 +513,7 @@ def assert_kas_request_error( def test_tdf_with_unbound_policy( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], - audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, ) -> None: @@ -541,8 +528,7 @@ def test_tdf_with_unbound_policy( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("unbound_policy", ct_file, change_policy) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") # Mark timestamp before tampered decrypt for audit log correlation # mark = audit_logs.mark("before_tampered_decrypt") @@ -561,10 +547,7 @@ def test_tdf_with_unbound_policy( def test_tdf_with_altered_policy_binding( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], - audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, ) -> None: @@ -580,8 +563,7 @@ def test_tdf_with_altered_policy_binding( b_file = tdfs.update_manifest( "altered_policy_binding", ct_file, change_policy_binding ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") # Mark timestamp before tampered decrypt for audit log correlation # mark = audit_logs.mark("before_tampered_decrypt") @@ -603,8 +585,6 @@ def test_tdf_with_altered_policy_binding( def test_tdf_with_altered_root_sig( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -620,8 +600,7 @@ def test_tdf_with_altered_root_sig( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("broken_root_sig", ct_file, change_root_signature) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -632,8 +611,6 @@ def test_tdf_with_altered_root_sig( def test_tdf_with_altered_seg_sig_wrong( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -649,8 +626,7 @@ def test_tdf_with_altered_seg_sig_wrong( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("broken_seg_sig", ct_file, change_segment_hash) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") try: decrypt_sdk.decrypt( b_file, rt_file, "ztdf", expect_error=True, verify_assertions=False @@ -666,8 +642,6 @@ def test_tdf_with_altered_seg_sig_wrong( def test_tdf_with_altered_enc_seg_size( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -685,8 +659,7 @@ def test_tdf_with_altered_enc_seg_size( b_file = tdfs.update_manifest( "broken_enc_seg_sig", ct_file, change_encrypted_segment_size ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -700,8 +673,6 @@ def test_tdf_with_altered_enc_seg_size( def test_tdf_with_altered_assertion_statement( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, @@ -725,8 +696,7 @@ def test_tdf_with_altered_assertion_statement( b_file = tdfs.update_manifest( "altered_assertion_statement", ct_file, change_assertion_statement ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -737,8 +707,6 @@ def test_tdf_with_altered_assertion_statement( def test_tdf_with_altered_assertion_with_keys( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, assertion_file_rs_and_hs_keys: str, assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], @@ -763,8 +731,7 @@ def test_tdf_with_altered_assertion_with_keys( b_file = tdfs.update_manifest( "altered_assertion_statement", ct_file, change_assertion_statement ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") try: decrypt_sdk.decrypt( b_file, @@ -784,8 +751,6 @@ def test_tdf_with_altered_assertion_with_keys( def test_tdf_altered_payload_end( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -803,8 +768,7 @@ def test_tdf_altered_payload_end( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_payload("altered_payload_end", ct_file, change_payload_end) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -818,8 +782,6 @@ def test_tdf_altered_payload_end( def test_tdf_with_malicious_kao( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, @@ -837,8 +799,7 @@ def test_tdf_with_malicious_kao( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("malicious_kao", ct_file, malicious_kao) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") # Mark timestamp - note: this test may not generate a rewrap audit event # because the SDK should reject the malicious KAO before calling the KAS From 794e5d1d75e9a917e93b700b2a75c7c116470961 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 13:50:04 -0400 Subject: [PATCH 09/10] refactor(xtest): address PR review issues - Convert encrypted_tdf factory closure to EncryptFactory class with rt_file() method that includes the current test label, preventing rt_file path collisions between tests that share a cached ciphertext - Replace all ct_file.with_name()/b_file.with_name() rt_file patterns with encrypted_tdf.rt_file() across all test files; standardize test_policytypes.py from .returned to .untdf extension - Bump assertion fixtures from package to session scope to match the session-scoped tmp_dir they write into (latent path collision fix) - Fix stale module docstring in fixtures/encryption.py - Remove unused pt_file parameter from test_manifest_validity and test_manifest_validity_with_assertions - Delete dead commented-out audit_logs blocks in test_tdf_with_unbound_policy and test_tdf_with_altered_policy_binding Co-Authored-By: Claude Sonnet 4.6 --- xtest/fixtures/assertions.py | 10 ++--- xtest/fixtures/encryption.py | 84 ++++++++++++++++++++++++------------ xtest/test_abac.py | 28 ++++++------ xtest/test_policytypes.py | 6 +-- xtest/test_pqc.py | 8 ++-- xtest/test_tdfs.py | 46 +++++++------------- 6 files changed, 97 insertions(+), 85 deletions(-) diff --git a/xtest/fixtures/assertions.py b/xtest/fixtures/assertions.py index 06dd144f1..73628470a 100644 --- a/xtest/fixtures/assertions.py +++ b/xtest/fixtures/assertions.py @@ -19,13 +19,13 @@ import assertions -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def hs256_key() -> str: """Generate a random HS256 (HMAC-SHA256) signing key.""" return base64.b64encode(secrets.token_bytes(32)).decode("ascii") -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def rs256_keys() -> tuple[str, str]: """Generate an RS256 (RSA-SHA256) key pair. @@ -88,7 +88,7 @@ def write_assertion_verification_keys_to_file( return as_file -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def assertion_file_no_keys(tmp_dir: Path) -> Path: """Assertion file with a single handling assertion (no signing key).""" assertion_list = [ @@ -109,7 +109,7 @@ def assertion_file_no_keys(tmp_dir: Path) -> Path: ) -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def assertion_file_rs_and_hs_keys( tmp_dir: Path, hs256_key: str, rs256_keys: tuple[str, str] ) -> Path: @@ -152,7 +152,7 @@ def assertion_file_rs_and_hs_keys( ) -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def assertion_verification_file_rs_and_hs_keys( tmp_dir: Path, hs256_key: str, rs256_keys: tuple[str, str] ) -> Path: diff --git a/xtest/fixtures/encryption.py b/xtest/fixtures/encryption.py index a603d7571..4fdb981b1 100644 --- a/xtest/fixtures/encryption.py +++ b/xtest/fixtures/encryption.py @@ -1,41 +1,38 @@ """Session-scoped factory fixture for memoized TDF encryption. -The cache key and filenames are derived automatically from the test name and input. +The cache key is derived from encryption input parameters; on-disk filenames also embed +the requesting test's name and a short hash for debuggability. """ import hashlib -from collections.abc import Callable from pathlib import Path import pytest import tdfs -EncryptFactory = Callable[..., Path] +class EncryptFactory: + """Memoized TDF encryption factory bound to the current test. -@pytest.fixture(scope="session") -def _encryption_cache() -> dict[tuple, Path]: - """Session-wide cache mapping input-tuple keys to encrypted Paths.""" - return {} - - -@pytest.fixture -def encrypted_tdf( - request: pytest.FixtureRequest, - pt_file: Path, - tmp_dir: Path, - _encryption_cache: dict[tuple, Path], -) -> EncryptFactory: - """Return a memoized encrypt-to-TDF function. - - Two callers that pass identical inputs share a ciphertext; differing - inputs produce distinct ciphertexts. The on-disk filename embeds the - requesting test's name plus a short hash for debuggability. + Call to encrypt (results are cached by input parameters). Use rt_file() to + generate a test-unique decrypted output path derived from the ciphertext. """ - label = request.node.originalname or request.node.name - def _factory( + def __init__( + self, + label: str, + pt_file: Path, + tmp_dir: Path, + cache: dict[tuple, Path], + ) -> None: + self._label = label + self._pt_file = pt_file + self._tmp_dir = tmp_dir + self._cache = cache + + def __call__( + self, encrypt_sdk: tdfs.SDK, *, container: tdfs.container_type = "ztdf", @@ -46,13 +43,13 @@ def _factory( ) -> Path: attr_key = tuple(attr_values) if attr_values is not None else None key = (str(encrypt_sdk), container, target_mode, attr_key, az, mime_type) - cached = _encryption_cache.get(key) + cached = self._cache.get(key) if cached is not None: return cached digest = hashlib.sha1(repr(key).encode()).hexdigest()[:8] - ct_file = tmp_dir / f"ct-{label}-{encrypt_sdk}-{container}-{digest}.tdf" + ct_file = self._tmp_dir / f"ct-{self._label}-{encrypt_sdk}-{container}-{digest}.tdf" encrypt_sdk.encrypt( - pt_file, + self._pt_file, ct_file, mime_type=mime_type, container=container, @@ -61,7 +58,38 @@ def _factory( target_mode=target_mode, ) assert ct_file.is_file() - _encryption_cache[key] = ct_file + self._cache[key] = ct_file return ct_file - return _factory + def rt_file(self, ct_file: Path, decrypt_sdk: tdfs.SDK, variant: str = "") -> Path: + """Return a test-unique path for the decrypted output. + + Embeds the current test label so tests that share a cached ciphertext + don't collide on their output files. + """ + variant_part = f"-{variant}" if variant else "" + return ct_file.with_name( + f"{ct_file.stem}-{decrypt_sdk}-{self._label}{variant_part}.untdf" + ) + + +@pytest.fixture(scope="session") +def _encryption_cache() -> dict[tuple, Path]: + """Session-wide cache mapping input-tuple keys to encrypted Paths.""" + return {} + + +@pytest.fixture +def encrypted_tdf( + request: pytest.FixtureRequest, + pt_file: Path, + tmp_dir: Path, + _encryption_cache: dict[tuple, Path], +) -> EncryptFactory: + """Return a memoized encrypt-to-TDF factory for the current test. + + Two callers with identical inputs share a ciphertext; differing inputs + produce distinct ciphertexts. Use rt_file() to get a test-unique output path. + """ + label = request.node.originalname or request.node.name + return EncryptFactory(label, pt_file, tmp_dir, _encryption_cache) diff --git a/xtest/test_abac.py b/xtest/test_abac.py index 1c12884ec..08b663e54 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -101,7 +101,7 @@ def test_key_mapping_multiple_mechanisms( assert manifest.encryptionInformation.keyAccess[0].url == kas_url_default tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -169,7 +169,7 @@ def test_key_mapping_extended_mechanisms( tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") # Decrypt and verify - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -230,7 +230,7 @@ def test_key_mapping_extended_ec_mechanisms( ) # Decrypt and verify - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -290,7 +290,7 @@ def test_key_mapping_extended_rsa_mechanisms( ) # Decrypt and verify - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -329,7 +329,7 @@ def test_autoconfigure_one_attribute_standard( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -385,7 +385,7 @@ def test_autoconfigure_two_kas_or_standard( # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -439,7 +439,7 @@ def test_autoconfigure_double_kas_and( # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -480,7 +480,7 @@ def test_autoconfigure_one_attribute_attr_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -525,7 +525,7 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -570,7 +570,7 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -607,7 +607,7 @@ def test_autoconfigure_one_attribute_ns_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -652,7 +652,7 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -697,7 +697,7 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -945,7 +945,7 @@ def test_autoconfigure_key_management_two_kas_two_keys( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) diff --git a/xtest/test_policytypes.py b/xtest/test_policytypes.py index 30e922562..95495434b 100644 --- a/xtest/test_policytypes.py +++ b/xtest/test_policytypes.py @@ -68,7 +68,7 @@ def test_or_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.returned") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -143,7 +143,7 @@ def test_and_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.returned") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -190,7 +190,7 @@ def test_hierarchy_attributes_success( ) assert_expected_attrs(container, None, ct_file, fqns) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.returned") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) diff --git a/xtest/test_pqc.py b/xtest/test_pqc.py index 569577cf8..1d226cc69 100644 --- a/xtest/test_pqc.py +++ b/xtest/test_pqc.py @@ -101,7 +101,7 @@ def test_xwing_roundtrip( assert_xwing_kao_sizes(kao) assert_xwing_public_key_size(key_xwing) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -166,7 +166,7 @@ def test_xwing_with_ec_roundtrip( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -224,7 +224,7 @@ def test_secpmlkem_3_roundtrip( f"public key DER should be >= {XWING_ENCAPSULATION_KEY_SIZE} bytes, got {der_len}" ) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -282,6 +282,6 @@ def test_secpmlkem_5_roundtrip( f"public key DER should be >= {XWING_ENCAPSULATION_KEY_SIZE} bytes, got {der_len}" ) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index c2d2530f7..a17fea066 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -84,7 +84,7 @@ def test_tdf_roundtrip( else: looks_like_430(manifest) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") @@ -100,7 +100,7 @@ def test_tdf_roundtrip( and decrypt_sdk.supports("ecwrap") and "ecwrap" in pfs.features ): - ert_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}-ecrewrap.untdf") + ert_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk, variant="ecrewrap") ec_mark = audit_logs.mark("before_ecwrap_decrypt") decrypt_sdk.decrypt(ct_file, ert_file, container, ecwrap=True) assert filecmp.cmp(pt_file, ert_file) @@ -133,7 +133,7 @@ def test_tdf_spec_target_422( attr_values=attribute_default_rsa.value_fqns, ) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -211,7 +211,6 @@ def looks_like_430(manifest: tdfs.Manifest): def test_manifest_validity( encrypt_sdk: tdfs.SDK, - pt_file: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, encrypted_tdf: EncryptFactory, @@ -228,7 +227,6 @@ def test_manifest_validity( def test_manifest_validity_with_assertions( encrypt_sdk: tdfs.SDK, - pt_file: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, @@ -274,7 +272,7 @@ def test_tdf_assertions_unkeyed( target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -304,7 +302,7 @@ def test_tdf_assertions_with_keys( target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt( ct_file, @@ -345,7 +343,7 @@ def test_tdf_assertions_422_format( ) looks_like_422(tdfs.manifest(ct_file)) - rt_file = ct_file.with_name(f"{ct_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt( ct_file, @@ -528,10 +526,7 @@ def test_tdf_with_unbound_policy( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("unbound_policy", ct_file, change_policy) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") - - # Mark timestamp before tampered decrypt for audit log correlation - # mark = audit_logs.mark("before_tampered_decrypt") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) @@ -539,10 +534,6 @@ def test_tdf_with_unbound_policy( except subprocess.CalledProcessError as exc: assert_kas_request_error(exc, decrypt_sdk) - # Verify rewrap failure was logged (policy binding mismatch) - # FIXME: Audit logs are not present on failed bindings - # audit_logs.assert_rewrap_error(min_count=1, since_mark=mark) - def test_tdf_with_altered_policy_binding( encrypt_sdk: tdfs.SDK, @@ -563,10 +554,7 @@ def test_tdf_with_altered_policy_binding( b_file = tdfs.update_manifest( "altered_policy_binding", ct_file, change_policy_binding ) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") - - # Mark timestamp before tampered decrypt for audit log correlation - # mark = audit_logs.mark("before_tampered_decrypt") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) @@ -574,10 +562,6 @@ def test_tdf_with_altered_policy_binding( except subprocess.CalledProcessError as exc: assert_kas_request_error(exc, decrypt_sdk) - # Verify rewrap failure was logged (policy binding mismatch) - # FIXME: Audit logs are not present on failed bindings - # audit_logs.assert_rewrap_error(min_count=1, since_mark=mark) - ## INTEGRITY TAMPER TESTS @@ -600,7 +584,7 @@ def test_tdf_with_altered_root_sig( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("broken_root_sig", ct_file, change_root_signature) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -626,7 +610,7 @@ def test_tdf_with_altered_seg_sig_wrong( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("broken_seg_sig", ct_file, change_segment_hash) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt( b_file, rt_file, "ztdf", expect_error=True, verify_assertions=False @@ -659,7 +643,7 @@ def test_tdf_with_altered_enc_seg_size( b_file = tdfs.update_manifest( "broken_enc_seg_sig", ct_file, change_encrypted_segment_size ) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -696,7 +680,7 @@ def test_tdf_with_altered_assertion_statement( b_file = tdfs.update_manifest( "altered_assertion_statement", ct_file, change_assertion_statement ) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -731,7 +715,7 @@ def test_tdf_with_altered_assertion_with_keys( b_file = tdfs.update_manifest( "altered_assertion_statement", ct_file, change_assertion_statement ) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt( b_file, @@ -768,7 +752,7 @@ def test_tdf_altered_payload_end( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_payload("altered_payload_end", ct_file, change_payload_end) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -799,7 +783,7 @@ def test_tdf_with_malicious_kao( attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("malicious_kao", ct_file, malicious_kao) - rt_file = ct_file.with_name(f"{b_file.stem}-{decrypt_sdk}.untdf") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) # Mark timestamp - note: this test may not generate a rewrap audit event # because the SDK should reject the malicious KAO before calling the KAS From dd4ff16f7fdf8d3789c5e004f82d112165b05723 Mon Sep 17 00:00:00 2001 From: Dave Mihalcik Date: Thu, 30 Apr 2026 13:53:25 -0400 Subject: [PATCH 10/10] fixup ruff format --- xtest/fixtures/encryption.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/xtest/fixtures/encryption.py b/xtest/fixtures/encryption.py index 4fdb981b1..2d4d12984 100644 --- a/xtest/fixtures/encryption.py +++ b/xtest/fixtures/encryption.py @@ -47,7 +47,9 @@ def __call__( if cached is not None: return cached digest = hashlib.sha1(repr(key).encode()).hexdigest()[:8] - ct_file = self._tmp_dir / f"ct-{self._label}-{encrypt_sdk}-{container}-{digest}.tdf" + ct_file = ( + self._tmp_dir / f"ct-{self._label}-{encrypt_sdk}-{container}-{digest}.tdf" + ) encrypt_sdk.encrypt( self._pt_file, ct_file,