Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 9 additions & 43 deletions rock/admin/entrypoints/sandbox_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import math
import re
import time
from typing import Annotated, Any
Expand All @@ -24,21 +23,21 @@
SandboxCommand,
SandboxCreateBashSessionRequest,
SandboxReadFileRequest,
SandboxRestartRequest,
SandboxStartRequest,
SandboxWriteFileRequest,
StartHeaders,
)
from rock.admin.proto.response import SandboxStartResponse
from rock.common.constants import (
CPU_OVERCOMMIT_ALLOWED_KEYS_KEY,
CPU_OVERCOMMIT_HEADROOM_KEY,
EXTRA_ACCELERATOR_TYPES_KEY,
GET_STATUS_SWITCH,
KATA_DIND_DISK_SIZE_KEY,
KATA_RUNTIME_SWITCH,
SANDBOX_DISK_LIMIT_ROOTFS_KEY,
SUPPORT_KATA_SWITCH,
)
from rock.common.cpu_overcommit import apply_cpu_overcommit
from rock.common.exception import handle_exceptions
from rock.common.validation import NonBlankStr
from rock.config import ImageRegistryMirror
Expand Down Expand Up @@ -275,43 +274,6 @@ async def _apply_accelerator_type_validation(config: DockerDeploymentConfig) ->
)


async def _apply_cpu_overcommit_default(config: DockerDeploymentConfig, rock_authorization: str | None) -> None:
"""Derive limit_cpus from cpus + Nacos headroom when SDK did not set it.

Formula: limit_cpus = min(2 * cpus, cpus + headroom)
- SDK-supplied limit_cpus always wins (function is a no-op in that case).
- Grayscale gate driven by Nacos list `cpu_overcommit_allowed_keys`:
* key absent from Nacos -> gate is open for every caller (full rollout).
* key present as a list -> only `rock_authorization` values in the list pass.
* key present but not a list (misconfigured) -> gate closed.
- headroom is read from Nacos key `cpu_overcommit_headroom` (default 0).
- headroom <= 0 keeps limit_cpus = None (docker run gets no --cpus flag).
"""
if config.limit_cpus is not None:
return

nacos = sandbox_manager.rock_config.nacos_provider
if nacos is None:
return

nacos_config = await nacos.get_config() or {}
allowed_keys = nacos_config.get(CPU_OVERCOMMIT_ALLOWED_KEYS_KEY)
if allowed_keys is not None and (not isinstance(allowed_keys, list) or rock_authorization not in allowed_keys):
return

raw = nacos_config.get(CPU_OVERCOMMIT_HEADROOM_KEY)
try:
headroom = float(raw) if raw is not None else 0.0
except (TypeError, ValueError):
headroom = 0.0

# Reject NaN / inf so a fat-fingered Nacos edit can't break sandbox startup
if not math.isfinite(headroom) or headroom <= 0:
return

config.limit_cpus = min(2 * config.cpus, config.cpus + headroom)


