Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/py_code_mode/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ async def bootstrap_namespaces(config: dict[str, Any]) -> NamespaceBundle:

Args:
config: Dict with "type" key ("file" or "redis") and type-specific fields.
- For "file": {"type": "file", "base_path": str, "tools_path": str|None}
- For "file": {"type": "file", "base_path": str, "workspace_id": str|None,
"tools_path": str|None}
- For "redis": {"type": "redis", "url": str, "prefix": str,
"workspace_id": str|None,
"tools_path": str|None}
- tools_path is optional; if provided, tools load from that directory

Expand Down Expand Up @@ -128,7 +130,8 @@ async def _bootstrap_file_storage(config: dict[str, Any]) -> NamespaceBundle:
from py_code_mode.storage import FileStorage

base_path = Path(config["base_path"])
storage = FileStorage(base_path)
workspace_id = config.get("workspace_id")
storage = FileStorage(base_path, workspace_id=workspace_id)

tools_ns = await _load_tools_namespace(config.get("tools_path"))
artifact_store = storage.get_artifact_store()
Expand Down Expand Up @@ -159,15 +162,16 @@ async def _bootstrap_redis_storage(config: dict[str, Any]) -> NamespaceBundle:

url = config["url"]
prefix = config["prefix"]
workspace_id = config.get("workspace_id")

# Connect to Redis
storage = RedisStorage(url=url, prefix=prefix)
storage = RedisStorage(url=url, prefix=prefix, workspace_id=workspace_id)

tools_ns = await _load_tools_namespace(config.get("tools_path"))
artifact_store = storage.get_artifact_store()

# Create deps namespace
deps_store = RedisDepsStore(storage.client, prefix=f"{prefix}:deps")
deps_store = RedisDepsStore(storage.client, prefix=prefix)
installer = PackageInstaller()
deps_ns = DepsNamespace(deps_store, installer)

