diff --git a/xtest/conftest.py b/xtest/conftest.py index 36f7ac0d..ce6a7131 100644 --- a/xtest/conftest.py +++ b/xtest/conftest.py @@ -35,6 +35,7 @@ "fixtures.obligations", "fixtures.keys", "fixtures.audit", + "fixtures.encryption", ] @@ -203,7 +204,7 @@ def defaulted_list_opt[T]( # Core fixtures -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def pt_file(tmp_dir: Path, size: str) -> Path: """Generate a plaintext test file. @@ -222,7 +223,7 @@ def pt_file(tmp_dir: Path, size: str) -> Path: return pt_file -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def tmp_dir(request: pytest.FixtureRequest) -> Path: """Create worker-specific temporary directory for test files. diff --git a/xtest/fixtures/assertions.py b/xtest/fixtures/assertions.py index 06dd144f..73628470 100644 --- a/xtest/fixtures/assertions.py +++ b/xtest/fixtures/assertions.py @@ -19,13 +19,13 @@ import assertions -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def hs256_key() -> str: """Generate a random HS256 (HMAC-SHA256) signing key.""" return base64.b64encode(secrets.token_bytes(32)).decode("ascii") -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def rs256_keys() -> tuple[str, str]: """Generate an RS256 (RSA-SHA256) key pair. @@ -88,7 +88,7 @@ def write_assertion_verification_keys_to_file( return as_file -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def assertion_file_no_keys(tmp_dir: Path) -> Path: """Assertion file with a single handling assertion (no signing key).""" assertion_list = [ @@ -109,7 +109,7 @@ def assertion_file_no_keys(tmp_dir: Path) -> Path: ) -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def assertion_file_rs_and_hs_keys( tmp_dir: Path, hs256_key: str, rs256_keys: tuple[str, str] ) -> Path: @@ -152,7 +152,7 @@ def assertion_file_rs_and_hs_keys( ) -@pytest.fixture(scope="package") +@pytest.fixture(scope="session") def assertion_verification_file_rs_and_hs_keys( tmp_dir: Path, hs256_key: str, rs256_keys: tuple[str, str] ) -> Path: diff --git a/xtest/fixtures/encryption.py b/xtest/fixtures/encryption.py new file mode 100644 index 00000000..2d4d1298 --- /dev/null +++ b/xtest/fixtures/encryption.py @@ -0,0 +1,97 @@ +"""Session-scoped factory fixture for memoized TDF encryption. + +The cache key is derived from encryption input parameters; on-disk filenames also embed +the requesting test's name and a short hash for debuggability. +""" + +import hashlib +from pathlib import Path + +import pytest + +import tdfs + + +class EncryptFactory: + """Memoized TDF encryption factory bound to the current test. + + Call to encrypt (results are cached by input parameters). Use rt_file() to + generate a test-unique decrypted output path derived from the ciphertext. + """ + + def __init__( + self, + label: str, + pt_file: Path, + tmp_dir: Path, + cache: dict[tuple, Path], + ) -> None: + self._label = label + self._pt_file = pt_file + self._tmp_dir = tmp_dir + self._cache = cache + + def __call__( + self, + encrypt_sdk: tdfs.SDK, + *, + container: tdfs.container_type = "ztdf", + attr_values: list[str] | None = None, + target_mode: tdfs.container_version | None = None, + az: str = "", + mime_type: str = "text/plain", + ) -> Path: + attr_key = tuple(attr_values) if attr_values is not None else None + key = (str(encrypt_sdk), container, target_mode, attr_key, az, mime_type) + cached = self._cache.get(key) + if cached is not None: + return cached + digest = hashlib.sha1(repr(key).encode()).hexdigest()[:8] + ct_file = ( + self._tmp_dir / f"ct-{self._label}-{encrypt_sdk}-{container}-{digest}.tdf" + ) + encrypt_sdk.encrypt( + self._pt_file, + ct_file, + mime_type=mime_type, + container=container, + attr_values=attr_values, + assert_value=az, + target_mode=target_mode, + ) + assert ct_file.is_file() + self._cache[key] = ct_file + return ct_file + + def rt_file(self, ct_file: Path, decrypt_sdk: tdfs.SDK, variant: str = "") -> Path: + """Return a test-unique path for the decrypted output. + + Embeds the current test label so tests that share a cached ciphertext + don't collide on their output files. + """ + variant_part = f"-{variant}" if variant else "" + return ct_file.with_name( + f"{ct_file.stem}-{decrypt_sdk}-{self._label}{variant_part}.untdf" + ) + + +@pytest.fixture(scope="session") +def _encryption_cache() -> dict[tuple, Path]: + """Session-wide cache mapping input-tuple keys to encrypted Paths.""" + return {} + + +@pytest.fixture +def encrypted_tdf( + request: pytest.FixtureRequest, + pt_file: Path, + tmp_dir: Path, + _encryption_cache: dict[tuple, Path], +) -> EncryptFactory: + """Return a memoized encrypt-to-TDF factory for the current test. + + Two callers with identical inputs share a ciphertext; differing inputs + produce distinct ciphertexts. Use rt_file() to get a test-unique output path. + """ + label = request.node.originalname or request.node.name + return EncryptFactory(label, pt_file, tmp_dir, _encryption_cache) diff --git a/xtest/test_abac.py b/xtest/test_abac.py index f32e7b35..08b663e5 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -8,9 +8,9 @@ import tdfs from abac import Attribute, ObligationValue from audit_logs import AuditLogAsserter +from fixtures.encryption import EncryptFactory from test_policytypes import skip_rts_as_needed -cipherTexts: dict[str, Path] = {} rewrap_403_pattern = ( "tdf: rewrap request 403|403 for \\[https?://[^\\]]+\\]; rewrap permission denied" ) @@ -70,13 +70,11 @@ def test_key_mapping_multiple_mechanisms( attribute_with_different_kids: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_default: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): - global counter - tdfs.skip_if_unsupported(encrypt_sdk, "key_management") skip_dspx2457(encrypt_sdk) skip_dspx1153(encrypt_sdk, decrypt_sdk) @@ -87,32 +85,23 @@ def test_key_mapping_multiple_mechanisms( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"multimechanism-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - # Currently, we only support rsa:2048 and ec:secp256r1 - vals = [ - v - for v in attribute_with_different_kids.value_fqns - if v.endswith("/e1") or v.endswith("/r1") - ] - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=vals, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + # Currently, we only support rsa:2048 and ec:secp256r1 + vals = [ + v + for v in attribute_with_different_kids.value_fqns + if v.endswith("/e1") or v.endswith("/r1") + ] + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert {kao.kid for kao in manifest.encryptionInformation.keyAccess} == {"r1", "e1"} assert manifest.encryptionInformation.keyAccess[0].url == kas_url_default tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"multimechanism-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -121,11 +110,11 @@ def test_key_mapping_extended_mechanisms( attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Test encryption and decryption with extended cryptographic mechanisms. @@ -151,20 +140,11 @@ def test_key_mapping_extended_mechanisms( attr, key_ids = attribute_allof_with_extended_mechanisms - sample_name = f"extended-mechanisms-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 5 @@ -189,7 +169,7 @@ def test_key_mapping_extended_mechanisms( tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") # Decrypt and verify - rt_file = tmp_dir / f"extended-mechanisms-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -198,10 +178,10 @@ def test_key_mapping_extended_ec_mechanisms( attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Test encryption and decryption with extended cryptographic mechanisms. @@ -227,20 +207,11 @@ def test_key_mapping_extended_ec_mechanisms( ec_vals = [v for v in attr.value_fqns if "ec-secp3" in v] assert len(ec_kids) == len(ec_vals), "Mismatch in EC key IDs and attribute values" - sample_name = f"extended-mechanisms-ec-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=ec_vals, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=ec_vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == len(ec_kids) @@ -259,7 +230,7 @@ def test_key_mapping_extended_ec_mechanisms( ) # Decrypt and verify - rt_file = tmp_dir / f"extended-mechanisms-ec-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -268,10 +239,10 @@ def test_key_mapping_extended_rsa_mechanisms( attribute_allof_with_extended_mechanisms: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Test encryption and decryption with extended cryptographic mechanisms. @@ -296,20 +267,11 @@ def test_key_mapping_extended_rsa_mechanisms( "Mismatch in RSA key IDs and attribute values" ) - sample_name = f"extended-mechanisms-rsa-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=rsa_vals, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=rsa_vals, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == len(rsa_kids) @@ -328,7 +290,7 @@ def test_key_mapping_extended_rsa_mechanisms( ) # Decrypt and verify - rt_file = tmp_dir / f"extended-mechanisms-rsa-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -337,14 +299,12 @@ def test_autoconfigure_one_attribute_standard( attribute_single_kas_grant: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_alpha: str, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, + encrypted_tdf: EncryptFactory, ): - global counter - skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -353,20 +313,11 @@ def test_autoconfigure_one_attribute_standard( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-one-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attribute_single_kas_grant.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attribute_single_kas_grant.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 assert manifest.encryptionInformation.keyAccess[0].url == kas_url_alpha @@ -378,7 +329,7 @@ def test_autoconfigure_one_attribute_standard( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-one-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -394,12 +345,12 @@ def test_autoconfigure_two_kas_or_standard( attribute_two_kas_grant_or: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_alpha: str, kas_url_beta: str, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -409,23 +360,14 @@ def test_autoconfigure_two_kas_or_standard( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-two-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attribute_two_kas_grant_or.value_fqns[0], - attribute_two_kas_grant_or.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attribute_two_kas_grant_or.value_fqns[0], + attribute_two_kas_grant_or.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 assert ( @@ -443,7 +385,7 @@ def test_autoconfigure_two_kas_or_standard( # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") - rt_file = tmp_dir / f"test-abac-or-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -456,12 +398,12 @@ def test_autoconfigure_double_kas_and( attribute_two_kas_grant_and: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_alpha: str, kas_url_beta: str, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -471,23 +413,14 @@ def test_autoconfigure_double_kas_and( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-three-and-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attribute_two_kas_grant_and.value_fqns[0], - attribute_two_kas_grant_and.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attribute_two_kas_grant_and.value_fqns[0], + attribute_two_kas_grant_and.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -506,7 +439,7 @@ def test_autoconfigure_double_kas_and( # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") - rt_file = tmp_dir / f"test-abac-and-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -519,10 +452,10 @@ def test_autoconfigure_one_attribute_attr_grant( one_attribute_attr_kas_grant: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -532,22 +465,13 @@ def test_autoconfigure_one_attribute_attr_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-one-attr-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - one_attribute_attr_kas_grant.value_fqns[0], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + one_attribute_attr_kas_grant.value_fqns[0], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -556,7 +480,7 @@ def test_autoconfigure_one_attribute_attr_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-one-attr-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -565,11 +489,11 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( attr_and_value_kas_grants_or: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -579,23 +503,14 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-attr-val-or-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attr_and_value_kas_grants_or.value_fqns[0], - attr_and_value_kas_grants_or.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attr_and_value_kas_grants_or.value_fqns[0], + attr_and_value_kas_grants_or.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -610,7 +525,7 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-attr-val-or-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -619,11 +534,11 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( attr_and_value_kas_grants_and: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -633,23 +548,14 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-attr-val-and-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - attr_and_value_kas_grants_and.value_fqns[0], - attr_and_value_kas_grants_and.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + attr_and_value_kas_grants_and.value_fqns[0], + attr_and_value_kas_grants_and.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -664,7 +570,7 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-attr-val-and-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -673,10 +579,10 @@ def test_autoconfigure_one_attribute_ns_grant( one_attribute_ns_kas_grant: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_delta: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -686,22 +592,13 @@ def test_autoconfigure_one_attribute_ns_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-one-ns-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - one_attribute_ns_kas_grant.value_fqns[0], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + one_attribute_ns_kas_grant.value_fqns[0], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -710,7 +607,7 @@ def test_autoconfigure_one_attribute_ns_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-one-ns-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -719,11 +616,11 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( ns_and_value_kas_grants_or: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_delta: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -733,23 +630,14 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-ns-val-or-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - ns_and_value_kas_grants_or.value_fqns[0], - ns_and_value_kas_grants_or.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + ns_and_value_kas_grants_or.value_fqns[0], + ns_and_value_kas_grants_or.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -764,7 +652,7 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-ns-val-or-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -773,11 +661,11 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( ns_and_value_kas_grants_and: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_delta: str, kas_url_alpha: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_dspx1153(encrypt_sdk, decrypt_sdk) if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -787,23 +675,14 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"test-abac-ns-val-and-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[ - ns_and_value_kas_grants_and.value_fqns[0], - ns_and_value_kas_grants_and.value_fqns[1], - ], - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[ + ns_and_value_kas_grants_and.value_fqns[0], + ns_and_value_kas_grants_and.value_fqns[1], + ], + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -818,7 +697,7 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"test-abac-ns-val-and-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -1024,11 +903,11 @@ def test_autoconfigure_key_management_two_kas_two_keys( attribute_allof_with_two_managed_keys: tuple[Attribute, list[str]], encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypts with an ALL_OF attribute that has two managed keys and decrypts successfully. @@ -1044,20 +923,11 @@ def test_autoconfigure_key_management_two_kas_two_keys( tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - sample_name = f"km-allof-two-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attribute_allof_with_two_managed_keys[0].value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attribute_allof_with_two_managed_keys[0].value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -1075,7 +945,7 @@ def test_autoconfigure_key_management_two_kas_two_keys( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"km-allof-two-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -1083,10 +953,10 @@ def test_autoconfigure_key_management_two_kas_two_keys( def test_encrypt_with_missing_value_uses_definition_key( attribute_missing_value_key_mapping: tuple[str, str], encrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_gamma: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypts with a missing value FQN and verifies definition-level key mapping.""" if not in_focus & {encrypt_sdk}: @@ -1097,19 +967,10 @@ def test_encrypt_with_missing_value_uses_definition_key( missing_value_fqn, key_id = attribute_missing_value_key_mapping - sample_name = f"missing-value-def-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=[missing_value_fqn], - ) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=[missing_value_fqn], + ) manifest = tdfs.manifest(ct_file) policy = manifest.encryptionInformation.policy_object diff --git a/xtest/test_legacy.py b/xtest/test_legacy.py index 4db0d81c..4a221ff1 100644 --- a/xtest/test_legacy.py +++ b/xtest/test_legacy.py @@ -24,7 +24,7 @@ def test_decrypt_small( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("small-java-4.3.0-e0f8caf.tdf") - rt_file = tmp_dir / "small-java.untdf" + rt_file = tmp_dir / f"small-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 5 * 2**10 @@ -44,7 +44,7 @@ def test_decrypt_big( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("big-java-4.3.0-e0f8caf.tdf") - rt_file = tmp_dir / "big-java.untdf" + rt_file = tmp_dir / f"big-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 10 * 2**20 @@ -65,7 +65,7 @@ def test_decrypt_SDKv0_7_5( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("xstext-java-v0.7.5-94b161d53-DSP2.0.2_and_2.0.3.tdf") - rt_file = tmp_dir / "0.7.5-java.untdf" + rt_file = tmp_dir / f"0.7.5-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 102 @@ -82,7 +82,7 @@ def test_decrypt_SDKv0_7_8( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("xstext-java-v0.7.8-7f487c2-DSP2.0.4.tdf") - rt_file = tmp_dir / "0.7.8-java.untdf" + rt_file = tmp_dir / f"0.7.8-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 92 @@ -99,7 +99,7 @@ def test_decrypt_SDKv0_9_0( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("xstext-java-v0.9.0-2de6a49-DSP2.0.5.1_and_2.0.6.tdf") - rt_file = tmp_dir / "0.9.0-java.untdf" + rt_file = tmp_dir / f"0.9.0-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 92 @@ -115,7 +115,7 @@ def test_decrypt_no_splitid( if not decrypt_sdk.supports("hexless"): pytest.skip("Decrypting hexless files is not supported") ct_file = get_golden_file("no-splitids-java.tdf") - rt_file = tmp_dir / "no-splitids-java.untdf" + rt_file = tmp_dir / f"no-splitids-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf") file_stats = os.stat(rt_file) assert file_stats.st_size == 5 * 2**10 @@ -135,7 +135,7 @@ def test_decrypt_object_statement_value_json( if not decrypt_sdk.supports("assertion_verification"): pytest.skip("assertion_verification is not supported") ct_file = get_golden_file("with-json-object-assertions-java.tdf") - rt_file = tmp_dir / "with-json-object-assertions-java.untdf" + rt_file = tmp_dir / f"with-json-object-assertions-java-{decrypt_sdk}.untdf" decrypt_sdk.decrypt(ct_file, rt_file, container="ztdf", verify_assertions=False) with rt_file.open("rb") as f: assert f.read().decode("utf-8") == "text" diff --git a/xtest/test_policytypes.py b/xtest/test_policytypes.py index a7bf62c8..95495434 100644 --- a/xtest/test_policytypes.py +++ b/xtest/test_policytypes.py @@ -7,8 +7,7 @@ import tdfs from abac import Attribute - -cipherTexts: dict[str, Path] = {} +from fixtures.encryption import EncryptFactory def skip_rts_as_needed( @@ -41,10 +40,10 @@ def test_or_attributes_success( attribute_with_or_type: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): skip_rts_as_needed(encrypt_sdk, decrypt_sdk, container, in_focus) @@ -61,26 +60,15 @@ def test_or_attributes_success( assert len([v.fqn for v in vals_to_use if v.fqn is None]) == 0 fqns = [v.fqn for v in vals_to_use if v.fqn is not None] assert len(fqns) == len(vals_to_use) - short_names = [v.value for v in vals_to_use] - assert len(short_names) == len(vals_to_use) - sample_name = f"pt-or-{'-'.join(short_names)}-{encrypt_sdk}.{container}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}" - # Currently, we only support rsa:2048 and ec:secp256r1 - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - assert_expected_attrs(container, None, ct_file, fqns) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + container=container, + attr_values=fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"{sample_name}.returned" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -121,10 +109,10 @@ def test_and_attributes_success( attribute_with_and_type: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """ Test AND attribute policy type. @@ -147,25 +135,15 @@ def test_and_attributes_success( assert len([v.fqn for v in vals_to_use if v.fqn is None]) == 0 fqns = [v.fqn for v in vals_to_use if v.fqn is not None] assert len(fqns) == len(vals_to_use) - short_names = [v.value for v in vals_to_use] - assert len(short_names) == len(vals_to_use) - sample_name = f"pt-and-{'-'.join(short_names)}-{encrypt_sdk}.{container}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - assert_expected_attrs(container, None, ct_file, fqns) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + container=container, + attr_values=fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"{sample_name}.returned" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) @@ -175,10 +153,10 @@ def test_hierarchy_attributes_success( attribute_with_hierarchy_type: Attribute, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """ Test HIERARCHY attribute policy type. @@ -204,25 +182,15 @@ def test_hierarchy_attributes_success( assert len([v.fqn for v in vals_to_use if v.fqn is None]) == 0 fqns = [v.fqn for v in vals_to_use if v.fqn is not None] assert len(fqns) == len(vals_to_use) - short_names = [v.value for v in vals_to_use] - assert len(short_names) == len(vals_to_use) - sample_name = f"pt-hierarchy-{'-'.join(short_names)}-{encrypt_sdk}.{container}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) - assert_expected_attrs(container, None, ct_file, fqns) - cipherTexts[sample_name] = ct_file + ct_file = encrypted_tdf( + encrypt_sdk, + container=container, + attr_values=fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) + assert_expected_attrs(container, None, ct_file, fqns) - rt_file = tmp_dir / f"{sample_name}.returned" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_or_dont( decrypt_sdk, pt_file, container, expect_success, ct_file, rt_file ) diff --git a/xtest/test_pqc.py b/xtest/test_pqc.py index dc8536e1..1d226cc6 100644 --- a/xtest/test_pqc.py +++ b/xtest/test_pqc.py @@ -12,10 +12,9 @@ import tdfs from abac import Attribute, KasKey +from fixtures.encryption import EncryptFactory from tdfs import KeyAccessObject -cipherTexts: dict[str, Path] = {} - # X-Wing KEM sizes per draft-connolly-cfrg-xwing-kem-10 XWING_ENCAPSULATION_KEY_SIZE = 1216 # public key, bytes XWING_CIPHERTEXT_SIZE = 1120 # KEM ciphertext (wrappedKey), bytes @@ -61,10 +60,10 @@ def test_xwing_roundtrip( key_xwing: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with an X-Wing managed key.""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -79,20 +78,11 @@ def test_xwing_roundtrip( attr, key_ids = attribute_with_xwing_key - sample_name = f"xwing-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -111,7 +101,7 @@ def test_xwing_roundtrip( assert_xwing_kao_sizes(kao) assert_xwing_public_key_size(key_xwing) - rt_file = tmp_dir / f"xwing-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -121,11 +111,11 @@ def test_xwing_with_ec_roundtrip( key_xwing: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, kas_url_km2: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with both X-Wing and EC keys (multi-mechanism).""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -140,20 +130,11 @@ def test_xwing_with_ec_roundtrip( attr, key_ids = attribute_with_xwing_and_ec_keys - sample_name = f"xwing-ec-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 2 @@ -185,7 +166,7 @@ def test_xwing_with_ec_roundtrip( kao.type == "ec-wrapped" for kao in manifest.encryptionInformation.keyAccess ): tdfs.skip_if_unsupported(decrypt_sdk, "ecwrap") - rt_file = tmp_dir / f"xwing-ec-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -195,10 +176,10 @@ def test_secpmlkem_3_roundtrip( key_secpmlkem_3: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with an X-Wing managed key.""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -213,20 +194,11 @@ def test_secpmlkem_3_roundtrip( attr, key_ids = attribute_with_secpmlkem_3_key - sample_name = f"secpmlkem_3-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -252,7 +224,7 @@ def test_secpmlkem_3_roundtrip( f"public key DER should be >= {XWING_ENCAPSULATION_KEY_SIZE} bytes, got {der_len}" ) - rt_file = tmp_dir / f"secpmlkem_3-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -262,10 +234,10 @@ def test_secpmlkem_5_roundtrip( key_secpmlkem_5: KasKey, encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - tmp_dir: Path, pt_file: Path, kas_url_km1: str, in_focus: set[tdfs.SDK], + encrypted_tdf: EncryptFactory, ): """Encrypt and decrypt with an X-Wing managed key.""" if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -280,20 +252,11 @@ def test_secpmlkem_5_roundtrip( attr, key_ids = attribute_with_secpmlkem_5_key - sample_name = f"secpmlkem_3-{encrypt_sdk}" - if sample_name in cipherTexts: - ct_file = cipherTexts[sample_name] - else: - ct_file = tmp_dir / f"{sample_name}.tdf" - cipherTexts[sample_name] = ct_file - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container="ztdf", - attr_values=attr.value_fqns, - target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), - ) + ct_file = encrypted_tdf( + encrypt_sdk, + attr_values=attr.value_fqns, + target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), + ) manifest = tdfs.manifest(ct_file) assert len(manifest.encryptionInformation.keyAccess) == 1 @@ -319,6 +282,6 @@ def test_secpmlkem_5_roundtrip( f"public key DER should be >= {XWING_ENCAPSULATION_KEY_SIZE} bytes, got {der_len}" ) - rt_file = tmp_dir / f"secpmlkem_3-{encrypt_sdk}-{decrypt_sdk}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index bfa896f1..a17fea06 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -11,80 +11,7 @@ import tdfs from abac import Attribute from audit_logs import AuditLogAsserter - -cipherTexts: dict[str, Path] = {} -counter = 0 - -#### HELPERS - - -def do_encrypt_with( - pt_file: Path, - encrypt_sdk: tdfs.SDK, - container: tdfs.container_type, - tmp_dir: Path, - az: str = "", - scenario: str = "", - target_mode: tdfs.container_version | None = None, - attr_values: list[str] | None = None, -) -> Path: - """ - Encrypt a file with the given SDK and container type, and return the path to the ciphertext file. - - Scenario is used to create a unique filename for the ciphertext file. - - If targetmode is set, asserts that the manifest is in the correct format for that target. - - If attr_values is provided, uses those attribute FQNs to ensure deterministic key selection. - This prevents test flakiness when base_key is configured on the platform. - """ - global counter - counter = (counter or 0) + 1 - c = counter - container_id = f"{encrypt_sdk}-{container}" - if scenario != "": - container_id += f"-{scenario}" - if container_id in cipherTexts: - return cipherTexts[container_id] - ct_file = tmp_dir / f"test-{encrypt_sdk}-{scenario}{c}.{container}" - - use_ecwrap = container == "ztdf-ecwrap" - encrypt_sdk.encrypt( - pt_file, - ct_file, - mime_type="text/plain", - container=container, - attr_values=attr_values, - assert_value=az, - target_mode=target_mode, - ) - - assert ct_file.is_file() - - if tdfs.simple_container(container) == "ztdf": - manifest = tdfs.manifest(ct_file) - assert manifest.payload.isEncrypted - assert len(manifest.encryptionInformation.keyAccess) == 1 - kao = manifest.encryptionInformation.keyAccess[0] - if use_ecwrap: - assert kao.type == "ec-wrapped" - assert kao.ephemeralPublicKey is not None - else: - assert kao.type == "wrapped" - assert kao.ephemeralPublicKey is None - if target_mode == "4.2.2": - looks_like_422(manifest) - elif target_mode == "4.3.0": - looks_like_430(manifest) - elif not encrypt_sdk.supports("hexless"): - looks_like_422(manifest) - else: - looks_like_430(manifest) - else: - assert False, f"Unknown container type: {container}" - cipherTexts[container_id] = ct_file - return ct_file - +from fixtures.encryption import EncryptFactory dspx1153Fails = [] @@ -102,11 +29,11 @@ def test_tdf_roundtrip( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, container: tdfs.container_type, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if container == "ztdf" and decrypt_sdk in dspx1153Fails: pytest.skip(f"DSPX-1153 SDK [{decrypt_sdk}] has a bug with payload tampering") @@ -133,17 +60,31 @@ def test_tdf_roundtrip( attr_values = ( None if container == "ztdf-ecwrap" else attribute_default_rsa.value_fqns ) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - container, - tmp_dir, + container=container, target_mode=target_mode, attr_values=attr_values, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + manifest = tdfs.manifest(ct_file) + assert manifest.payload.isEncrypted + assert len(manifest.encryptionInformation.keyAccess) == 1 + kao = manifest.encryptionInformation.keyAccess[0] + if container == "ztdf-ecwrap": + assert kao.type == "ec-wrapped" + assert kao.ephemeralPublicKey is not None + else: + assert kao.type == "wrapped" + assert kao.ephemeralPublicKey is None + if target_mode == "4.2.2" or ( + target_mode is None and not encrypt_sdk.supports("hexless") + ): + looks_like_422(manifest) + else: + looks_like_430(manifest) + + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) # Mark timestamp before decrypt for audit log correlation mark = audit_logs.mark("before_decrypt") @@ -159,7 +100,7 @@ def test_tdf_roundtrip( and decrypt_sdk.supports("ecwrap") and "ecwrap" in pfs.features ): - ert_file = tmp_dir / f"{fname}-ecrewrap.untdf" + ert_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk, variant="ecrewrap") ec_mark = audit_logs.mark("before_ecwrap_decrypt") decrypt_sdk.decrypt(ct_file, ert_file, container, ecwrap=True) assert filecmp.cmp(pt_file, ert_file) @@ -171,9 +112,9 @@ def test_tdf_spec_target_422( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) @@ -186,18 +127,13 @@ def test_tdf_spec_target_422( f"Encrypt SDK {encrypt_sdk} doesn't support targeting container format 4.2.2" ) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="target-422", target_mode="4.2.2", attr_values=attribute_default_rsa.value_fqns, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -275,18 +211,14 @@ def looks_like_430(manifest: tdfs.Manifest): def test_manifest_validity( encrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk}: pytest.skip("Not in focus") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, attr_values=attribute_default_rsa.value_fqns, ) @@ -295,22 +227,17 @@ def test_manifest_validity( def test_manifest_validity_with_assertions( encrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk}: pytest.skip("Not in focus") if not encrypt_sdk.supports("assertions"): pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions", az=assertion_file_no_keys, attr_values=attribute_default_rsa.value_fqns, ) @@ -325,10 +252,10 @@ def test_tdf_assertions_unkeyed( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): pfs = tdfs.get_platform_features() if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -339,18 +266,13 @@ def test_tdf_assertions_unkeyed( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertions"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertions") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions", az=assertion_file_no_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt(ct_file, rt_file, "ztdf") assert filecmp.cmp(pt_file, rt_file) @@ -359,11 +281,11 @@ def test_tdf_assertions_with_keys( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_rs_and_hs_keys: str, assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): pfs = tdfs.get_platform_features() if not in_focus & {encrypt_sdk, decrypt_sdk}: @@ -374,18 +296,13 @@ def test_tdf_assertions_with_keys( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertion_verification"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertion_verification") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions-keys-roundtrip", az=assertion_file_rs_and_hs_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt( ct_file, @@ -400,11 +317,11 @@ def test_tdf_assertions_422_format( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, pt_file: Path, - tmp_dir: Path, assertion_file_rs_and_hs_keys: str, assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -418,19 +335,15 @@ def test_tdf_assertions_422_format( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertion_verification"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertion_verification") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions-422-keys-roundtrip", az=assertion_file_rs_and_hs_keys, target_mode="4.2.2", attr_values=attribute_default_rsa.value_fqns, ) + looks_like_422(tdfs.manifest(ct_file)) - fname = ct_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(ct_file, decrypt_sdk) decrypt_sdk.decrypt( ct_file, @@ -598,31 +511,22 @@ def assert_kas_request_error( def test_tdf_with_unbound_policy( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], - audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("unbound_policy", ct_file, change_policy) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" - - # Mark timestamp before tampered decrypt for audit log correlation - # mark = audit_logs.mark("before_tampered_decrypt") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) @@ -630,40 +534,27 @@ def test_tdf_with_unbound_policy( except subprocess.CalledProcessError as exc: assert_kas_request_error(exc, decrypt_sdk) - # Verify rewrap failure was logged (policy binding mismatch) - # FIXME: Audit logs are not present on failed bindings - # audit_logs.assert_rewrap_error(min_count=1, since_mark=mark) - def test_tdf_with_altered_policy_binding( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], - audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest( "altered_policy_binding", ct_file, change_policy_binding ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" - - # Mark timestamp before tampered decrypt for audit log correlation - # mark = audit_logs.mark("before_tampered_decrypt") + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) @@ -671,10 +562,6 @@ def test_tdf_with_altered_policy_binding( except subprocess.CalledProcessError as exc: assert_kas_request_error(exc, decrypt_sdk) - # Verify rewrap failure was logged (policy binding mismatch) - # FIXME: Audit logs are not present on failed bindings - # audit_logs.assert_rewrap_error(min_count=1, since_mark=mark) - ## INTEGRITY TAMPER TESTS @@ -682,27 +569,22 @@ def test_tdf_with_altered_policy_binding( def test_tdf_with_altered_root_sig( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("broken_root_sig", ct_file, change_root_signature) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -713,27 +595,22 @@ def test_tdf_with_altered_root_sig( def test_tdf_with_altered_seg_sig_wrong( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("broken_seg_sig", ct_file, change_segment_hash) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt( b_file, rt_file, "ztdf", expect_error=True, verify_assertions=False @@ -749,29 +626,24 @@ def test_tdf_with_altered_seg_sig_wrong( def test_tdf_with_altered_enc_seg_size( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest( "broken_enc_seg_sig", ct_file, change_encrypted_segment_size ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -785,11 +657,10 @@ def test_tdf_with_altered_enc_seg_size( def test_tdf_with_altered_assertion_statement( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, assertion_file_no_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -800,12 +671,8 @@ def test_tdf_with_altered_assertion_statement( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertions"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertions") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions", az=assertion_file_no_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, @@ -813,8 +680,7 @@ def test_tdf_with_altered_assertion_statement( b_file = tdfs.update_manifest( "altered_assertion_statement", ct_file, change_assertion_statement ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -825,12 +691,11 @@ def test_tdf_with_altered_assertion_statement( def test_tdf_with_altered_assertion_with_keys( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, assertion_file_rs_and_hs_keys: str, assertion_verification_file_rs_and_hs_keys: str, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ): if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -841,12 +706,8 @@ def test_tdf_with_altered_assertion_with_keys( pytest.skip(f"{encrypt_sdk} sdk doesn't yet support assertions") if not decrypt_sdk.supports("assertion_verification"): pytest.skip(f"{decrypt_sdk} sdk doesn't yet support assertion_verification") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, - scenario="assertions-keys-roundtrip-altered", az=assertion_file_rs_and_hs_keys, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, @@ -854,8 +715,7 @@ def test_tdf_with_altered_assertion_with_keys( b_file = tdfs.update_manifest( "altered_assertion_statement", ct_file, change_assertion_statement ) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt( b_file, @@ -875,10 +735,9 @@ def test_tdf_with_altered_assertion_with_keys( def test_tdf_altered_payload_end( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -887,17 +746,13 @@ def test_tdf_altered_payload_end( pfs = tdfs.get_platform_features() tdfs.skip_connectrpc_skew(encrypt_sdk, decrypt_sdk, pfs) tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, target_mode=tdfs.select_target_version(encrypt_sdk, decrypt_sdk), attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_payload("altered_payload_end", ct_file, change_payload_end) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) try: decrypt_sdk.decrypt(b_file, rt_file, "ztdf", expect_error=True) assert False, "decrypt succeeded unexpectedly" @@ -911,11 +766,10 @@ def test_tdf_altered_payload_end( def test_tdf_with_malicious_kao( encrypt_sdk: tdfs.SDK, decrypt_sdk: tdfs.SDK, - pt_file: Path, - tmp_dir: Path, in_focus: set[tdfs.SDK], audit_logs: AuditLogAsserter, attribute_default_rsa: Attribute, + encrypted_tdf: EncryptFactory, ) -> None: if not in_focus & {encrypt_sdk, decrypt_sdk}: pytest.skip("Not in focus") @@ -924,16 +778,12 @@ def test_tdf_with_malicious_kao( tdfs.skip_hexless_skew(encrypt_sdk, decrypt_sdk) if not decrypt_sdk.supports("kasallowlist"): pytest.skip(f"{encrypt_sdk} sdk doesn't yet support an allowlist for kases") - ct_file = do_encrypt_with( - pt_file, + ct_file = encrypted_tdf( encrypt_sdk, - "ztdf", - tmp_dir, attr_values=attribute_default_rsa.value_fqns, ) b_file = tdfs.update_manifest("malicious_kao", ct_file, malicious_kao) - fname = b_file.stem - rt_file = tmp_dir / f"{fname}.untdf" + rt_file = encrypted_tdf.rt_file(b_file, decrypt_sdk) # Mark timestamp - note: this test may not generate a rewrap audit event # because the SDK should reject the malicious KAO before calling the KAS