@sandbox_router.post("/start")
@handle_exceptions(error_message="start sandbox failed")
async def start(request: SandboxStartRequest) -> RockResponse[SandboxStartResponse]:
Expand All @@ -335,7 +297,9 @@ async def start_async(
await _apply_accelerator_type_validation(config)
await _apply_kata_runtime_switch(config)
await _apply_kata_disk_size(config)
await _apply_cpu_overcommit_default(config, headers.user_info.get("rock_authorization"))
await apply_cpu_overcommit(
config, sandbox_manager.rock_config.nacos_provider, headers.user_info.get("rock_authorization")
)
await _apply_disk_limits(config)
await _apply_image_registry_mirror(config)
sandbox_start_response = await sandbox_manager.start_async(
Expand Down Expand Up @@ -454,8 +418,10 @@ async def delete(sandbox_id: str = Body(..., embed=True)) -> RockResponse:

@sandbox_router.post("/restart")
@handle_exceptions(error_message="restart sandbox failed")
async def restart(sandbox_id: str = Body(..., embed=True)) -> RockResponse[SandboxStartResponse]:
result = await sandbox_manager.restart_async(sandbox_id)
async def restart(request: SandboxRestartRequest) -> RockResponse[SandboxStartResponse]:
result = await sandbox_manager.restart_async(
request.sandbox_id, cpus=request.cpus, memory=request.memory, limit_cpus=request.limit_cpus
)
return RockResponse(result=result)


Expand Down
7 changes: 7 additions & 0 deletions rock/admin/proto/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class SandboxStartRequest(BaseModel):
"""GPU accelerator type (e.g. 'A100', 'V100'). If not specified, any available GPU will be used."""


class SandboxRestartRequest(BaseModel):
sandbox_id: NonBlankStr
cpus: float | None = None
memory: str | None = None
limit_cpus: float | None = None


class SandboxCommand(Command):
timeout: float | None = 1200
"""The timeout for the command. None means no timeout."""
Expand Down
42 changes: 42 additions & 0 deletions rock/common/cpu_overcommit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import math

from rock.common.constants import CPU_OVERCOMMIT_ALLOWED_KEYS_KEY, CPU_OVERCOMMIT_HEADROOM_KEY
from rock.deployments.config import DockerDeploymentConfig


async def apply_cpu_overcommit(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

封装到类

config: DockerDeploymentConfig, nacos_provider, rock_authorization: str | None = None
) -> None:
"""Derive limit_cpus from cpus + Nacos headroom when not explicitly set.

Formula: limit_cpus = min(2 * cpus, cpus + headroom)
- SDK-supplied limit_cpus always wins (function is a no-op in that case).
- Grayscale gate driven by Nacos list `cpu_overcommit_allowed_keys`:
* key absent from Nacos -> gate is open for every caller (full rollout).
* key present as a list -> only `rock_authorization` values in the list pass.
* key present but not a list (misconfigured) -> gate closed.
- headroom is read from Nacos key `cpu_overcommit_headroom` (default 0).
- headroom <= 0 keeps limit_cpus = None (docker run gets no --cpus flag).
"""
if config.limit_cpus is not None:
return

if nacos_provider is None:
return

nacos_config = await nacos_provider.get_config() or {}
allowed_keys = nacos_config.get(CPU_OVERCOMMIT_ALLOWED_KEYS_KEY)
if allowed_keys is not None and (not isinstance(allowed_keys, list) or rock_authorization not in allowed_keys):
return

raw = nacos_config.get(CPU_OVERCOMMIT_HEADROOM_KEY)
try:
headroom = float(raw) if raw is not None else 0.0
except (TypeError, ValueError):
headroom = 0.0

# Reject NaN / inf so a fat-fingered Nacos edit can't break sandbox startup
if not math.isfinite(headroom) or headroom <= 0:
return

config.limit_cpus = min(2 * config.cpus, config.cpus + headroom)
11 changes: 11 additions & 0 deletions rock/deployments/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,15 @@ def _docker_create(self, cmd: list[str]) -> None:
)
raise

def _docker_update_resources(self) -> None:
"""Apply cpu/memory resource updates to a stopped container via docker update."""
cmd = ["docker", "update"] + self._cpus() + self._memory() + [self._container_name]
logger.info(f"Updating container resources: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"docker update failed: {result.stderr}")
raise RuntimeError(f"docker update failed for {self._container_name}: {result.stderr}")

def _docker_start(self) -> subprocess.Popen:
"""Start a previously-created container with stdout/stderr attached."""
try:
Expand Down Expand Up @@ -706,6 +715,8 @@ async def restart(self):

logger.info(f"Restarting container {self._container_name} with docker start")

await loop.run_in_executor(executor, self._docker_update_resources)

# Reuse the same Popen-based attached start used by start(), so the
# restart path also produces a valid self._container_process. Without
# this, _stop() would skip its `if self._container_process is not None`
Expand Down
41 changes: 40 additions & 1 deletion rock/sandbox/sandbox_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from rock.admin.proto.request import SandboxWriteFileRequest as WriteFileRequest
from rock.admin.proto.response import SandboxStartResponse, SandboxStatusResponse
from rock.common.constants import DeleteReason, StopReason
from rock.common.cpu_overcommit import apply_cpu_overcommit
from rock.config import RockConfig, RuntimeConfig
from rock.deployments.config import DeploymentConfig, DockerDeploymentConfig
from rock.logger import init_logger
Expand Down Expand Up @@ -152,7 +153,13 @@ async def start_async(
)

@monitor_sandbox_operation()
async def restart_async(self, sandbox_id: str) -> SandboxStartResponse:
async def restart_async(
self,
sandbox_id: str,
cpus: float | None = None,
memory: str | None = None,
limit_cpus: float | None = None,
) -> SandboxStartResponse:
sm = await self._get_current_statemachine(sandbox_id)
if sm is None:
raise BadRequestRockError(f"Sandbox {sandbox_id} not found")
Expand All @@ -161,11 +168,43 @@ async def restart_async(self, sandbox_id: str) -> SandboxStartResponse:
if state != State.STOPPED:
raise BadRequestRockError(f"Sandbox {sandbox_id} cannot be restarted: current state is '{state.value}'")

info = sm.sandbox_info or {}
spec = info.get("spec") or {}
if spec:
restart_config = DockerDeploymentConfig(**spec)
else:
logger.warning(
f"sandbox {sandbox_id} has no spec snapshot; rebuilding config from flat fields with model defaults"
)
restart_config = DockerDeploymentConfig(
container_name=sandbox_id,
image=info.get("image") or DockerDeploymentConfig.model_fields["image"].default,
memory=info.get("memory") or DockerDeploymentConfig.model_fields["memory"].default,
cpus=float(info.get("cpus") or DockerDeploymentConfig.model_fields["cpus"].default),
)

if cpus is not None:
restart_config.cpus = cpus
if limit_cpus is not None:
restart_config.limit_cpus = limit_cpus
if memory is not None:
restart_config.memory = memory
if cpus is not None and limit_cpus is None:
if restart_config.limit_cpus is not None:
logger.warning(
f"restart {sandbox_id}: cpus changed to {cpus} but limit_cpus not provided, "
f"clearing previous limit_cpus={restart_config.limit_cpus}"
)
restart_config.limit_cpus = None
await apply_cpu_overcommit(restart_config, self.rock_config.nacos_provider)
self.validate_sandbox_spec(self.rock_config.runtime, restart_config)

await sm.send(
"restart",
sandbox_id=sandbox_id,
operator=self._operator,
meta_store=self._meta_store,
restart_config=restart_config,
)

info: SandboxInfo = sm.sandbox_info or {}
Expand Down
26 changes: 9 additions & 17 deletions rock/sandbox/sandbox_statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,19 @@ async def on_alive(self, sandbox_id: str, meta_store, sandbox_info: SandboxInfo)
sandbox_info["start_time"] = get_iso8601_timestamp()
await meta_store.update(sandbox_id, sandbox_info)

async def on_restart(self, sandbox_id: str, operator, meta_store) -> None:
async def on_restart(
self,
sandbox_id: str,
operator,
meta_store,
restart_config: DockerDeploymentConfig,
) -> None:
info = self.sandbox_info or {}

host_ip = info.get("host_ip")
if not host_ip:
raise BadRequestRockError(f"Sandbox {sandbox_id} has no host_ip; cannot pin restart to original node")

# Prefer the spec snapshot (DockerDeploymentConfig.model_dump persisted to
# the DB at start time) so the new actor wraps the existing container with
# the exact same config.
spec = info.get("spec") or {}
if spec:
restart_config = DockerDeploymentConfig(**spec)
else:
logger.warning(
f"sandbox {sandbox_id} has no spec snapshot; rebuilding config from flat fields with model defaults"
)
restart_config = DockerDeploymentConfig(
container_name=sandbox_id,
image=info.get("image") or DockerDeploymentConfig.model_fields["image"].default,
memory=info.get("memory") or DockerDeploymentConfig.model_fields["memory"].default,
cpus=float(info.get("cpus") or DockerDeploymentConfig.model_fields["cpus"].default),
)
timeout_info = SandboxTimeoutHelper.make_timeout_info(restart_config.auto_clear_time)

logger.info(f"restart sandbox {sandbox_id} (pin host_ip={host_ip})")
Expand All @@ -135,6 +125,8 @@ async def on_restart(self, sandbox_id: str, operator, meta_store) -> None:
# fields (spec/status) won't pollute the alive key.
new_info = dict(info)
new_info["state"] = RockState.PENDING
new_info["cpus"] = restart_config.cpus
new_info["memory"] = restart_config.memory
new_info.pop("stop_time", None)
await meta_store.update(sandbox_id, new_info)
await meta_store.update_timeout(sandbox_id, timeout_info)
Expand Down
52 changes: 52 additions & 0 deletions tests/unit/admin/entrypoints/test_param_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,55 @@ async def test_gem_close_empty_sandbox_id(gem_app):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
_assert_failed(await client.post("/close", json={"sandbox_id": ""}))
_assert_failed(await client.post("/close", json={"sandbox_id": " "}))


# --- restart backward compatibility ---


@pytest.mark.asyncio
async def test_restart_old_sdk_only_sandbox_id(sandbox_app):
"""Old SDK sends only {"sandbox_id": "xxx"} — must still work."""
app, mock_manager = sandbox_app
mock_manager.restart_async = AsyncMock(
return_value=SandboxStartResponse(sandbox_id="sb-1", host_name="h", host_ip="1.2.3.4")
)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post("/restart", json={"sandbox_id": "sb-1"})
assert resp.status_code == 200
assert resp.json()["status"] == "Success"
mock_manager.restart_async.assert_called_once_with("sb-1", cpus=None, memory=None, limit_cpus=None)


@pytest.mark.asyncio
async def test_restart_new_sdk_with_resources(sandbox_app):
"""New SDK sends {"sandbox_id": "xxx", "cpus": 4, "memory": "8g"}."""
app, mock_manager = sandbox_app
mock_manager.restart_async = AsyncMock(
return_value=SandboxStartResponse(sandbox_id="sb-1", host_name="h", host_ip="1.2.3.4")
)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post("/restart", json={"sandbox_id": "sb-1", "cpus": 4, "memory": "8g"})
assert resp.status_code == 200
assert resp.json()["status"] == "Success"
mock_manager.restart_async.assert_called_once_with("sb-1", cpus=4.0, memory="8g", limit_cpus=None)


@pytest.mark.asyncio
async def test_restart_with_limit_cpus(sandbox_app):
"""New SDK sends {"sandbox_id": "xxx", "cpus": 2, "limit_cpus": 4}."""
app, mock_manager = sandbox_app
mock_manager.restart_async = AsyncMock(
return_value=SandboxStartResponse(sandbox_id="sb-1", host_name="h", host_ip="1.2.3.4")
)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post("/restart", json={"sandbox_id": "sb-1", "cpus": 2, "limit_cpus": 4})
assert resp.status_code == 200
assert resp.json()["status"] == "Success"
mock_manager.restart_async.assert_called_once_with("sb-1", cpus=2.0, memory=None, limit_cpus=4.0)


@pytest.mark.asyncio
async def test_restart_empty_sandbox_id(sandbox_app):
app, _ = sandbox_app
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
_assert_failed(await client.post("/restart", json={"sandbox_id": ""}))
Loading
Loading