Complete reference for the Session class - the primary interface for py-code-mode.
Session wraps a storage backend and executor, providing a unified API for code execution with tools, workflows, and artifacts.
from py_code_mode import Session
# Simplest: auto-discovers tools/, workflows/, artifacts/, requirements.txt
async with Session.from_base("./.code-mode") as session:
result = await session.run("tools.curl.get(url='https://api.github.com')")One-liner for local development. Auto-discovers resources from a workspace directory.
Session.from_base(
base: str | Path,
*,
timeout: float | None = 30.0,
extra_deps: tuple[str, ...] | None = None,
allow_runtime_deps: bool = True,
sync_deps_on_start: bool = False,
) -> Session| Parameter | Type | Description |
|---|---|---|
base |
str | Path |
Workspace directory path. |
timeout |
float | None |
Execution timeout in seconds. None = unlimited. |
extra_deps |
tuple[str, ...] |
Additional packages beyond requirements.txt. |
allow_runtime_deps |
bool |
Allow deps.add()/remove() at runtime. |
sync_deps_on_start |
bool |
Install configured deps when session starts. |
Auto-discovers:
{base}/tools/- Tool definitions (YAML files){base}/workflows/- Workflow files (Python){base}/artifacts/- Persistent data storage{base}/requirements.txt- Pre-configured dependencies
Example:
async with Session.from_base("./.code-mode") as session:
await session.run("tools.list()")Process isolation via subprocess with dedicated virtualenv. Same auto-discovery as from_base().
Session.subprocess(
base_path: str | Path,
*,
timeout: float | None = 60.0,
extra_deps: tuple[str, ...] | None = None,
allow_runtime_deps: bool = True,
sync_deps_on_start: bool = False,
python_version: str | None = None,
cache_venv: bool = True,
) -> SessionExample:
async with Session.subprocess("~/.code-mode") as session:
await session.run("import pandas; pandas.__version__")Fastest execution, no isolation. Same auto-discovery as from_base().
Session.inprocess(
base_path: str | Path,
*,
timeout: float | None = 30.0,
extra_deps: tuple[str, ...] | None = None,
allow_runtime_deps: bool = True,
sync_deps_on_start: bool = False,
) -> SessionExample:
async with Session.inprocess("~/.code-mode") as session:
await session.run("1 + 1")For full control, use the constructor directly with explicit storage and executor.
Session(
storage: StorageBackend,
executor: Executor | None = None,
sync_deps_on_start: bool = False,
)| Parameter | Type | Description |
|---|---|---|
storage |
StorageBackend |
Required. FileStorage or RedisStorage instance. |
executor |
Executor |
Optional. Defaults to InProcessExecutor. |
sync_deps_on_start |
bool |
If True, install pre-configured deps when session starts. |
Example:
from py_code_mode import Session, FileStorage, SubprocessExecutor, SubprocessConfig
storage = FileStorage(base_path=Path("./data"))
config = SubprocessConfig(tools_path=Path("./tools"))
executor = SubprocessExecutor(config=config)
async with Session(storage=storage, executor=executor) as session:
...Initialize the executor and prepare for code execution.
async def start(self) -> NoneCalled automatically when using async with. Only call manually if not using context manager.
Release session resources.
async def close(self) -> NoneCalled automatically when exiting async with. Only call manually if not using context manager.
Reset the execution environment, clearing user-defined variables while preserving namespaces.
async def reset(self) -> NoneExample:
async with Session(storage=storage, executor=executor) as session:
await session.run("x = 42")
await session.reset()
result = await session.run("x") # Error: x is not definedExecute Python code and return the result.
async def run(
self,
code: str,
timeout: float | None = None
) -> ExecutionResult| Parameter | Type | Description |
|---|---|---|
code |
str |
Python code to execute |
timeout |
float |
Optional timeout in seconds (overrides default) |
Returns: ExecutionResult with:
value- Return value of the last expressionstdout- Captured stdouterror- Error message if execution failedis_ok- True if no error
Example:
result = await session.run('''
import json
data = tools.curl.get(url="https://api.github.com/users/octocat")
json.loads(data)["public_repos"]
''')
if result.is_ok:
print(f"Repos: {result.value}")
else:
print(f"Error: {result.error}")Check if the session supports a specific capability.
def supports(self, capability: str) -> boolExample:
if session.supports("timeout"):
result = await session.run(code, timeout=30.0)Get all capabilities supported by the current executor.
def supported_capabilities(self) -> set[str]Example:
caps = session.supported_capabilities()
# {'timeout', 'process_isolation', 'reset', ...}Available capabilities:
| Capability | Description |
|---|---|
timeout |
Supports execution timeout |
process_isolation |
Code runs in separate process |
container_isolation |
Code runs in container |
network_isolation |
Can disable network access |
reset |
Supports environment reset |
deps_install |
Can install dependencies |
List all available tools.
async def list_tools(self) -> list[dict[str, Any]]Returns: List of tool info dicts with name, description, tags.
Example:
tools = await session.list_tools()
for tool in tools:
print(f"{tool['name']}: {tool['description']}")Search tools by keyword or semantic similarity.
async def search_tools(
self,
query: str,
limit: int = 10
) -> list[dict[str, Any]]Example:
http_tools = await session.search_tools("make HTTP requests")List all available workflows.
async def list_workflows(self) -> list[dict[str, Any]]Returns: List of workflow summaries (name, description, parameters - no source).
Search workflows by semantic similarity.
async def search_workflows(
self,
query: str,
limit: int = 5
) -> list[dict[str, Any]]Example:
workflows = await session.search_workflows("fetch GitHub repository data")Get a specific workflow by name, including source code.
async def get_workflow(self, name: str) -> dict[str, Any] | NoneReturns: Workflow dict with name, description, parameters, source, or None if not found.
Example:
workflow = await session.get_workflow("fetch_json")
if workflow:
print(workflow["source"])Create and persist a new workflow.
async def add_workflow(
self,
name: str,
source: str,
description: str
) -> dict[str, Any]Example:
await session.add_workflow(
name="fetch_json",
source='''async def run(url: str) -> dict:
import json
response = tools.curl.get(url=url)
return json.loads(response)
''',
description="Fetch and parse JSON from a URL"
)Remove a workflow by name.
async def remove_workflow(self, name: str) -> boolReturns: True if removed, False if not found.
List all stored artifacts.
async def list_artifacts(self) -> list[dict[str, Any]]Returns: List of artifact info with name, path, description, metadata, created_at.
Save data as an artifact.
async def save_artifact(
self,
name: str,
data: Any,
description: str = "",
metadata: dict[str, Any] | None = None
) -> dict[str, Any]Example:
await session.save_artifact(
name="analysis_results",
data={"repos": 42, "stars": 1000},
description="GitHub analysis results"
)Load artifact data by name.
async def load_artifact(self, name: str) -> AnyExample:
data = await session.load_artifact("analysis_results")Delete an artifact.
async def delete_artifact(self, name: str) -> NoneList configured dependencies.
async def list_deps(self) -> list[str]Add and install a dependency.
async def add_dep(self, package: str) -> dict[str, Any]Returns: Dict with installed, already_present, failed keys.
Example:
result = await session.add_dep("pandas>=2.0")
if result.get("installed"):
print("pandas installed")Remove a dependency.
async def remove_dep(self, package: str) -> dict[str, Any]Returns: Dict with removed, not_found, failed, removed_from_config keys.
Install all pre-configured dependencies.
async def sync_deps(self) -> dict[str, Any]Returns: Dict with installed, already_present, failed keys.
Example:
# Manually sync deps (alternative to sync_deps_on_start=True)
result = await session.sync_deps()
print(f"Installed: {result['installed']}")Access the underlying storage backend.
@property
def storage(self) -> StorageBackendExample:
# Access storage for advanced operations
workflow_library = session.storage.get_workflow_library()Session implements async context manager for automatic lifecycle management:
async with Session(storage=storage, executor=executor) as session:
# session.start() called automatically
result = await session.run(code)
# session.close() called automatically on exitThis is the recommended pattern. Manual lifecycle management:
session = Session(storage=storage, executor=executor)
await session.start()
try:
result = await session.run(code)
finally:
await session.close()