diff --git a/rock/admin/entrypoints/sandbox_api.py b/rock/admin/entrypoints/sandbox_api.py index 34c1ac0af5..3f818387e5 100644 --- a/rock/admin/entrypoints/sandbox_api.py +++ b/rock/admin/entrypoints/sandbox_api.py @@ -50,6 +50,11 @@ logger = init_logger(__name__) + +def _log_duration(phase: str, stage: str, start: float): + duration = time.perf_counter() - start + logger.info(f"[{phase}] {stage} took {duration:.3f} s") + _MIRROR_PROBE_CACHE: dict[str, tuple[bool, float]] = {} _MIRROR_PROBE_TTL_SECONDS = 60.0 @@ -389,20 +394,48 @@ async def start_async( request: SandboxStartRequest, headers: Annotated[StartHeaders, Depends()], ) -> RockResponse[SandboxStartResponse]: + total_start = time.perf_counter() config = DockerDeploymentConfig.from_request(request) + + t0 = time.perf_counter() await _apply_accelerator_type_validation(config) + _log_duration("start_async_timing", "Apply accelerator type validation", t0) + + t0 = time.perf_counter() await _apply_kata_runtime_switch(config) await _apply_kata_disk_size(config) + _log_duration("start_async_timing", "Apply kata runtime switch + disk size", t0) + + t0 = time.perf_counter() await _apply_image_os_profile(config) + _log_duration("start_async_timing", "Apply image OS profile", t0) + + t0 = time.perf_counter() await _apply_timeout_defaults(config) await _apply_cpu_overcommit_default(config, headers.user_info.get("rock_authorization")) + _log_duration("start_async_timing", "Apply timeout defaults + CPU overcommit", t0) + + t0 = time.perf_counter() await _apply_disk_limits(config) + _log_duration("start_async_timing", "Apply disk limits", t0) + + t0 = time.perf_counter() await _apply_image_registry_mirror(config) + _log_duration("start_async_timing", "Apply image registry mirror", t0) + + t0 = time.perf_counter() sandbox_start_response = await sandbox_manager.start_async( config, user_info=headers.user_info, cluster_info=headers.cluster_info, ) + _log_duration("start_async_timing", "sandbox_manager.start_async", t0) + + total_duration = time.perf_counter() - total_start + logger.info( + f"[start_async_timing] [{sandbox_start_response.sandbox_id}] " + f"API start_async total took {total_duration:.3f} s" + ) return RockResponse(result=sandbox_start_response) @@ -427,15 +460,27 @@ async def get_sandbox_statistics(sandbox_id: NonBlankStr): @sandbox_router.get("/get_status") @handle_exceptions(error_message="get sandbox status failed") async def get_status(sandbox_id: NonBlankStr, include_all_states: bool = False): + total_start = time.perf_counter() + # TODO: do judgement inside operator - if ( + t0 = time.perf_counter() + use_v2 = ( sandbox_manager.rock_config.nacos_provider is not None and await sandbox_manager.rock_config.nacos_provider.get_switch_status(GET_STATUS_SWITCH) - ): - return RockResponse( - result=await sandbox_manager.get_status_v2(sandbox_id, include_all_states=include_all_states) - ) - return RockResponse(result=await sandbox_manager.get_status(sandbox_id, include_all_states=include_all_states)) + ) + _log_duration("get_status_timing", f"[{sandbox_id}] Nacos switch check", t0) + + t0 = time.perf_counter() + if use_v2: + result = await sandbox_manager.get_status_v2(sandbox_id, include_all_states=include_all_states) + else: + result = await sandbox_manager.get_status(sandbox_id, include_all_states=include_all_states) + _log_duration("get_status_timing", f"[{sandbox_id}] sandbox_manager.get_status", t0) + + total_duration = time.perf_counter() - total_start + logger.info(f"[get_status_timing] [{sandbox_id}] API get_status total took {total_duration:.3f} s") + + return RockResponse(result=result) @sandbox_router.post("/execute") diff --git a/rock/admin/metrics/decorator.py b/rock/admin/metrics/decorator.py index 39bdceb932..e8d175f794 100644 --- a/rock/admin/metrics/decorator.py +++ b/rock/admin/metrics/decorator.py @@ -157,8 +157,18 @@ def monitor_sandbox_operation( sandbox_id_position: int = None, sandbox_id_param: str = None, metric_prefix: str = "request", + skip_user_info: bool = False, ): - """Method decorator: Monitor specific methods""" + """Method decorator: Monitor specific methods + + Parameters + ---------- + skip_user_info: + If True, skip the Redis lookup for user_id/experiment_id/namespace. + Use this for operations like ``start_async`` where the sandbox does + not yet exist in Redis — the lookup would always return None and + only add latency under load. + """ def decorator(f): if asyncio.iscoroutinefunction(f): @@ -179,8 +189,11 @@ async def wrapper(self, *args, **kwargs): args, kwargs, extract_sandbox_id, sandbox_id_position, sandbox_id_param ) - meta_store = getattr(self, "_meta_store", None) - user_id, experiment_id, namespace = await _get_user_info(meta_store, sandbox_id) + if skip_user_info: + user_id, experiment_id, namespace = "default", "default", "default" + else: + meta_store = getattr(self, "_meta_store", None) + user_id, experiment_id, namespace = await _get_user_info(meta_store, sandbox_id) # Build attributes attributes = _build_attributes(op_name, sandbox_id, f, user_id, experiment_id, namespace) diff --git a/rock/admin/metrics/monitor.py b/rock/admin/metrics/monitor.py index 44c8d077bb..1824c34894 100644 --- a/rock/admin/metrics/monitor.py +++ b/rock/admin/metrics/monitor.py @@ -181,7 +181,7 @@ def export_with_logging(metrics_data, *args, **kwargs): t0 = time.perf_counter() result = original_export(metrics_data, *args, **kwargs) elapsed_ms = (time.perf_counter() - t0) * 1000.0 - logger.info( + logger.debug( "OTLP export metric_prefix=%s endpoint=%s data_points=%d duration_ms=%.1f result=%s", self.metric_prefix, endpoint, diff --git a/rock/sandbox/operator/ray.py b/rock/sandbox/operator/ray.py index 0f03f31d72..447ab99e4a 100644 --- a/rock/sandbox/operator/ray.py +++ b/rock/sandbox/operator/ray.py @@ -1,4 +1,5 @@ import json +import time import ray @@ -60,7 +61,9 @@ async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> async with self._ray_service.get_ray_rwlock().read_lock(): sandbox_id = config.container_name logger.info(f"[{sandbox_id}] start_async params:{json.dumps(config.model_dump(), indent=2)}") + t0 = time.perf_counter() sandbox_actor: SandboxActor = await self.create_actor(config) + logger.info(f"[startup_timing] [{sandbox_id}] Ray create_actor " f"took {time.perf_counter() - t0:.3f} s") sandbox_actor.set_metrics_endpoint.remote(self._runtime_config.metrics_endpoint) sandbox_actor.set_user_defined_tags.remote(self._runtime_config.user_defined_tags) sandbox_actor.start.remote() @@ -71,7 +74,11 @@ async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> sandbox_actor.set_user_id.remote(user_id) sandbox_actor.set_experiment_id.remote(experiment_id) sandbox_actor.set_namespace.remote(namespace) + t0 = time.perf_counter() sandbox_info: SandboxInfo = await self._ray_service.async_ray_get(sandbox_actor.sandbox_info.remote()) + logger.info( + f"[startup_timing] [{sandbox_id}] Ray sandbox_info.remote() " f"took {time.perf_counter() - t0:.3f} s" + ) sandbox_info["user_id"] = user_id sandbox_info["experiment_id"] = experiment_id sandbox_info["namespace"] = namespace @@ -82,34 +89,95 @@ async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> async def get_status(self, sandbox_id: str) -> SandboxInfo | None: if self.use_rocklet(): + t0 = time.perf_counter() sandbox_info: SandboxInfo = await build_sandbox_from_redis(self._redis_provider, sandbox_id) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] Redis build_sandbox_from_redis " + f"took {time.perf_counter() - t0:.3f} s" + ) if sandbox_info is None: return None + host_ip = sandbox_info.get("host_ip") + + t0 = time.perf_counter() remote_status = await self.get_remote_status(sandbox_id, host_ip) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] HTTP get_remote_status " + f"took {time.perf_counter() - t0:.3f} s" + ) + + t0 = time.perf_counter() is_alive = await self._check_alive_status(sandbox_id, host_ip, remote_status) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] HTTP check_alive_status " + f"took {time.perf_counter() - t0:.3f} s" + ) + # TODO: sink update state according to is_alive logic into SandboxInfo if is_alive: sandbox_info["state"] = State.RUNNING sandbox_info.update(remote_status.to_dict()) + return sandbox_info async with self._ray_service.get_ray_rwlock().read_lock(): + total_start = time.perf_counter() try: + t0 = time.perf_counter() actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] Ray get_actor " + f"took {time.perf_counter() - t0:.3f} s" + ) except (ValueError, Exception): logger.debug(f"Actor for sandbox {sandbox_id} not found, returning None") return None + + t0 = time.perf_counter() sandbox_info: SandboxInfo = await self._ray_service.async_ray_get(actor.sandbox_info.remote()) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] Ray sandbox_info.remote() " + f"took {time.perf_counter() - t0:.3f} s" + ) + + t0 = time.perf_counter() remote_status: ServiceStatus = await self._ray_service.async_ray_get(actor.get_status.remote()) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] Ray get_status.remote() " + f"took {time.perf_counter() - t0:.3f} s" + ) + sandbox_info["phases"] = {name: phase.to_dict() for name, phase in remote_status.phases.items()} sandbox_info["port_mapping"] = remote_status.get_port_mapping() + + t0 = time.perf_counter() alive = await self._ray_service.async_ray_get(actor.is_alive.remote()) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] Ray is_alive.remote() " + f"took {time.perf_counter() - t0:.3f} s" + ) + # TODO: sink update state according to is_alive logic into SandboxInfo if alive.is_alive: sandbox_info["state"] = State.RUNNING if not self._redis_provider: + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] ray get_status total " + f"took {time.perf_counter() - total_start:.3f} s" + ) return sandbox_info + + t0 = time.perf_counter() redis_info = await self.get_sandbox_info_from_redis(sandbox_id) + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] Redis get_sandbox_info " + f"took {time.perf_counter() - t0:.3f} s" + ) + + logger.info( + f"[operator_get_status_timing] [{sandbox_id}] ray get_status total " + f"took {time.perf_counter() - total_start:.3f} s" + ) if redis_info: redis_info.update(sandbox_info) return redis_info diff --git a/rock/sandbox/sandbox_manager.py b/rock/sandbox/sandbox_manager.py index 1c1ff3070a..b57db0a677 100644 --- a/rock/sandbox/sandbox_manager.py +++ b/rock/sandbox/sandbox_manager.py @@ -90,7 +90,11 @@ async def refresh_aes_key(self): async def _check_sandbox_exists_in_redis(self, config: DeploymentConfig): if isinstance(config, DockerDeploymentConfig) and config.container_name: sandbox_id = config.container_name - if await self._meta_store.exists(sandbox_id): + self._meta_store._redis.log_pool_detailed_status() + t0 = time.perf_counter() + exists = await self._meta_store.exists(sandbox_id) + logger.info(f"[startup_timing] [{config.image}] Redis exists check took {time.perf_counter() - t0:.3f} s") + if exists: raise BadRequestRockError(f"Sandbox {sandbox_id} already exists") def _setup_sandbox_actor_metadata(self, sandbox_actor: SandboxActor, user_info: UserInfo) -> None: @@ -116,10 +120,11 @@ async def _build_sandbox_info_metadata( sandbox_info["state"] = State.PENDING sandbox_info["create_time"] = get_iso8601_timestamp() - @monitor_sandbox_operation() + @monitor_sandbox_operation(skip_user_info=True) async def start_async( self, config: DeploymentConfig, user_info: UserInfo = {}, cluster_info: ClusterInfo = {} ) -> SandboxStartResponse: + total_start = time.perf_counter() await self._check_sandbox_exists_in_redis(config) self.validate_sandbox_spec(self.rock_config.runtime, config) with StageTimer("startup_timing", f"[{config.image}] Init config", logger): @@ -134,17 +139,20 @@ async def start_async( ) docker_deployment_config.cpus = self.rock_config.runtime.standard_spec.cpus docker_deployment_config.memory = self.rock_config.runtime.standard_spec.memory - with StageTimer("startup_timing", f"[{sandbox_id}] Operator submit", logger): + with StageTimer("startup_timing", f"[{sandbox_id}] Ray operator submit", logger): sandbox_info: SandboxInfo = await self._operator.submit(docker_deployment_config, user_info) - await self._build_sandbox_info_metadata(sandbox_info, user_info, cluster_info) - timeout_info = SandboxTimeoutHelper.make_timeout_info(docker_deployment_config.auto_clear_time) - with StageTimer("startup_timing", f"[{sandbox_id}] Meta store create", logger): + with StageTimer("startup_timing", f"[{sandbox_id}] Build sandbox metadata", logger): + await self._build_sandbox_info_metadata(sandbox_info, user_info, cluster_info) + timeout_info = SandboxTimeoutHelper.make_timeout_info(docker_deployment_config.auto_clear_time) + with StageTimer("startup_timing", f"[{sandbox_id}] Meta store create (Redis+DB)", logger): await self._meta_store.create( sandbox_id, sandbox_info, timeout_info=timeout_info, deployment_config=docker_deployment_config, ) + total_duration = time.perf_counter() - total_start + logger.info(f"[startup_timing] [{sandbox_id}] start_async total took {total_duration:.3f} s") return SandboxStartResponse( sandbox_id=sandbox_id, host_name=sandbox_info.get("host_name"), @@ -272,22 +280,27 @@ async def commit(self, sandbox_id, image_tag: str, username: str, password: str) @monitor_sandbox_operation() async def get_status(self, sandbox_id, include_all_states: bool = False) -> SandboxStatusResponse: - # get status from meta_store - sm = await self._get_current_statemachine(sandbox_id) + total_start = time.perf_counter() + + # get status from meta_store (Redis + DB fallback) + with StageTimer("get_status_timing", f"[{sandbox_id}] Meta store get (Redis+DB)", logger): + sm = await self._get_current_statemachine(sandbox_id) if sm is None: raise BadRequestRockError(f"Sandbox {sandbox_id} not found") - # update status from operator + # update status from operator (Ray) is_alive = False operator_sandbox_info: SandboxInfo | None = await self._operator.get_status(sandbox_id=sandbox_id) if operator_sandbox_info is not None: is_alive = operator_sandbox_info.get("state") == State.RUNNING if sm.current_state.value == State.PENDING and is_alive: - await sm.send( - "alive", sandbox_id=sandbox_id, meta_store=self._meta_store, sandbox_info=operator_sandbox_info - ) + with StageTimer("get_status_timing", f"[{sandbox_id}] Meta store alive update (Redis+DB)", logger): + await sm.send( + "alive", sandbox_id=sandbox_id, meta_store=self._meta_store, sandbox_info=operator_sandbox_info + ) if operator_sandbox_info.get("state") in (State.PENDING, State.RUNNING): - await self._refresh_timeout(sandbox_id) + with StageTimer("get_status_timing", f"[{sandbox_id}] Redis refresh timeout", logger): + await self._refresh_timeout(sandbox_id) # compat with legacy get_status behavior by default (include_all_states == False), # raise 'not found' if not on pending or running status. @@ -299,6 +312,9 @@ async def get_status(self, sandbox_id, include_all_states: bool = False) -> Sand else: sandbox_info = sm.sandbox_info + total_duration = time.perf_counter() - total_start + logger.info(f"[get_status_timing] [{sandbox_id}] get_status total took {total_duration:.3f} s") + return SandboxStatusResponse( sandbox_id=sandbox_id, status=sandbox_info.get("phases"), @@ -436,6 +452,4 @@ def validate_sandbox_spec(self, runtime_config: RuntimeConfig, deployment_config parse_size_to_bytes(deployment_config.disk_limit_rootfs) except ValueError as e: logger.warning(f"Invalid disk_limit_rootfs size: {deployment_config.disk_limit_rootfs}", exc_info=e) - raise BadRequestRockError( - f"Invalid disk_limit_rootfs size: {deployment_config.disk_limit_rootfs}" - ) + raise BadRequestRockError(f"Invalid disk_limit_rootfs size: {deployment_config.disk_limit_rootfs}") diff --git a/rock/sandbox/sandbox_meta_store.py b/rock/sandbox/sandbox_meta_store.py index 9b04f66a3f..bceb5db877 100644 --- a/rock/sandbox/sandbox_meta_store.py +++ b/rock/sandbox/sandbox_meta_store.py @@ -76,12 +76,21 @@ async def create( DB-only fields the caller may carry (e.g. ``spec`` / ``status`` from a prior DB-fallback read) cannot leak into the alive key. """ + import time + redis_payload = pick_sandbox_info_fields(sandbox_info) + t0 = time.perf_counter() await self._redis.json_set(alive_sandbox_key(sandbox_id), "$", redis_payload) if timeout_info is not None: await self._redis.json_set(timeout_sandbox_key(sandbox_id), "$", timeout_info) + logger.info( + f"[startup_timing] [{sandbox_id}] Meta store create Redis " f"took {time.perf_counter() - t0:.3f} s" + ) + self._redis.log_pool_detailed_status() + t0 = time.perf_counter() await self._db.create(sandbox_id, sandbox_info, deployment_config) + logger.info(f"[startup_timing] [{sandbox_id}] Meta store create DB " f"took {time.perf_counter() - t0:.3f} s") @monitor_metastore_operation async def update(self, sandbox_id: str, sandbox_info: SandboxInfo) -> None: @@ -140,7 +149,9 @@ async def get(self, sandbox_id: str, check_db: bool = False) -> SandboxInfo | No async def exists(self, sandbox_id: str) -> bool: """Return ``True`` when the Redis alive key exists for ``sandbox_id``.""" - return await self.get(sandbox_id) is not None + # Use EXISTS command instead of JSON.GET for better performance + key = alive_sandbox_key(sandbox_id) + return await self._redis.exists(key) @monitor_metastore_operation async def get_timeout(self, sandbox_id: str) -> dict[str, str] | None: diff --git a/rock/utils/__init__.py b/rock/utils/__init__.py index 04f48f8f0d..83590c63b3 100644 --- a/rock/utils/__init__.py +++ b/rock/utils/__init__.py @@ -3,6 +3,7 @@ from .concurrent_helper import ( AsyncAtomicInt, AsyncSafeDict, + EventLoopLagMonitor, RayUtil, StageTimer, Timer, @@ -70,6 +71,7 @@ "RayUtil", "AsyncSafeDict", "AsyncAtomicInt", + "EventLoopLagMonitor", "run_until_complete", "timeout", # Data utilities diff --git a/rock/utils/concurrent_helper.py b/rock/utils/concurrent_helper.py index 4115170937..2998a4ee34 100644 --- a/rock/utils/concurrent_helper.py +++ b/rock/utils/concurrent_helper.py @@ -97,6 +97,63 @@ def __exit__(self, exc_type, exc_val, exc_tb): return False +class EventLoopLagMonitor: + """Monitor asyncio event loop lag by measuring scheduling delays. + + Schedules a periodic callback via ``loop.call_later`` and measures the + gap between the expected fire time and the actual fire time. A large + gap means the event loop is saturated (blocked by CPU work or too many + ready callbacks). + + Usage:: + + monitor = EventLoopLagMonitor(interval=1.0, warn_threshold=0.5) + monitor.start() # returns the background Task + # ... application runs ... + monitor.stop() + """ + + def __init__(self, interval: float = 1.0, warn_threshold: float = 0.5, logger=None): + self._interval = interval + self._warn_threshold = warn_threshold + self._logger = logger + self._task: asyncio.Task | None = None + self._max_lag: float = 0.0 + self._lag_count: int = 0 + self._sample_count: int = 0 + + async def _monitor_loop(self): + loop = asyncio.get_running_loop() + while True: + expected = self._interval + t0 = loop.time() + await asyncio.sleep(self._interval) + actual = loop.time() - t0 + lag = actual - expected + self._sample_count += 1 + + if lag > self._max_lag: + self._max_lag = lag + + if lag > self._warn_threshold: + self._lag_count += 1 + if self._logger: + self._logger.warning( + f"[event_loop_lag] lag={lag:.3f}s (max={self._max_lag:.3f}s, " + f"samples={self._sample_count}, lag_count={self._lag_count})" + ) + + def start(self) -> asyncio.Task: + """Start the background monitor task. Returns the asyncio.Task.""" + self._task = asyncio.create_task(self._monitor_loop()) + return self._task + + def stop(self): + """Cancel the background monitor task.""" + if self._task and not self._task.done(): + self._task.cancel() + + class StageTimer: """Context manager that logs elapsed time for a named stage.""" diff --git a/rock/utils/providers/redis_provider.py b/rock/utils/providers/redis_provider.py index 07dd5b0557..5bf21be505 100644 --- a/rock/utils/providers/redis_provider.py +++ b/rock/utils/providers/redis_provider.py @@ -19,6 +19,7 @@ def __init__( self.port = port self.password = password self.client: redis.Redis | None = None + self._pool: redis.asyncio.ConnectionPool | None = None async def init_pool(self): """ @@ -26,15 +27,26 @@ async def init_pool(self): """ logger.info("Initializing Redis connection pool...") # Create asynchronous connection pool - pool = redis.asyncio.ConnectionPool( + # - health_check_interval=30: send PING before reusing idle connections + # to detect stale connections closed by intermediate network devices (SLB/VIPServer). + # Without this, a zombie connection causes TCP retransmission timeout (~30s) + # before the client realizes the connection is dead. + # - socket_connect_timeout=5: fail fast on new connection establishment. + # - socket_timeout=5: fail fast on command execution. + # - tcp_keepalive=True: OS-level keepalive to detect dead peers faster. + self._pool = redis.asyncio.ConnectionPool( host=self.host, port=self.port, password=self.password, - decode_responses=True, # Automatically decode Redis bytes responses to str - max_connections=500, # Set maximum connections according to your needs + decode_responses=True, + max_connections=2000, + health_check_interval=30, + socket_connect_timeout=5, + socket_timeout=5, + socket_keepalive=True, ) # Create Redis client based on connection pool - self.client = redis.asyncio.Redis.from_pool(pool) + self.client = redis.asyncio.Redis.from_pool(self._pool) await self.client.ping() logger.info("Redis connection pool initialized and connection successful.") @@ -54,6 +66,31 @@ def _ensure_client(self) -> redis.Redis: raise RuntimeError("RedisManager is not initialized. Please call 'init_pool()' first.") return self.client + async def exists(self, key: str) -> bool: + """Check if a key exists in Redis using the EXISTS command (O(1)).""" + import time + + client = self._ensure_client() + pool = self._pool + available = len(pool._available_connections) if pool else -1 + in_use = len(pool._in_use_connections) if pool else -1 + logger.info(f"[debug] exists: pool state available={available}, in_use={in_use}") + t0 = time.perf_counter() + result = await client.exists(key) + logger.info(f"[debug] exists: client.exists took {time.perf_counter() - t0:.3f} s") + return bool(result) + + def log_pool_detailed_status(self): + """Log detailed Redis connection pool status for debugging.""" + if self._pool: + available = len(self._pool._available_connections) + in_use = len(self._pool._in_use_connections) + max_conn = self._pool.max_connections + logger.info( + f"Redis pool detailed: available={available}, in_use={in_use}, " + f"max={max_conn}, total_created={available + in_use}" + ) + # --- RedisJSON functionality encapsulation --- @property @@ -73,8 +110,43 @@ async def json_set(self, key: str, path: str, obj: Any): :param obj: A Python object that can be serialized to JSON. """ logger.debug(f"JSON SET on key '{key}' at path '{path}'") + import time + + # Step 1: Ensure client + t0 = time.perf_counter() + client = self._ensure_client() + logger.info(f"[debug] json_set step1: ensure_client took {time.perf_counter() - t0:.3f} s") + + # Step 2: Check pool state + t1 = time.perf_counter() + pool = self._pool + available = len(pool._available_connections) if pool else -1 + in_use = len(pool._in_use_connections) if pool else -1 + logger.info( + f"[debug] json_set step2: pool state available={available}, in_use={in_use} (took {time.perf_counter() - t1:.3f} s)" + ) + + # Step 2.5: Check pool connection config + if pool: + conn_kwargs = pool.connection_kwargs + logger.info( + f"[debug] json_set step2.5: pool config socket_timeout={conn_kwargs.get('socket_timeout')}, " + f"socket_connect_timeout={conn_kwargs.get('socket_connect_timeout')}, " + f"health_check_interval={conn_kwargs.get('health_check_interval')}" + ) + + # Step 3: Get json_client + t2 = time.perf_counter() + json_client = client.json() + logger.info(f"[debug] json_set step3: get json_client took {time.perf_counter() - t2:.3f} s") + + # Step 4: Actual set operation try: - await self.json_client.set(key, path, obj) + t3 = time.perf_counter() + result = await json_client.set(key, path, obj) + elapsed = time.perf_counter() - t3 + logger.info(f"[debug] json_set step4: actual set took {elapsed:.3f} s") + return result except Exception as e: logger.error(f"Error on JSON SET for key '{key}': {e}", exc_info=True) raise @@ -113,8 +185,16 @@ async def json_get(self, key: str, path: str = "$") -> Any | None: """ logger.debug(f"JSON GET from key '{key}' at path '{path}'") try: + import time + + pool = self._pool + available = len(pool._available_connections) if pool else -1 + in_use = len(pool._in_use_connections) if pool else -1 + logger.info(f"[debug] json_get: pool state available={available}, in_use={in_use}") + t0 = time.perf_counter() # RedisJSON's GET can return a list even for a single match result = await self.json_client.get(key, path) + logger.info(f"[debug] json_get: actual get took {time.perf_counter() - t0:.3f} s") # Unwrap if it's a single-element list from a specific path query if isinstance(result, list) and len(result) == 1 and path != "$": return result[0] diff --git a/scripts/redis_stress_test.py b/scripts/redis_stress_test.py new file mode 100644 index 0000000000..8704605c50 --- /dev/null +++ b/scripts/redis_stress_test.py @@ -0,0 +1,274 @@ +"""Redis stress test script — uses RedisProvider from the codebase. + +Usage: + python redis_stress_test.py # use defaults + python redis_stress_test.py --port 6379 --password xxx --concurrency 50 +""" + +import argparse +import asyncio +import statistics +import sys +import time +from pathlib import Path + +# Add project root to path so we can import rock modules +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from rock.actions.sandbox.sandbox_info import pick_sandbox_info_fields # noqa: E402 +from rock.admin.core.redis_key import alive_sandbox_key, timeout_sandbox_key # noqa: E402 +from rock.utils.providers.redis_provider import RedisProvider # noqa: E402 + +# --- defaults from rock-inner-vpc-nt-a-pool.yml --- +DEFAULT_HOST = "r-uf6gof1cr5hs5yneyq.redis.rds.aliyuncs.com" +DEFAULT_PORT = 6379 +DEFAULT_PASSWORD = "Admin@gal" + + +async def run_stress_test( + host: str, + port: int, + password: str, + concurrency: int, + iterations: int, +): + print("=== Redis Stress Test ===") + print(f"Host: {host}:{port}") + print(f"Concurrency: {concurrency}, Iterations per worker: {iterations}") + print() + + # Initialize RedisProvider + provider = RedisProvider(host=host, port=port, password=password) + + # Verify connectivity + t0 = time.perf_counter() + await provider.init_pool() + print(f"✓ Connection pool initialized ({time.perf_counter() - t0:.3f}s)") + + t0 = time.perf_counter() + await provider.client.ping() + print(f"✓ PING OK ({time.perf_counter() - t0:.3f}s)") + + # Warm up: create a few connections (same as meta_store.create) + for i in range(min(5, concurrency)): + sandbox_id = f"stress:warmup:{i}" + sandbox_info = { + "sandbox_id": sandbox_id, + "state": "running", + "memory": "2g", + "cpus": 1.0, + } + redis_payload = pick_sandbox_info_fields(sandbox_info) + await provider.json_set(alive_sandbox_key(sandbox_id), "$", redis_payload) + timeout_info = {"auto_clear_time": "120", "expire_time": "2026-06-26T20:00:00+08:00"} + await provider.json_set(timeout_sandbox_key(sandbox_id), "$", timeout_info) + + print(f"\n--- Starting stress test ({concurrency} workers × {iterations} iters) ---\n") + + all_latencies: list[float] = [] + errors: list[str] = [] + total_start = time.perf_counter() + + async def worker(worker_id: int): + prefix = f"stress:worker{worker_id}" + latencies = [] + for i in range(iterations): + sandbox_id = f"{prefix}:item:{i % 10}" # reuse 10 keys per worker + alive_key = alive_sandbox_key(sandbox_id) + timeout_key = timeout_sandbox_key(sandbox_id) + + # --- EXISTS (same as _check_sandbox_exists_in_redis) --- + t = time.perf_counter() + try: + await provider.exists(alive_key) + elapsed = time.perf_counter() - t + latencies.append(elapsed) + except Exception as e: + errors.append(f"[w{worker_id}] EXISTS {alive_key}: {e}") + elapsed = time.perf_counter() - t + latencies.append(elapsed) + + # --- JSON.SET alive key (same as meta_store.create) --- + t = time.perf_counter() + try: + sandbox_info = { + "role": "admin", + "env": "inner-vpc-nt-a-pool", + "image": "rock-instances-registry-vpc.cn-shanghai.cr.aliyuncs.com/instance/harbor:8e50674fc2", + "image_os": "linux", + "port": None, + "docker_args": [], + "startup_timeout": 600.0, + "pull": "missing", + "python_standalone_dir": None, + "platform": None, + "remove_container": False, + "auto_clear_time_minutes": 120, + "memory": "2g", + "cpus": 1.0, + "limit_cpus": None, + "disk_limit_rootfs": None, + "container_name": sandbox_id, + "auto_delete_seconds": None, + "type": "docker", + "enable_auto_clear": False, + "use_kata_runtime": False, + "kata_disk_size": "50G", + "kata_disk_base_path": "/data/docker-disk", + "actor_resource": "xrl-sandbox", + "actor_resource_num": 2, + "registry_username": None, + "runtime_config": { + "enable_auto_clear": False, + "project_root": "/distribution/rock-admin/code/OpenSource", + "python_env_path": "/root/miniconda3", + "envhub_db_url": "sqlite:////root/.rock/rock_envs.db", + "operator_type": "ray", + "standard_spec": {"memory": "8g", "cpus": 2}, + "max_allowed_spec": {"memory": "64g", "cpus": 16}, + "use_standard_spec_only": False, + "metrics_endpoint": "https://sunfire-ingestion-external.alibaba-inc.com", + "user_defined_tags": {"service.name": "chatos"}, + "sandbox_disk_limit_rootfs": None, + "instance_registry_mirrors": [], + "image_os_profiles": {}, + }, + "num_gpus": None, + "accelerator_type": None, + "extended_params": {}, + "image_os_profile": None, + "sandbox_id": sandbox_id, + "state": "running", + "create_time": "2026-06-25T20:00:00+08:00", + } + redis_payload = pick_sandbox_info_fields(sandbox_info) + await provider.json_set(alive_key, "$", redis_payload) + elapsed = time.perf_counter() - t + latencies.append(elapsed) + except Exception as e: + errors.append(f"[w{worker_id}] JSON.SET {alive_key}: {e}") + elapsed = time.perf_counter() - t + latencies.append(elapsed) + + # --- JSON.SET timeout key (same as meta_store.create with timeout_info) --- + t = time.perf_counter() + try: + timeout_info = {"auto_clear_time": "120", "expire_time": "2026-06-26T20:00:00+08:00"} + await provider.json_set(timeout_key, "$", timeout_info) + elapsed = time.perf_counter() - t + latencies.append(elapsed) + except Exception as e: + errors.append(f"[w{worker_id}] JSON.SET {timeout_key}: {e}") + elapsed = time.perf_counter() - t + latencies.append(elapsed) + + # --- JSON.GET alive key (same as meta_store.get / build_sandbox_from_redis) --- + t = time.perf_counter() + try: + await provider.json_get(alive_key, "$") + elapsed = time.perf_counter() - t + latencies.append(elapsed) + except Exception as e: + errors.append(f"[w{worker_id}] JSON.GET {alive_key}: {e}") + elapsed = time.perf_counter() - t + latencies.append(elapsed) + + return latencies + + # Run all workers concurrently + tasks = [worker(i) for i in range(concurrency)] + results = await asyncio.gather(*tasks, return_exceptions=True) + + total_elapsed = time.perf_counter() - total_start + + # Collect results + for r in results: + if isinstance(r, Exception): + errors.append(f"Worker failed: {r}") + elif isinstance(r, list): + all_latencies.extend(r) + + # Report + print("=== Results ===") + print(f"Total time: {total_elapsed:.2f}s") + print(f"Total operations: {len(all_latencies)}") + print(f"Errors: {len(errors)}") + + if all_latencies: + sorted_lat = sorted(all_latencies) + print("\n--- Latency Distribution ---") + print(f" Min: {sorted_lat[0]:.4f}s") + print(f" P50: {sorted_lat[len(sorted_lat) // 2]:.4f}s") + print(f" P90: {sorted_lat[int(len(sorted_lat) * 0.9)]:.4f}s") + print(f" P95: {sorted_lat[int(len(sorted_lat) * 0.95)]:.4f}s") + print(f" P99: {sorted_lat[int(len(sorted_lat) * 0.99)]:.4f}s") + print(f" Max: {sorted_lat[-1]:.4f}s") + print(f" Mean: {statistics.mean(all_latencies):.4f}s") + print(f" Stdev: {statistics.stdev(all_latencies):.4f}s") + + # Flag slow operations (>1s) + slow = [l for l in all_latencies if l > 1.0] + if slow: + print(f"\n⚠ {len(slow)} operations exceeded 1s:") + for i, l in enumerate(sorted(slow, reverse=True)[:20]): + print(f" #{i+1}: {l:.3f}s") + + if errors: + print(f"\n--- Errors ({len(errors)}) ---") + for e in errors[:20]: + print(f" {e}") + if len(errors) > 20: + print(f" ... and {len(errors) - 20} more") + + # Pool status + provider.log_pool_detailed_status() + + # Cleanup (delete both alive and timeout keys) + for i in range(concurrency): + for j in range(10): + sandbox_id = f"stress:worker{i}:item:{j}" + try: + await provider.json_delete(alive_sandbox_key(sandbox_id)) + except Exception: + pass + try: + await provider.json_delete(timeout_sandbox_key(sandbox_id)) + except Exception: + pass + for i in range(min(5, concurrency)): + sandbox_id = f"stress:warmup:{i}" + try: + await provider.json_delete(alive_sandbox_key(sandbox_id)) + except Exception: + pass + try: + await provider.json_delete(timeout_sandbox_key(sandbox_id)) + except Exception: + pass + + await provider.close_pool() + print("\n✓ Cleanup done, pool closed.") + + +def main(): + parser = argparse.ArgumentParser(description="Redis stress test") + parser.add_argument("--host", default=DEFAULT_HOST, help="Redis host") + parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Redis port") + parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Redis password") + parser.add_argument("--concurrency", type=int, default=20, help="Number of concurrent workers") + parser.add_argument("--iterations", type=int, default=50, help="Iterations per worker") + args = parser.parse_args() + + asyncio.run( + run_stress_test( + host=args.host, + port=args.port, + password=args.password, + concurrency=args.concurrency, + iterations=args.iterations, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/redis_timeout_diagnosis.py b/scripts/redis_timeout_diagnosis.py new file mode 100644 index 0000000000..b78219d1c7 --- /dev/null +++ b/scripts/redis_timeout_diagnosis.py @@ -0,0 +1,114 @@ +"""Redis timeout diagnosis script — traces every step of a Redis operation.""" + +import asyncio +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import redis.asyncio as aioredis # noqa: E402 + + +async def diagnose_timeout(): + """Diagnose where the 25s timeout is happening.""" + print("=== Redis Timeout Diagnosis ===\n") + + # Create connection with production config + pool = aioredis.ConnectionPool( + host="r-uf6gof1cr5hs5yneyq.redis.rds.aliyuncs.com", + port=6379, + password="Admin@gal", + decode_responses=True, + max_connections=10, + health_check_interval=30, + socket_connect_timeout=5, + socket_timeout=5, + ) + + client = aioredis.Redis(connection_pool=pool) + + # Step 1: Initial connection + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 1: Calling ping() to establish connection...") + await client.ping() + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ Initial ping took {t1 - t0:.3f}s\n") + + # Step 2: Simple SET operation + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 2: Calling set('test_key', 'value')...") + await client.set("test_key", "value") + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ SET took {t1 - t0:.3f}s\n") + + # Step 3: Simple GET operation + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 3: Calling get('test_key')...") + result = await client.get("test_key") + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ GET took {t1 - t0:.3f}s, result={result}\n") + + # Step 4: Wait and check health_check_interval behavior + print(f"[{time.perf_counter():.3f}] Step 4: Waiting 35 seconds (health_check_interval=30)...") + for i in range(35): + await asyncio.sleep(1) + if (i + 1) % 5 == 0: + print(f" Waited {i + 1}s...") + print(f"[{time.perf_counter():.3f}] ✓ Wait complete\n") + + # Step 5: Operation after health check interval + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 5: Calling get('test_key') after 35s wait (should trigger health check)...") + try: + result = await client.get("test_key") + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ GET after wait took {t1 - t0:.3f}s, result={result}\n") + except Exception as e: + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✗ GET failed after {t1 - t0:.3f}s: {e}\n") + + # Step 6: Get connection directly and trace operations + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 6: Getting connection directly from pool...") + conn = await pool.get_connection("GET") + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ get_connection took {t1 - t0:.3f}s") + print(f" Connection is_connected: {conn.is_connected}") + print( + f" Connection next_health_check: {conn.next_health_check if hasattr(conn, 'next_health_check') else 'N/A'}\n" + ) + + # Step 7: Send command with the connection + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 7: Sending GET command with existing connection...") + await conn.send_command("GET", "test_key") + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ send_command took {t1 - t0:.3f}s\n") + + # Step 8: Read response + t0 = time.perf_counter() + print(f"[{t0:.3f}] Step 8: Reading response...") + response = await conn.read_response() + t1 = time.perf_counter() + print(f"[{t1:.3f}] ✓ read_response took {t1 - t0:.3f}s, result={response}\n") + + # Step 9: Release connection + await pool.release(conn) + print(f"[{time.perf_counter():.3f}] Step 9: Connection released\n") + + # Cleanup + await client.delete("test_key") + await client.aclose() + + print("=== Diagnosis Complete ===") + print("\nIf all operations complete quickly, the timeout issue is not in the Redis client itself.") + print("Check:") + print(" 1. Network latency between pods") + print(" 2. Redis server load (SLOWLOG, INFO)") + print(" 3. Event loop blocking (py-spy showed SSL context creation)") + print(" 4. Connection pool exhaustion under high concurrency") + + +if __name__ == "__main__": + asyncio.run(diagnose_timeout())