Expand Down
7 changes: 4 additions & 3 deletions src/py_code_mode/execution/container/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,9 @@ async def start(
workflows_path = access.workflows_path
artifacts_path = access.artifacts_path
deps_path = None
if artifacts_path is not None:
deps_path = artifacts_path.parent / "deps"
deps_root = access.root_path or artifacts_path.parent
if deps_root is not None:
deps_path = deps_root / "deps"
# Create directories on host before mounting
# Workflows need to exist for volume mount
if workflows_path:
Expand All @@ -425,7 +426,7 @@ async def start(
tools_prefix = None # Tools owned by executor
workflows_prefix = access.workflows_prefix
artifacts_prefix = access.artifacts_prefix
deps_prefix = None # Deps owned by executor
deps_prefix = access.root_prefix or access.workflows_prefix.rsplit(":", 1)[0]
else:
raise TypeError(
f"Unexpected storage access type: {type(access).__name__}. "
Expand Down
5 changes: 2 additions & 3 deletions src/py_code_mode/execution/container/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,8 @@ async def initialize_server(config: SessionConfig) -> None:
artifact_store = RedisArtifactStore(r, prefix=artifacts_prefix)

# Deps from Redis
# Derive deps prefix from tools prefix namespace (e.g., "myapp:tools" -> "myapp:deps")
# If tools_prefix has no namespace separator, uses tools_prefix directly as base
deps_prefix = os.environ.get("REDIS_DEPS_PREFIX", f"{tools_prefix.rsplit(':', 1)[0]}:deps")
# Deps use the root Redis prefix. RedisDepsStore appends ":deps" internally.
deps_prefix = os.environ.get("REDIS_DEPS_PREFIX", tools_prefix.rsplit(":", 1)[0])
deps_store = RedisDepsStore(r, prefix=deps_prefix)
deps_installer = PackageInstaller()
logger.info(" Deps in Redis (%s): initialized", deps_prefix)
Expand Down
2 changes: 2 additions & 0 deletions src/py_code_mode/execution/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class FileStorageAccess:
workflows_path: Path | None
artifacts_path: Path
vectors_path: Path | None = None
root_path: Path | None = None


@dataclass(frozen=True)
Expand All @@ -48,6 +49,7 @@ class RedisStorageAccess:
workflows_prefix: str
artifacts_prefix: str
vectors_prefix: str | None = None
root_prefix: str | None = None


StorageAccess = FileStorageAccess | RedisStorageAccess
Expand Down
7 changes: 4 additions & 3 deletions src/py_code_mode/execution/subprocess/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ def _build_file_storage_setup_code(
repr(str(storage_access.workflows_path)) if storage_access.workflows_path else "None"
)
artifacts_path_str = repr(str(storage_access.artifacts_path))
# Base path is parent of artifacts for deps store
base_path_str = repr(str(storage_access.artifacts_path.parent))
base_path = storage_access.root_path or storage_access.artifacts_path.parent
base_path_str = repr(str(base_path))
allow_deps_str = "True" if allow_runtime_deps else "False"
vectors_path_str = (
repr(str(storage_access.vectors_path)) if storage_access.vectors_path else "None"
Expand Down Expand Up @@ -416,7 +416,8 @@ def _build_redis_storage_setup_code(
vectors_prefix_str = (
repr(storage_access.vectors_prefix) if storage_access.vectors_prefix else "None"
)
deps_prefix_str = repr(f"{storage_access.workflows_prefix.rsplit(':', 1)[0]}:deps")
deps_prefix = storage_access.root_prefix or storage_access.workflows_prefix.rsplit(":", 1)[0]
deps_prefix_str = repr(deps_prefix)
allow_deps_str = "True" if allow_runtime_deps else "False"

return f'''# Auto-generated namespace setup for SubprocessExecutor (Redis)
Expand Down
79 changes: 64 additions & 15 deletions src/py_code_mode/storage/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import logging
import re
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar, Protocol, runtime_checkable
from urllib.parse import quote
Expand Down Expand Up @@ -45,10 +46,22 @@

logger = logging.getLogger(__name__)

_WORKSPACE_ID_PATTERN = re.compile(r"^[A-Za-z0-9_-]{1,128}$")

if TYPE_CHECKING:
from redis import Redis


def _validate_workspace_id(workspace_id: str) -> str:
"""Validate a workspace identifier used in paths and Redis prefixes."""
if not _WORKSPACE_ID_PATTERN.fullmatch(workspace_id):
raise ValueError(
"workspace_id must be 1-128 characters using only ASCII letters, digits, "
"underscores, or hyphens"
)
return workspace_id


@runtime_checkable
class StorageBackend(Protocol):
"""Protocol for unified storage backend.
Expand Down Expand Up @@ -88,14 +101,18 @@ class FileStorage:

_UNINITIALIZED: ClassVar[object] = object()

def __init__(self, base_path: Path | str) -> None:
def __init__(self, base_path: Path | str, workspace_id: str | None = None) -> None:
"""Initialize file storage.

Args:
base_path: Base directory for storage. Will create workflows/, artifacts/ subdirs.
workspace_id: Optional workspace scope for workflows, artifacts, and vectors.
"""
self._base_path = Path(base_path) if isinstance(base_path, str) else base_path
self._base_path.mkdir(parents=True, exist_ok=True)
self._workspace_id = (
_validate_workspace_id(workspace_id) if workspace_id is not None else None
)

# Lazy-initialized stores (workflows and artifacts only)
self._workflow_library: WorkflowLibrary | None = None
Expand All @@ -107,21 +124,32 @@ def root(self) -> Path:
"""Get the root storage path."""
return self._base_path

@property
def workspace_id(self) -> str | None:
"""Get the configured workspace scope."""
return self._workspace_id

def _get_storage_root(self) -> Path:
"""Get the scoped root for workflows, artifacts, and vectors."""
if self._workspace_id is None:
return self._base_path
return self._base_path / "workspaces" / self._workspace_id

def _get_workflows_path(self) -> Path:
"""Get the workflows directory path."""
workflows_path = self._base_path / "workflows"
workflows_path = self._get_storage_root() / "workflows"
workflows_path.mkdir(parents=True, exist_ok=True)
return workflows_path

def _get_artifacts_path(self) -> Path:
"""Get the artifacts directory path."""
artifacts_path = self._base_path / "artifacts"
artifacts_path = self._get_storage_root() / "artifacts"
artifacts_path.mkdir(parents=True, exist_ok=True)
return artifacts_path

def _get_vectors_path(self) -> Path:
"""Get the vectors directory path."""
vectors_path = self._base_path / "vectors"
vectors_path = self._get_storage_root() / "vectors"
vectors_path.mkdir(parents=True, exist_ok=True)
return vectors_path

Expand Down Expand Up @@ -153,13 +181,14 @@ def get_vector_store(self) -> VectorStore | None:

def get_serializable_access(self) -> FileStorageAccess:
"""Return FileStorageAccess for cross-process communication."""
base_path = self._base_path
vectors_path = base_path / "vectors"
storage_root = self._get_storage_root()
vectors_path = storage_root / "vectors"

return FileStorageAccess(
workflows_path=base_path / "workflows",
artifacts_path=base_path / "artifacts",
workflows_path=storage_root / "workflows",
artifacts_path=storage_root / "artifacts",
vectors_path=vectors_path if vectors_path.exists() else None,
root_path=self._base_path,
)

def get_workflow_library(self) -> WorkflowLibrary:
Expand Down Expand Up @@ -206,10 +235,13 @@ def to_bootstrap_config(self) -> dict[str, str]:
This config can be passed to bootstrap_namespaces() to reconstruct
the storage in a subprocess.
"""
return {
config = {
"type": "file",
"base_path": str(self._base_path),
}
if self._workspace_id is not None:
config["workspace_id"] = self._workspace_id
return config


class RedisStorage:
Expand All @@ -225,6 +257,7 @@ def __init__(
url: str | None = None,
redis: Redis | None = None,
prefix: str = "py_code_mode",
workspace_id: str | None = None,
) -> None:
"""Initialize Redis storage.

Expand All @@ -234,6 +267,7 @@ def __init__(
redis: Redis client instance. Use for advanced configurations
(custom connection pools, etc.). Mutually exclusive with url.
prefix: Key prefix for all storage. Default: "py_code_mode"
workspace_id: Optional workspace scope for workflows, artifacts, and vectors.

Raises:
ValueError: If neither url nor redis is provided, or if both are.
Expand All @@ -260,6 +294,12 @@ def __init__(
raise ValueError("Redis client is required")

self._prefix = prefix
self._workspace_id = (
_validate_workspace_id(workspace_id) if workspace_id is not None else None
)
self._storage_prefix = (
f"{prefix}:ws:{self._workspace_id}" if self._workspace_id is not None else prefix
)

# Lazy-initialized stores (workflows and artifacts only)
self._workflow_library: WorkflowLibrary | None = None
Expand All @@ -271,6 +311,11 @@ def prefix(self) -> str:
"""Get the configured prefix."""
return self._prefix

@property
def workspace_id(self) -> str | None:
"""Get the configured workspace scope."""
return self._workspace_id

@property
def client(self) -> Redis:
"""Get the Redis client."""
Expand Down Expand Up @@ -322,7 +367,7 @@ def get_vector_store(self) -> VectorStore | None:
self._vector_store = RedisVectorStore(
redis=self._redis,
embedder=embedder,
prefix=f"{self._prefix}:vectors",
prefix=f"{self._storage_prefix}:vectors",
)
except ImportError:
self._vector_store = None
Expand All @@ -343,7 +388,7 @@ def get_serializable_access(self) -> RedisStorageAccess:
else:
redis_url = self._reconstruct_redis_url()

prefix = self._prefix
prefix = self._storage_prefix
# vectors_prefix is set when RedisVectorStore dependencies are available
# (redis-py with RediSearch). We check module availability, not actual
# vector store creation, to avoid side effects during serialization.
Expand All @@ -355,12 +400,13 @@ def get_serializable_access(self) -> RedisStorageAccess:
workflows_prefix=f"{prefix}:workflows",
artifacts_prefix=f"{prefix}:artifacts",
vectors_prefix=vectors_prefix,
root_prefix=self._prefix,
)

def get_workflow_library(self) -> WorkflowLibrary:
"""Return WorkflowLibrary for in-process execution."""
if self._workflow_library is None:
raw_store = RedisWorkflowStore(self._redis, prefix=f"{self._prefix}:workflows")
raw_store = RedisWorkflowStore(self._redis, prefix=f"{self._storage_prefix}:workflows")
vector_store = self.get_vector_store()
try:
self._workflow_library = create_workflow_library(
Expand All @@ -385,13 +431,13 @@ def get_artifact_store(self) -> ArtifactStoreProtocol:
"""Return artifact store for in-process execution."""
if self._artifact_store is None:
self._artifact_store = RedisArtifactStore(
self._redis, prefix=f"{self._prefix}:artifacts"
self._redis, prefix=f"{self._storage_prefix}:artifacts"
)
return self._artifact_store

def get_workflow_store(self) -> WorkflowStore:
"""Return the underlying WorkflowStore for direct access."""
return RedisWorkflowStore(self._redis, prefix=f"{self._prefix}:workflows")
return RedisWorkflowStore(self._redis, prefix=f"{self._storage_prefix}:workflows")

def to_bootstrap_config(self) -> dict[str, str]:
"""Serialize storage configuration for subprocess bootstrap.
Expand All @@ -401,8 +447,11 @@ def to_bootstrap_config(self) -> dict[str, str]:
This config can be passed to bootstrap_namespaces() to reconstruct
the storage in a subprocess.
"""
return {
config = {
"type": "redis",
"url": self._reconstruct_redis_url(),
"prefix": self._prefix,
}
if self._workspace_id is not None:
config["workspace_id"] = self._workspace_id
return config
Loading