Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/blaxel/core/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
BLAXEL_API_VERSION = "2026-04-16"


def _get_int_env(name: str, default: int) -> int:
value = os.environ.get(name)
if value is None:
return default
try:
return int(value)
except ValueError:
return default


def _get_os_arch() -> str:
"""Get OS and architecture information."""
try:
Expand Down Expand Up @@ -97,6 +107,16 @@ def api_version(self) -> str:
"""Get the API version sent in the Blaxel-Version header."""
return os.environ.get("BL_API_VERSION", BLAXEL_API_VERSION)

@property
def fs_part_retries(self) -> int:
"""Retry budget for idempotent filesystem upload PUTs."""
return _get_int_env("BL_FS_PART_RETRIES", 3)

@property
def sandbox_read_retries(self) -> int:
"""Retry budget for idempotent sandbox read/list operations."""
return _get_int_env("BL_SANDBOX_READ_RETRIES", 5)

@property
def headers(self) -> Dict[str, str]:
"""Get the headers for API requests."""
Expand Down
27 changes: 16 additions & 11 deletions src/blaxel/core/sandbox/default/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DriveUnmountResponse,
ErrorResponse,
)
from ..transient_retry import retry_on_transient_reset_async
from ..types import SandboxConfiguration
from .action import SandboxAction

Expand Down Expand Up @@ -100,15 +101,19 @@ async def list(self) -> List[DriveMountInfo]:
Returns:
List of DriveMountInfo for each mounted drive
"""
client = Client(
base_url=self.url,
headers={**settings.headers, **self.sandbox_config.headers},
)

async with client:
response = await get_drives_mount(client=client)
if response is None:
raise Exception("Failed to list drives")
if isinstance(response, ErrorResponse):
raise Exception(f"List drives failed: {response.error}")
return list(response.mounts) if response.mounts else []
async def list_once() -> List[DriveMountInfo]:
client = Client(
base_url=self.url,
headers={**settings.headers, **self.sandbox_config.headers},
)

async with client:
response = await get_drives_mount(client=client)
if response is None:
raise Exception("Failed to list drives")
if isinstance(response, ErrorResponse):
raise Exception(f"List drives failed: {response.error}")
return list(response.mounts) if response.mounts else []

return await retry_on_transient_reset_async(list_once)
185 changes: 104 additions & 81 deletions src/blaxel/core/sandbox/default/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from ...common.settings import settings
from ..client.models import Directory, FileRequest, SuccessResponse
from ..transient_retry import retry_on_transient_reset_async
from ..types import (
AsyncWatchHandle,
CopyResponse,
Expand Down Expand Up @@ -93,34 +94,35 @@ async def write_binary(
if len(content) > MULTIPART_THRESHOLD:
return await self._upload_with_multipart(path, content, "0644")

# Use regular upload for small files
# Wrap binary content in BytesIO to provide file-like interface
binary_file = io.BytesIO(content)

# Prepare multipart form data
files = {
"file": (
"binary-file.bin",
binary_file,
"application/octet-stream",
),
}
data = {"permissions": "0644", "path": path}

# Use the fixed get_client method
url = f"{self.url}/filesystem/{path}"
headers = {**settings.headers, **self.sandbox_config.headers}

client = self.get_client()
response = await client.put(url, files=files, data=data, headers=headers)
try:
content_bytes = await response.aread()
if not response.is_success:
error_text = content_bytes.decode("utf-8", errors="ignore")
raise Exception(f"Failed to write binary: {response.status_code} {error_text}")
return SuccessResponse.from_dict(json.loads(content_bytes))
finally:
await response.aclose()
async def put_once() -> SuccessResponse:
files = {
"file": (
"binary-file.bin",
io.BytesIO(content),
"application/octet-stream",
),
}
data = {"permissions": "0644", "path": path}
client = self.get_client()
response = await client.put(url, files=files, data=data, headers=headers)
try:
content_bytes = await response.aread()
if not response.is_success:
error_text = content_bytes.decode("utf-8", errors="ignore")
raise Exception(f"Failed to write binary: {response.status_code} {error_text}")
result = SuccessResponse.from_dict(json.loads(content_bytes))
assert result is not None
return result
finally:
await response.aclose()

return await retry_on_transient_reset_async(
put_once,
retries=settings.fs_part_retries,
)

async def write_tree(
self,
Expand Down Expand Up @@ -152,16 +154,19 @@ async def write_tree(
async def read(self, path: str) -> str:
path = self.format_path(path)

client = self.get_client()
response = await client.get(f"/filesystem/{path}")
try:
data = json.loads(await response.aread())
self.handle_response_error(response)
if "content" in data:
return data["content"]
raise Exception("Unsupported file type")
finally:
await response.aclose()
async def read_once() -> str:
client = self.get_client()
response = await client.get(f"/filesystem/{path}")
try:
data = json.loads(await response.aread())
self.handle_response_error(response)
if "content" in data:
return data["content"]
raise Exception("Unsupported file type")
finally:
await response.aclose()

return await retry_on_transient_reset_async(read_once)

async def read_binary(self, path: str) -> bytes:
"""Read binary content from a file.
Expand All @@ -181,14 +186,17 @@ async def read_binary(self, path: str) -> bytes:
"Accept": "application/octet-stream",
}

