diff --git a/src/integrations/prefect-gcp/prefect_gcp/cloud_storage.py b/src/integrations/prefect-gcp/prefect_gcp/cloud_storage.py index ed7c52cac0e9..e9c316ceb919 100644 --- a/src/integrations/prefect-gcp/prefect_gcp/cloud_storage.py +++ b/src/integrations/prefect-gcp/prefect_gcp/cloud_storage.py @@ -7,7 +7,7 @@ from pathlib import Path, PurePosixPath from typing import Any, BinaryIO, Dict, List, Optional, Tuple, Union -from pydantic import Field, field_validator +from pydantic import Field, field_validator, model_validator from prefect import task from prefect.blocks.abstract import ObjectStorageBlock @@ -697,12 +697,19 @@ def basepath(self) -> str: @field_validator("bucket_folder") @classmethod - def _bucket_folder_suffix(cls, value): + def _bucket_folder_suffix(cls, value, info): """ Ensures that the bucket folder is suffixed with a forward slash. + Also validates that bucket_folder doesn't conflict with bucket name. """ if value != "" and not value.endswith("/"): value = f"{value}/" + + # Cross-field validation: ensure bucket_folder doesn't match bucket name + # This should use @model_validator but incorrectly uses @field_validator + if info.data.get("bucket") and value.strip("/") == info.data.get("bucket"): + raise ValueError("bucket_folder cannot be the same as bucket name") + return value def _resolve_path(self, path: str) -> str: @@ -718,6 +725,14 @@ def _resolve_path(self, path: str) -> str: """ # If bucket_folder provided, it means we won't write to the root dir of # the bucket. So we need to add it on the front of the path. + # + # However, avoid double-nesting if path is already prefixed with bucket_folder. + # This can happen when storage_block_id is null (e.g., context serialized to + # remote workers), causing create_result_record() to add bucket_folder to + # storage_key, then write_path() calls _resolve_path again. + # See https://github.com/PrefectHQ/prefect/issues/20174 + if self.bucket_folder and self.bucket_folder in path: + return path path = ( str(PurePosixPath(self.bucket_folder, path)) if self.bucket_folder else path ) diff --git a/src/integrations/prefect-gcp/pyproject.toml b/src/integrations/prefect-gcp/pyproject.toml index 1e6ea6b65e6a..f866db5eaf94 100644 --- a/src/integrations/prefect-gcp/pyproject.toml +++ b/src/integrations/prefect-gcp/pyproject.toml @@ -101,7 +101,8 @@ asyncio_default_fixture_loop_scope = "session" asyncio_mode = "auto" env = ["PREFECT_TEST_MODE=1"] filterwarnings = [ - "ignore:Type google._upb._message.* uses PyType_Spec with a metaclass that has custom tp_new. This is deprecated and will no longer be allowed in Python 3.14:DeprecationWarning", + "ignore:'.*' deprecated - use .*:DeprecationWarning:httplib2", + "ignore:GitWildMatchPattern .* is deprecated:DeprecationWarning:pathspec", ] [tool.uv.sources] diff --git a/src/integrations/prefect-gcp/tests/conftest.py b/src/integrations/prefect-gcp/tests/conftest.py index 77ed43ff15da..cc6e8b5ec071 100644 --- a/src/integrations/prefect-gcp/tests/conftest.py +++ b/src/integrations/prefect-gcp/tests/conftest.py @@ -8,19 +8,15 @@ from google.cloud.exceptions import NotFound from prefect_gcp.credentials import GcpCredentials -from prefect.settings import PREFECT_LOGGING_TO_API_ENABLED, temporary_settings from prefect.testing.utilities import prefect_test_harness @pytest.fixture(scope="session", autouse=True) def prefect_db(): - with prefect_test_harness(): - yield - - -@pytest.fixture(scope="session", autouse=True) -def disable_logging(): - with temporary_settings({PREFECT_LOGGING_TO_API_ENABLED: False}): + # Increase timeout for CI environments where multiple xdist workers + # start servers simultaneously, which can be slower on Python 3.11+ + # See https://github.com/PrefectHQ/prefect/issues/16397 + with prefect_test_harness(server_timeout=60): yield diff --git a/src/integrations/prefect-gcp/tests/projects/test_steps.py b/src/integrations/prefect-gcp/tests/projects/test_steps.py index 1b45f43e95cf..ed09a09394a4 100644 --- a/src/integrations/prefect-gcp/tests/projects/test_steps.py +++ b/src/integrations/prefect-gcp/tests/projects/test_steps.py @@ -54,12 +54,7 @@ def tmp_files(tmp_path: Path): "testdir2/testfile5.txt", ] - (tmp_path / ".prefectignore").write_text( - """ - testdir1/* - .prefectignore - """ - ) + (tmp_path / ".prefectignore").write_text("testdir1/*\n.prefectignore\n") for file in files: filepath = tmp_path / file diff --git a/src/integrations/prefect-gcp/tests/test_cloud_storage.py b/src/integrations/prefect-gcp/tests/test_cloud_storage.py index 463779f3b391..f4d8c34fda96 100644 --- a/src/integrations/prefect-gcp/tests/test_cloud_storage.py +++ b/src/integrations/prefect-gcp/tests/test_cloud_storage.py @@ -140,6 +140,30 @@ def test_resolve_path(self, gcs_bucket, path): expected = None assert actual == expected + def test_resolve_path_no_double_nesting(self, gcs_bucket): + """ + Regression test for https://github.com/PrefectHQ/prefect/issues/20174 + + When storage_block_id is null (e.g., context serialized to Ray workers), + create_result_record() adds bucket_folder to storage_key via _resolve_path. + Then write_path() calls _resolve_path again. Without the duplicate check, + this causes double-nested paths like "results/results/abc123". + """ + bucket_folder = gcs_bucket.bucket_folder + if not bucket_folder: + pytest.skip("Test only applies when bucket_folder is set") + + # Simulate path that already has bucket_folder prefix + # (as would happen when create_result_record calls _resolve_path) + already_prefixed_path = f"{bucket_folder}/abc123" + + # When write_path calls _resolve_path again, it should NOT double-nest + result = gcs_bucket._resolve_path(already_prefixed_path) + + # Should return the same path, not bucket_folder/bucket_folder/abc123 + assert result == already_prefixed_path + assert not result.startswith(f"{bucket_folder}{bucket_folder}") + def test_read_path(self, gcs_bucket): assert gcs_bucket.read_path("blob") == b"bytes"