-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbootstrap.py
More file actions
178 lines (132 loc) · 6.13 KB
/
bootstrap.py
File metadata and controls
178 lines (132 loc) · 6.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Bootstrap module for reconstructing storage and namespaces in subprocesses.
This module provides the core primitives for SubprocessExecutor to reconstruct
storage backends and their associated namespaces without knowing about specific
storage implementations. The pattern is:
1. Storage classes implement `to_bootstrap_config()` returning a JSON-serializable dict
2. This dict is passed to the subprocess along with executor config (tools_path, etc.)
3. `bootstrap_namespaces(config)` reconstructs the storage and creates namespaces
This keeps the executor decoupled from storage implementations.
Architecture note: Tools are owned by executors (via config.tools_path), not storage.
The bootstrap config can optionally include tools_path for the subprocess to load tools.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from py_code_mode.artifacts import ArtifactStoreProtocol
from py_code_mode.deps import DepsNamespace
from py_code_mode.execution.in_process.workflows_namespace import WorkflowsNamespace
from py_code_mode.tools import ToolsNamespace
@dataclass
class NamespaceBundle:
"""Container for reconstructed namespaces.
Provides the four namespaces needed for code execution:
- tools: ToolsNamespace for tool access
- workflows: WorkflowsNamespace for workflow access
- artifacts: ArtifactStoreProtocol for artifact storage
- deps: DepsNamespace for dependency management
"""
tools: ToolsNamespace
workflows: WorkflowsNamespace
artifacts: ArtifactStoreProtocol
deps: DepsNamespace
async def bootstrap_namespaces(config: dict[str, Any]) -> NamespaceBundle:
"""Reconstruct storage and namespaces from serialized config.
This function is async because tool registry initialization may require
async operations (e.g., MCP server connections).
Args:
config: Dict with "type" key ("file" or "redis") and type-specific fields.
- For "file": {"type": "file", "base_path": str, "workspace_id": str|None,
"tools_path": str|None}
- For "redis": {"type": "redis", "url": str, "prefix": str,
"workspace_id": str|None,
"tools_path": str|None}
- tools_path is optional; if provided, tools load from that directory
Returns:
NamespaceBundle with tools, workflows, artifacts namespaces.
Raises:
ValueError: If config["type"] is unknown or missing.
KeyError: If required fields are missing for the storage type.
"""
storage_type = config.get("type")
if storage_type == "file":
return await _bootstrap_file_storage(config)
elif storage_type == "redis":
return await _bootstrap_redis_storage(config)
else:
raise ValueError(f"Unknown storage type: {storage_type!r}. Expected 'file' or 'redis'.")
async def _load_tools_namespace(tools_path_str: str | None) -> ToolsNamespace:
"""Load tools namespace from optional tools path."""
from py_code_mode.tools import ToolRegistry, ToolsNamespace
if tools_path_str:
tools_path = Path(tools_path_str)
registry = await ToolRegistry.from_dir(str(tools_path))
return ToolsNamespace(registry)
return ToolsNamespace(ToolRegistry())
def _build_namespace_bundle(
storage: Any,
tools_ns: ToolsNamespace,
deps_ns: DepsNamespace,
artifact_store: ArtifactStoreProtocol,
) -> NamespaceBundle:
"""Wire up namespaces into a NamespaceBundle."""
from py_code_mode.execution.in_process.workflows_namespace import WorkflowsNamespace
namespace_dict: dict[str, Any] = {}
workflows_ns = WorkflowsNamespace(storage.get_workflow_library(), namespace_dict)
namespace_dict["tools"] = tools_ns
namespace_dict["workflows"] = workflows_ns
namespace_dict["artifacts"] = artifact_store
namespace_dict["deps"] = deps_ns
return NamespaceBundle(
tools=tools_ns,
workflows=workflows_ns,
artifacts=artifact_store,
deps=deps_ns,
)
async def _bootstrap_file_storage(config: dict[str, Any]) -> NamespaceBundle:
"""Bootstrap namespaces from FileStorage config.
Args:
config: Dict with base_path key and optional tools_path.
Returns:
NamespaceBundle with file-based storage.
Raises:
KeyError: If base_path is missing.
"""
# Import lazily to avoid circular imports
from py_code_mode.deps import DepsNamespace, FileDepsStore, PackageInstaller
from py_code_mode.storage import FileStorage
base_path = Path(config["base_path"])
workspace_id = config.get("workspace_id")
storage = FileStorage(base_path, workspace_id=workspace_id)
tools_ns = await _load_tools_namespace(config.get("tools_path"))
artifact_store = storage.get_artifact_store()
# Create deps namespace
deps_store = FileDepsStore(base_path)
installer = PackageInstaller()
deps_ns = DepsNamespace(deps_store, installer)
return _build_namespace_bundle(storage, tools_ns, deps_ns, artifact_store)
async def _bootstrap_redis_storage(config: dict[str, Any]) -> NamespaceBundle:
"""Bootstrap namespaces from RedisStorage config.
Args:
config: Dict with url, prefix, and optional tools_path keys.
Returns:
NamespaceBundle with Redis-based storage.
Raises:
KeyError: If url or prefix is missing.
"""
# Import lazily to avoid circular imports
from py_code_mode.deps import DepsNamespace, PackageInstaller, RedisDepsStore
from py_code_mode.storage import RedisStorage
url = config["url"]
prefix = config["prefix"]
workspace_id = config.get("workspace_id")
# Connect to Redis
storage = RedisStorage(url=url, prefix=prefix, workspace_id=workspace_id)
tools_ns = await _load_tools_namespace(config.get("tools_path"))
artifact_store = storage.get_artifact_store()
# Create deps namespace
deps_store = RedisDepsStore(storage.client, prefix=prefix)
installer = PackageInstaller()
deps_ns = DepsNamespace(deps_store, installer)
return _build_namespace_bundle(storage, tools_ns, deps_ns, artifact_store)