From 6572decb3a90292576c4ad7bf66a8792dce7efca Mon Sep 17 00:00:00 2001 From: Maciej Bala Date: Tue, 23 Jun 2026 13:55:56 +0200 Subject: [PATCH 1/2] Added policy action notebook Signed-off-by: Maciej Bala --- cookbooks/cosmos3/generator/action/README.md | 5 + .../action/run_policy_with_vllm.ipynb | 575 ++++++++++++++++++ 2 files changed, 580 insertions(+) create mode 100644 cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb diff --git a/cookbooks/cosmos3/generator/action/README.md b/cookbooks/cosmos3/generator/action/README.md index 02f97ee7..e2871ead 100644 --- a/cookbooks/cosmos3/generator/action/README.md +++ b/cookbooks/cosmos3/generator/action/README.md @@ -116,6 +116,9 @@ generation (see [`run_fd_with_vllm.ipynb`](./run_fd_with_vllm.ipynb) and The notebooks build the full request body for AV, DROID, and UMI examples, including autoregressive chunked generation for the robotics examples. +Policy inference can either use async `POST /v1/videos` to retrieve a rollout +video plus top-level `action` metadata, or the OpenPI websocket endpoint for +action chunks consumed by robot clients. ### VLLM-Omni Notebook Walkthrough @@ -126,6 +129,8 @@ write outputs under `outputs/cosmos3_action_vllm/`: DROID, and UMI robotics examples. - [`run_id_with_vllm.ipynb`](./run_id_with_vllm.ipynb) — inverse dynamics, predicting ego-motion trajectories from input AV videos. +- [`run_policy_with_vllm.ipynb`](./run_policy_with_vllm.ipynb) — policy + inference for DROID through the async video API and OpenPI websocket endpoint. diff --git a/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb b/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb new file mode 100644 index 00000000..fd6456fc --- /dev/null +++ b/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb @@ -0,0 +1,575 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "license-header", + "metadata": {}, + "source": [ + "" + ] + }, + { + "cell_type": "markdown", + "id": "policy-title", + "metadata": {}, + "source": [ + "# Cosmos3 Nano Action: Policy with vLLM-Omni\n", + "\n", + "## Prerequisites\n", + "\n", + "Generator requires the Guardrail. Request access to the gated [nvidia/Cosmos-1.0-Guardrail](https://huggingface.co/nvidia/Cosmos-1.0-Guardrail) HF repository before running guarded examples. This notebook disables guardrails for the policy requests with `guardrails: false` / `--no-guardrails`.\n", + "\n", + "## Overview\n", + "\n", + "This notebook runs Cosmos3 Nano **action policy** inference through vLLM-Omni using the checked-in DROID LeRobot sample under `assets/droid_lerobot_example`.\n", + "\n", + "It covers two policy serving paths from the vLLM-Omni Cosmos3 recipe:\n", + "\n", + "- `POST /v1/videos`: first frame plus instruction -> rollout video plus top-level `action` metadata.\n", + "- `ws://.../v1/realtime/robot/openpi`: OpenPI-compatible observation -> action chunk, suitable for a simulated or real robot client.\n", + "\n", + "The OpenPI example sends one static sample observation. Real robot clients should replace the zero joint/gripper state with live state and send repeated observations in a control loop." + ] + }, + { + "cell_type": "markdown", + "id": "policy-server-md", + "metadata": {}, + "source": [ + "## Start vLLM-Omni Policy Server\n", + "\n", + "Start the server in a terminal from the `cosmos` repo root. The container listens on port `8000`; Docker publishes it to host port `8001`, so the notebook uses `http://localhost:8001`.\n", + "\n", + "The OpenPI RoboLab branch reuses reference Cosmos Framework action transforms. The command below assumes the framework checkout is mounted at `/workspace/cosmos-framework`; adjust `PYTHONPATH` if your checkout lives elsewhere.\n", + "\n", + "```bash\n", + "cat > /tmp/cosmos3_droid_openpi_stage_overrides.json <<'JSON'\n", + "{\n", + " \"0\": {\n", + " \"model_config\": {\n", + " \"policy_server_config\": {\n", + " \"image_resolution\": [540, 640],\n", + " \"n_external_cameras\": 2,\n", + " \"needs_wrist_camera\": true,\n", + " \"needs_stereo_camera\": false,\n", + " \"needs_session_id\": true,\n", + " \"action_space\": \"joint_position\"\n", + " }\n", + " }\n", + " }\n", + "}\n", + "JSON\n", + "\n", + "docker rm -f cosmos3-vllm-omni-policy-notebook 2>/dev/null || true\n", + "\n", + "docker run -d --name cosmos3-vllm-omni-policy-notebook --runtime nvidia --gpus '\"device=0\"' -e CUDA_DEVICE_ORDER=PCI_BUS_ID -e PYTHONPATH=/workspace/cosmos-framework -v \"/mnt/sdb/.cache/huggingface:/root/.cache/huggingface\" -v \"$PWD:/workspace\" -p 8001:8000 --ipc=host vllm/vllm-omni:cosmos3 vllm serve nvidia/Cosmos3-Nano-Policy-DROID --omni --model-class-name Cosmos3OmniDiffusersPipeline --allowed-local-media-path / --port 8000 --init-timeout 1800 --no-guardrails --stage-overrides \"$(cat /tmp/cosmos3_droid_openpi_stage_overrides.json)\"\n", + "\n", + "# Wait until this returns model metadata before running the inference cells.\n", + "curl http://localhost:8001/v1/models\n", + "```\n", + "\n", + "To inspect startup logs:\n", + "\n", + "```bash\n", + "docker logs -f cosmos3-vllm-omni-policy-notebook\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "policy-vars-code", + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "import os\n", + "\n", + "\n", + "def find_repo_root(start: Path) -> Path:\n", + " for path in [start, *start.parents]:\n", + " if (path / \"README.md\").exists() and (path / \"cookbooks\").exists():\n", + " return path\n", + " return start\n", + "\n", + "\n", + "def http_to_ws_url(base_url: str) -> str:\n", + " if base_url.startswith(\"https://\"):\n", + " ws_base = \"wss://\" + base_url[len(\"https://\"):]\n", + " elif base_url.startswith(\"http://\"):\n", + " ws_base = \"ws://\" + base_url[len(\"http://\"):]\n", + " else:\n", + " ws_base = base_url\n", + " return ws_base.rstrip(\"/\") + \"/v1/realtime/robot/openpi\"\n", + "\n", + "\n", + "COSMOS_ROOT = find_repo_root(Path.cwd().resolve())\n", + "COSMOS3_OUTPUT_ROOT = Path(\n", + " os.environ.get(\"COSMOS3_VLLM_OUTPUT_ROOT\", COSMOS_ROOT / \"outputs\" / \"cosmos3_action_vllm\")\n", + ").resolve()\n", + "COSMOS3_INPUT_DIR = COSMOS3_OUTPUT_ROOT / \"inputs\"\n", + "COSMOS3_POLICY_OUTPUT_DIR = COSMOS3_OUTPUT_ROOT / \"action_policy_droid\"\n", + "COSMOS3_ACTION_ROOT = COSMOS_ROOT / \"cookbooks\" / \"cosmos3\" / \"generator\" / \"action\"\n", + "DROID_ASSET_ROOT = COSMOS3_ACTION_ROOT / \"assets\" / \"droid_lerobot_example\"\n", + "VLLM_BASE_URL = os.environ.get(\"COSMOS3_VLLM_BASE_URL\", \"http://localhost:8001\").rstrip(\"/\")\n", + "OPENPI_WS_URL = os.environ.get(\"COSMOS3_OPENPI_WS_URL\", http_to_ws_url(VLLM_BASE_URL))\n", + "VLLM_MODEL = os.environ.get(\"COSMOS3_VLLM_MODEL\", \"nvidia/Cosmos3-Nano-Policy-DROID\")\n", + "\n", + "COSMOS3_INPUT_DIR.mkdir(parents=True, exist_ok=True)\n", + "COSMOS3_POLICY_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)\n", + "\n", + "print(\"COSMOS_ROOT:\", COSMOS_ROOT)\n", + "print(\"DROID_ASSET_ROOT:\", DROID_ASSET_ROOT)\n", + "print(\"COSMOS3_INPUT_DIR:\", COSMOS3_INPUT_DIR)\n", + "print(\"COSMOS3_POLICY_OUTPUT_DIR:\", COSMOS3_POLICY_OUTPUT_DIR)\n", + "print(\"COSMOS3_VLLM_BASE_URL:\", VLLM_BASE_URL)\n", + "print(\"COSMOS3_OPENPI_WS_URL:\", OPENPI_WS_URL)\n", + "print(\"COSMOS3_VLLM_MODEL:\", VLLM_MODEL)" + ] + }, + { + "cell_type": "markdown", + "id": "policy-input-md", + "metadata": {}, + "source": [ + "## Prepare a DROID Policy Input\n", + "\n", + "This cell extracts the first frame from each checked-in DROID camera video, creates a 640x540 multiview conditioning image for the video API, and builds a matching OpenPI observation dictionary.\n", + "\n", + "The OpenPI observation uses real camera frames from the sample asset and zero joint/gripper state so the cell can run without a LeRobot/parquet dependency in the notebook kernel." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "policy-input-code", + "metadata": {}, + "outputs": [], + "source": [ + "import subprocess\n", + "\n", + "import numpy as np\n", + "from PIL import Image, ImageOps\n", + "from IPython.display import display\n", + "\n", + "try:\n", + " import imageio_ffmpeg\n", + "except ImportError as exc:\n", + " raise RuntimeError(\"Install imageio-ffmpeg in this notebook kernel: pip install imageio-ffmpeg\") from exc\n", + "\n", + "FFMPEG = imageio_ffmpeg.get_ffmpeg_exe()\n", + "CAMERA_VIDEO_PATHS = {\n", + " \"observation/wrist_image_left\": DROID_ASSET_ROOT / \"videos\" / \"observation.image.wrist_image_left\" / \"chunk-000\" / \"file-000.mp4\",\n", + " \"observation/exterior_image_1_left\": DROID_ASSET_ROOT / \"videos\" / \"observation.image.exterior_image_1_left\" / \"chunk-000\" / \"file-000.mp4\",\n", + " \"observation/exterior_image_2_left\": DROID_ASSET_ROOT / \"videos\" / \"observation.image.exterior_image_2_left\" / \"chunk-000\" / \"file-000.mp4\",\n", + "}\n", + "for key, video_path in CAMERA_VIDEO_PATHS.items():\n", + " assert video_path.exists(), f\"missing {key}: {video_path}\"\n", + "\n", + "\n", + "def extract_first_frame(video_path: Path, out_path: Path) -> Path:\n", + " if not out_path.exists() or out_path.stat().st_mtime < video_path.stat().st_mtime:\n", + " subprocess.run(\n", + " [FFMPEG, \"-y\", \"-loglevel\", \"error\", \"-i\", str(video_path), \"-frames:v\", \"1\", str(out_path)],\n", + " check=True,\n", + " )\n", + " return out_path\n", + "\n", + "\n", + "frame_paths = {\n", + " key: extract_first_frame(video_path, COSMOS3_INPUT_DIR / f\"policy_{key.split('/')[-1]}.png\")\n", + " for key, video_path in CAMERA_VIDEO_PATHS.items()\n", + "}\n", + "frames = {key: Image.open(path).convert(\"RGB\") for key, path in frame_paths.items()}\n", + "\n", + "# DROID policy uses a concatenated multi-view frame. Keep the OpenPI branch on\n", + "# raw per-camera frames; use this composed image only for the /v1/videos path.\n", + "target_w, target_h = 640, 540\n", + "top_h = target_h // 2\n", + "bottom_h = target_h - top_h\n", + "half_w = target_w // 2\n", + "wrist = ImageOps.fit(frames[\"observation/wrist_image_left\"], (target_w, top_h), method=Image.Resampling.BICUBIC)\n", + "left = ImageOps.fit(frames[\"observation/exterior_image_1_left\"], (half_w, bottom_h), method=Image.Resampling.BICUBIC)\n", + "right = ImageOps.fit(frames[\"observation/exterior_image_2_left\"], (half_w, bottom_h), method=Image.Resampling.BICUBIC)\n", + "policy_image = Image.new(\"RGB\", (target_w, target_h))\n", + "policy_image.paste(wrist, (0, 0))\n", + "policy_image.paste(left, (0, top_h))\n", + "policy_image.paste(right, (half_w, top_h))\n", + "policy_image_path = COSMOS3_INPUT_DIR / \"droid_policy_first_frame.png\"\n", + "policy_image.save(policy_image_path)\n", + "\n", + "policy_prompt = os.environ.get(\n", + " \"COSMOS3_POLICY_PROMPT\",\n", + " \"Pick up the object and place it in the target container.\",\n", + ")\n", + "openpi_observation = {\n", + " \"prompt\": policy_prompt,\n", + " \"session_id\": os.environ.get(\"COSMOS3_POLICY_SESSION_ID\", \"cosmos3-policy-notebook\"),\n", + " \"observation/wrist_image_left\": np.asarray(frames[\"observation/wrist_image_left\"], dtype=np.uint8),\n", + " \"observation/exterior_image_1_left\": np.asarray(frames[\"observation/exterior_image_1_left\"], dtype=np.uint8),\n", + " \"observation/exterior_image_2_left\": np.asarray(frames[\"observation/exterior_image_2_left\"], dtype=np.uint8),\n", + " \"observation/joint_position\": np.zeros(7, dtype=np.float32),\n", + " \"observation/gripper_position\": np.zeros(1, dtype=np.float32),\n", + "}\n", + "\n", + "print(\"policy prompt:\", policy_prompt)\n", + "print(\"video API conditioning image:\", policy_image_path, policy_image.size)\n", + "for key, value in openpi_observation.items():\n", + " if isinstance(value, np.ndarray):\n", + " print(key, value.shape, value.dtype)\n", + "\n", + "display(policy_image)" + ] + }, + { + "cell_type": "markdown", + "id": "policy-video-md", + "metadata": {}, + "source": [ + "## Run Policy Inference Through `/v1/videos`\n", + "\n", + "This path behaves like the forward/inverse vLLM action notebooks: it sends a multipart request to the OpenAI-compatible video API, polls the async job, writes the generated rollout video, and saves the predicted action from the response metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "policy-video-code", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import time\n", + "from pathlib import Path\n", + "\n", + "try:\n", + " import requests\n", + "except ImportError as exc:\n", + " raise RuntimeError(\"Install requests in this notebook kernel: pip install requests\") from exc\n", + "\n", + "\n", + "def check_vllm_server(timeout_s: int = 600, interval_s: int = 10) -> None:\n", + " deadline = time.time() + timeout_s\n", + " last_error: Exception | None = None\n", + " while time.time() < deadline:\n", + " try:\n", + " response = requests.get(f\"{VLLM_BASE_URL}/v1/models\", timeout=10)\n", + " response.raise_for_status()\n", + " print(response.json())\n", + " return\n", + " except requests.RequestException as exc:\n", + " last_error = exc\n", + " print(f\"Waiting for vLLM server at {VLLM_BASE_URL}: {exc}\")\n", + " time.sleep(interval_s)\n", + " raise RuntimeError(\n", + " f\"vLLM server did not become ready at {VLLM_BASE_URL} within {timeout_s}s. \"\n", + " \"Check `docker logs -f cosmos3-vllm-omni-policy-notebook`.\"\n", + " ) from last_error\n", + "\n", + "\n", + "ACTION_VIDEO_RES_SIZE_INFO = {\n", + " \"480\": {\n", + " \"1,1\": (640, 640),\n", + " \"4,3\": (736, 544),\n", + " \"3,4\": (544, 736),\n", + " \"16,9\": (832, 480),\n", + " \"9,16\": (480, 832),\n", + " }\n", + "}\n", + "\n", + "\n", + "def closest_action_size(height: int, width: int, resolution: str = \"480\") -> tuple[int, int]:\n", + " input_ratio = height / width\n", + " candidates = ACTION_VIDEO_RES_SIZE_INFO[resolution].values()\n", + " return min(candidates, key=lambda size: abs(input_ratio - size[1] / size[0]))\n", + "\n", + "\n", + "def submit_policy_video() -> dict:\n", + " run_dir = COSMOS3_POLICY_OUTPUT_DIR / \"video_api\"\n", + " run_dir.mkdir(parents=True, exist_ok=True)\n", + "\n", + " input_width, input_height = Image.open(policy_image_path).size\n", + " target_width, target_height = closest_action_size(input_height, input_width)\n", + " extra_params = {\n", + " \"action_mode\": \"policy\",\n", + " \"domain_name\": \"droid_lerobot\",\n", + " \"raw_action_dim\": 10,\n", + " \"action_chunk_size\": 16,\n", + " \"image_size\": 480,\n", + " \"view_point\": \"concat_view\",\n", + " \"guardrails\": False,\n", + " }\n", + " form = {\n", + " \"model\": VLLM_MODEL,\n", + " \"prompt\": policy_prompt,\n", + " \"num_frames\": 17,\n", + " \"fps\": 15,\n", + " \"size\": f\"{target_width}x{target_height}\",\n", + " \"num_inference_steps\": 30,\n", + " \"guidance_scale\": 1.0,\n", + " \"flow_shift\": 5.0,\n", + " \"seed\": 0,\n", + " \"extra_params\": json.dumps(extra_params),\n", + " }\n", + "\n", + " with policy_image_path.open(\"rb\") as image_file:\n", + " response = requests.post(\n", + " f\"{VLLM_BASE_URL}/v1/videos\",\n", + " data={key: str(value) for key, value in form.items()},\n", + " files={\"input_reference\": (policy_image_path.name, image_file, \"image/png\")},\n", + " timeout=120,\n", + " )\n", + " if not response.ok:\n", + " (run_dir / \"error_response.txt\").write_text(response.text)\n", + " print(\"vLLM request failed:\", response.status_code)\n", + " print(response.text)\n", + " print(\"form:\", json.dumps(form, indent=2))\n", + " response.raise_for_status()\n", + "\n", + " initial = response.json()\n", + " (run_dir / \"response.json\").write_text(json.dumps(initial, indent=2))\n", + "\n", + " while True:\n", + " response = requests.get(f\"{VLLM_BASE_URL}/v1/videos/{initial['id']}\", timeout=30)\n", + " response.raise_for_status()\n", + " final = response.json()\n", + " (run_dir / \"final.json\").write_text(json.dumps(final, indent=2))\n", + " print(initial[\"id\"], final.get(\"status\"), f\"{final.get('progress', 0)}%\")\n", + " if final.get(\"status\") == \"completed\":\n", + " break\n", + " if final.get(\"status\") in {\"failed\", \"cancelled\"}:\n", + " raise RuntimeError(json.dumps(final, indent=2))\n", + " time.sleep(2)\n", + "\n", + " action = final.get(\"action\")\n", + " if not action or \"data\" not in action:\n", + " raise RuntimeError(f\"vLLM response did not include action data: {json.dumps(final, indent=2)}\")\n", + " (run_dir / \"action.json\").write_text(json.dumps(action, indent=2))\n", + " sample_outputs = {\"outputs\": [{\"content\": {\"action\": action[\"data\"]}}]}\n", + " (run_dir / \"sample_outputs.json\").write_text(json.dumps(sample_outputs, indent=2))\n", + "\n", + " content_response = requests.get(f\"{VLLM_BASE_URL}/v1/videos/{initial['id']}/content\", timeout=300)\n", + " content_response.raise_for_status()\n", + " video_path = run_dir / \"policy_rollout.mp4\"\n", + " if content_response.content:\n", + " video_path.write_bytes(content_response.content)\n", + " print(\"saved\", video_path)\n", + " else:\n", + " video_path = None\n", + " print(\"video content endpoint returned an empty body\")\n", + "\n", + " print(\"saved\", run_dir / \"action.json\")\n", + " print(\"action shape:\", action.get(\"shape\"), \"dtype:\", action.get(\"dtype\"), \"domain_id:\", action.get(\"domain_id\"))\n", + " return {\"initial\": initial, \"final\": final, \"run_dir\": run_dir, \"video_path\": video_path, \"action\": action}\n", + "\n", + "\n", + "check_vllm_server()\n", + "policy_video_result = submit_policy_video()" + ] + }, + { + "cell_type": "markdown", + "id": "policy-preview-md", + "metadata": {}, + "source": [ + "## Inspect Video API Outputs\n", + "\n", + "Preview the rollout video if the server returned one, and print the first few predicted action rows." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "policy-preview-code", + "metadata": {}, + "outputs": [], + "source": [ + "import subprocess\n", + "\n", + "import imageio_ffmpeg\n", + "from IPython.display import Video, display\n", + "\n", + "FFMPEG = imageio_ffmpeg.get_ffmpeg_exe()\n", + "\n", + "\n", + "def make_preview(src: Path, crf: int = 28) -> Path:\n", + " preview = src.with_name(f\"{src.stem}_preview.mp4\")\n", + " if not preview.exists() or preview.stat().st_mtime < src.stat().st_mtime:\n", + " subprocess.run(\n", + " [\n", + " FFMPEG,\n", + " \"-y\",\n", + " \"-loglevel\",\n", + " \"error\",\n", + " \"-i\",\n", + " str(src),\n", + " \"-c:v\",\n", + " \"libx264\",\n", + " \"-crf\",\n", + " str(crf),\n", + " \"-preset\",\n", + " \"veryfast\",\n", + " \"-an\",\n", + " \"-pix_fmt\",\n", + " \"yuv420p\",\n", + " str(preview),\n", + " ],\n", + " check=True,\n", + " )\n", + " return preview\n", + "\n", + "\n", + "action = policy_video_result[\"action\"]\n", + "action_array = np.asarray(action[\"data\"], dtype=np.float32)\n", + "print(\"action array:\", action_array.shape, action_array.dtype)\n", + "print(action_array[: min(5, len(action_array))])\n", + "\n", + "video_path = policy_video_result.get(\"video_path\")\n", + "if video_path is not None:\n", + " preview = make_preview(video_path)\n", + " print(f\"preview: {preview}\")\n", + " display(Video(str(preview), embed=True))" + ] + }, + { + "cell_type": "markdown", + "id": "policy-openpi-md", + "metadata": {}, + "source": [ + "## Run Policy Inference Through OpenPI WebSocket\n", + "\n", + "The OpenPI endpoint is for robot control clients. It sends model-specific metadata immediately after connection, then accepts msgpack-numpy observation dictionaries and returns action chunks.\n", + "\n", + "Install the small client dependencies in this notebook kernel if needed:\n", + "\n", + "```bash\n", + "pip install websockets openpi-client\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "policy-openpi-code", + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " import websockets.sync.client as websockets_client\n", + " from openpi_client import msgpack_numpy\n", + "except ImportError as exc:\n", + " raise RuntimeError(\"Install OpenPI client deps in this notebook kernel: pip install websockets openpi-client\") from exc\n", + "\n", + "\n", + "def to_jsonable(value):\n", + " if isinstance(value, np.ndarray):\n", + " return value.tolist()\n", + " if isinstance(value, dict):\n", + " return {str(key): to_jsonable(item) for key, item in value.items()}\n", + " if isinstance(value, (list, tuple)):\n", + " return [to_jsonable(item) for item in value]\n", + " if isinstance(value, np.generic):\n", + " return value.item()\n", + " return value\n", + "\n", + "\n", + "def summarize_actions(actions) -> None:\n", + " if isinstance(actions, dict):\n", + " for key, value in actions.items():\n", + " arr = np.asarray(value, dtype=np.float32)\n", + " print(key, arr.shape, arr.dtype)\n", + " print(arr.reshape(-1, arr.shape[-1])[: min(3, arr.reshape(-1, arr.shape[-1]).shape[0])])\n", + " else:\n", + " arr = np.asarray(actions, dtype=np.float32)\n", + " print(\"actions\", arr.shape, arr.dtype)\n", + " if arr.ndim >= 2:\n", + " print(arr.reshape(-1, arr.shape[-1])[: min(3, arr.reshape(-1, arr.shape[-1]).shape[0])])\n", + " else:\n", + " print(arr[: min(10, arr.shape[0])])\n", + "\n", + "\n", + "def run_openpi_policy(obs: dict) -> dict:\n", + " packer = msgpack_numpy.Packer()\n", + " ws = websockets_client.connect(\n", + " OPENPI_WS_URL,\n", + " compression=None,\n", + " max_size=None,\n", + " ping_interval=300,\n", + " ping_timeout=3600,\n", + " )\n", + " try:\n", + " metadata_payload = ws.recv()\n", + " if isinstance(metadata_payload, str):\n", + " raise RuntimeError(f\"Expected binary metadata payload, got text: {metadata_payload}\")\n", + " metadata = msgpack_numpy.unpackb(metadata_payload)\n", + " if not isinstance(metadata, dict):\n", + " raise RuntimeError(f\"Expected metadata dict, got {type(metadata)!r}\")\n", + " print(\"server metadata:\", metadata)\n", + "\n", + " payload = dict(obs)\n", + " payload[\"endpoint\"] = \"infer\"\n", + " ws.send(packer.pack(payload))\n", + " response_payload = ws.recv()\n", + " if isinstance(response_payload, str):\n", + " raise RuntimeError(f\"Inference failed: {response_payload}\")\n", + " actions = msgpack_numpy.unpackb(response_payload)\n", + " if isinstance(actions, dict) and actions.get(\"type\") == \"error\":\n", + " raise RuntimeError(f\"Inference failed: {actions.get('message', actions)}\")\n", + "\n", + " ws.send(packer.pack({\"endpoint\": \"reset\"}))\n", + " reset_payload = ws.recv()\n", + " reset_response = msgpack_numpy.unpackb(reset_payload) if not isinstance(reset_payload, str) else reset_payload\n", + " print(\"reset response:\", reset_response)\n", + " return {\"metadata\": metadata, \"actions\": actions, \"reset_response\": reset_response}\n", + " finally:\n", + " ws.close()\n", + "\n", + "\n", + "openpi_result = run_openpi_policy(openpi_observation)\n", + "summarize_actions(openpi_result[\"actions\"])\n", + "\n", + "openpi_output_dir = COSMOS3_POLICY_OUTPUT_DIR / \"openpi\"\n", + "openpi_output_dir.mkdir(parents=True, exist_ok=True)\n", + "(openpi_output_dir / \"result.json\").write_text(json.dumps(to_jsonable(openpi_result), indent=2))\n", + "print(\"saved\", openpi_output_dir / \"result.json\")" + ] + }, + { + "cell_type": "markdown", + "id": "policy-robolab-md", + "metadata": {}, + "source": [ + "## Use with RoboLab\n", + "\n", + "For an interactive simulation, keep the same vLLM-Omni server running and point a RoboLab Cosmos3 policy client at `ws://localhost:8001/v1/realtime/robot/openpi`.\n", + "\n", + "```bash\n", + "python policies/cosmos3/run.py --task BananaInBowlTask\n", + "```\n", + "\n", + "Use `python policies/cosmos3/run.py --help` in the RoboLab repo to check how your checkout configures the policy endpoint." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 0231853e7afb5fbcbe4647004aa6abb0d7ae1e84 Mon Sep 17 00:00:00 2001 From: Maciej Bala Date: Tue, 23 Jun 2026 14:42:35 +0200 Subject: [PATCH 2/2] Removed websocket example Signed-off-by: Maciej Bala --- cookbooks/cosmos3/generator/action/README.md | 9 +- .../action/run_policy_with_vllm.ipynb | 197 ++---------------- 2 files changed, 22 insertions(+), 184 deletions(-) diff --git a/cookbooks/cosmos3/generator/action/README.md b/cookbooks/cosmos3/generator/action/README.md index e2871ead..02585bcd 100644 --- a/cookbooks/cosmos3/generator/action/README.md +++ b/cookbooks/cosmos3/generator/action/README.md @@ -115,10 +115,9 @@ generation (see [`run_fd_with_vllm.ipynb`](./run_fd_with_vllm.ipynb) and | `flow_shift` | `10.0` | The notebooks build the full request body for AV, DROID, and UMI examples, -including autoregressive chunked generation for the robotics examples. -Policy inference can either use async `POST /v1/videos` to retrieve a rollout -video plus top-level `action` metadata, or the OpenPI websocket endpoint for -action chunks consumed by robot clients. +including autoregressive chunked generation for the robotics examples. Policy +inference uses async `POST /v1/videos` to retrieve a rollout video plus +top-level `action` metadata. ### VLLM-Omni Notebook Walkthrough @@ -130,7 +129,7 @@ write outputs under `outputs/cosmos3_action_vllm/`: - [`run_id_with_vllm.ipynb`](./run_id_with_vllm.ipynb) — inverse dynamics, predicting ego-motion trajectories from input AV videos. - [`run_policy_with_vllm.ipynb`](./run_policy_with_vllm.ipynb) — policy - inference for DROID through the async video API and OpenPI websocket endpoint. + inference for DROID through the async video API. diff --git a/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb b/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb index fd6456fc..95a816d2 100644 --- a/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb +++ b/cookbooks/cosmos3/generator/action/run_policy_with_vllm.ipynb @@ -18,18 +18,13 @@ "\n", "## Prerequisites\n", "\n", - "Generator requires the Guardrail. Request access to the gated [nvidia/Cosmos-1.0-Guardrail](https://huggingface.co/nvidia/Cosmos-1.0-Guardrail) HF repository before running guarded examples. This notebook disables guardrails for the policy requests with `guardrails: false` / `--no-guardrails`.\n", + "Generator requires the Guardrail. Request access to the gated [nvidia/Cosmos-1.0-Guardrail](https://huggingface.co/nvidia/Cosmos-1.0-Guardrail) HF repository before running guarded examples. This notebook disables guardrails for the policy request with `guardrails: false` in `extra_params`.\n", "\n", "## Overview\n", "\n", "This notebook runs Cosmos3 Nano **action policy** inference through vLLM-Omni using the checked-in DROID LeRobot sample under `assets/droid_lerobot_example`.\n", "\n", - "It covers two policy serving paths from the vLLM-Omni Cosmos3 recipe:\n", - "\n", - "- `POST /v1/videos`: first frame plus instruction -> rollout video plus top-level `action` metadata.\n", - "- `ws://.../v1/realtime/robot/openpi`: OpenPI-compatible observation -> action chunk, suitable for a simulated or real robot client.\n", - "\n", - "The OpenPI example sends one static sample observation. Real robot clients should replace the zero joint/gripper state with live state and send repeated observations in a control loop." + "It sends `POST /v1/videos` requests with a first frame and instruction, then retrieves a rollout video plus top-level `action` metadata." ] }, { @@ -41,29 +36,23 @@ "\n", "Start the server in a terminal from the `cosmos` repo root. The container listens on port `8000`; Docker publishes it to host port `8001`, so the notebook uses `http://localhost:8001`.\n", "\n", - "The OpenPI RoboLab branch reuses reference Cosmos Framework action transforms. The command below assumes the framework checkout is mounted at `/workspace/cosmos-framework`; adjust `PYTHONPATH` if your checkout lives elsewhere.\n", - "\n", "```bash\n", - "cat > /tmp/cosmos3_droid_openpi_stage_overrides.json <<'JSON'\n", - "{\n", - " \"0\": {\n", - " \"model_config\": {\n", - " \"policy_server_config\": {\n", - " \"image_resolution\": [540, 640],\n", - " \"n_external_cameras\": 2,\n", - " \"needs_wrist_camera\": true,\n", - " \"needs_stereo_camera\": false,\n", - " \"needs_session_id\": true,\n", - " \"action_space\": \"joint_position\"\n", - " }\n", - " }\n", - " }\n", - "}\n", - "JSON\n", - "\n", "docker rm -f cosmos3-vllm-omni-policy-notebook 2>/dev/null || true\n", "\n", - "docker run -d --name cosmos3-vllm-omni-policy-notebook --runtime nvidia --gpus '\"device=0\"' -e CUDA_DEVICE_ORDER=PCI_BUS_ID -e PYTHONPATH=/workspace/cosmos-framework -v \"/mnt/sdb/.cache/huggingface:/root/.cache/huggingface\" -v \"$PWD:/workspace\" -p 8001:8000 --ipc=host vllm/vllm-omni:cosmos3 vllm serve nvidia/Cosmos3-Nano-Policy-DROID --omni --model-class-name Cosmos3OmniDiffusersPipeline --allowed-local-media-path / --port 8000 --init-timeout 1800 --no-guardrails --stage-overrides \"$(cat /tmp/cosmos3_droid_openpi_stage_overrides.json)\"\n", + "docker run -d --name cosmos3-vllm-omni-policy-notebook \\\n", + " --runtime nvidia --gpus '\"device=0\"' \\\n", + " -e CUDA_DEVICE_ORDER=PCI_BUS_ID \\\n", + " -e PYTHONPATH=/workspace/cosmos-framework \\\n", + " -v \"/mnt/sdb/.cache/huggingface:/root/.cache/huggingface\" \\\n", + " -v \"$PWD:/workspace\" \\\n", + " -p 8001:8000 --ipc=host \\\n", + " vllm/vllm-omni:cosmos3 \\\n", + " vllm serve nvidia/Cosmos3-Nano-Policy-DROID \\\n", + " --omni \\\n", + " --model-class-name Cosmos3OmniDiffusersPipeline \\\n", + " --allowed-local-media-path / \\\n", + " --port 8000 \\\n", + " --init-timeout 1800\n", "\n", "# Wait until this returns model metadata before running the inference cells.\n", "curl http://localhost:8001/v1/models\n", @@ -92,18 +81,6 @@ " if (path / \"README.md\").exists() and (path / \"cookbooks\").exists():\n", " return path\n", " return start\n", - "\n", - "\n", - "def http_to_ws_url(base_url: str) -> str:\n", - " if base_url.startswith(\"https://\"):\n", - " ws_base = \"wss://\" + base_url[len(\"https://\"):]\n", - " elif base_url.startswith(\"http://\"):\n", - " ws_base = \"ws://\" + base_url[len(\"http://\"):]\n", - " else:\n", - " ws_base = base_url\n", - " return ws_base.rstrip(\"/\") + \"/v1/realtime/robot/openpi\"\n", - "\n", - "\n", "COSMOS_ROOT = find_repo_root(Path.cwd().resolve())\n", "COSMOS3_OUTPUT_ROOT = Path(\n", " os.environ.get(\"COSMOS3_VLLM_OUTPUT_ROOT\", COSMOS_ROOT / \"outputs\" / \"cosmos3_action_vllm\")\n", @@ -113,7 +90,6 @@ "COSMOS3_ACTION_ROOT = COSMOS_ROOT / \"cookbooks\" / \"cosmos3\" / \"generator\" / \"action\"\n", "DROID_ASSET_ROOT = COSMOS3_ACTION_ROOT / \"assets\" / \"droid_lerobot_example\"\n", "VLLM_BASE_URL = os.environ.get(\"COSMOS3_VLLM_BASE_URL\", \"http://localhost:8001\").rstrip(\"/\")\n", - "OPENPI_WS_URL = os.environ.get(\"COSMOS3_OPENPI_WS_URL\", http_to_ws_url(VLLM_BASE_URL))\n", "VLLM_MODEL = os.environ.get(\"COSMOS3_VLLM_MODEL\", \"nvidia/Cosmos3-Nano-Policy-DROID\")\n", "\n", "COSMOS3_INPUT_DIR.mkdir(parents=True, exist_ok=True)\n", @@ -124,7 +100,6 @@ "print(\"COSMOS3_INPUT_DIR:\", COSMOS3_INPUT_DIR)\n", "print(\"COSMOS3_POLICY_OUTPUT_DIR:\", COSMOS3_POLICY_OUTPUT_DIR)\n", "print(\"COSMOS3_VLLM_BASE_URL:\", VLLM_BASE_URL)\n", - "print(\"COSMOS3_OPENPI_WS_URL:\", OPENPI_WS_URL)\n", "print(\"COSMOS3_VLLM_MODEL:\", VLLM_MODEL)" ] }, @@ -135,9 +110,7 @@ "source": [ "## Prepare a DROID Policy Input\n", "\n", - "This cell extracts the first frame from each checked-in DROID camera video, creates a 640x540 multiview conditioning image for the video API, and builds a matching OpenPI observation dictionary.\n", - "\n", - "The OpenPI observation uses real camera frames from the sample asset and zero joint/gripper state so the cell can run without a LeRobot/parquet dependency in the notebook kernel." + "This cell extracts the first frame from each checked-in DROID camera video and creates a 640x540 multiview conditioning image for the video API." ] }, { @@ -183,8 +156,7 @@ "}\n", "frames = {key: Image.open(path).convert(\"RGB\") for key, path in frame_paths.items()}\n", "\n", - "# DROID policy uses a concatenated multi-view frame. Keep the OpenPI branch on\n", - "# raw per-camera frames; use this composed image only for the /v1/videos path.\n", + "# DROID policy uses a concatenated multi-view frame for the /v1/videos path.\n", "target_w, target_h = 640, 540\n", "top_h = target_h // 2\n", "bottom_h = target_h - top_h\n", @@ -203,22 +175,8 @@ " \"COSMOS3_POLICY_PROMPT\",\n", " \"Pick up the object and place it in the target container.\",\n", ")\n", - "openpi_observation = {\n", - " \"prompt\": policy_prompt,\n", - " \"session_id\": os.environ.get(\"COSMOS3_POLICY_SESSION_ID\", \"cosmos3-policy-notebook\"),\n", - " \"observation/wrist_image_left\": np.asarray(frames[\"observation/wrist_image_left\"], dtype=np.uint8),\n", - " \"observation/exterior_image_1_left\": np.asarray(frames[\"observation/exterior_image_1_left\"], dtype=np.uint8),\n", - " \"observation/exterior_image_2_left\": np.asarray(frames[\"observation/exterior_image_2_left\"], dtype=np.uint8),\n", - " \"observation/joint_position\": np.zeros(7, dtype=np.float32),\n", - " \"observation/gripper_position\": np.zeros(1, dtype=np.float32),\n", - "}\n", - "\n", "print(\"policy prompt:\", policy_prompt)\n", "print(\"video API conditioning image:\", policy_image_path, policy_image.size)\n", - "for key, value in openpi_observation.items():\n", - " if isinstance(value, np.ndarray):\n", - " print(key, value.shape, value.dtype)\n", - "\n", "display(policy_image)" ] }, @@ -431,125 +389,6 @@ " print(f\"preview: {preview}\")\n", " display(Video(str(preview), embed=True))" ] - }, - { - "cell_type": "markdown", - "id": "policy-openpi-md", - "metadata": {}, - "source": [ - "## Run Policy Inference Through OpenPI WebSocket\n", - "\n", - "The OpenPI endpoint is for robot control clients. It sends model-specific metadata immediately after connection, then accepts msgpack-numpy observation dictionaries and returns action chunks.\n", - "\n", - "Install the small client dependencies in this notebook kernel if needed:\n", - "\n", - "```bash\n", - "pip install websockets openpi-client\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "policy-openpi-code", - "metadata": {}, - "outputs": [], - "source": [ - "try:\n", - " import websockets.sync.client as websockets_client\n", - " from openpi_client import msgpack_numpy\n", - "except ImportError as exc:\n", - " raise RuntimeError(\"Install OpenPI client deps in this notebook kernel: pip install websockets openpi-client\") from exc\n", - "\n", - "\n", - "def to_jsonable(value):\n", - " if isinstance(value, np.ndarray):\n", - " return value.tolist()\n", - " if isinstance(value, dict):\n", - " return {str(key): to_jsonable(item) for key, item in value.items()}\n", - " if isinstance(value, (list, tuple)):\n", - " return [to_jsonable(item) for item in value]\n", - " if isinstance(value, np.generic):\n", - " return value.item()\n", - " return value\n", - "\n", - "\n", - "def summarize_actions(actions) -> None:\n", - " if isinstance(actions, dict):\n", - " for key, value in actions.items():\n", - " arr = np.asarray(value, dtype=np.float32)\n", - " print(key, arr.shape, arr.dtype)\n", - " print(arr.reshape(-1, arr.shape[-1])[: min(3, arr.reshape(-1, arr.shape[-1]).shape[0])])\n", - " else:\n", - " arr = np.asarray(actions, dtype=np.float32)\n", - " print(\"actions\", arr.shape, arr.dtype)\n", - " if arr.ndim >= 2:\n", - " print(arr.reshape(-1, arr.shape[-1])[: min(3, arr.reshape(-1, arr.shape[-1]).shape[0])])\n", - " else:\n", - " print(arr[: min(10, arr.shape[0])])\n", - "\n", - "\n", - "def run_openpi_policy(obs: dict) -> dict:\n", - " packer = msgpack_numpy.Packer()\n", - " ws = websockets_client.connect(\n", - " OPENPI_WS_URL,\n", - " compression=None,\n", - " max_size=None,\n", - " ping_interval=300,\n", - " ping_timeout=3600,\n", - " )\n", - " try:\n", - " metadata_payload = ws.recv()\n", - " if isinstance(metadata_payload, str):\n", - " raise RuntimeError(f\"Expected binary metadata payload, got text: {metadata_payload}\")\n", - " metadata = msgpack_numpy.unpackb(metadata_payload)\n", - " if not isinstance(metadata, dict):\n", - " raise RuntimeError(f\"Expected metadata dict, got {type(metadata)!r}\")\n", - " print(\"server metadata:\", metadata)\n", - "\n", - " payload = dict(obs)\n", - " payload[\"endpoint\"] = \"infer\"\n", - " ws.send(packer.pack(payload))\n", - " response_payload = ws.recv()\n", - " if isinstance(response_payload, str):\n", - " raise RuntimeError(f\"Inference failed: {response_payload}\")\n", - " actions = msgpack_numpy.unpackb(response_payload)\n", - " if isinstance(actions, dict) and actions.get(\"type\") == \"error\":\n", - " raise RuntimeError(f\"Inference failed: {actions.get('message', actions)}\")\n", - "\n", - " ws.send(packer.pack({\"endpoint\": \"reset\"}))\n", - " reset_payload = ws.recv()\n", - " reset_response = msgpack_numpy.unpackb(reset_payload) if not isinstance(reset_payload, str) else reset_payload\n", - " print(\"reset response:\", reset_response)\n", - " return {\"metadata\": metadata, \"actions\": actions, \"reset_response\": reset_response}\n", - " finally:\n", - " ws.close()\n", - "\n", - "\n", - "openpi_result = run_openpi_policy(openpi_observation)\n", - "summarize_actions(openpi_result[\"actions\"])\n", - "\n", - "openpi_output_dir = COSMOS3_POLICY_OUTPUT_DIR / \"openpi\"\n", - "openpi_output_dir.mkdir(parents=True, exist_ok=True)\n", - "(openpi_output_dir / \"result.json\").write_text(json.dumps(to_jsonable(openpi_result), indent=2))\n", - "print(\"saved\", openpi_output_dir / \"result.json\")" - ] - }, - { - "cell_type": "markdown", - "id": "policy-robolab-md", - "metadata": {}, - "source": [ - "## Use with RoboLab\n", - "\n", - "For an interactive simulation, keep the same vLLM-Omni server running and point a RoboLab Cosmos3 policy client at `ws://localhost:8001/v1/realtime/robot/openpi`.\n", - "\n", - "```bash\n", - "python policies/cosmos3/run.py --task BananaInBowlTask\n", - "```\n", - "\n", - "Use `python policies/cosmos3/run.py --help` in the RoboLab repo to check how your checkout configures the policy endpoint." - ] } ], "metadata": {