client = self.get_client()
response = await client.get(url, headers=headers)
try:
content = await response.aread()
self.handle_response_error(response)
return content
finally:
await response.aclose()
async def read_once() -> bytes:
client = self.get_client()
response = await client.get(url, headers=headers)
try:
content = await response.aread()
self.handle_response_error(response)
return content
finally:
await response.aclose()

return await retry_on_transient_reset_async(read_once)

async def download(self, src: str, destination_path: str, mode: int = 0o644) -> None:
"""Download a file from the sandbox to the local filesystem.
Expand Down Expand Up @@ -219,16 +227,21 @@ async def rm(self, path: str, recursive: bool = False) -> SuccessResponse:
async def ls(self, path: str) -> Directory:
path = self.format_path(path)

client = self.get_client()
response = await client.get(f"/filesystem/{path}")
try:
data = json.loads(await response.aread())
self.handle_response_error(response)
if not ("files" in data or "subdirectories" in data):
raise Exception('{"error": "Directory not found"}')
return Directory.from_dict(data)
finally:
await response.aclose()
async def ls_once() -> Directory:
client = self.get_client()
response = await client.get(f"/filesystem/{path}")
try:
data = json.loads(await response.aread())
self.handle_response_error(response)
if not ("files" in data or "subdirectories" in data):
raise Exception('{"error": "Directory not found"}')
result = Directory.from_dict(data)
assert result is not None
return result
finally:
await response.aclose()

return await retry_on_transient_reset_async(ls_once)

async def find(
self,
Expand Down Expand Up @@ -269,17 +282,20 @@ async def find(
url = f"{self.url}/filesystem-find/{path}"
headers = {**settings.headers, **self.sandbox_config.headers}

client = self.get_client()
response = await client.get(url, params=params, headers=headers)
try:
data = json.loads(await response.aread())
self.handle_response_error(response)
async def find_once():
client = self.get_client()
response = await client.get(url, params=params, headers=headers)
try:
data = json.loads(await response.aread())
self.handle_response_error(response)

from ..client.models.find_response import FindResponse
from ..client.models.find_response import FindResponse

return FindResponse.from_dict(data)
finally:
await response.aclose()
return FindResponse.from_dict(data)
finally:
await response.aclose()

return await retry_on_transient_reset_async(find_once)

async def grep(
self,
Expand Down Expand Up @@ -322,17 +338,20 @@ async def grep(
url = f"{self.url}/filesystem-content-search/{path}"
headers = {**settings.headers, **self.sandbox_config.headers}

client = self.get_client()
response = await client.get(url, params=params, headers=headers)
try:
data = json.loads(await response.aread())
self.handle_response_error(response)
async def grep_once():
client = self.get_client()
response = await client.get(url, params=params, headers=headers)
try:
data = json.loads(await response.aread())
self.handle_response_error(response)

from ..client.models.content_search_response import ContentSearchResponse
from ..client.models.content_search_response import ContentSearchResponse

return ContentSearchResponse.from_dict(data)
finally:
await response.aclose()
return ContentSearchResponse.from_dict(data)
finally:
await response.aclose()

return await retry_on_transient_reset_async(grep_once)

async def cp(self, source: str, destination: str, max_wait: int = 180000) -> CopyResponse:
"""Copy files or directories using the cp command.
Expand Down Expand Up @@ -492,17 +511,21 @@ async def _upload_part(self, upload_id: str, part_number: int, data: bytes) -> D
headers = {**settings.headers, **self.sandbox_config.headers}
params = {"partNumber": part_number}

# Prepare multipart form data with the file chunk
files = {"file": ("part", io.BytesIO(data), "application/octet-stream")}
async def put_once() -> Dict[str, Any]:
files = {"file": ("part", io.BytesIO(data), "application/octet-stream")}
client = self.get_client()
response = await client.put(url, files=files, params=params, headers=headers)
try:
self.handle_response_error(response)
result = json.loads(await response.aread())
return result
finally:
await response.aclose()

client = self.get_client()
response = await client.put(url, files=files, params=params, headers=headers)
try:
self.handle_response_error(response)
result = json.loads(await response.aread())
return result
finally:
await response.aclose()
return await retry_on_transient_reset_async(
put_once,
retries=settings.fs_part_retries,
)

async def _complete_multipart_upload(
self, upload_id: str, parts: List[Dict[str, Any]]
Expand Down
Loading
Loading