From 669be03920e13e5ac7cc3ccca1d1c01d85d62e0e Mon Sep 17 00:00:00 2001 From: lwaekfjlk <1125027232@qq.com> Date: Wed, 18 Mar 2026 06:08:16 +0000 Subject: [PATCH 1/2] update --- .../verl/trainer/ppo/world_model_loss.py | 50 +++++++++ .../client/client_config/alfworld_param.yaml | 2 +- .../client/utils/http_training_client.py | 13 +++ opentinker/scheduler/job_scheduler.py | 33 +----- opentinker/server/http_training_server.py | 42 ++++++- run.sh | 106 ++++++++++++++++++ run_grpo.sh | 7 ++ run_grpo_wm.sh | 4 + 8 files changed, 225 insertions(+), 32 deletions(-) create mode 100644 opentinker/backend_patch/verl/trainer/ppo/world_model_loss.py create mode 100755 run.sh create mode 100755 run_grpo.sh create mode 100755 run_grpo_wm.sh diff --git a/opentinker/backend_patch/verl/trainer/ppo/world_model_loss.py b/opentinker/backend_patch/verl/trainer/ppo/world_model_loss.py new file mode 100644 index 00000000..62545e75 --- /dev/null +++ b/opentinker/backend_patch/verl/trainer/ppo/world_model_loss.py @@ -0,0 +1,50 @@ +# Copyright 2025 OpenTinker +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +World Model SFT Loss — trains the policy to also predict observation tokens. + +Joint loss = ppo_loss(action tokens) + wm_coeff * sft_loss(observation tokens) + +Implementation: + The WM SFT loss is computed inside dp_actor.update_policy() (verl modification). + It is gated by config.world_model_coeff > 0 AND observation_mask being present + in the batch. + + observation_mask is computed in http_training_server.py before update_actor: + obs_mask = attention_mask[:, -resp_len:] & ~response_mask + + To enable, set in your config yaml: + actor_rollout_ref: + actor: + world_model_coeff: 0.1 +""" + +import torch + + +def compute_observation_mask(batch) -> torch.Tensor: + """Compute observation_mask from attention_mask and response_mask. + + observation tokens = real tokens in the response portion that are NOT + action (LLM-generated) tokens. + + Args: + batch: DataProto with batch["attention_mask"] and batch["response_mask"] + + Returns: + observation_mask: (batch_size, response_length) float tensor + """ + resp_len = batch.batch["response_mask"].shape[1] + attn_response = batch.batch["attention_mask"][:, -resp_len:] + return (attn_response.bool() & ~batch.batch["response_mask"].bool()).float() diff --git a/opentinker/client/client_config/alfworld_param.yaml b/opentinker/client/client_config/alfworld_param.yaml index 0f183186..eb641504 100644 --- a/opentinker/client/client_config/alfworld_param.yaml +++ b/opentinker/client/client_config/alfworld_param.yaml @@ -60,7 +60,7 @@ interaction: env_endpoint: http://${interaction.config.env_host}:${interaction.config.env_port} # If you run the ALFWorld env server in sharded mode (--shards N), # set env_shards=N. The client will route each instance_id to a stable shard. - env_shards: 32 + env_shards: 8 max_steps: 20 # ALFWorld episodes max steps max_total_steps: 20 # Max environment step calls (controls rollout turns) observation_template: "{observation}" diff --git a/opentinker/client/utils/http_training_client.py b/opentinker/client/utils/http_training_client.py index 08f65e9b..0d0aea99 100644 --- a/opentinker/client/utils/http_training_client.py +++ b/opentinker/client/utils/http_training_client.py @@ -625,6 +625,19 @@ def set_config(self, args: DictConfig, env=None): } ) + # Optional world model SFT coefficient for joint PPO + WM training. + world_model_coeff = args.get("world_model_coeff", None) + if world_model_coeff is not None: + server_cfg = OmegaConf.merge( + server_cfg, + OmegaConf.create( + {"algorithm": {"world_model_coeff": float(world_model_coeff)}} + ), + ) + print( + f"[ServiceClient] Forwarding algorithm.world_model_coeff={world_model_coeff}" + ) + # Add multi_turn config if present in args if hasattr(args, "multi_turn") and args.multi_turn: multi_turn_cfg = OmegaConf.to_container(args.multi_turn, resolve=True) diff --git a/opentinker/scheduler/job_scheduler.py b/opentinker/scheduler/job_scheduler.py index 2a1824d4..021a7789 100755 --- a/opentinker/scheduler/job_scheduler.py +++ b/opentinker/scheduler/job_scheduler.py @@ -284,7 +284,7 @@ def check_gpu_available(gpu_id: int) -> bool: return True # Fail open # Thresholds for considering a GPU "idle" - MAX_MEMORY_MB = 10 # Allow up to 100 MB (some baseline CUDA overhead) + MAX_MEMORY_MB = 2000 # Allow up to 2 GB (root processes may use ~1 GB) MAX_UTILIZATION = 1000 # Allow up to 5% utilization if memory_used_mb > MAX_MEMORY_MB or utilization_percent > MAX_UTILIZATION: @@ -294,35 +294,8 @@ def check_gpu_available(gpu_id: int) -> bool: ) return False - # Check 2: Look for running processes on this GPU - pmon_result = subprocess.run( - ["nvidia-smi", "pmon", "-c", "1", "-s", "um"], - capture_output=True, - text=True, - timeout=5, - ) - - if pmon_result.returncode == 0: - # Parse pmon output to check for processes on this GPU - # Format: "# gpu pid type sm mem enc dec command" - # " 0 12345 C 50 500 0 0 python" - lines = pmon_result.stdout.strip().split("\n") - for line in lines: - if line.startswith("#") or not line.strip(): - continue - parts = line.split() - if len(parts) >= 2: - try: - gpu_idx = int(parts[0].strip()) - if gpu_idx == gpu_id and parts[1].strip() != "-": - # Found a process on this GPU - pid = parts[1].strip() - logger.warning( - f"GPU {gpu_id}: ⚠️ OCCUPIED - Process {pid} detected via pmon" - ) - return False - except (ValueError, IndexError): - continue + # pmon check disabled — root/system processes cause false positives + # when sharing GPUs across users. Memory threshold check above is sufficient. # All checks passed - GPU is idle logger.debug( diff --git a/opentinker/server/http_training_server.py b/opentinker/server/http_training_server.py index 8d67b106..d8d13e28 100755 --- a/opentinker/server/http_training_server.py +++ b/opentinker/server/http_training_server.py @@ -84,6 +84,7 @@ ) + # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -639,6 +640,7 @@ def __init__( # Server state self.is_initialized = False self.global_steps = 0 + self.wm_coeff = 0.0 # Generation config (can be overridden by client) self.generation_config = { @@ -677,6 +679,15 @@ def init_workers(self, total_steps: int) -> Dict[str, Any]: try: # optimizer needs parameter: total_steps self.trainer.post_init(total_steps) + + # Forward algorithm.world_model_coeff → actor config so dp_actor can read it + algo_wm_coeff = self.config.algorithm.get("world_model_coeff", 0.0) + if algo_wm_coeff > 0: + from omegaconf import open_dict + with open_dict(self.config): + self.config.actor_rollout_ref.actor.world_model_coeff = algo_wm_coeff + logger.info(f"Forwarded world_model_coeff={algo_wm_coeff} to actor config") + logger.info("Initializing workers...") # Check async rollout mode @@ -707,6 +718,11 @@ def init_workers(self, total_steps: int) -> Dict[str, Any]: if self.async_rollout_mode: self.async_rollout_manager = self.trainer.async_rollout_manager + # World model SFT loss coeff (actual loss computed in dp_actor via config.world_model_coeff) + self.wm_coeff = self.config.actor_rollout_ref.actor.get("world_model_coeff", 0.0) + if self.wm_coeff > 0: + logger.info(f"World model SFT loss enabled with coeff={self.wm_coeff}") + self.is_initialized = True logger.info("Workers initialized successfully") return {"status": "success", "message": "Workers initialized"} @@ -1170,6 +1186,14 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]: ) metrics.update(critic_output_metrics) + # 10.5 Compute observation_mask for world model loss + if self.wm_coeff > 0: + resp_len = batch.batch["response_mask"].shape[1] + attn_response = batch.batch["attention_mask"][:, -resp_len:] + batch.batch["observation_mask"] = ( + attn_response.bool() & ~batch.batch["response_mask"].bool() + ).float() + # 11. Update actor (check critic warmup) if self.config.trainer.critic_warmup <= self.global_steps: with marked_timer("update_actor", timing_raw, color="red"): @@ -2104,8 +2128,16 @@ def run_fastapi_server(): ) ray.init( namespace=_server_cfg.ray.namespace, - num_gpus=_server_cfg.trainer.n_gpus_per_node, # Explicitly specify number of GPUs + num_gpus=_server_cfg.trainer.n_gpus_per_node, ignore_reinit_error=True, + runtime_env={ + "env_vars": { + "NCCL_CUMEM_ENABLE": "0", + "VLLM_DISABLE_SLEEP_MODE": "1", + "RAY_memory_usage_threshold": "0.99", + "VLLM_GPU_MEMORY_UTILIZATION": "0.15", + }, + }, ) else: # Connect to existing Ray cluster at specific address @@ -2114,6 +2146,14 @@ def run_fastapi_server(): address=_server_cfg.ray.address, namespace=_server_cfg.ray.namespace, ignore_reinit_error=True, + runtime_env={ + "env_vars": { + "NCCL_CUMEM_ENABLE": "0", + "VLLM_DISABLE_SLEEP_MODE": "1", + "RAY_memory_usage_threshold": "0.99", + "VLLM_GPU_MEMORY_UTILIZATION": "0.15", + }, + }, ) # Verify GPU availability diff --git a/run.sh b/run.sh new file mode 100755 index 00000000..514257a8 --- /dev/null +++ b/run.sh @@ -0,0 +1,106 @@ +#!/usr/bin/env bash +# Usage: ./run.sh [config] [gpus] [scheduler_port] [env_port] [steps] [model] [mode] [wm_coeff] +# Example: +# ./run.sh # all defaults +# ./run.sh alfworld_param 4,5 8782 8110 150 +# ./run.sh alfworld_param 4,5 8782 8110 150 Qwen/Qwen2.5-3B-Instruct grpo_wm 0.1 +# Modes: +# grpo : standard GRPO +# grpo_wm : GRPO + world model SFT loss (adds +world_model_coeff=wm_coeff) +set -euo pipefail + +CONFIG="${1:-alfworld_param}" +RAW_GPUS="${2:-4,6}" +SCHEDULER_PORT="${3:-8782}" +ENV_PORT="${4:-8120}" +STEPS="${5:-600}" +MODEL="${6:-Qwen/Qwen2.5-3B-Instruct}" +MODE="${7:-grpo}" +WM_COEFF="${8:-0.1}" + +# Normalize GPU list so Hydra always receives a clean list override. +# Accepted input forms: "4,6" or "[4,6]". +GPUS="${RAW_GPUS// /}" +GPUS="${GPUS#[}" +GPUS="${GPUS%]}" +if [[ -z "$GPUS" || ! "$GPUS" =~ ^[0-9]+(,[0-9]+)*$ ]]; then + echo "Invalid GPU list: '$RAW_GPUS'" + echo "Expected format: 4,6 (or [4,6])" + exit 1 +fi +NUM_GPUS=$(awk -F',' '{print NF}' <<< "$GPUS") +SCHEDULER_GPU_OVERRIDE="available_gpus=[${GPUS}]" + +# conda.sh may reference PS1; in non-interactive shells PS1 can be unset. +set +u +source ~/anaconda3/etc/profile.d/conda.sh +conda activate opentinker +set -u +cd "$(dirname "$0")" + +EXTRA_HYDRA_ARGS=() +MODE_TAG="grpo" +case "$MODE" in + grpo) + ;; + grpo_wm|grpo+wm|wm|wm_sft) + EXTRA_HYDRA_ARGS+=("+world_model_coeff=${WM_COEFF}") + MODE_TAG="grpo_wm_${WM_COEFF}" + ;; + *) + echo "Unsupported mode: $MODE" + echo "Supported modes: grpo | grpo_wm" + exit 1 + ;; +esac + +# vLLM / NCCL fixes (cumem allocator crash) +export VLLM_DISABLE_SLEEP_MODE=1 +export NCCL_CUMEM_ENABLE=0 +export VLLM_GPU_MEMORY_UTILIZATION=0.25 + +# Step 1: Scheduler +echo "=== Step 1: Scheduler (GPUs=[$GPUS], port=$SCHEDULER_PORT) ===" +if command -v lsof >/dev/null 2>&1 && lsof -iTCP:"${SCHEDULER_PORT}" -sTCP:LISTEN -t >/dev/null 2>&1; then + echo "Scheduler port ${SCHEDULER_PORT} is already in use." + echo "Stop old scheduler first, or choose another port." + echo "Hint: lsof -iTCP:${SCHEDULER_PORT} -sTCP:LISTEN -P -n" + exit 1 +fi +echo "Scheduler override: ${SCHEDULER_GPU_OVERRIDE}" +ROLLOUT_TRACE_DIR=./traces TORCH_CUDA_ARCH_LIST="9.0" FLASHINFER_HOMOGENEOUS_MS=1 \ +nohup python opentinker/scheduler/launch_scheduler_kill.py \ + "${SCHEDULER_GPU_OVERRIDE}" gpus_per_job="${NUM_GPUS}" \ + port_range=null num_ports=200 scheduler_port="${SCHEDULER_PORT}" \ + > /tmp/scheduler_${SCHEDULER_PORT}.log 2>&1 & +echo "PID: $!" +sleep 12 +curl -sf http://0.0.0.0:${SCHEDULER_PORT}/ > /dev/null && echo "OK" || { echo "FAIL"; exit 1; } + +# Step 2: ALFWorld server +echo "=== Step 2: ALFWorld server (port=$ENV_PORT, shards=8) ===" +# Kill any stale shard processes on our port range +for ((p=ENV_PORT; p/dev/null || true +done +sleep 1 +nohup python opentinker/environment/alfworld/alfworld_server.py \ + --port "${ENV_PORT}" --shards 8 \ + > /tmp/alfworld_server_${ENV_PORT}.log 2>&1 & +echo "PID: $!" +sleep 30 +curl -sf http://0.0.0.0:${ENV_PORT}/health > /dev/null && echo "OK" || { echo "FAIL"; exit 1; } + +# Step 3: RL training +LOG="/tmp/${CONFIG}_${MODE_TAG}_p${SCHEDULER_PORT}.log" +echo "=== Step 3: Training (config=$CONFIG, mode=$MODE, gpus=$NUM_GPUS, steps=$STEPS) ===" +nohup python opentinker/client/alfworld_rl.py \ + --config-name "${CONFIG}" \ + tokenizer_path="${MODEL}" \ + num_gpus="${NUM_GPUS}" num_steps="${STEPS}" \ + scheduler_url="http://0.0.0.0:${SCHEDULER_PORT}" \ + interaction.config.env_port="${ENV_PORT}" \ + "${EXTRA_HYDRA_ARGS[@]}" \ + > "$LOG" 2>&1 & +echo "PID: $! | Log: $LOG" +echo "=== Done. tail -f $LOG ===" diff --git a/run_grpo.sh b/run_grpo.sh new file mode 100755 index 00000000..fde6d4f5 --- /dev/null +++ b/run_grpo.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail +cd "$(dirname "$0")" + +# GRPO baseline +# Usage: ./run_grpo.sh [gpus] [scheduler_port] [env_port] [steps] +exec ./run.sh alfworld_param "${1:-4,6}" "${2:-8782}" "${3:-8120}" "${4:-1000}" Qwen/Qwen2.5-3B-Instruct grpo diff --git a/run_grpo_wm.sh b/run_grpo_wm.sh new file mode 100755 index 00000000..a953977e --- /dev/null +++ b/run_grpo_wm.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +# GRPO + World Model SFT loss +# Usage: ./run_grpo_wm.sh [gpus] [scheduler_port] [env_port] [steps] [wm_coeff] +exec ./run.sh alfworld_param "${1:-2,7}" "${2:-8782}" "${3:-8120}" "${4:-1000}" Qwen/Qwen2.5-3B-Instruct grpo_wm "${5:-0.1}" From b08412eb55414cf7b7c3b3097f73115069b91327 Mon Sep 17 00:00:00 2001 From: lwaekfjlk <1125027232@qq.com> Date: Sat, 21 Mar 2026 09:25:09 +0000 Subject: [PATCH 2/2] update with v3 wm weight ratio --- .../client/utils/http_training_client.py | 19 +- .../environment/alfworld/alfworld_game.py | 1 + opentinker/server/generic_agent_loop.py | 83 ++++++- opentinker/server/http_training_server.py | 90 ++++++- programs.md | 220 ++++++++++++++++++ run.sh | 12 +- run_grpo.sh | 2 +- run_grpo_wm.sh | 2 +- 8 files changed, 411 insertions(+), 18 deletions(-) create mode 100644 programs.md diff --git a/opentinker/client/utils/http_training_client.py b/opentinker/client/utils/http_training_client.py index 0d0aea99..d6f45ba3 100644 --- a/opentinker/client/utils/http_training_client.py +++ b/opentinker/client/utils/http_training_client.py @@ -638,6 +638,19 @@ def set_config(self, args: DictConfig, env=None): f"[ServiceClient] Forwarding algorithm.world_model_coeff={world_model_coeff}" ) + # Optional WM loss sparsification ratio + wm_loss_top_ratio = args.get("wm_loss_top_ratio", None) + if wm_loss_top_ratio is not None: + server_cfg = OmegaConf.merge( + server_cfg, + OmegaConf.create( + {"algorithm": {"wm_loss_top_ratio": float(wm_loss_top_ratio)}} + ), + ) + print( + f"[ServiceClient] Forwarding algorithm.wm_loss_top_ratio={wm_loss_top_ratio}" + ) + # Add multi_turn config if present in args if hasattr(args, "multi_turn") and args.multi_turn: multi_turn_cfg = OmegaConf.to_container(args.multi_turn, resolve=True) @@ -657,7 +670,11 @@ def set_config(self, args: DictConfig, env=None): server_cfg = OmegaConf.merge( server_cfg, OmegaConf.create( - {"actor_rollout_ref": {"rollout": {"agent": {"num_workers": agent_num_workers}}}} + { + "actor_rollout_ref": { + "rollout": {"agent": {"num_workers": agent_num_workers}} + } + } ), ) print( diff --git a/opentinker/environment/alfworld/alfworld_game.py b/opentinker/environment/alfworld/alfworld_game.py index 3d0ae7cb..320c99ea 100644 --- a/opentinker/environment/alfworld/alfworld_game.py +++ b/opentinker/environment/alfworld/alfworld_game.py @@ -522,6 +522,7 @@ def step(self, action: str) -> StepResult: # Note: Don't include "step" here as gym_environment_interaction.py # already passes it explicitly to observation_template.format() "raw_reward": float(reward), + "raw_obs": obs, # raw env feedback (before _format_observation) "action_taken": parsed_action, "task": self._task_desc, "won": won_flag, diff --git a/opentinker/server/generic_agent_loop.py b/opentinker/server/generic_agent_loop.py index bc56696d..2831e579 100755 --- a/opentinker/server/generic_agent_loop.py +++ b/opentinker/server/generic_agent_loop.py @@ -115,6 +115,9 @@ def __init__( self.prompt_ids: list[int] = [] self.response_ids: list[int] = [] self.response_mask: list[int] = [] + self.observation_mask: list[ + int + ] = [] # 1 for pure env feedback tokens, 0 otherwise self.response_logprobs: list[float] = [] # Turn tracking @@ -224,7 +227,9 @@ def init_class(cls, config, tokenizer, processor, **kwargs): # Create per-job subdirectory to isolate traces from different client tasks job_id = os.environ.get("ROLLOUT_TRACE_JOB_ID", None) if job_id: - cls._trace_output_dir = str(Path(cls._trace_output_dir) / f"job_{job_id}") + cls._trace_output_dir = str( + Path(cls._trace_output_dir) / f"job_{job_id}" + ) cls._save_traces = True cls._process_id = os.getpid() # Store process ID for unique trace naming Path(cls._trace_output_dir).mkdir(parents=True, exist_ok=True) @@ -465,6 +470,9 @@ async def run(self, sampling_params: dict[str, Any], **kwargs) -> AgentLoopOutpu # Ensure env_info exists for all samples (even if empty) for consistent DataProto.concat output.extra_fields["env_info"] = agent_data.extra_fields.get("env_info", []) output.extra_fields["turn_scores"] = agent_data.turn_scores + output.extra_fields["observation_mask"] = agent_data.observation_mask[ + : self.response_length + ] # Add any other extra fields (except the ones we already set) for key, value in agent_data.extra_fields.items(): if key not in output.extra_fields: @@ -581,6 +589,9 @@ async def _handle_generating_state( agent_data.response_mask += [1] * len( agent_data.response_ids ) # mask=1 for LLM tokens + agent_data.observation_mask += [0] * len( + agent_data.response_ids + ) # observation_mask=0 for action tokens if response_log_probs: agent_data.response_logprobs += response_log_probs @@ -684,6 +695,14 @@ async def _handle_interacting_state( agent_data.prompt_ids += response_ids agent_data.response_mask += [0] * len(response_ids) + # Build observation_mask: 1 only for pure env feedback tokens + # (excludes chat template, "=== Current State ===", "=== Available Actions ===" etc.) + raw_obs = info.get("raw_obs", None) if info else None + obs_mask = self._build_env_feedback_mask( + response_ids, observation, raw_obs, self.tokenizer + ) + agent_data.observation_mask += obs_mask + if agent_data.response_logprobs: # Pad logprobs with 0.0 for observation tokens agent_data.response_logprobs += [0.0] * len(response_ids) @@ -693,6 +712,68 @@ async def _handle_interacting_state( else: return GenericAgentState.GENERATING + @staticmethod + def _build_env_feedback_mask( + response_ids: list[int], + observation: str, + raw_obs: str | None, + tokenizer, + ) -> list[int]: + """Build a per-token mask marking only pure environment feedback tokens. + + Uses character-offset mapping to avoid BPE boundary mismatch: + 1. Tokenize the full observation string with offset_mapping + 2. Find raw_obs character range within observation + 3. Map character range → token indices in observation encoding + 4. Find observation tokens as contiguous subsequence in response_ids + (response_ids = chat_template_prefix + obs_tokens + chat_template_suffix) + 5. Transfer the per-token mask to response_ids positions + + Falls back to all-1 mask if any step fails. + """ + n = len(response_ids) + if not raw_obs or not observation: + return [1] * n + + # Step 1-2: find raw_obs character range in the formatted observation + char_start = observation.find(raw_obs) + if char_start < 0: + return [1] * n + char_end = char_start + len(raw_obs) + + # Step 3: tokenize observation with offset mapping + try: + enc = tokenizer( + observation, add_special_tokens=False, return_offsets_mapping=True + ) + obs_ids = enc["input_ids"] + offsets = enc["offset_mapping"] + except Exception: + # Tokenizer doesn't support offset_mapping; fall back + return [1] * n + + # Mark which observation-level tokens overlap with raw_obs char range + obs_level_mask = [] + for s, e in offsets: + if e > char_start and s < char_end: + obs_level_mask.append(1) + else: + obs_level_mask.append(0) + + # Step 4: find obs_ids as contiguous subsequence in response_ids + # (chat template special tokens don't merge with content tokens) + m = len(obs_ids) + for i in range(n - m + 1): + if response_ids[i : i + m] == obs_ids: + # Step 5: transfer mask + mask = [0] * n + for j in range(m): + mask[i + j] = obs_level_mask[j] + return mask + + # obs_ids not found in response_ids — fall back + return [1] * n + async def _save_debug_images(self, image_data: list, request_id: str): """Save debug images to disk when SAVE_DEBUG_IMAGES env var is set. diff --git a/opentinker/server/http_training_server.py b/opentinker/server/http_training_server.py index d8d13e28..04800e84 100755 --- a/opentinker/server/http_training_server.py +++ b/opentinker/server/http_training_server.py @@ -84,7 +84,6 @@ ) - # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -684,9 +683,25 @@ def init_workers(self, total_steps: int) -> Dict[str, Any]: algo_wm_coeff = self.config.algorithm.get("world_model_coeff", 0.0) if algo_wm_coeff > 0: from omegaconf import open_dict + with open_dict(self.config): - self.config.actor_rollout_ref.actor.world_model_coeff = algo_wm_coeff - logger.info(f"Forwarded world_model_coeff={algo_wm_coeff} to actor config") + self.config.actor_rollout_ref.actor.world_model_coeff = ( + algo_wm_coeff + ) + # Also forward wm_loss_top_ratio if set + algo_wm_top_ratio = self.config.algorithm.get( + "wm_loss_top_ratio", None + ) + if algo_wm_top_ratio is not None: + self.config.actor_rollout_ref.actor.wm_loss_top_ratio = float( + algo_wm_top_ratio + ) + logger.info( + f"Forwarded wm_loss_top_ratio={algo_wm_top_ratio} to actor config" + ) + logger.info( + f"Forwarded world_model_coeff={algo_wm_coeff} to actor config" + ) logger.info("Initializing workers...") @@ -719,7 +734,9 @@ def init_workers(self, total_steps: int) -> Dict[str, Any]: self.async_rollout_manager = self.trainer.async_rollout_manager # World model SFT loss coeff (actual loss computed in dp_actor via config.world_model_coeff) - self.wm_coeff = self.config.actor_rollout_ref.actor.get("world_model_coeff", 0.0) + self.wm_coeff = self.config.actor_rollout_ref.actor.get( + "world_model_coeff", 0.0 + ) if self.wm_coeff > 0: logger.info(f"World model SFT loss enabled with coeff={self.wm_coeff}") @@ -959,7 +976,9 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]: # episodes into individual per-turn training samples, the gen_batch_output # batch size is larger than the original batch. We need to expand the # original batch to match using the expansion index. - expansion_index = gen_batch_output.meta_info.pop('per_turn_expansion_index', None) + expansion_index = gen_batch_output.meta_info.pop( + "per_turn_expansion_index", None + ) if expansion_index is not None: logger.info( f"[Per-turn training] Expanding original batch from {len(batch)} to " @@ -972,6 +991,7 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]: elif batch.batch is not None: # Empty TensorDict (all keys were popped) — create new one with expanded size from tensordict import TensorDict + batch.batch = TensorDict({}, batch_size=[len(expansion_index)]) # Expand non-tensor batch expanded_non_tensor = {} @@ -1186,13 +1206,41 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]: ) metrics.update(critic_output_metrics) - # 10.5 Compute observation_mask for world model loss + # 10.5 Build observation_mask for world model loss + # Prefer fine-grained mask from agent loop (only pure env feedback tokens) + # Fall back to coarse mask (all non-action response tokens) if not available if self.wm_coeff > 0: resp_len = batch.batch["response_mask"].shape[1] - attn_response = batch.batch["attention_mask"][:, -resp_len:] - batch.batch["observation_mask"] = ( - attn_response.bool() & ~batch.batch["response_mask"].bool() - ).float() + if "observation_mask" in batch.non_tensor_batch: + # Fine-grained mask from agent loop: 1 = pure env feedback only + obs_mask_lists = batch.non_tensor_batch["observation_mask"] + padded = [] + for m in obs_mask_lists: + m = list(m) if m is not None else [] + if len(m) < resp_len: + m = m + [0] * (resp_len - len(m)) + else: + m = m[:resp_len] + padded.append(m) + batch.batch["observation_mask"] = torch.tensor( + padded, + dtype=torch.float32, + device=batch.batch["response_mask"].device, + ) + logger.info( + f"[WM] Using fine-grained observation_mask from agent loop " + f"(env_feedback_tokens={batch.batch['observation_mask'].sum().item():.0f})" + ) + else: + # Fallback: all non-action tokens in response portion + attn_response = batch.batch["attention_mask"][:, -resp_len:] + batch.batch["observation_mask"] = ( + attn_response.bool() & ~batch.batch["response_mask"].bool() + ).float() + logger.info( + f"[WM] Using coarse observation_mask (all non-action tokens, " + f"total={batch.batch['observation_mask'].sum().item():.0f})" + ) # 11. Update actor (check critic warmup) if self.config.trainer.critic_warmup <= self.global_steps: @@ -1282,6 +1330,23 @@ def train_step(self, batch: DataProto) -> Dict[str, Any]: logger.info(f"Training step {self.global_steps} completed successfully") + # Free large intermediates to prevent Ray object store spilling to disk + del batch, gen_batch, gen_batch_output, reward_tensor + if "old_log_prob" in dir(): + del old_log_prob + if "ref_log_prob" in dir(): + del ref_log_prob + if "values" in dir(): + del values + if "actor_output" in dir(): + del actor_output + if "critic_output" in dir(): + del critic_output + import gc + + gc.collect() + torch.cuda.empty_cache() + return { "status": "success", "metrics": metrics, @@ -1559,7 +1624,9 @@ def validate_step(self, batch: DataProto) -> Dict[str, Any]: # 6. Merge original batch and generated output # Per-turn training expansion: expand batch if gen output is larger - expansion_index = gen_batch_output.meta_info.pop('per_turn_expansion_index', None) + expansion_index = gen_batch_output.meta_info.pop( + "per_turn_expansion_index", None + ) if expansion_index is not None: logger.info( f"[Per-turn training] Validation: Expanding original batch from {len(batch)} to " @@ -1571,6 +1638,7 @@ def validate_step(self, batch: DataProto) -> Dict[str, Any]: elif batch.batch is not None: # Empty TensorDict (all keys were popped) — create new one with expanded size from tensordict import TensorDict + batch.batch = TensorDict({}, batch_size=[len(expansion_index)]) expanded_non_tensor = {} for k, v in batch.non_tensor_batch.items(): diff --git a/programs.md b/programs.md new file mode 100644 index 00000000..efc0d18b --- /dev/null +++ b/programs.md @@ -0,0 +1,220 @@ +# ALFWorld 3B WMC-ERC Iteration Plan + +## Objective + +**Iterate on the WMC-ERC (World Model Correction) algorithm until it consistently beats baseline PPO in reward growth speed on ALFWorld.** + +The first milestone is: **WMC-ERC reward increases faster than baseline PPO over 150 training steps.** + +If it doesn't, we iterate on the WMC-ERC hyperparameters (mu_base, lambda_wm) and algorithm design until it does. + +## Iteration Loop + +``` +1. Run baseline PPO → record reward curve (one-time) +2. Run WMC-ERC + PPO → record reward curve +3. Compare: does WMC-ERC reward grow faster? + - YES → success, move to larger model / more steps + - NO → analyze why, adjust WMC-ERC params, go to step 2 +``` + +### What to tune in WMC-ERC: + +- `mu_base`: base clipping coefficient (controls tightness of the gate) +- `lambda_wm`: how much WM uncertainty widens the gate +- `entropy_target`, `base_entropy_coeff`: adaptive entropy control via beta_token +- Algorithm logic in `opentinker/backend_patch/verl/trainer/ppo/wmc_erc.py` + +## Setup + +- **Model**: Qwen/Qwen2.5-3B-Instruct +- **GPUs**: 0, 1 (2 GPUs) +- **Steps per experiment**: 150 +- **Batch size**: 8 (reduced from 24 due to 3B model memory) +- **RL Algorithm**: PPO (adv_estimator: gae, rollout_n: 1) +- **Environment**: ALFWorld (alfworld_server, NOT sciworld_server) +- **Env shards**: 8 (8 parallel env server processes on ports 8092-8099) + +--- + +## Full Restart Procedure + +**IMPORTANT: Every time you restart, you MUST kill and restart ALL THREE services in this order:** + +1. **Kill everything** +2. **Start Scheduler** (FIRST) +3. **Start ALFWorld Server** (SECOND) +4. **Start RL Training** (THIRD) + +### Step 1: Kill Everything + +```bash +source ~/anaconda3/etc/profile.d/conda.sh && conda activate opentinker + +pkill -9 -f "alfworld_rl" -u haofeiy2 +pkill -9 -f "launch_scheduler" -u haofeiy2 +pkill -9 -f "ray::" -u haofeiy2 +pkill -9 -f "alfworld_server" -u haofeiy2 +ray stop --force +sleep 5 + +# Verify everything is dead +ps aux | grep -E "alfworld|scheduler|ray::" | grep -v grep | wc -l # Should be 0 +``` + +### Step 2: Start Scheduler (FIRST) + +```bash +ROLLOUT_TRACE_DIR=./traces TORCH_CUDA_ARCH_LIST="9.0" FLASHINFER_HOMOGENEOUS_MS=1 \ +nohup python opentinker/scheduler/launch_scheduler_kill.py \ + available_gpus='[0,1]' gpus_per_job=2 \ + port_range=null num_ports=200 scheduler_port=8780 \ + > /tmp/scheduler_8780.log 2>&1 & + +sleep 15 +curl -s http://0.0.0.0:8780/ # Should return scheduler JSON +``` + +### Step 3: Start ALFWorld Server (SECOND) + +```bash +nohup python opentinker/environment/alfworld/alfworld_server.py \ + --port 8092 --shards 8 \ + > /tmp/alfworld_server.log 2>&1 & + +sleep 10 +curl -s http://0.0.0.0:8092/health # Should return {"status":"healthy"} +``` + +### Step 4: Start RL Training (THIRD) + +**Experiment A — WMC-ERC + PPO:** + +```bash +python opentinker/client/alfworld_rl.py \ + --config-name alfworld_wmc_erc_param \ + tokenizer_path=Qwen/Qwen2.5-3B-Instruct \ + num_gpus=2 num_steps=150 +``` + +**Experiment B — Baseline PPO (no WMC-ERC):** + +```bash +python opentinker/client/alfworld_rl.py \ + --config-name alfworld_param \ + tokenizer_path=Qwen/Qwen2.5-3B-Instruct \ + num_gpus=2 num_steps=150 +``` + +--- + +## Extract Reward Data + +```bash +# Find the latest wandb output log +LOG=$(ls -t outputs/*/wandb/run-*/files/output.log | head -1) + +# Extract reward per step +grep "training_step_reward" $LOG | grep -oP "training_step_reward:\K[^ ]+" | \ + awk '{n++; printf "step %2d: %+.3f\n", n, $0}' + +# Extract completion rate +grep "game/completion_rate" $LOG | grep -oP "game/completion_rate:\K[^ ]+" | \ + awk '{n++; printf "step %2d: %.1f%%\n", n, $0*100}' +``` + +## Key Config Differences + +| Setting | WMC-ERC (`alfworld_wmc_erc_param`) | Baseline (`alfworld_param`) | +| -------------------------- | ---------------------------------- | --------------------------- | +| batch_size | 8 | 8 | +| wmc_erc.enable | true | N/A | +| wmc_erc.mu_base | 1.0 | N/A | +| wmc_erc.lambda_wm | 10.0 | N/A | +| wmc_erc.entropy_target | 2.0 | N/A | +| wmc_erc.base_entropy_coeff | 0.02 | N/A | +| wmc_erc.entropy_floor | 0.1 | N/A | +| entropy_coeff | 0.0 (adaptive via beta_token) | 0 | +| adv_estimator | gae (PPO) | gae (PPO) | +| rollout_n | 1 | 1 | +| env_shards | 8 | 8 | + +## Previous 3B Results (for reference) + +WMC-ERC with Qwen2.5-3B-Instruct (36 steps, grpo_per_step): + +``` +Step 1: -0.030 Step 10: +0.375 Step 20: +1.244 Step 30: +1.580 +Step 5: -0.001 Step 15: +1.447 Step 25: +0.926 Step 36: +3.262 +``` + +Reward grew from -0.03 to +3.26, completion rate reached ~35%. + +## 0.5B Experiment Results (2026-03-16) + +### WMC-ERC Run 5 (best): Adaptive Entropy via beta_token + +Config: `mu_base=1.0, lambda_wm=10.0, entropy_floor=0.1, entropy_target=2.0, base_entropy_coeff=0.02, entropy_coeff=0.0` + +``` +Step 1: -1.335 (entropy: 1.96) Step 50: -0.187 (entropy: 1.44) +Step 10: -1.051 (entropy: 2.17) Step 90: -0.092 (entropy: 3.58) +Step 30: -0.344 (entropy: 1.05) Step 150: -0.098 (entropy: 3.93) +``` + +### Baseline PPO (0.5B, 150 steps) + +``` +Step 1: -1.424 (entropy: 2.02) Step 50: -0.160 (entropy: 0.02) +Step 10: -0.866 (entropy: 1.51) Step 90: -0.130 (entropy: 0.32) +Step 30: -0.173 (entropy: 0.04) Step 150: -0.073 (entropy: 0.01) +``` + +### 0.5B Conclusion + +Both WMC-ERC and baseline PPO converged to similar rewards (~-0.07 to -0.10) with 0% completion rate. The 0.5B model lacks capacity for ALFWorld. Moving to 3B. + +### Key Findings from 0.5B + +1. **Entropy collapse** is the #1 failure mode for small models on ALFWorld +2. Fixed `entropy_coeff` is too blunt — too weak (0.005) doesn't prevent collapse, too strong (0.01) causes explosion +3. **Adaptive beta_token** (WMC-ERC v2) solves this: per-turn entropy → per-token entropy coefficient +4. `lambda_wm=10.0` needed because H_WM values are small (0.03-0.09) +5. Completion rate remained 0% across all 0.5B runs — model too small + +### Algorithm Changes Made (from 0.5B iteration) + +1. `wmc_erc.py`: Added `compute_per_turn_entropy()`, entropy floor gating, adaptive beta_token injection +2. `http_training_client.py`: Pass `wmc_erc` and `entropy_coeff` config to server +3. Config files: batch_size adjustable, steps 150 + +## Code Changes Made + +1. `http_training_server.py`: Added `NCCL_CUMEM_ENABLE=0`, `VLLM_DISABLE_SLEEP_MODE=1`, `VLLM_GPU_MEMORY_UTILIZATION=0.25` to `ray.init()` runtime_env +2. `vllm_async_server.py`: Added env var override for `gpu_memory_utilization` +3. `job_scheduler.py`: pmon check disabled for GPU sharing; memory threshold adjustable +4. `alfworld_wmc_erc_param.yaml`: `adv_estimator` changed to `gae`, `rollout_n` to 1, `env_shards` to 8 +5. `alfworld_param.yaml`: `adv_estimator` changed to `gae`, `rollout_n` to 1, `env_shards` to 8 +6. `http_training_server.py`: Added `gc.collect()` + `torch.cuda.empty_cache()` after each training step + +## 3B GRPO Baseline (2026-03-17) + +**Setup**: GPU 2,7 | Scheduler port 8781 | ALFWorld server port 8100 | 150 steps +**Config**: `adv_estimator=grpo, rollout_n=4, batch_size=8, Qwen2.5-3B-Instruct` +**W&B**: `alfworld_grpo_baseline_3b` (run f1nf4cy1) +**Log**: `/tmp/alfworld_grpo_baseline.log` + +### Results + +_(pending — check with `tail -20 /tmp/alfworld_grpo_baseline.log`)_ + +--- + +## Troubleshooting + +- **QUEUED forever**: Check `grep "OCCUPIED" /tmp/scheduler_8780.log` — GPU not free +- **cumem error**: Ensure `VLLM_DISABLE_SLEEP_MODE=1` in ray.init runtime_env +- **env_shards connection error**: Ensure env server started with `--shards 8` matching config `env_shards: 8` +- **Java zombie processes**: Run `pkill -9 -f scienceworld.jar` — sciworld server leaks Java +- **RAM OOM**: Check `free -g` and kill Java zombies if present +- **vLLM GPU OOM**: Lower `VLLM_GPU_MEMORY_UTILIZATION` (currently 0.25) diff --git a/run.sh b/run.sh index 514257a8..2f095114 100755 --- a/run.sh +++ b/run.sh @@ -1,9 +1,9 @@ #!/usr/bin/env bash -# Usage: ./run.sh [config] [gpus] [scheduler_port] [env_port] [steps] [model] [mode] [wm_coeff] +# Usage: ./run.sh [config] [gpus] [scheduler_port] [env_port] [steps] [model] [mode] [wm_coeff] [wm_top_ratio] # Example: # ./run.sh # all defaults # ./run.sh alfworld_param 4,5 8782 8110 150 -# ./run.sh alfworld_param 4,5 8782 8110 150 Qwen/Qwen2.5-3B-Instruct grpo_wm 0.1 +# ./run.sh alfworld_param 4,5 8782 8110 150 Qwen/Qwen2.5-3B-Instruct grpo_wm 0.1 0.1 # Modes: # grpo : standard GRPO # grpo_wm : GRPO + world model SFT loss (adds +world_model_coeff=wm_coeff) @@ -17,6 +17,7 @@ STEPS="${5:-600}" MODEL="${6:-Qwen/Qwen2.5-3B-Instruct}" MODE="${7:-grpo}" WM_COEFF="${8:-0.1}" +WM_TOP_RATIO="${9:-}" # Normalize GPU list so Hydra always receives a clean list override. # Accepted input forms: "4,6" or "[4,6]". @@ -45,7 +46,12 @@ case "$MODE" in ;; grpo_wm|grpo+wm|wm|wm_sft) EXTRA_HYDRA_ARGS+=("+world_model_coeff=${WM_COEFF}") - MODE_TAG="grpo_wm_${WM_COEFF}" + if [[ -n "${WM_TOP_RATIO:-}" ]]; then + EXTRA_HYDRA_ARGS+=("+wm_loss_top_ratio=${WM_TOP_RATIO}") + MODE_TAG="grpo_wm_${WM_COEFF}_top${WM_TOP_RATIO}" + else + MODE_TAG="grpo_wm_${WM_COEFF}" + fi ;; *) echo "Unsupported mode: $MODE" diff --git a/run_grpo.sh b/run_grpo.sh index fde6d4f5..df59d1bf 100755 --- a/run_grpo.sh +++ b/run_grpo.sh @@ -4,4 +4,4 @@ cd "$(dirname "$0")" # GRPO baseline # Usage: ./run_grpo.sh [gpus] [scheduler_port] [env_port] [steps] -exec ./run.sh alfworld_param "${1:-4,6}" "${2:-8782}" "${3:-8120}" "${4:-1000}" Qwen/Qwen2.5-3B-Instruct grpo +exec ./run.sh alfworld_param "${1:-2,6}" "${2:-8782}" "${3:-8120}" "${4:-1000}" Qwen/Qwen2.5-3B-Instruct grpo diff --git a/run_grpo_wm.sh b/run_grpo_wm.sh index a953977e..fb0b976c 100755 --- a/run_grpo_wm.sh +++ b/run_grpo_wm.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash # GRPO + World Model SFT loss # Usage: ./run_grpo_wm.sh [gpus] [scheduler_port] [env_port] [steps] [wm_coeff] -exec ./run.sh alfworld_param "${1:-2,7}" "${2:-8782}" "${3:-8120}" "${4:-1000}" Qwen/Qwen2.5-3B-Instruct grpo_wm "${5:-0.1}" +exec ./run.sh alfworld_param "${1:-1,9}" "${2:-8782}" "${3:-8120}" "${4:-1000}" Qwen/Qwen2.5-3B-Instruct grpo_wm "${5:-0.01}"