From 66ba01ae5fd81aed9b744de122411aeaa63be9cb Mon Sep 17 00:00:00 2001 From: mjoffre Date: Thu, 21 May 2026 10:08:52 +0000 Subject: [PATCH 1/2] fix: stream large file uploads from disk to avoid memory exhaustion When write_binary receives a file path for files > 5 MB, read chunks directly from disk using os.pread() instead of loading the entire file into memory with Path.read_bytes(). This prevents OOM for multi-GB files and ensures the multipart upload path is used without requiring the full file content in RAM. At most MAX_PARALLEL_UPLOADS * CHUNK_SIZE (15 MB) is held in memory at any time during upload. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- src/blaxel/core/sandbox/default/filesystem.py | 67 ++++++++++++++++++- src/blaxel/core/sandbox/sync/filesystem.py | 58 ++++++++++++++++ 2 files changed, 123 insertions(+), 2 deletions(-) diff --git a/src/blaxel/core/sandbox/default/filesystem.py b/src/blaxel/core/sandbox/default/filesystem.py index c5e8a1d5..7d3293f6 100644 --- a/src/blaxel/core/sandbox/default/filesystem.py +++ b/src/blaxel/core/sandbox/default/filesystem.py @@ -2,6 +2,7 @@ import io import json import logging +import os from pathlib import Path from typing import Any, Callable, Dict, List, Union @@ -81,15 +82,19 @@ async def write_binary( """ path = self.format_path(path) - # If content is a string, treat it as a file path and read it + # If content is a string, treat it as a file path if isinstance(content, str): local_path = Path(content) + file_size = local_path.stat().st_size + # Stream from disk for large files to avoid loading into memory + if file_size > MULTIPART_THRESHOLD: + return await self._upload_file_with_multipart(path, local_path, "0644") content = local_path.read_bytes() # Convert bytearray to bytes if necessary elif isinstance(content, bytearray): content = bytes(content) - # Use multipart upload for large files + # Use multipart upload for large in-memory data if len(content) > MULTIPART_THRESHOLD: return await self._upload_with_multipart(path, content, "0644") @@ -535,6 +540,64 @@ async def _abort_multipart_upload(self, upload_id: str) -> None: finally: await response.aclose() + async def _upload_file_with_multipart( + self, path: str, local_path: Path, permissions: str = "0644" + ) -> SuccessResponse: + """Upload a local file using streaming multipart upload. + + Reads chunks directly from disk without loading the entire file into memory. + At most MAX_PARALLEL_UPLOADS * CHUNK_SIZE bytes are held in memory at once. + """ + file_size = local_path.stat().st_size + + init_response = await self._initiate_multipart_upload(path, permissions) + upload_id = init_response.get("uploadId") + + if not upload_id: + raise Exception("Failed to get upload ID from initiate response") + + try: + num_parts = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE + parts: List[Dict[str, Any]] = [] + + fd = os.open(str(local_path), os.O_RDONLY) + try: + for i in range(0, num_parts, MAX_PARALLEL_UPLOADS): + batch_tasks = [] + + for j in range(MAX_PARALLEL_UPLOADS): + if i + j >= num_parts: + break + + part_number = i + j + 1 + offset = (part_number - 1) * CHUNK_SIZE + read_size = min(CHUNK_SIZE, file_size - offset) + chunk = os.pread(fd, read_size, offset) + + batch_tasks.append(self._upload_part(upload_id, part_number, chunk)) + + batch_results = await asyncio.gather(*batch_tasks) + parts.extend( + [ + { + "partNumber": r.get("partNumber"), + "etag": r.get("etag"), + } + for r in batch_results + ] + ) + finally: + os.close(fd) + + parts.sort(key=lambda p: p.get("partNumber", 0)) + return await self._complete_multipart_upload(upload_id, parts) + except Exception as error: + try: + await self._abort_multipart_upload(upload_id) + except Exception as abort_error: + logger.warning(f"Failed to abort multipart upload: {abort_error}") + raise error + async def _upload_with_multipart( self, path: str, data: bytes, permissions: str = "0644" ) -> SuccessResponse: diff --git a/src/blaxel/core/sandbox/sync/filesystem.py b/src/blaxel/core/sandbox/sync/filesystem.py index b263057d..92b38d13 100644 --- a/src/blaxel/core/sandbox/sync/filesystem.py +++ b/src/blaxel/core/sandbox/sync/filesystem.py @@ -1,6 +1,7 @@ import io import json import logging +import os import threading from pathlib import Path from typing import Any, Callable, Dict, List, Union @@ -55,6 +56,10 @@ def write_binary(self, path: str, content: Union[bytes, bytearray, str]) -> Succ path = self.format_path(path) if isinstance(content, str): local_path = Path(content) + file_size = local_path.stat().st_size + # Stream from disk for large files to avoid loading into memory + if file_size > MULTIPART_THRESHOLD: + return self._upload_file_with_multipart(path, local_path, "0644") content = local_path.read_bytes() elif isinstance(content, bytearray): content = bytes(content) @@ -287,6 +292,59 @@ def _abort_multipart_upload(self, upload_id: str) -> None: if not response.is_success: logger.warning(f"Failed to abort multipart upload: {response.status_code}") + def _upload_file_with_multipart( + self, path: str, local_path: Path, permissions: str = "0644" + ) -> SuccessResponse: + """Upload a local file using streaming multipart upload. + + Reads chunks directly from disk without loading the entire file into memory. + """ + file_size = local_path.stat().st_size + + init_response = self._initiate_multipart_upload(path, permissions) + upload_id = init_response.get("uploadId") + if not upload_id: + raise Exception("Failed to get upload ID from initiate response") + + try: + num_parts = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE + parts: List[Dict[str, Any]] = [] + + fd = os.open(str(local_path), os.O_RDONLY) + try: + for i in range(0, num_parts, MAX_PARALLEL_UPLOADS): + threads = [] + results: Dict[int, Dict[str, Any]] = {} + + def make_upload(part_number: int, chunk: bytes): + results[part_number] = self._upload_part(upload_id, part_number, chunk) + + for j in range(MAX_PARALLEL_UPLOADS): + if i + j >= num_parts: + break + part_number = i + j + 1 + offset = (part_number - 1) * CHUNK_SIZE + read_size = min(CHUNK_SIZE, file_size - offset) + chunk = os.pread(fd, read_size, offset) + t = threading.Thread(target=make_upload, args=(part_number, chunk)) + threads.append(t) + t.start() + for t in threads: + t.join() + for part_number, r in results.items(): + parts.append({"partNumber": part_number, "etag": r.get("etag")}) + finally: + os.close(fd) + + parts.sort(key=lambda p: p.get("partNumber", 0)) + return self._complete_multipart_upload(upload_id, parts) + except Exception as error: + try: + self._abort_multipart_upload(upload_id) + except Exception as abort_error: + logger.warning(f"Failed to abort multipart upload: {abort_error}") + raise error + def _upload_with_multipart( self, path: str, data: bytes, permissions: str = "0644" ) -> SuccessResponse: From 2f7940741c269e07fa4338f7b1c7474d349327f9 Mon Sep 17 00:00:00 2001 From: mjoffre Date: Thu, 21 May 2026 10:18:12 +0000 Subject: [PATCH 2/2] fix: propagate thread exceptions in sync streaming multipart upload If _upload_part throws inside a thread, capture the exception and re-raise it after all threads join. This prevents _complete_multipart_upload from being called with incomplete parts on partial failure. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- src/blaxel/core/sandbox/sync/filesystem.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/blaxel/core/sandbox/sync/filesystem.py b/src/blaxel/core/sandbox/sync/filesystem.py index 92b38d13..a92f25d2 100644 --- a/src/blaxel/core/sandbox/sync/filesystem.py +++ b/src/blaxel/core/sandbox/sync/filesystem.py @@ -315,9 +315,13 @@ def _upload_file_with_multipart( for i in range(0, num_parts, MAX_PARALLEL_UPLOADS): threads = [] results: Dict[int, Dict[str, Any]] = {} + exceptions: List[Exception] = [] def make_upload(part_number: int, chunk: bytes): - results[part_number] = self._upload_part(upload_id, part_number, chunk) + try: + results[part_number] = self._upload_part(upload_id, part_number, chunk) + except Exception as e: + exceptions.append(e) for j in range(MAX_PARALLEL_UPLOADS): if i + j >= num_parts: @@ -331,6 +335,8 @@ def make_upload(part_number: int, chunk: bytes): t.start() for t in threads: t.join() + if exceptions: + raise exceptions[0] for part_number, r in results.items(): parts.append({"partNumber": part_number, "etag": r.get("etag")}) finally: