From 0534ab9e7c4571a1cca1c877a590eabfe314f627 Mon Sep 17 00:00:00 2001 From: actae0n <19864268+xpcmdshell@users.noreply.github.com> Date: Mon, 6 Apr 2026 02:38:38 -0700 Subject: [PATCH 1/2] Add workspace-scoped storage support Implement workspace_id-based scoping for FileStorage and RedisStorage. Scope artifacts, workflows, and vectors by workspace while preserving legacy behavior when workspace_id is omitted. Propagate workspace scope through bootstrap and executor wiring, and add coverage for isolation, sharing, defaults, validation, and bootstrap roundtrip behavior. --- src/py_code_mode/bootstrap.py | 12 +- .../execution/container/executor.py | 7 +- src/py_code_mode/execution/protocol.py | 2 + .../execution/subprocess/namespace.py | 7 +- src/py_code_mode/storage/backends.py | 79 ++- tests/test_bootstrap.py | 133 ++++- tests/test_workspace_scoped_storage.py | 530 ++++++++++++++++++ 7 files changed, 742 insertions(+), 28 deletions(-) create mode 100644 tests/test_workspace_scoped_storage.py diff --git a/src/py_code_mode/bootstrap.py b/src/py_code_mode/bootstrap.py index b122206..7016797 100644 --- a/src/py_code_mode/bootstrap.py +++ b/src/py_code_mode/bootstrap.py @@ -52,8 +52,10 @@ async def bootstrap_namespaces(config: dict[str, Any]) -> NamespaceBundle: Args: config: Dict with "type" key ("file" or "redis") and type-specific fields. - - For "file": {"type": "file", "base_path": str, "tools_path": str|None} + - For "file": {"type": "file", "base_path": str, "workspace_id": str|None, + "tools_path": str|None} - For "redis": {"type": "redis", "url": str, "prefix": str, + "workspace_id": str|None, "tools_path": str|None} - tools_path is optional; if provided, tools load from that directory @@ -128,7 +130,8 @@ async def _bootstrap_file_storage(config: dict[str, Any]) -> NamespaceBundle: from py_code_mode.storage import FileStorage base_path = Path(config["base_path"]) - storage = FileStorage(base_path) + workspace_id = config.get("workspace_id") + storage = FileStorage(base_path, workspace_id=workspace_id) tools_ns = await _load_tools_namespace(config.get("tools_path")) artifact_store = storage.get_artifact_store() @@ -159,15 +162,16 @@ async def _bootstrap_redis_storage(config: dict[str, Any]) -> NamespaceBundle: url = config["url"] prefix = config["prefix"] + workspace_id = config.get("workspace_id") # Connect to Redis - storage = RedisStorage(url=url, prefix=prefix) + storage = RedisStorage(url=url, prefix=prefix, workspace_id=workspace_id) tools_ns = await _load_tools_namespace(config.get("tools_path")) artifact_store = storage.get_artifact_store() # Create deps namespace - deps_store = RedisDepsStore(storage.client, prefix=f"{prefix}:deps") + deps_store = RedisDepsStore(storage.client, prefix=prefix) installer = PackageInstaller() deps_ns = DepsNamespace(deps_store, installer) diff --git a/src/py_code_mode/execution/container/executor.py b/src/py_code_mode/execution/container/executor.py index ec16903..6a7adf7 100644 --- a/src/py_code_mode/execution/container/executor.py +++ b/src/py_code_mode/execution/container/executor.py @@ -404,8 +404,9 @@ async def start( workflows_path = access.workflows_path artifacts_path = access.artifacts_path deps_path = None - if artifacts_path is not None: - deps_path = artifacts_path.parent / "deps" + deps_root = access.root_path or artifacts_path.parent + if deps_root is not None: + deps_path = deps_root / "deps" # Create directories on host before mounting # Workflows need to exist for volume mount if workflows_path: @@ -425,7 +426,7 @@ async def start( tools_prefix = None # Tools owned by executor workflows_prefix = access.workflows_prefix artifacts_prefix = access.artifacts_prefix - deps_prefix = None # Deps owned by executor + deps_prefix = access.root_prefix or access.workflows_prefix.rsplit(":", 1)[0] else: raise TypeError( f"Unexpected storage access type: {type(access).__name__}. " diff --git a/src/py_code_mode/execution/protocol.py b/src/py_code_mode/execution/protocol.py index 570987e..3b9e8db 100644 --- a/src/py_code_mode/execution/protocol.py +++ b/src/py_code_mode/execution/protocol.py @@ -33,6 +33,7 @@ class FileStorageAccess: workflows_path: Path | None artifacts_path: Path vectors_path: Path | None = None + root_path: Path | None = None @dataclass(frozen=True) @@ -48,6 +49,7 @@ class RedisStorageAccess: workflows_prefix: str artifacts_prefix: str vectors_prefix: str | None = None + root_prefix: str | None = None StorageAccess = FileStorageAccess | RedisStorageAccess diff --git a/src/py_code_mode/execution/subprocess/namespace.py b/src/py_code_mode/execution/subprocess/namespace.py index 80cddd7..f00a3d4 100644 --- a/src/py_code_mode/execution/subprocess/namespace.py +++ b/src/py_code_mode/execution/subprocess/namespace.py @@ -102,8 +102,8 @@ def _build_file_storage_setup_code( repr(str(storage_access.workflows_path)) if storage_access.workflows_path else "None" ) artifacts_path_str = repr(str(storage_access.artifacts_path)) - # Base path is parent of artifacts for deps store - base_path_str = repr(str(storage_access.artifacts_path.parent)) + base_path = storage_access.root_path or storage_access.artifacts_path.parent + base_path_str = repr(str(base_path)) allow_deps_str = "True" if allow_runtime_deps else "False" vectors_path_str = ( repr(str(storage_access.vectors_path)) if storage_access.vectors_path else "None" @@ -416,7 +416,8 @@ def _build_redis_storage_setup_code( vectors_prefix_str = ( repr(storage_access.vectors_prefix) if storage_access.vectors_prefix else "None" ) - deps_prefix_str = repr(f"{storage_access.workflows_prefix.rsplit(':', 1)[0]}:deps") + deps_prefix = storage_access.root_prefix or storage_access.workflows_prefix.rsplit(":", 1)[0] + deps_prefix_str = repr(deps_prefix) allow_deps_str = "True" if allow_runtime_deps else "False" return f'''# Auto-generated namespace setup for SubprocessExecutor (Redis) diff --git a/src/py_code_mode/storage/backends.py b/src/py_code_mode/storage/backends.py index 4377b4d..421e9c6 100644 --- a/src/py_code_mode/storage/backends.py +++ b/src/py_code_mode/storage/backends.py @@ -9,6 +9,7 @@ from __future__ import annotations import logging +import re from pathlib import Path from typing import TYPE_CHECKING, ClassVar, Protocol, runtime_checkable from urllib.parse import quote @@ -45,10 +46,22 @@ logger = logging.getLogger(__name__) +_WORKSPACE_ID_PATTERN = re.compile(r"^[A-Za-z0-9_-]{1,128}$") + if TYPE_CHECKING: from redis import Redis +def _validate_workspace_id(workspace_id: str) -> str: + """Validate a workspace identifier used in paths and Redis prefixes.""" + if not _WORKSPACE_ID_PATTERN.fullmatch(workspace_id): + raise ValueError( + "workspace_id must be 1-128 characters using only ASCII letters, digits, " + "underscores, or hyphens" + ) + return workspace_id + + @runtime_checkable class StorageBackend(Protocol): """Protocol for unified storage backend. @@ -88,14 +101,18 @@ class FileStorage: _UNINITIALIZED: ClassVar[object] = object() - def __init__(self, base_path: Path | str) -> None: + def __init__(self, base_path: Path | str, workspace_id: str | None = None) -> None: """Initialize file storage. Args: base_path: Base directory for storage. Will create workflows/, artifacts/ subdirs. + workspace_id: Optional workspace scope for workflows, artifacts, and vectors. """ self._base_path = Path(base_path) if isinstance(base_path, str) else base_path self._base_path.mkdir(parents=True, exist_ok=True) + self._workspace_id = ( + _validate_workspace_id(workspace_id) if workspace_id is not None else None + ) # Lazy-initialized stores (workflows and artifacts only) self._workflow_library: WorkflowLibrary | None = None @@ -107,21 +124,32 @@ def root(self) -> Path: """Get the root storage path.""" return self._base_path + @property + def workspace_id(self) -> str | None: + """Get the configured workspace scope.""" + return self._workspace_id + + def _get_storage_root(self) -> Path: + """Get the scoped root for workflows, artifacts, and vectors.""" + if self._workspace_id is None: + return self._base_path + return self._base_path / "workspaces" / self._workspace_id + def _get_workflows_path(self) -> Path: """Get the workflows directory path.""" - workflows_path = self._base_path / "workflows" + workflows_path = self._get_storage_root() / "workflows" workflows_path.mkdir(parents=True, exist_ok=True) return workflows_path def _get_artifacts_path(self) -> Path: """Get the artifacts directory path.""" - artifacts_path = self._base_path / "artifacts" + artifacts_path = self._get_storage_root() / "artifacts" artifacts_path.mkdir(parents=True, exist_ok=True) return artifacts_path def _get_vectors_path(self) -> Path: """Get the vectors directory path.""" - vectors_path = self._base_path / "vectors" + vectors_path = self._get_storage_root() / "vectors" vectors_path.mkdir(parents=True, exist_ok=True) return vectors_path @@ -153,13 +181,14 @@ def get_vector_store(self) -> VectorStore | None: def get_serializable_access(self) -> FileStorageAccess: """Return FileStorageAccess for cross-process communication.""" - base_path = self._base_path - vectors_path = base_path / "vectors" + storage_root = self._get_storage_root() + vectors_path = storage_root / "vectors" return FileStorageAccess( - workflows_path=base_path / "workflows", - artifacts_path=base_path / "artifacts", + workflows_path=storage_root / "workflows", + artifacts_path=storage_root / "artifacts", vectors_path=vectors_path if vectors_path.exists() else None, + root_path=self._base_path, ) def get_workflow_library(self) -> WorkflowLibrary: @@ -206,10 +235,13 @@ def to_bootstrap_config(self) -> dict[str, str]: This config can be passed to bootstrap_namespaces() to reconstruct the storage in a subprocess. """ - return { + config = { "type": "file", "base_path": str(self._base_path), } + if self._workspace_id is not None: + config["workspace_id"] = self._workspace_id + return config class RedisStorage: @@ -225,6 +257,7 @@ def __init__( url: str | None = None, redis: Redis | None = None, prefix: str = "py_code_mode", + workspace_id: str | None = None, ) -> None: """Initialize Redis storage. @@ -234,6 +267,7 @@ def __init__( redis: Redis client instance. Use for advanced configurations (custom connection pools, etc.). Mutually exclusive with url. prefix: Key prefix for all storage. Default: "py_code_mode" + workspace_id: Optional workspace scope for workflows, artifacts, and vectors. Raises: ValueError: If neither url nor redis is provided, or if both are. @@ -260,6 +294,12 @@ def __init__( raise ValueError("Redis client is required") self._prefix = prefix + self._workspace_id = ( + _validate_workspace_id(workspace_id) if workspace_id is not None else None + ) + self._storage_prefix = ( + f"{prefix}:ws:{self._workspace_id}" if self._workspace_id is not None else prefix + ) # Lazy-initialized stores (workflows and artifacts only) self._workflow_library: WorkflowLibrary | None = None @@ -271,6 +311,11 @@ def prefix(self) -> str: """Get the configured prefix.""" return self._prefix + @property + def workspace_id(self) -> str | None: + """Get the configured workspace scope.""" + return self._workspace_id + @property def client(self) -> Redis: """Get the Redis client.""" @@ -322,7 +367,7 @@ def get_vector_store(self) -> VectorStore | None: self._vector_store = RedisVectorStore( redis=self._redis, embedder=embedder, - prefix=f"{self._prefix}:vectors", + prefix=f"{self._storage_prefix}:vectors", ) except ImportError: self._vector_store = None @@ -343,7 +388,7 @@ def get_serializable_access(self) -> RedisStorageAccess: else: redis_url = self._reconstruct_redis_url() - prefix = self._prefix + prefix = self._storage_prefix # vectors_prefix is set when RedisVectorStore dependencies are available # (redis-py with RediSearch). We check module availability, not actual # vector store creation, to avoid side effects during serialization. @@ -355,12 +400,13 @@ def get_serializable_access(self) -> RedisStorageAccess: workflows_prefix=f"{prefix}:workflows", artifacts_prefix=f"{prefix}:artifacts", vectors_prefix=vectors_prefix, + root_prefix=self._prefix, ) def get_workflow_library(self) -> WorkflowLibrary: """Return WorkflowLibrary for in-process execution.""" if self._workflow_library is None: - raw_store = RedisWorkflowStore(self._redis, prefix=f"{self._prefix}:workflows") + raw_store = RedisWorkflowStore(self._redis, prefix=f"{self._storage_prefix}:workflows") vector_store = self.get_vector_store() try: self._workflow_library = create_workflow_library( @@ -385,13 +431,13 @@ def get_artifact_store(self) -> ArtifactStoreProtocol: """Return artifact store for in-process execution.""" if self._artifact_store is None: self._artifact_store = RedisArtifactStore( - self._redis, prefix=f"{self._prefix}:artifacts" + self._redis, prefix=f"{self._storage_prefix}:artifacts" ) return self._artifact_store def get_workflow_store(self) -> WorkflowStore: """Return the underlying WorkflowStore for direct access.""" - return RedisWorkflowStore(self._redis, prefix=f"{self._prefix}:workflows") + return RedisWorkflowStore(self._redis, prefix=f"{self._storage_prefix}:workflows") def to_bootstrap_config(self) -> dict[str, str]: """Serialize storage configuration for subprocess bootstrap. @@ -401,8 +447,11 @@ def to_bootstrap_config(self) -> dict[str, str]: This config can be passed to bootstrap_namespaces() to reconstruct the storage in a subprocess. """ - return { + config = { "type": "redis", "url": self._reconstruct_redis_url(), "prefix": self._prefix, } + if self._workspace_id is not None: + config["workspace_id"] = self._workspace_id + return config diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 4440f19..fb8ae40 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -4,9 +4,6 @@ 1. `to_bootstrap_config()` method on storage classes - serializes to dict 2. `bootstrap_namespaces(config)` function - reconstructs storage from config 3. Lazy connections - storage only connects when actually used - -TDD RED phase: These tests are written before implementation. -They will fail until the bootstrap module is implemented. """ from __future__ import annotations @@ -713,6 +710,67 @@ async def test_config_roundtrip(self, tmp_path: Path) -> None: assert workflow is not None assert workflow.name == "greet" + @pytest.mark.asyncio + async def test_config_roundtrip_preserves_workspace_scoping(self, tmp_path: Path) -> None: + """Scoped file storage roundtrip preserves scoped visibility.""" + from py_code_mode.bootstrap import bootstrap_namespaces + from py_code_mode.errors import ArtifactNotFoundError + from py_code_mode.storage import FileStorage + from py_code_mode.workflows import PythonWorkflow + + scoped_storage = FileStorage(tmp_path, workspace_id="client_a") + legacy_storage = FileStorage(tmp_path) + + scoped_storage.get_artifact_store().save( + "scoped.json", + {"scope": "client_a"}, + description="Scoped artifact", + ) + legacy_storage.get_artifact_store().save( + "legacy.json", + {"scope": "legacy"}, + description="Legacy artifact", + ) + + scoped_storage.get_workflow_store().save( + PythonWorkflow.from_source( + name="scoped_workflow", + source='async def run() -> str:\n return "scoped"', + description="Workflow visible only in client_a workspace", + ) + ) + legacy_storage.get_workflow_store().save( + PythonWorkflow.from_source( + name="legacy_workflow", + source='async def run() -> str:\n return "legacy"', + description="Workflow visible only in unscoped storage", + ) + ) + + config = scoped_storage.to_bootstrap_config() + bundle = await bootstrap_namespaces(config) + + assert bundle.artifacts.load("scoped.json") == {"scope": "client_a"} + with pytest.raises(ArtifactNotFoundError): + bundle.artifacts.load("legacy.json") + + assert bundle.workflows.library.get("scoped_workflow") is not None + assert bundle.workflows.library.get("legacy_workflow") is None + + @pytest.mark.asyncio + async def test_config_roundtrip_keeps_file_deps_unscoped(self, tmp_path: Path) -> None: + """Scoped file storage bootstrap keeps deps rooted at the unscoped base path.""" + from py_code_mode.bootstrap import bootstrap_namespaces + from py_code_mode.storage import FileStorage + + scoped_storage = FileStorage(tmp_path, workspace_id="client_a") + + bundle = await bootstrap_namespaces(scoped_storage.to_bootstrap_config()) + bundle.deps._store.add("requests>=2.0") + + assert (tmp_path / "deps" / "requirements.txt").read_text() == "requests>=2.0\n" + assert not (tmp_path / "workspaces" / "client_a" / "deps").exists() + # ============================================================================= # RedisStorage.to_bootstrap_config() Tests @@ -873,6 +931,75 @@ async def test_config_roundtrip(self, mock_redis: MockRedisClient) -> None: assert workflow is not None assert workflow.name == "greet" + @pytest.mark.asyncio + async def test_config_roundtrip_preserves_workspace_scoping( + self, mock_redis: MockRedisClient + ) -> None: + """Scoped Redis storage roundtrip preserves scoped visibility.""" + from py_code_mode.bootstrap import bootstrap_namespaces + from py_code_mode.errors import ArtifactNotFoundError + from py_code_mode.storage import RedisStorage + from py_code_mode.workflows import PythonWorkflow + + scoped_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + legacy_storage = RedisStorage(redis=mock_redis, prefix="app") + + scoped_storage.get_artifact_store().save( + "scoped.json", + {"scope": "client_a"}, + description="Scoped artifact", + ) + legacy_storage.get_artifact_store().save( + "legacy.json", + {"scope": "legacy"}, + description="Legacy artifact", + ) + + scoped_storage.get_workflow_store().save( + PythonWorkflow.from_source( + name="scoped_workflow", + source='async def run() -> str:\n return "scoped"', + description="Workflow visible only in client_a workspace", + ) + ) + legacy_storage.get_workflow_store().save( + PythonWorkflow.from_source( + name="legacy_workflow", + source='async def run() -> str:\n return "legacy"', + description="Workflow visible only in unscoped storage", + ) + ) + + config = scoped_storage.to_bootstrap_config() + + with patch("redis.Redis.from_url", return_value=mock_redis): + bundle = await bootstrap_namespaces(config) + + assert bundle.artifacts.load("scoped.json") == {"scope": "client_a"} + with pytest.raises(ArtifactNotFoundError): + bundle.artifacts.load("legacy.json") + + assert bundle.workflows.library.get("scoped_workflow") is not None + assert bundle.workflows.library.get("legacy_workflow") is None + + @pytest.mark.asyncio + async def test_config_roundtrip_keeps_redis_deps_unscoped( + self, mock_redis: MockRedisClient + ) -> None: + """Scoped Redis storage bootstrap keeps deps under the root Redis prefix.""" + from py_code_mode.bootstrap import bootstrap_namespaces + from py_code_mode.storage import RedisStorage + + scoped_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + + with patch("redis.Redis.from_url", return_value=mock_redis): + bundle = await bootstrap_namespaces(scoped_storage.to_bootstrap_config()) + + bundle.deps._store.add("requests>=2.0") + + assert "requests>=2.0" in mock_redis.smembers("app:deps") + assert mock_redis.smembers("app:ws:client_a:deps") == set() + # ============================================================================= # Lazy Connection Tests diff --git a/tests/test_workspace_scoped_storage.py b/tests/test_workspace_scoped_storage.py new file mode 100644 index 0000000..5edc731 --- /dev/null +++ b/tests/test_workspace_scoped_storage.py @@ -0,0 +1,530 @@ +"""Tests for workspace-scoped storage behavior.""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +from py_code_mode import Session +from py_code_mode.errors import ArtifactNotFoundError +from py_code_mode.storage import FileStorage, RedisStorage + +if TYPE_CHECKING: + from tests.conftest import MockRedisClient + + +SHARED_WORKFLOW_SOURCE = """async def run() -> str: + return "shared workflow" +""" + +SEARCHABLE_WORKFLOW_SOURCE = """async def run() -> str: + return "searchable workflow" +""" + + +class TestWorkspaceScopedFileStorageArtifacts: + """Session-facing artifact behavior with workspace-scoped FileStorage.""" + + @pytest.mark.asyncio + async def test_artifacts_are_isolated_between_workspaces(self, tmp_path: Path) -> None: + workspace_a = FileStorage(tmp_path, workspace_id="client_a") + workspace_b = FileStorage(tmp_path, workspace_id="client_b") + + async with Session(storage=workspace_a) as session_a: + await session_a.save_artifact( + name="campaign.json", + data={"workspace": "client_a"}, + description="Artifact scoped to client_a", + ) + + async with Session(storage=workspace_b) as session_b: + assert await session_b.list_artifacts() == [] + with pytest.raises(ArtifactNotFoundError): + await session_b.load_artifact("campaign.json") + + @pytest.mark.asyncio + async def test_artifacts_are_shared_by_separately_initialized_sessions_in_same_workspace( + self, tmp_path: Path + ) -> None: + writer_storage = FileStorage(tmp_path, workspace_id="shared_client") + reader_storage = FileStorage(tmp_path, workspace_id="shared_client") + + async with Session(storage=writer_storage) as writer: + await writer.save_artifact( + name="campaign.json", + data={"shared": True}, + description="Shared artifact", + ) + + async with Session(storage=reader_storage) as reader: + assert await reader.load_artifact("campaign.json") == {"shared": True} + artifacts = await reader.list_artifacts() + assert [artifact["name"] for artifact in artifacts] == ["campaign.json"] + + +class TestWorkspaceScopedFileStorageWorkflows: + """Session-facing workflow behavior with workspace-scoped FileStorage.""" + + @pytest.mark.asyncio + async def test_workflows_are_isolated_between_workspaces(self, tmp_path: Path) -> None: + workspace_a = FileStorage(tmp_path, workspace_id="client_a") + workspace_b = FileStorage(tmp_path, workspace_id="client_b") + + async with Session(storage=workspace_a) as session_a: + await session_a.add_workflow( + name="shared_campaign", + source=SHARED_WORKFLOW_SOURCE, + description="Workflow scoped to client_a", + ) + + async with Session(storage=workspace_b) as session_b: + assert await session_b.get_workflow("shared_campaign") is None + assert await session_b.list_workflows() == [] + + @pytest.mark.asyncio + async def test_workflows_are_shared_by_separately_initialized_sessions_in_same_workspace( + self, tmp_path: Path + ) -> None: + writer_storage = FileStorage(tmp_path, workspace_id="shared_client") + reader_storage = FileStorage(tmp_path, workspace_id="shared_client") + + async with Session(storage=writer_storage) as writer: + await writer.add_workflow( + name="shared_campaign", + source=SHARED_WORKFLOW_SOURCE, + description="Workflow visible within one workspace", + ) + + async with Session(storage=reader_storage) as reader: + workflow = await reader.get_workflow("shared_campaign") + assert workflow is not None + assert workflow["name"] == "shared_campaign" + + result = await reader.run("workflows.shared_campaign()") + assert result.is_ok + assert result.value == "shared workflow" + + @pytest.mark.asyncio + async def test_workflow_search_is_isolated_between_workspaces(self, tmp_path: Path) -> None: + workspace_a = FileStorage(tmp_path, workspace_id="client_a") + workspace_b = FileStorage(tmp_path, workspace_id="client_b") + + async with Session(storage=workspace_a) as session_a: + await session_a.add_workflow( + name="campaign_search", + source=SEARCHABLE_WORKFLOW_SOURCE, + description="Analyze campaign metrics and summarize ad performance", + ) + results = await session_a.search_workflows("campaign metrics ad performance") + assert [workflow["name"] for workflow in results] == ["campaign_search"] + + async with Session(storage=workspace_b) as session_b: + results = await session_b.search_workflows("campaign metrics ad performance") + assert results == [] + + +class TestWorkspaceScopedRedisStorageArtifacts: + """Session-facing artifact behavior with workspace-scoped RedisStorage.""" + + @pytest.mark.asyncio + async def test_artifacts_are_isolated_between_workspaces( + self, mock_redis: MockRedisClient + ) -> None: + workspace_a = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + workspace_b = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_b") + + async with Session(storage=workspace_a) as session_a: + await session_a.save_artifact( + name="campaign.json", + data={"workspace": "client_a"}, + description="Artifact scoped to client_a", + ) + + async with Session(storage=workspace_b) as session_b: + assert await session_b.list_artifacts() == [] + with pytest.raises(ArtifactNotFoundError): + await session_b.load_artifact("campaign.json") + + @pytest.mark.asyncio + async def test_artifacts_are_shared_by_separately_initialized_sessions_in_same_workspace( + self, mock_redis: MockRedisClient + ) -> None: + writer_storage = RedisStorage( + redis=mock_redis, + prefix="app", + workspace_id="shared_client", + ) + reader_storage = RedisStorage( + redis=mock_redis, + prefix="app", + workspace_id="shared_client", + ) + + async with Session(storage=writer_storage) as writer: + await writer.save_artifact( + name="campaign.json", + data={"shared": True}, + description="Shared artifact", + ) + + async with Session(storage=reader_storage) as reader: + assert await reader.load_artifact("campaign.json") == {"shared": True} + artifacts = await reader.list_artifacts() + assert [artifact["name"] for artifact in artifacts] == ["campaign.json"] + + +class TestWorkspaceScopedRedisStorageWorkflows: + """Session-facing workflow behavior with workspace-scoped RedisStorage.""" + + @pytest.mark.asyncio + async def test_workflows_are_isolated_between_workspaces( + self, mock_redis: MockRedisClient + ) -> None: + workspace_a = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + workspace_b = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_b") + + async with Session(storage=workspace_a) as session_a: + await session_a.add_workflow( + name="shared_campaign", + source=SHARED_WORKFLOW_SOURCE, + description="Workflow scoped to client_a", + ) + + async with Session(storage=workspace_b) as session_b: + assert await session_b.get_workflow("shared_campaign") is None + assert await session_b.list_workflows() == [] + + @pytest.mark.asyncio + async def test_workflows_are_shared_by_separately_initialized_sessions_in_same_workspace( + self, mock_redis: MockRedisClient + ) -> None: + writer_storage = RedisStorage( + redis=mock_redis, + prefix="app", + workspace_id="shared_client", + ) + reader_storage = RedisStorage( + redis=mock_redis, + prefix="app", + workspace_id="shared_client", + ) + + async with Session(storage=writer_storage) as writer: + await writer.add_workflow( + name="shared_campaign", + source=SHARED_WORKFLOW_SOURCE, + description="Workflow visible within one workspace", + ) + + async with Session(storage=reader_storage) as reader: + workflow = await reader.get_workflow("shared_campaign") + assert workflow is not None + assert workflow["name"] == "shared_campaign" + + result = await reader.run("workflows.shared_campaign()") + assert result.is_ok + assert result.value == "shared workflow" + + @pytest.mark.asyncio + async def test_workflow_search_is_isolated_between_workspaces( + self, mock_redis: MockRedisClient + ) -> None: + workspace_a = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + workspace_b = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_b") + + async with Session(storage=workspace_a) as session_a: + await session_a.add_workflow( + name="campaign_search", + source=SEARCHABLE_WORKFLOW_SOURCE, + description="Analyze campaign metrics and summarize ad performance", + ) + results = await session_a.search_workflows("campaign metrics ad performance") + assert [workflow["name"] for workflow in results] == ["campaign_search"] + + async with Session(storage=workspace_b) as session_b: + results = await session_b.search_workflows("campaign metrics ad performance") + assert results == [] + + +class TestWorkspaceScopedStorageDefaults: + """Expected behavior when workspace_id is omitted.""" + + @pytest.mark.asyncio + async def test_omitting_workspace_id_preserves_current_unscoped_session_behavior( + self, tmp_path: Path + ) -> None: + writer_storage = FileStorage(tmp_path) + reader_storage = FileStorage(tmp_path, workspace_id=None) + + async with Session(storage=writer_storage) as writer: + await writer.save_artifact( + name="legacy.json", + data={"mode": "legacy"}, + description="Legacy unscoped artifact", + ) + + async with Session(storage=reader_storage) as reader: + assert await reader.load_artifact("legacy.json") == {"mode": "legacy"} + + @pytest.mark.asyncio + async def test_omitting_workspace_id_preserves_current_unscoped_redis_behavior( + self, mock_redis: MockRedisClient + ) -> None: + writer_storage = RedisStorage(redis=mock_redis, prefix="app") + reader_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id=None) + + async with Session(storage=writer_storage) as writer: + await writer.save_artifact( + name="legacy.json", + data={"mode": "legacy"}, + description="Legacy unscoped artifact", + ) + + async with Session(storage=reader_storage) as reader: + assert await reader.load_artifact("legacy.json") == {"mode": "legacy"} + + @pytest.mark.asyncio + async def test_omitting_workspace_id_preserves_current_unscoped_workflow_behavior( + self, tmp_path: Path + ) -> None: + writer_storage = FileStorage(tmp_path) + reader_storage = FileStorage(tmp_path, workspace_id=None) + + async with Session(storage=writer_storage) as writer: + await writer.add_workflow( + name="legacy_workflow", + source=SHARED_WORKFLOW_SOURCE, + description="Legacy unscoped workflow", + ) + + async with Session(storage=reader_storage) as reader: + workflow = await reader.get_workflow("legacy_workflow") + assert workflow is not None + result = await reader.run("workflows.legacy_workflow()") + assert result.is_ok + assert result.value == "shared workflow" + + @pytest.mark.asyncio + async def test_omitting_workspace_id_preserves_current_unscoped_redis_workflow_behavior( + self, mock_redis: MockRedisClient + ) -> None: + writer_storage = RedisStorage(redis=mock_redis, prefix="app") + reader_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id=None) + + async with Session(storage=writer_storage) as writer: + await writer.add_workflow( + name="legacy_workflow", + source=SHARED_WORKFLOW_SOURCE, + description="Legacy unscoped workflow", + ) + + async with Session(storage=reader_storage) as reader: + workflow = await reader.get_workflow("legacy_workflow") + assert workflow is not None + result = await reader.run("workflows.legacy_workflow()") + assert result.is_ok + assert result.value == "shared workflow" + + @pytest.mark.asyncio + async def test_scoped_file_storage_does_not_see_unscoped_artifacts( + self, tmp_path: Path + ) -> None: + legacy_storage = FileStorage(tmp_path) + scoped_storage = FileStorage(tmp_path, workspace_id="client_a") + + async with Session(storage=legacy_storage) as legacy: + await legacy.save_artifact( + name="legacy.json", + data={"mode": "legacy"}, + description="Legacy artifact", + ) + + async with Session(storage=scoped_storage) as scoped: + assert await scoped.list_artifacts() == [] + with pytest.raises(ArtifactNotFoundError): + await scoped.load_artifact("legacy.json") + + @pytest.mark.asyncio + async def test_unscoped_file_storage_does_not_see_scoped_artifacts( + self, tmp_path: Path + ) -> None: + legacy_storage = FileStorage(tmp_path) + scoped_storage = FileStorage(tmp_path, workspace_id="client_a") + + async with Session(storage=scoped_storage) as scoped: + await scoped.save_artifact( + name="workspace.json", + data={"mode": "scoped"}, + description="Scoped artifact", + ) + + async with Session(storage=legacy_storage) as legacy: + assert await legacy.list_artifacts() == [] + with pytest.raises(ArtifactNotFoundError): + await legacy.load_artifact("workspace.json") + + @pytest.mark.asyncio + async def test_scoped_file_storage_does_not_see_unscoped_workflows( + self, tmp_path: Path + ) -> None: + legacy_storage = FileStorage(tmp_path) + scoped_storage = FileStorage(tmp_path, workspace_id="client_a") + + async with Session(storage=legacy_storage) as legacy: + await legacy.add_workflow( + name="legacy_workflow", + source=SHARED_WORKFLOW_SOURCE, + description="Legacy unscoped workflow", + ) + + async with Session(storage=scoped_storage) as scoped: + assert await scoped.get_workflow("legacy_workflow") is None + assert await scoped.search_workflows("legacy unscoped workflow") == [] + + @pytest.mark.asyncio + async def test_scoped_redis_storage_does_not_see_unscoped_artifacts( + self, mock_redis: MockRedisClient + ) -> None: + legacy_storage = RedisStorage(redis=mock_redis, prefix="app") + scoped_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + + async with Session(storage=legacy_storage) as legacy: + await legacy.save_artifact( + name="legacy.json", + data={"mode": "legacy"}, + description="Legacy artifact", + ) + + async with Session(storage=scoped_storage) as scoped: + assert await scoped.list_artifacts() == [] + with pytest.raises(ArtifactNotFoundError): + await scoped.load_artifact("legacy.json") + + @pytest.mark.asyncio + async def test_unscoped_redis_storage_does_not_see_scoped_artifacts( + self, mock_redis: MockRedisClient + ) -> None: + legacy_storage = RedisStorage(redis=mock_redis, prefix="app") + scoped_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + + async with Session(storage=scoped_storage) as scoped: + await scoped.save_artifact( + name="workspace.json", + data={"mode": "scoped"}, + description="Scoped artifact", + ) + + async with Session(storage=legacy_storage) as legacy: + assert await legacy.list_artifacts() == [] + with pytest.raises(ArtifactNotFoundError): + await legacy.load_artifact("workspace.json") + + @pytest.mark.asyncio + async def test_scoped_redis_storage_does_not_see_unscoped_workflows( + self, mock_redis: MockRedisClient + ) -> None: + legacy_storage = RedisStorage(redis=mock_redis, prefix="app") + scoped_storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + + async with Session(storage=legacy_storage) as legacy: + await legacy.add_workflow( + name="legacy_workflow", + source=SHARED_WORKFLOW_SOURCE, + description="Legacy unscoped workflow", + ) + + async with Session(storage=scoped_storage) as scoped: + assert await scoped.get_workflow("legacy_workflow") is None + assert await scoped.search_workflows("legacy unscoped workflow") == [] + + def test_file_storage_without_workspace_id_uses_legacy_layout(self, tmp_path: Path) -> None: + storage = FileStorage(tmp_path, workspace_id=None) + + access = storage.get_serializable_access() + + assert access.workflows_path == tmp_path / "workflows" + assert access.artifacts_path == tmp_path / "artifacts" + if access.vectors_path is not None: + assert access.vectors_path == tmp_path / "vectors" + + def test_file_storage_workspace_id_scopes_paths(self, tmp_path: Path) -> None: + storage = FileStorage(tmp_path, workspace_id="client_a") + + access = storage.get_serializable_access() + + assert access.workflows_path == tmp_path / "workspaces" / "client_a" / "workflows" + assert access.artifacts_path == tmp_path / "workspaces" / "client_a" / "artifacts" + if access.vectors_path is not None: + assert access.vectors_path == tmp_path / "workspaces" / "client_a" / "vectors" + + def test_redis_storage_workspace_id_scopes_prefixes(self, mock_redis: MockRedisClient) -> None: + storage = RedisStorage( + redis=mock_redis, + prefix="app", + workspace_id="client_a", + ) + + access = storage.get_serializable_access() + + assert access.workflows_prefix == "app:ws:client_a:workflows" + assert access.artifacts_prefix == "app:ws:client_a:artifacts" + if access.vectors_prefix is not None: + assert access.vectors_prefix == "app:ws:client_a:vectors" + + def test_redis_storage_without_workspace_id_preserves_current_prefixes( + self, mock_redis: MockRedisClient + ) -> None: + storage = RedisStorage( + redis=mock_redis, + prefix="app", + workspace_id=None, + ) + + access = storage.get_serializable_access() + + assert access.workflows_prefix == "app:workflows" + assert access.artifacts_prefix == "app:artifacts" + if access.vectors_prefix is not None: + assert access.vectors_prefix == "app:vectors" + + +class TestWorkspaceScopedBootstrapConfig: + """Bootstrap config preserves workspace scope without re-scoping deps roots.""" + + def test_file_storage_workspace_id_is_serialized_explicitly(self, tmp_path: Path) -> None: + storage = FileStorage(tmp_path, workspace_id="client_a") + + config = storage.to_bootstrap_config() + + assert config["base_path"] == str(tmp_path) + assert config["workspace_id"] == "client_a" + + def test_redis_storage_workspace_id_is_serialized_explicitly( + self, mock_redis: MockRedisClient + ) -> None: + storage = RedisStorage(redis=mock_redis, prefix="app", workspace_id="client_a") + + config = storage.to_bootstrap_config() + + assert config["prefix"] == "app" + assert config["workspace_id"] == "client_a" + + +class TestWorkspaceIdValidation: + """Validation behavior for workspace identifiers.""" + + @pytest.mark.parametrize("workspace_id", ["", ".", "..", "../escape", "bad/name", r"bad\\name"]) + def test_file_storage_rejects_invalid_workspace_ids( + self, tmp_path: Path, workspace_id: str + ) -> None: + with pytest.raises(ValueError): + FileStorage(tmp_path, workspace_id=workspace_id) + + @pytest.mark.parametrize("workspace_id", ["", ".", "..", "../escape", "bad/name", "bad:name"]) + def test_redis_storage_rejects_invalid_workspace_ids( + self, mock_redis: MockRedisClient, workspace_id: str + ) -> None: + with pytest.raises(ValueError): + RedisStorage(redis=mock_redis, prefix="app", workspace_id=workspace_id) From c54db6131d6f728b6d4b342dafe0f728e830a56f Mon Sep 17 00:00:00 2001 From: actae0n <19864268+xpcmdshell@users.noreply.github.com> Date: Mon, 6 Apr 2026 02:40:00 -0700 Subject: [PATCH 2/2] Fix Redis deps fallback in container server Use the unscoped tools prefix when REDIS_DEPS_PREFIX is omitted so deps remain rooted at the base Redis namespace instead of a workspace-scoped workflow prefix. Add coverage for fallback behavior and explicit REDIS_DEPS_PREFIX precedence. --- .../execution/container/server.py | 5 +- tests/container/test_server.py | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/src/py_code_mode/execution/container/server.py b/src/py_code_mode/execution/container/server.py index 435b122..f33a0ff 100644 --- a/src/py_code_mode/execution/container/server.py +++ b/src/py_code_mode/execution/container/server.py @@ -408,9 +408,8 @@ async def initialize_server(config: SessionConfig) -> None: artifact_store = RedisArtifactStore(r, prefix=artifacts_prefix) # Deps from Redis - # Derive deps prefix from tools prefix namespace (e.g., "myapp:tools" -> "myapp:deps") - # If tools_prefix has no namespace separator, uses tools_prefix directly as base - deps_prefix = os.environ.get("REDIS_DEPS_PREFIX", f"{tools_prefix.rsplit(':', 1)[0]}:deps") + # Deps use the root Redis prefix. RedisDepsStore appends ":deps" internally. + deps_prefix = os.environ.get("REDIS_DEPS_PREFIX", tools_prefix.rsplit(":", 1)[0]) deps_store = RedisDepsStore(r, prefix=deps_prefix) deps_installer = PackageInstaller() logger.info(" Deps in Redis (%s): initialized", deps_prefix) diff --git a/tests/container/test_server.py b/tests/container/test_server.py index 1c7c929..0dc08d2 100644 --- a/tests/container/test_server.py +++ b/tests/container/test_server.py @@ -1,5 +1,8 @@ """Tests for session server.""" +import asyncio +from unittest.mock import MagicMock + import pytest from py_code_mode.execution.container.config import SessionConfig @@ -252,3 +255,83 @@ def test_session_creates_artifact_store(self, config_with_artifacts) -> None: # Session has a file artifact store pointing to artifacts_path assert isinstance(session.artifact_store, FileArtifactStore) assert "artifacts" in str(session.artifact_store._path) + + +class TestRedisDepsFallback: + """Tests for Redis deps initialization in the container server.""" + + def test_redis_deps_fallback_stays_unscoped_when_workflows_are_scoped( + self, monkeypatch, mock_redis + ) -> None: + """Deps fallback should use the root prefix, not the workspace-scoped workflows prefix.""" + from py_code_mode.execution.container import server as server_module + from py_code_mode.tools import ToolRegistry + + async def fake_registry_from_redis(_tool_store): + return ToolRegistry() + + monkeypatch.setenv("REDIS_URL", "redis://localhost:6379/0") + monkeypatch.setenv("REDIS_TOOLS_PREFIX", "app:tools") + monkeypatch.setenv("REDIS_WORKFLOWS_PREFIX", "app:ws:client_a:workflows") + monkeypatch.setenv("REDIS_ARTIFACTS_PREFIX", "app:ws:client_a:artifacts") + monkeypatch.delenv("REDIS_DEPS_PREFIX", raising=False) + + config = SessionConfig(auth_disabled=True) + + monkeypatch.setattr("redis.from_url", lambda _url: mock_redis) + monkeypatch.setattr( + "py_code_mode.storage.registry_from_redis", + fake_registry_from_redis, + ) + monkeypatch.setattr( + server_module, + "create_workflow_library", + lambda *, store: MagicMock( + refresh=lambda: None, list=lambda: [], search=lambda *_a, **_k: [] + ), + ) + + asyncio.run(server_module.initialize_server(config)) + + assert server_module._state.deps_store is not None + server_module._state.deps_store.add("requests") + + assert "requests" in mock_redis.smembers("app:deps") + assert mock_redis.smembers("app:ws:client_a:deps") == set() + + def test_explicit_redis_deps_prefix_takes_precedence(self, monkeypatch, mock_redis) -> None: + """Explicit REDIS_DEPS_PREFIX should override any fallback derivation.""" + from py_code_mode.execution.container import server as server_module + from py_code_mode.tools import ToolRegistry + + async def fake_registry_from_redis(_tool_store): + return ToolRegistry() + + monkeypatch.setenv("REDIS_URL", "redis://localhost:6379/0") + monkeypatch.setenv("REDIS_TOOLS_PREFIX", "app:tools") + monkeypatch.setenv("REDIS_WORKFLOWS_PREFIX", "app:ws:client_a:workflows") + monkeypatch.setenv("REDIS_ARTIFACTS_PREFIX", "app:ws:client_a:artifacts") + monkeypatch.setenv("REDIS_DEPS_PREFIX", "custom-root") + + config = SessionConfig(auth_disabled=True) + + monkeypatch.setattr("redis.from_url", lambda _url: mock_redis) + monkeypatch.setattr( + "py_code_mode.storage.registry_from_redis", + fake_registry_from_redis, + ) + monkeypatch.setattr( + server_module, + "create_workflow_library", + lambda *, store: MagicMock( + refresh=lambda: None, list=lambda: [], search=lambda *_a, **_k: [] + ), + ) + + asyncio.run(server_module.initialize_server(config)) + + assert server_module._state.deps_store is not None + server_module._state.deps_store.add("requests") + + assert "requests" in mock_redis.smembers("custom-root:deps") + assert mock_redis.smembers("app:deps") == set()