diff --git a/examples/veRL/basic_curriculum/config/ppo_trainer.yaml b/examples/veRL/basic_curriculum/config/ppo_trainer.yaml deleted file mode 100644 index a3d167ea..00000000 --- a/examples/veRL/basic_curriculum/config/ppo_trainer.yaml +++ /dev/null @@ -1,171 +0,0 @@ -data: - tokenizer: null - train_files: ~/data/rlhf/gsm8k/train.parquet - val_files: ~/data/rlhf/gsm8k/test.parquet - prompt_key: prompt - max_prompt_length: 512 - max_response_length: 512 - train_batch_size: 1024 - val_batch_size: 1312 - return_raw_input_ids: False # This should be set to true when the tokenizer between policy and rm differs - return_raw_chat: False - -actor_rollout_ref: - hybrid_engine: True - model: - path: ~/models/deepseek-llm-7b-chat - external_lib: null - override_config: { } - enable_gradient_checkpointing: True - use_remove_padding: False - actor: - strategy: fsdp # This is for backward-compatibility - ppo_mini_batch_size: 256 - ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu - ppo_micro_batch_size_per_gpu: null - use_dynamic_bsz: False - ppo_max_token_len_per_gpu: 16384 # n * ${data.max_prompt_length} + ${data.max_response_length} - grad_clip: 1.0 - clip_ratio: 0.2 - entropy_coeff: 0.001 - use_kl_loss: False # True for GRPO - kl_loss_coef: 0.001 # for grpo - kl_loss_type: low_var_kl # for grpo - ppo_epochs: 1 - shuffle: False - ulysses_sequence_parallel_size: 1 # sp size - optim: - lr: 1e-6 - lr_warmup_steps_ratio: 0. # the total steps will be injected during runtime - min_lr_ratio: null # only useful for warmup with cosine - warmup_style: constant # select from constant/cosine - total_training_steps: -1 # must be override by program - fsdp_config: - wrap_policy: - # transformer_layer_cls_to_wrap: None - min_num_params: 0 - param_offload: False - optimizer_offload: False - fsdp_size: -1 - ref: - fsdp_config: - param_offload: False - wrap_policy: - # transformer_layer_cls_to_wrap: None - min_num_params: 0 - log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu - log_prob_micro_batch_size_per_gpu: null - log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} - log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} - ulysses_sequence_parallel_size: ${actor_rollout_ref.actor.ulysses_sequence_parallel_size} # sp size - rollout: - name: vllm - temperature: 1.0 - top_k: -1 # 0 for hf rollout, -1 for vllm rollout - top_p: 1 - prompt_length: ${data.max_prompt_length} # not use for opensource - response_length: ${data.max_response_length} - # for vllm rollout - dtype: bfloat16 # should align with FSDP - gpu_memory_utilization: 0.5 - ignore_eos: False - enforce_eager: True - free_cache_engine: True - load_format: dummy_dtensor - tensor_model_parallel_size: 2 - max_num_batched_tokens: 8192 - max_num_seqs: 1024 - log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu - log_prob_micro_batch_size_per_gpu: null - log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} - log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} - disable_log_stats: True - enable_chunked_prefill: True # could get higher throughput - # for hf rollout - do_sample: True - # number of responses (i.e. num sample times) - n: 1 # > 1 for grpo - -critic: - strategy: fsdp - optim: - lr: 1e-5 - lr_warmup_steps_ratio: 0. # the total steps will be injected during runtime - min_lr_ratio: null # only useful for warmup with cosine - warmup_style: constant # select from constant/cosine - total_training_steps: -1 # must be override by program - model: - path: ~/models/deepseek-llm-7b-chat - tokenizer_path: ${actor_rollout_ref.model.path} - override_config: { } - external_lib: ${actor_rollout_ref.model.external_lib} - enable_gradient_checkpointing: True - use_remove_padding: False - fsdp_config: - param_offload: False - optimizer_offload: False - wrap_policy: - # transformer_layer_cls_to_wrap: None - min_num_params: 0 - fsdp_size: -1 - ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} - ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu - ppo_micro_batch_size_per_gpu: null - forward_micro_batch_size: ${critic.ppo_micro_batch_size} - forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} - use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} - ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 - forward_max_token_len_per_gpu: ${critic.ppo_max_token_len_per_gpu} - ulysses_sequence_parallel_size: 1 # sp size - ppo_epochs: ${actor_rollout_ref.actor.ppo_epochs} - shuffle: ${actor_rollout_ref.actor.shuffle} - grad_clip: 1.0 - cliprange_value: 0.5 - -reward_model: - enable: False - strategy: fsdp - model: - input_tokenizer: ${actor_rollout_ref.model.path} # set this to null if the chat template is identical - path: ~/models/FsfairX-LLaMA3-RM-v0.1 - external_lib: ${actor_rollout_ref.model.external_lib} - use_remove_padding: False - fsdp_config: - min_num_params: 0 - param_offload: False - fsdp_size: -1 - micro_batch_size: null # will be deprecated, use micro_batch_size_per_gpu - micro_batch_size_per_gpu: null # set a number - max_length: null - ulysses_sequence_parallel_size: 1 # sp size - use_dynamic_bsz: ${critic.use_dynamic_bsz} - forward_max_token_len_per_gpu: ${critic.forward_max_token_len_per_gpu} - -algorithm: - gamma: 1.0 - lam: 1.0 - adv_estimator: gae - kl_penalty: kl # how to estimate kl divergence - kl_ctrl: - type: fixed - kl_coef: 0.001 - -trainer: - total_epochs: 30 - total_training_steps: null - project_name: verl_examples - experiment_name: gsm8k - logger: [ 'console', 'wandb' ] - val_generations_to_log_to_wandb: 0 - nnodes: 1 - n_gpus_per_node: 8 - save_freq: -1 - # auto: find the last ckpt to resume. If can't find, start from scratch - resume_mode: auto # or auto or resume_path if - resume_from_path: False - test_freq: -1 - critic_warmup: 0 - default_hdfs_dir: null - remove_previous_ckpt_in_save: False - del_local_ckpt_after_load: False - default_local_dir: checkpoints/${trainer.project_name}/${trainer.experiment_name} diff --git a/examples/veRL/basic_curriculum/launch.sh b/examples/veRL/basic_curriculum/launch.sh deleted file mode 100755 index 6681a2c8..00000000 --- a/examples/veRL/basic_curriculum/launch.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -export N_GPUS=4 -export BASE_MODEL=meta-llama/Llama-3.2-3B-Instruct # meta-llama/Llama-3.2-1B-Instruct -export ROLLOUT_TP_SIZE=2 -export EXPERIMENT_NAME=basic_curriculum -export VLLM_ATTENTION_BACKEND=XFORMERS - -bash ./train_grpo.sh diff --git a/examples/veRL/basic_curriculum/ppo_curriculum.py b/examples/veRL/basic_curriculum/ppo_curriculum.py deleted file mode 100644 index 15bc0fa9..00000000 --- a/examples/veRL/basic_curriculum/ppo_curriculum.py +++ /dev/null @@ -1,299 +0,0 @@ -# This example is an adapted version of Bytedance's code: -# https://github.com/volcengine/verl/blob/a65c9157bc0b85b64cd753de19f94e80a11bd871/verl/trainer/main_ppo.py -from io import StringIO -from typing import Optional - -import hydra -import ray -import torch -import verl.utils.torch_functional as verl_F -from omegaconf import OmegaConf, open_dict -from torch.utils.data import Dataset -from torchdata.stateful_dataloader import StatefulDataLoader -from transformers import PreTrainedTokenizer -from verl import DataProto -from verl.trainer.ppo.ray_trainer import RayPPOTrainer -from verl.utils.dataset.rl_dataset import collate_fn -from verl.utils.model import compute_position_id_with_mask - -import reasoning_gym -import reasoning_gym.utils -from reasoning_gym.coaching.curriculum_config import CurriculumExperimentConfig -from reasoning_gym.coaching.experiment import CurriculumExperiment -from reasoning_gym.utils import extract_answer - -curriculum_config_yaml = """ - curricula: - leg_counting: - attribute_levels: - num_animals: 2 - products: - attribute_levels: - num_terms: 4 - num_digits: 4 - chain_sum: - attribute_levels: - num_terms: 4 - num_digits: 4 - weight: 1.0 - """ - - -class ReasoningGymDataset(Dataset): - def __init__( - self, - tokenizer: PreTrainedTokenizer, - experiment_name: str, - seed: int, - size: int, - developer_prompt: Optional[str] = None, - developer_role: str = "system", - max_prompt_length: int = 2048, - truncation: str = "error", ## ['left', 'right', 'error'] - return_raw_chat: bool = False, - ): - self.tokenizer = tokenizer - curriculum_config = CurriculumExperimentConfig.from_yaml_stream(StringIO(curriculum_config_yaml)) - self.experiment = CurriculumExperiment(experiment_name, curriculum_config, size=size, seed=seed) - self.developer_prompt = developer_prompt - self.developer_role = developer_role - self.max_prompt_length = max_prompt_length - self.truncation = truncation - self.return_raw_chat = return_raw_chat - - def __len__(self) -> int: - return len(self.experiment.composite) - - def __getitem__(self, index: int): - row_dict = self.experiment.get_dataset_entry(index).copy() - q = row_dict["question"] - - chat = [] - if self.developer_prompt is not None: - chat.append({"role": self.developer_role, "content": self.developer_prompt}) - chat.append({"role": "user", "content": q}) - - prompt = self.tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True) - - input_ids, attention_mask = verl_F.tokenize_and_postprocess_data( - prompt=prompt, - tokenizer=self.tokenizer, - max_length=self.max_prompt_length, - pad_token_id=self.tokenizer.pad_token_id, - left_pad=True, - truncation=self.truncation, - ) - - position_ids = compute_position_id_with_mask(attention_mask) - - row_dict["data_source"] = "reasoning_gym/" + self.dataset_name - row_dict["input_ids"] = input_ids[0] - row_dict["attention_mask"] = attention_mask[0] - row_dict["position_ids"] = position_ids[0] - - # encode prompts without chat template - if self.return_raw_chat: - row_dict["raw_prompt"] = chat.tolist() - - return row_dict - - -class RayPPOTrainerCustom(RayPPOTrainer): - def __init__( - self, - config, - tokenizer, - role_worker_mapping: dict, - resource_pool_manager, - ray_worker_group_cls, - experiment_name: str = "basic_curriculum", - dataset_size: int = 10000, - ): - self.dataset_size = dataset_size - - developer_prompt = reasoning_gym.utils.SYSTEM_PROMPTS["DeepSeekZero"] - self.train_dataset = ReasoningGymDataset( - tokenizer=tokenizer, - experiment_name=experiment_name, - seed=1, - size=self.dataset_size, - developer_prompt=developer_prompt, - ) - - self.val_dataset = ReasoningGymDataset( - tokenizer=tokenizer, - experiment_name=experiment_name, - seed=2, - size=self.dataset_size, - developer_prompt=developer_prompt, - ) - - train_reward_fn = lambda data: self._score_output(data, num_examine=0) - val_reward_fn = lambda data: self._score_output(data, num_examine=1) - - super().__init__( - config, - tokenizer, - role_worker_mapping, - resource_pool_manager, - ray_worker_group_cls, - train_reward_fn, - val_reward_fn, - ) - - def _score_output(self, data: DataProto, num_examine: int = 0) -> torch.Tensor: - reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32) - - num_printed = 0 - for i in range(len(data)): - data_item = data[i] # DataProtoItem - - prompt_ids = data_item.batch["prompts"] # tokenized prompts - prompt_length = prompt_ids.shape[-1] - - valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum() - valid_prompt_ids = prompt_ids[-valid_prompt_length:] - - response_ids = data_item.batch["responses"] - valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum() - valid_response_ids = response_ids[:valid_response_length] - - # decode - sequences = torch.cat((valid_prompt_ids, valid_response_ids)) - sequences_str = self.tokenizer.decode(sequences) - - entry_id = data_item.non_tensor_batch["metadata"]["entry_id"] - - score = self._compute_score( - solution_str=sequences_str, - entry_id=entry_id, - ) - reward_tensor[i, valid_response_length - 1] = score - - if num_printed < num_examine: - print(f"reward={score}, seq={sequences_str}") - num_printed += 1 - - return reward_tensor - - def _compute_score(self, solution_str: str, entry_id: str) -> float: - found_answer = extract_answer(solution_str, tag_name="answer") - reward = self.train_dataset.experiment.score_answer_with_id(found_answer, entry_id=entry_id) - print(f"entry_id: {entry_id}; found answer={found_answer}; reward: {reward};") - return reward - - def _create_dataloader(self): - self.train_dataloader = StatefulDataLoader( - dataset=self.train_dataset, - batch_size=self.config.data.train_batch_size, - shuffle=True, - drop_last=True, - collate_fn=collate_fn, - ) - - self.val_dataloader = StatefulDataLoader( - dataset=self.val_dataset, - batch_size=len(self.val_dataset), - shuffle=True, - drop_last=True, - collate_fn=collate_fn, - ) - - assert len(self.train_dataloader) >= 1 - assert len(self.val_dataloader) >= 1 - - print(f"Size of train dataloader: {len(self.train_dataloader)}") - print(f"Size of val dataloader: {len(self.val_dataloader)}") - - # inject total_training_steps to actor/critic optim_config. This is hacky. - total_training_steps = len(self.train_dataloader) * self.config.trainer.total_epochs - - if self.config.trainer.total_training_steps is not None: - total_training_steps = self.config.trainer.total_training_steps - - self.total_training_steps = total_training_steps - print(f"Total training steps: {self.total_training_steps}") - - OmegaConf.set_struct(self.config, True) - with open_dict(self.config): - self.config.actor_rollout_ref.actor.optim.total_training_steps = total_training_steps - self.config.critic.optim.total_training_steps = total_training_steps - - -@ray.remote -def main_task(config): - # print initial config - from pprint import pprint - - from verl.utils import hf_tokenizer - from verl.utils.fs import copy_local_path_from_hdfs - - pprint(OmegaConf.to_container(config, resolve=True)) # resolve=True will eval symbol values - OmegaConf.resolve(config) - - # download the checkpoint from hdfs - local_path = copy_local_path_from_hdfs(config.actor_rollout_ref.model.path) - - # instantiate tokenizer - tokenizer = hf_tokenizer(local_path) - - # define worker classes - if config.actor_rollout_ref.actor.strategy == "fsdp": - assert config.actor_rollout_ref.actor.strategy == config.critic.strategy - from verl.single_controller.ray import RayWorkerGroup - from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorker - - ray_worker_group_cls = RayWorkerGroup - - elif config.actor_rollout_ref.actor.strategy == "megatron": - assert config.actor_rollout_ref.actor.strategy == config.critic.strategy - from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup - from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker - - ray_worker_group_cls = NVMegatronRayWorkerGroup - - else: - raise NotImplementedError - - from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role - - role_worker_mapping = { - Role.ActorRollout: ray.remote(ActorRolloutRefWorker), - Role.Critic: ray.remote(CriticWorker), - Role.RefPolicy: ray.remote(ActorRolloutRefWorker), - } - - global_pool_id = "global_pool" - resource_pool_spec = { - global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes, - } - mapping = { - Role.ActorRollout: global_pool_id, - Role.Critic: global_pool_id, - Role.RefPolicy: global_pool_id, - } - - resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping) - - trainer = RayPPOTrainerCustom( - config=config, - tokenizer=tokenizer, - role_worker_mapping=role_worker_mapping, - resource_pool_manager=resource_pool_manager, - ray_worker_group_cls=ray_worker_group_cls, - ) - trainer.init_workers() - trainer.fit() - - -@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) -def main(config): - if not ray.is_initialized(): - # this is for local ray cluster - ray.init(runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}) - - ray.get(main_task.remote(config)) - - -if __name__ == "__main__": - main() diff --git a/examples/veRL/basic_curriculum/train_grpo.sh b/examples/veRL/basic_curriculum/train_grpo.sh deleted file mode 100644 index 6bfa35be..00000000 --- a/examples/veRL/basic_curriculum/train_grpo.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -set -x - -python3 -u ppo_curriculum.py \ - algorithm.adv_estimator=grpo \ - data.train_files=$DATA_DIR/train.parquet \ - data.val_files=$DATA_DIR/test.parquet \ - data.train_batch_size=512 \ - data.val_batch_size=512 \ - data.max_prompt_length=512 \ - data.max_response_length=1024 \ - actor_rollout_ref.model.path=$BASE_MODEL \ - actor_rollout_ref.actor.optim.lr=1e-6 \ - actor_rollout_ref.model.use_remove_padding=True \ - actor_rollout_ref.actor.ppo_mini_batch_size=256 \ - actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=80 \ - actor_rollout_ref.actor.use_kl_loss=True \ - actor_rollout_ref.actor.kl_loss_coef=0.001 \ - actor_rollout_ref.actor.kl_loss_type=low_var_kl \ - actor_rollout_ref.model.enable_gradient_checkpointing=True \ - actor_rollout_ref.actor.fsdp_config.param_offload=False \ - actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \ - actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=160 \ - actor_rollout_ref.rollout.tensor_model_parallel_size=$ROLLOUT_TP_SIZE \ - actor_rollout_ref.rollout.name=vllm \ - actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ - actor_rollout_ref.rollout.n=8 \ - actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=160 \ - actor_rollout_ref.ref.fsdp_config.param_offload=True \ - algorithm.kl_ctrl.kl_coef=0.001 \ - trainer.critic_warmup=0 \ - trainer.logger=['wandb'] \ - trainer.project_name='verl_chain_sum_grpo' \ - trainer.experiment_name=$EXPERIMENT_NAME \ - trainer.n_gpus_per_node=$N_GPUS \ - trainer.nnodes=1 \ - trainer.save_freq=100 \ - trainer.test_freq=100 \ - trainer.total_epochs=15 $@ 2>&1 | tee verl_output.log diff --git a/examples/veRL/chain_sum/config/ppo_trainer.yaml b/examples/veRL/chain_sum/config/ppo_trainer.yaml deleted file mode 100644 index 5e4e97be..00000000 --- a/examples/veRL/chain_sum/config/ppo_trainer.yaml +++ /dev/null @@ -1,175 +0,0 @@ -data: - tokenizer: null - train_files: ~/data/rlhf/gsm8k/train.parquet - val_files: ~/data/rlhf/gsm8k/test.parquet - prompt_key: prompt - max_prompt_length: 512 - max_response_length: 512 - train_batch_size: 1024 - val_batch_size: 1312 - return_raw_input_ids: False # This should be set to true when the tokenizer between policy and rm differs - return_raw_chat: False - -actor_rollout_ref: - hybrid_engine: True - model: - path: ~/models/deepseek-llm-7b-chat - external_lib: null - override_config: { } - enable_gradient_checkpointing: True - use_remove_padding: False - actor: - strategy: fsdp # This is for backward-compatibility - ppo_mini_batch_size: 256 - ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu - ppo_micro_batch_size_per_gpu: null - use_dynamic_bsz: False - ppo_max_token_len_per_gpu: 16384 # n * ${data.max_prompt_length} + ${data.max_response_length} - grad_clip: 1.0 - clip_ratio: 0.2 - entropy_coeff: 0.001 - use_kl_loss: False # True for GRPO - kl_loss_coef: 0.001 # for grpo - kl_loss_type: low_var_kl # for grpo - ppo_epochs: 1 - shuffle: False - ulysses_sequence_parallel_size: 1 # sp size - optim: - lr: 1e-6 - lr_warmup_steps_ratio: 0. # the total steps will be injected during runtime - min_lr_ratio: null # only useful for warmup with cosine - warmup_style: constant # select from constant/cosine - total_training_steps: -1 # must be override by program - fsdp_config: - wrap_policy: - # transformer_layer_cls_to_wrap: None - min_num_params: 0 - param_offload: False - optimizer_offload: False - fsdp_size: -1 - ref: - fsdp_config: - param_offload: False - wrap_policy: - # transformer_layer_cls_to_wrap: None - min_num_params: 0 - log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu - log_prob_micro_batch_size_per_gpu: null - log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} - log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} - ulysses_sequence_parallel_size: ${actor_rollout_ref.actor.ulysses_sequence_parallel_size} # sp size - rollout: - name: vllm - temperature: 1.0 - top_k: -1 # 0 for hf rollout, -1 for vllm rollout - top_p: 1 - prompt_length: ${data.max_prompt_length} # not use for opensource - response_length: ${data.max_response_length} - # for vllm rollout - dtype: bfloat16 # should align with FSDP - gpu_memory_utilization: 0.5 - ignore_eos: False - enforce_eager: True - free_cache_engine: True - load_format: dummy_dtensor - tensor_model_parallel_size: 2 - max_num_batched_tokens: 8192 - max_num_seqs: 1024 - log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu - log_prob_micro_batch_size_per_gpu: null - log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} - log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} - disable_log_stats: True - enable_chunked_prefill: True # could get higher throughput - # for hf rollout - do_sample: True - use_fire_sampling: False - # number of responses (i.e. num sample times) - n: 1 # > 1 for grpo - val_kwargs: - do_sample: True - -critic: - strategy: fsdp - optim: - lr: 1e-5 - lr_warmup_steps_ratio: 0. # the total steps will be injected during runtime - min_lr_ratio: null # only useful for warmup with cosine - warmup_style: constant # select from constant/cosine - total_training_steps: -1 # must be override by program - model: - path: ~/models/deepseek-llm-7b-chat - tokenizer_path: ${actor_rollout_ref.model.path} - override_config: { } - external_lib: ${actor_rollout_ref.model.external_lib} - enable_gradient_checkpointing: True - use_remove_padding: False - fsdp_config: - param_offload: False - optimizer_offload: False - wrap_policy: - # transformer_layer_cls_to_wrap: None - min_num_params: 0 - fsdp_size: -1 - ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size} - ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu - ppo_micro_batch_size_per_gpu: null - forward_micro_batch_size: ${critic.ppo_micro_batch_size} - forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu} - use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} - ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2 - forward_max_token_len_per_gpu: ${critic.ppo_max_token_len_per_gpu} - ulysses_sequence_parallel_size: 1 # sp size - ppo_epochs: ${actor_rollout_ref.actor.ppo_epochs} - shuffle: ${actor_rollout_ref.actor.shuffle} - grad_clip: 1.0 - cliprange_value: 0.5 - -reward_model: - enable: False - strategy: fsdp - model: - input_tokenizer: ${actor_rollout_ref.model.path} # set this to null if the chat template is identical - path: ~/models/FsfairX-LLaMA3-RM-v0.1 - external_lib: ${actor_rollout_ref.model.external_lib} - use_remove_padding: False - fsdp_config: - min_num_params: 0 - param_offload: False - fsdp_size: -1 - micro_batch_size: null # will be deprecated, use micro_batch_size_per_gpu - micro_batch_size_per_gpu: null # set a number - max_length: null - ulysses_sequence_parallel_size: 1 # sp size - use_dynamic_bsz: ${critic.use_dynamic_bsz} - forward_max_token_len_per_gpu: ${critic.forward_max_token_len_per_gpu} - -algorithm: - gamma: 1.0 - lam: 1.0 - adv_estimator: gae - kl_penalty: kl # how to estimate kl divergence - kl_ctrl: - type: fixed - kl_coef: 0.001 - -trainer: - balance_batch: True - total_epochs: 30 - total_training_steps: null - project_name: verl_examples - experiment_name: gsm8k - logger: [ 'console', 'wandb' ] - val_generations_to_log_to_wandb: 0 - nnodes: 1 - n_gpus_per_node: 8 - save_freq: -1 - # auto: find the last ckpt to resume. If can't find, start from scratch - resume_mode: auto # or auto or resume_path if - resume_from_path: False - test_freq: -1 - critic_warmup: 0 - default_hdfs_dir: null - remove_previous_ckpt_in_save: False - del_local_ckpt_after_load: False - default_local_dir: checkpoints/${trainer.project_name}/${trainer.experiment_name} diff --git a/examples/veRL/chain_sum/launch_on_2gpu_server.sh b/examples/veRL/chain_sum/launch_on_2gpu_server.sh deleted file mode 100755 index 4f2efc46..00000000 --- a/examples/veRL/chain_sum/launch_on_2gpu_server.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -export N_GPUS=2 -export BASE_MODEL=meta-llama/Llama-3.2-1B-Instruct -export ROLLOUT_TP_SIZE=2 -export EXPERIMENT_NAME=chain_sum_llama -export VLLM_ATTENTION_BACKEND=XFORMERS - -bash ./train_grpo_server.sh diff --git a/examples/veRL/chain_sum/launch_on_4gpu.sh b/examples/veRL/chain_sum/launch_on_4gpu.sh deleted file mode 100755 index f2b309c6..00000000 --- a/examples/veRL/chain_sum/launch_on_4gpu.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -export N_GPUS=4 -export BASE_MODEL=meta-llama/Llama-3.2-1B-Instruct -export ROLLOUT_TP_SIZE=2 -export EXPERIMENT_NAME=chain_sum_llama -export VLLM_ATTENTION_BACKEND=XFORMERS - -bash ./train_grpo.sh diff --git a/examples/veRL/chain_sum/main_ppo_custom_reward_server.py b/examples/veRL/chain_sum/main_ppo_custom_reward_server.py deleted file mode 100644 index 5dc5cfa7..00000000 --- a/examples/veRL/chain_sum/main_ppo_custom_reward_server.py +++ /dev/null @@ -1,346 +0,0 @@ -# This example is an adapted version of Bytedance's code: -# https://github.com/volcengine/verl/blob/a65c9157bc0b85b64cd753de19f94e80a11bd871/verl/trainer/main_ppo.py -import os -from typing import Optional - -import hydra -import ray -import torch -import verl.utils.torch_functional as verl_F -from omegaconf import OmegaConf, open_dict -from torch.utils.data import Dataset -from torchdata.stateful_dataloader import StatefulDataLoader -from transformers import PreTrainedTokenizer -from verl import DataProto -from verl.trainer.ppo.ray_trainer import RayPPOTrainer -from verl.utils.dataset.rl_dataset import collate_fn -from verl.utils.model import compute_position_id_with_mask - -import reasoning_gym -import reasoning_gym.utils -from reasoning_gym.utils import extract_answer -from tools.server.models import AnswerItem, BatchEntry, ExperimentCreate - - -class ReasoningGymDataset(Dataset): - def __init__( - self, - tokenizer: PreTrainedTokenizer, - dataset_name: str, - seed: int, - size: int, - developer_prompt: Optional[str] = None, - developer_role: str = "system", - max_prompt_length: int = 2048, - truncation: str = "error", ## ['left', 'right', 'error'] - return_raw_chat: bool = False, - server_url: str = "http://localhost:8000", - api_key: Optional[str] = None, - batch_size: int = 32, - ): - from tools.cli.rgc.client import RGClient - - self.tokenizer = tokenizer - self.dataset_name = dataset_name - self.developer_prompt = developer_prompt - self.developer_role = developer_role - self.max_prompt_length = max_prompt_length - self.truncation = truncation - self.return_raw_chat = return_raw_chat - self.size = size - self.batch_size = batch_size - - # Initialize client and create experiment if needed - self.client = RGClient(base_url=server_url, api_key=api_key) - - # Check if experiment exists, create if not - experiments = self.client.list_experiments() - if dataset_name not in experiments.experiments: - config = ExperimentCreate( - name=dataset_name, - size=size, - seed=seed, - datasets={dataset_name: {"weight": 1.0, "config": {"seed": seed, "size": size}}}, - ) - self.client.create_experiment(dataset_name, config) - - # Cache for batches - self._batch_cache: dict[int, list[BatchEntry]] = {} - - def __len__(self) -> int: - return self.size - - def _get_batch(self, batch_idx: int) -> list[BatchEntry]: - """Fetch or retrieve cached batch""" - if batch_idx not in self._batch_cache: - base_index = batch_idx * self.batch_size - response = self.client.get_batch(self.dataset_name, base_index=base_index, batch_size=self.batch_size) - self._batch_cache[batch_idx] = response.entries - - # # Basic cache management - keep only last N batches - # if len(self._batch_cache) > 10: - # oldest_batch = min(self._batch_cache.keys()) - # del self._batch_cache[oldest_batch] - - return self._batch_cache[batch_idx] - - def __getitem__(self, index): - # Get batch containing this index - batch_idx = index // self.batch_size - - batch = self._get_batch(batch_idx) - entry = batch[index % self.batch_size] - - # Format chat/prompt - chat = [] - if self.developer_prompt is not None: - chat.append({"role": self.developer_role, "content": self.developer_prompt}) - chat.append({"role": "user", "content": entry.question}) - - prompt = self.tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True) - - # Tokenize - input_ids, attention_mask = verl_F.tokenize_and_postprocess_data( - prompt=prompt, - tokenizer=self.tokenizer, - max_length=self.max_prompt_length, - pad_token_id=self.tokenizer.pad_token_id, - left_pad=True, - truncation=self.truncation, - ) - - position_ids = compute_position_id_with_mask(attention_mask) - - row_dict = { - "data_source": "reasoning_gym/" + self.dataset_name, - "input_ids": input_ids[0], - "attention_mask": attention_mask[0], - "position_ids": position_ids[0], - "entry_id": entry.entry_id, - "metadata": entry.metadata, - "index": index, - "raw_prompt_ids": self.tokenizer.encode(prompt, add_special_tokens=False), - } - - # Add raw chat if requested - if self.return_raw_chat: - row_dict["raw_prompt"] = chat - - return row_dict - - -class RayPPOTrainerCustom(RayPPOTrainer): - def __init__( - self, - config, - tokenizer, - role_worker_mapping: dict, - resource_pool_manager, - ray_worker_group_cls, - dataset_name: str = "chain_sum", - dataset_size: int = 10000, - ): - self.dataset_name = dataset_name - self.dataset_size = dataset_size - - developer_prompt = reasoning_gym.utils.SYSTEM_PROMPTS["DeepSeekZero"] - rg_api_key = os.getenv("REASONING_GYM_API_KEY", "your-secret-key") - self.train_dataset = ReasoningGymDataset( - tokenizer=tokenizer, - dataset_name=self.dataset_name, - seed=1, - size=self.dataset_size, - developer_prompt=developer_prompt, - api_key=rg_api_key, - ) - - self.val_dataset = ReasoningGymDataset( - tokenizer=tokenizer, - dataset_name=self.dataset_name, - seed=2, - size=self.dataset_size, - developer_prompt=developer_prompt, - api_key=rg_api_key, - ) - - train_reward_fn = lambda data: self._score_output(data, num_examine=0) - val_reward_fn = lambda data: self._score_output(data, num_examine=1) - - super().__init__( - config, - tokenizer, - role_worker_mapping, - resource_pool_manager, - ray_worker_group_cls, - train_reward_fn, - val_reward_fn, - ) - - def _score_output(self, data: DataProto, num_examine: int = 0) -> torch.Tensor: - reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32) - - # Prepare batch of answers to score - answer_items = [] - valid_response_lengths = [] - sequences_strs = [] - - for i in range(len(data)): - data_item = data[i] - - # Get prompt and response - prompt_ids = data_item.batch["prompts"] - prompt_length = prompt_ids.shape[-1] - valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum() - valid_prompt_ids = prompt_ids[-valid_prompt_length:] - - response_ids = data_item.batch["responses"] - valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum() - valid_response_ids = response_ids[:valid_response_length] - valid_response_lengths.append(valid_response_length) - - # Decode full sequence - sequences = torch.cat((valid_prompt_ids, valid_response_ids)) - sequences_str = self.tokenizer.decode(sequences) - sequences_strs.append(sequences_str) - - # Extract answer and prepare scoring item - found_answer = extract_answer(sequences_str, tag_name="answer") - - index = data_item.non_tensor_batch["index"] - entry_id = self.train_dataset[index]["entry_id"] - # print( - # "found_answer", - # entry_id, - # found_answer, - # ) - - answer_items.append(AnswerItem(entry_id=entry_id, answer=found_answer)) - - # Score all answers in one request - response = self.train_dataset.client.score_outputs(self.train_dataset.dataset_name, answer_items) - # print("response", response) - - # Fill reward tensor - for i, (score, valid_response_length) in enumerate(zip(response.scores, valid_response_lengths)): - reward_tensor[i, valid_response_length - 1] = score - - if i < num_examine: - print(f"reward={score}, seq={sequences_strs[i]}") - - return reward_tensor - - def _create_dataloader(self): - self.train_dataloader = StatefulDataLoader( - dataset=self.train_dataset, - batch_size=self.config.data.train_batch_size, - shuffle=False, - drop_last=True, - collate_fn=collate_fn, - ) - - self.val_dataloader = StatefulDataLoader( - dataset=self.val_dataset, - batch_size=len(self.val_dataset), - shuffle=False, - drop_last=True, - collate_fn=collate_fn, - ) - - assert len(self.train_dataloader) >= 1 - assert len(self.val_dataloader) >= 1 - - print(f"Size of train dataloader: {len(self.train_dataloader)}") - print(f"Size of val dataloader: {len(self.val_dataloader)}") - - # inject total_training_steps to actor/critic optim_config. This is hacky. - total_training_steps = len(self.train_dataloader) * self.config.trainer.total_epochs - - if self.config.trainer.total_training_steps is not None: - total_training_steps = self.config.trainer.total_training_steps - - self.total_training_steps = total_training_steps - print(f"Total training steps: {self.total_training_steps}") - - OmegaConf.set_struct(self.config, True) - with open_dict(self.config): - self.config.actor_rollout_ref.actor.optim.total_training_steps = total_training_steps - self.config.critic.optim.total_training_steps = total_training_steps - - -@ray.remote -def main_task(config): - # print initial config - from pprint import pprint - - from verl.utils import hf_tokenizer - from verl.utils.fs import copy_local_path_from_hdfs - - pprint(OmegaConf.to_container(config, resolve=True)) # resolve=True will eval symbol values - OmegaConf.resolve(config) - - # download the checkpoint from hdfs - local_path = copy_local_path_from_hdfs(config.actor_rollout_ref.model.path) - - # instantiate tokenizer - tokenizer = hf_tokenizer(local_path) - - # define worker classes - if config.actor_rollout_ref.actor.strategy == "fsdp": - assert config.actor_rollout_ref.actor.strategy == config.critic.strategy - from verl.single_controller.ray import RayWorkerGroup - from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorker - - ray_worker_group_cls = RayWorkerGroup - - elif config.actor_rollout_ref.actor.strategy == "megatron": - assert config.actor_rollout_ref.actor.strategy == config.critic.strategy - from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup - from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker - - ray_worker_group_cls = NVMegatronRayWorkerGroup - - else: - raise NotImplementedError - - from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role - - role_worker_mapping = { - Role.ActorRollout: ray.remote(ActorRolloutRefWorker), - Role.Critic: ray.remote(CriticWorker), - Role.RefPolicy: ray.remote(ActorRolloutRefWorker), - } - - global_pool_id = "global_pool" - resource_pool_spec = { - global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes, - } - mapping = { - Role.ActorRollout: global_pool_id, - Role.Critic: global_pool_id, - Role.RefPolicy: global_pool_id, - } - - resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping) - - trainer = RayPPOTrainerCustom( - config=config, - tokenizer=tokenizer, - role_worker_mapping=role_worker_mapping, - resource_pool_manager=resource_pool_manager, - ray_worker_group_cls=ray_worker_group_cls, - ) - trainer.init_workers() - trainer.fit() - - -@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) -def main(config): - if not ray.is_initialized(): - # this is for local ray cluster - ray.init(runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}) - - ray.get(main_task.remote(config)) - - -if __name__ == "__main__": - main() diff --git a/examples/veRL/chain_sum/train_grpo.sh b/examples/veRL/chain_sum/train_grpo.sh deleted file mode 100644 index 3f39714c..00000000 --- a/examples/veRL/chain_sum/train_grpo.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -set -x - -python3 -u main_ppo_custom_reward.py \ - algorithm.adv_estimator=grpo \ - data.train_files=$DATA_DIR/train.parquet \ - data.val_files=$DATA_DIR/test.parquet \ - data.train_batch_size=1024 \ - data.val_batch_size=1312 \ - data.max_prompt_length=512 \ - data.max_response_length=1024 \ - actor_rollout_ref.model.path=$BASE_MODEL \ - actor_rollout_ref.actor.optim.lr=1e-6 \ - actor_rollout_ref.model.use_remove_padding=True \ - actor_rollout_ref.actor.ppo_mini_batch_size=256 \ - actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=80 \ - actor_rollout_ref.actor.use_kl_loss=True \ - actor_rollout_ref.actor.kl_loss_coef=0.001 \ - actor_rollout_ref.actor.kl_loss_type=low_var_kl \ - actor_rollout_ref.model.enable_gradient_checkpointing=True \ - actor_rollout_ref.actor.fsdp_config.param_offload=False \ - actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \ - actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=160 \ - actor_rollout_ref.rollout.tensor_model_parallel_size=$ROLLOUT_TP_SIZE \ - actor_rollout_ref.rollout.name=vllm \ - actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ - actor_rollout_ref.rollout.n=8 \ - actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=160 \ - actor_rollout_ref.ref.fsdp_config.param_offload=True \ - algorithm.kl_ctrl.kl_coef=0.001 \ - trainer.critic_warmup=0 \ - trainer.logger=['console'] \ - trainer.project_name='verl_chain_sum_grpo' \ - trainer.experiment_name=$EXPERIMENT_NAME \ - trainer.n_gpus_per_node=$N_GPUS \ - trainer.nnodes=1 \ - trainer.save_freq=100 \ - trainer.test_freq=100 \ - trainer.total_epochs=15 $@ 2>&1 | tee verl_output.log diff --git a/examples/veRL/chain_sum/train_grpo_server.sh b/examples/veRL/chain_sum/train_grpo_server.sh deleted file mode 100644 index 34b956ad..00000000 --- a/examples/veRL/chain_sum/train_grpo_server.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -set -x - -python3 -u main_ppo_custom_reward_server.py \ - algorithm.adv_estimator=grpo \ - data.train_files=$DATA_DIR/train.parquet \ - data.val_files=$DATA_DIR/test.parquet \ - data.train_batch_size=32 \ - data.val_batch_size=32 \ - data.max_prompt_length=512 \ - data.max_response_length=1024 \ - actor_rollout_ref.model.path=$BASE_MODEL \ - actor_rollout_ref.actor.optim.lr=1e-6 \ - actor_rollout_ref.model.use_remove_padding=True \ - actor_rollout_ref.actor.ppo_mini_batch_size=32 \ - actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=32 \ - actor_rollout_ref.actor.use_kl_loss=True \ - actor_rollout_ref.actor.kl_loss_coef=0.001 \ - actor_rollout_ref.actor.kl_loss_type=low_var_kl \ - actor_rollout_ref.model.enable_gradient_checkpointing=True \ - actor_rollout_ref.actor.fsdp_config.param_offload=False \ - actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \ - actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=32 \ - actor_rollout_ref.rollout.tensor_model_parallel_size=$ROLLOUT_TP_SIZE \ - actor_rollout_ref.rollout.name=vllm \ - actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ - actor_rollout_ref.rollout.n=8 \ - actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=32 \ - actor_rollout_ref.ref.fsdp_config.param_offload=True \ - algorithm.kl_ctrl.kl_coef=0.001 \ - trainer.critic_warmup=0 \ - trainer.logger=['console'] \ - trainer.project_name='verl_chain_sum_grpo' \ - trainer.experiment_name=$EXPERIMENT_NAME \ - trainer.n_gpus_per_node=$N_GPUS \ - trainer.nnodes=1 \ - trainer.save_freq=100 \ - trainer.test_freq=100 \ - trainer.total_epochs=15 $@ 2>&1 | tee verl_output.log diff --git a/examples/veRL/chain_sum/train_ppo.sh b/examples/veRL/chain_sum/train_ppo.sh deleted file mode 100755 index 45f53be5..00000000 --- a/examples/veRL/chain_sum/train_ppo.sh +++ /dev/null @@ -1,30 +0,0 @@ -#!/bin/bash -python3 -u main_ppo_custom_reward.py \ -data.train_files=$DATA_DIR/train.parquet \ -data.val_files=$DATA_DIR/test.parquet \ -data.train_batch_size=256 \ -data.val_batch_size=1312 \ -data.max_prompt_length=256 \ -data.max_response_length=1024 \ -actor_rollout_ref.model.path=$BASE_MODEL \ -actor_rollout_ref.actor.optim.lr=1e-6 \ -actor_rollout_ref.actor.ppo_mini_batch_size=128 \ -actor_rollout_ref.actor.ppo_micro_batch_size=8 \ -actor_rollout_ref.rollout.log_prob_micro_batch_size=8 \ -actor_rollout_ref.rollout.tensor_model_parallel_size=$ROLLOUT_TP_SIZE \ -actor_rollout_ref.rollout.gpu_memory_utilization=0.4 \ -actor_rollout_ref.ref.log_prob_micro_batch_size=4 \ -critic.optim.lr=1e-5 \ -critic.model.path=$BASE_MODEL \ -critic.ppo_micro_batch_size=8 \ -algorithm.kl_ctrl.kl_coef=0.001 \ -trainer.logger=['wandb'] \ -+trainer.val_before_train=False \ -trainer.default_hdfs_dir=null \ -trainer.n_gpus_per_node=$N_GPUS \ -trainer.nnodes=1 \ -trainer.save_freq=100 \ -trainer.test_freq=100 \ -trainer.project_name='verl_chain_sum_ppo' \ -trainer.experiment_name=$EXPERIMENT_NAME \ -trainer.total_epochs=15 2>&1 | tee verl_output.log diff --git a/examples/veRL/multi_env/README.md b/examples/veRL/multi_env/README.md new file mode 100644 index 00000000..88fe2fed --- /dev/null +++ b/examples/veRL/multi_env/README.md @@ -0,0 +1,72 @@ +# Chain Sum Training with veRL + +This example demonstrates how to train a language model using veRL (Volcano Engine Reinforcement Learning) with the reasoning-gym environment for chain sum problems. + +Requirements: + +python >= 3.10 + +## Installation + +1. **Install veRL**: Follow the installation instructions at [veRL repository](https://github.com/volcengine/verl) + +2. **Install reasoning-gym**: + ```bash + pip install reasoning-gym + ``` + +## Training + +To start training the model on chain sum problems: + +```bash +python grpo_train.py --config-path config --config-name grpo_trainer +``` + +### Configuration + +You can modify the training by editing the configuration file or overriding arguments in the shell scripts directly + +```bash +# Change dataset +Here it is easiest to modify the `config/grpo_trainer.yaml` file with a custom training composite. Here is an example experiment which uses a composite of algorithmic training tasks +```yaml +reasoning_gym: + dataset_size: 20000 + developer_prompt: DeepSeekZero + datasets: + ab: + weight: 1 + base_conversion: + weight: 1 + binary_alternation: + weight: 1 + config: + p_solvable: 0.9 + binary_matrix: + weight: 1 + config: + min_n: 2 + max_n: 6 + caesar_cipher: + weight: 1 + config: + max_words: 10 + cryptarithm: + weight: 1 + isomorphic_strings: + weight: 1 + config: + max_string_length: 8 +``` + +# Change configuration Set project_name and experiment_name if logging your runs to W&B. T +This config assumes a single GPU node, but you can configure this too. The following command would be for 2 GPUs, with 1 used for vLLM rollouts: + +python3 -u train_grpo.py --config-paths configs/inter_generalisation --config-name algorithmic_qwen_3b \ + actor_rollout_ref.rollout.tensor_model_parallel_size=1 \ + trainer.n_gpus_per_node=2 \ + trainer.project_name=rg-grpo \ + trainer.experiment_name=algorithmic_qwen2.5_3b + +Or similarly you could define this in a config file directly diff --git a/examples/veRL/chain_sum/config/grpo_trainer.yaml b/examples/veRL/multi_env/config/grpo_trainer.yaml similarity index 79% rename from examples/veRL/chain_sum/config/grpo_trainer.yaml rename to examples/veRL/multi_env/config/grpo_trainer.yaml index a4277028..a915585d 100644 --- a/examples/veRL/chain_sum/config/grpo_trainer.yaml +++ b/examples/veRL/multi_env/config/grpo_trainer.yaml @@ -1,34 +1,48 @@ +defaults: + - ppo_trainer + - _self_ + +reasoning_gym: + dataset_size: 20000 + developer_prompt: DeepSeekZero + datasets: + ab: + weight: 1 + data: tokenizer: null - train_files: ~/data/rlhf/gsm8k/train.parquet - val_files: ~/data/rlhf/gsm8k/test.parquet + train_files: null + val_files: null prompt_key: prompt max_prompt_length: 512 max_response_length: 512 - train_batch_size: 1024 - val_batch_size: 1312 - return_raw_input_ids: False # This should be set to true when the tokenizer between policy and rm differs - return_raw_chat: False + train_batch_size: 16 + val_batch_size: 1 actor_rollout_ref: hybrid_engine: True model: - path: ~/models/deepseek-llm-7b-chat + path: Qwen/Qwen2.5-Math-1.5B external_lib: null override_config: { } enable_gradient_checkpointing: True use_remove_padding: False actor: + loss_agg_mode: "token-mean" strategy: fsdp # This is for backward-compatibility - ppo_mini_batch_size: 256 + ppo_mini_batch_size: 16 ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu - ppo_micro_batch_size_per_gpu: null + ppo_micro_batch_size_per_gpu: 4 use_dynamic_bsz: False ppo_max_token_len_per_gpu: 16384 # n * ${data.max_prompt_length} + ${data.max_response_length} grad_clip: 1.0 - clip_ratio: 0.2 - entropy_coeff: 0.001 - use_kl_loss: True # True for GRPO + clip_ratio: 0.2 # default value if clip_ratio_low and clip_ratio_high are not specified + clip_ratio_low: 0.2 + clip_ratio_high: 0.2 + clip_ratio_c: 3.0 # lower bound of the value for Dual-clip PPO from https://arxiv.org/pdf/1912.09729 + entropy_coeff: 0 + use_kl_loss: False # True for GRPO + use_torch_compile: True # False to disable torch compile kl_loss_coef: 0.001 # for grpo kl_loss_type: low_var_kl # for grpo ppo_epochs: 1 @@ -47,6 +61,8 @@ actor_rollout_ref: param_offload: False optimizer_offload: False fsdp_size: -1 + checkpoint: + contents: ['model', 'optimizer', 'extra'] ref: fsdp_config: param_offload: False @@ -54,13 +70,15 @@ actor_rollout_ref: # transformer_layer_cls_to_wrap: None min_num_params: 0 log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu - log_prob_micro_batch_size_per_gpu: null + log_prob_micro_batch_size_per_gpu: 4 log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} ulysses_sequence_parallel_size: ${actor_rollout_ref.actor.ulysses_sequence_parallel_size} # sp size rollout: name: vllm + mode: sync temperature: 1.0 + max_model_len: 2048 top_k: -1 # 0 for hf rollout, -1 for vllm rollout top_p: 1 prompt_length: ${data.max_prompt_length} # not use for opensource @@ -72,11 +90,11 @@ actor_rollout_ref: enforce_eager: True free_cache_engine: True load_format: dummy_dtensor - tensor_model_parallel_size: 2 + tensor_model_parallel_size: 1 max_num_batched_tokens: 8192 max_num_seqs: 1024 log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu - log_prob_micro_batch_size_per_gpu: null + log_prob_micro_batch_size_per_gpu: 4 log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz} log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu} disable_log_stats: True @@ -88,6 +106,11 @@ actor_rollout_ref: n: 16 # > 1 for grpo val_kwargs: do_sample: True + multi_turn: + enable: False # set to True for multi-turn tool interaction tasks; should set rollout.name to sglang as well + max_turns: null # null for no limit (default max_length // 3) + tool_config_path: null # null for no tool + format: chatml critic: strategy: fsdp @@ -143,26 +166,32 @@ reward_model: ulysses_sequence_parallel_size: 1 # sp size use_dynamic_bsz: ${critic.use_dynamic_bsz} forward_max_token_len_per_gpu: ${critic.forward_max_token_len_per_gpu} + launch_reward_fn_async: False algorithm: + use_kl_in_reward: False gamma: 1.0 lam: 1.0 - adv_estimator: gae + adv_estimator: grpo kl_penalty: kl # how to estimate kl divergence kl_ctrl: type: fixed kl_coef: 0.001 + use_pf_ppo: False + pf_ppo: + reweight_method: pow # ["pow", "max_min", "max_random"] + weight_pow: 2.0 trainer: balance_batch: True total_epochs: 30 total_training_steps: null project_name: verl_examples - experiment_name: gsm8k + experiment_name: chain_sum logger: [ 'console', 'wandb' ] val_generations_to_log_to_wandb: 0 nnodes: 1 - n_gpus_per_node: 8 + n_gpus_per_node: 1 save_freq: -1 # auto: find the last ckpt to resume. If can't find, start from scratch resume_mode: auto # or auto or resume_path if diff --git a/examples/veRL/chain_sum/main_ppo_custom_reward.py b/examples/veRL/multi_env/grpo_train.py similarity index 64% rename from examples/veRL/chain_sum/main_ppo_custom_reward.py rename to examples/veRL/multi_env/grpo_train.py index dc6418c6..fe24486c 100644 --- a/examples/veRL/chain_sum/main_ppo_custom_reward.py +++ b/examples/veRL/multi_env/grpo_train.py @@ -3,6 +3,7 @@ from typing import Optional import hydra +import numpy as np import ray import torch import verl.utils.torch_functional as verl_F @@ -12,11 +13,14 @@ from torchdata.stateful_dataloader import StatefulDataLoader from transformers import PreTrainedTokenizer from verl import DataProto from verl.trainer.ppo.ray_trainer import RayPPOTrainer -from verl.utils.dataset.rl_dataset import collate_fn +from verl.utils.dataset.rl_dataset import collate_fn as verl_collate_fn from verl.utils.model import compute_position_id_with_mask import reasoning_gym import reasoning_gym.utils +from reasoning_gym.coaching.experiment import Experiment +from reasoning_gym.composite import CompositeDataset, DatasetSpec +from reasoning_gym.dataset import ProceduralDataset from reasoning_gym.utils import extract_answer @@ -24,23 +28,25 @@ class ReasoningGymDataset(Dataset): def __init__( self, tokenizer: PreTrainedTokenizer, - dataset_name: str, - seed: int, - size: int, + procedural_dataset: Optional[ProceduralDataset] = None, + experiment: Optional[Experiment] = None, developer_prompt: Optional[str] = None, developer_role: str = "system", max_prompt_length: int = 2048, truncation: str = "error", ## ['left', 'right', 'error'] - return_raw_chat: bool = False, ): + assert procedural_dataset or experiment, "One of `procedural_dataset` or `experiment` must be provided" + assert ( + procedural_dataset is None or experiment is None + ), "Only one of `procedural_dataset` or `experiment` may be provided" + self.tokenizer = tokenizer - self.dataset_name = dataset_name - self.data = reasoning_gym.create_dataset(dataset_name, seed=seed, size=size) + self.data = procedural_dataset or experiment.composite + self.experiment = experiment self.developer_prompt = developer_prompt self.developer_role = developer_role self.max_prompt_length = max_prompt_length self.truncation = truncation - self.return_raw_chat = return_raw_chat def __len__(self) -> int: return len(self.data) @@ -67,21 +73,69 @@ class ReasoningGymDataset(Dataset): position_ids = compute_position_id_with_mask(attention_mask) - row_dict["data_source"] = "reasoning_gym/" + self.dataset_name - row_dict["input_ids"] = input_ids[0] - row_dict["attention_mask"] = attention_mask[0] - row_dict["position_ids"] = position_ids[0] - row_dict["raw_prompt_ids"] = self.tokenizer.encode(prompt, add_special_tokens=False) + item = {} + item["index"] = index - # encode prompts without chat template - if self.return_raw_chat: - row_dict["raw_prompt"] = chat.tolist() + item["input_ids"] = input_ids[0] + item["attention_mask"] = attention_mask[0] + item["position_ids"] = position_ids[0] - # add index for each prompt - # index = row_dict.get("extra_info", {}).get("index", 0) - row_dict["index"] = index + item["raw_prompt_ids"] = item["input_ids"].tolist() - return row_dict + return item + + +def make_dataset( + tokenizer, + data_source: Experiment | ProceduralDataset, + developer_prompt: str, + max_prompt_length: int = 2048, +) -> ReasoningGymDataset: + """ + Create ReasoningGymDataset object using either a ProceduralDataset or Experiment as the underlying data source. + """ + if isinstance(data_source, Experiment): + return ReasoningGymDataset( + tokenizer=tokenizer, + experiment=data_source, + developer_prompt=developer_prompt, + developer_role="system", + max_prompt_length=max_prompt_length, + truncation="error", + ) + else: + return ReasoningGymDataset( + tokenizer=tokenizer, + procedural_dataset=data_source, + developer_prompt=developer_prompt, + developer_role="system", + max_prompt_length=max_prompt_length, + truncation="error", + ) + + +def prepare_datasets(config, tokenizer) -> tuple[ReasoningGymDataset, ReasoningGymDataset]: + """Prepare training and validation datasets.""" + dataset_size = config.reasoning_gym.dataset_size + developer_prompt_setting = config.reasoning_gym.developer_prompt + developer_prompt = reasoning_gym.utils.SYSTEM_PROMPTS[developer_prompt_setting] + dataset_specs = [ + DatasetSpec( + name=name, + weight=ds.weight, + config=OmegaConf.to_container(ds.config, resolve=True) if "config" in ds else {}, + ) + for name, ds in config.reasoning_gym.datasets.items() + ] + train_data_source = reasoning_gym.create_dataset("composite", seed=1, size=dataset_size, datasets=dataset_specs) + val_data_source = reasoning_gym.create_dataset("composite", seed=2, size=dataset_size, datasets=dataset_specs) + train_dataset = make_dataset( + tokenizer, train_data_source, developer_prompt, max_prompt_length=config.data.max_prompt_length + ) + val_dataset = make_dataset( + tokenizer, val_data_source, developer_prompt, max_prompt_length=config.data.max_prompt_length + ) + return train_dataset, val_dataset class RayPPOTrainerCustom(RayPPOTrainer): @@ -92,6 +146,8 @@ class RayPPOTrainerCustom(RayPPOTrainer): role_worker_mapping: dict, resource_pool_manager, ray_worker_group_cls, + train_dataset: ReasoningGymDataset, + val_dataset: ReasoningGymDataset, dataset_name: str = "chain_sum", dataset_size: int = 10000, ): @@ -99,24 +155,22 @@ class RayPPOTrainerCustom(RayPPOTrainer): self.dataset_size = dataset_size developer_prompt = reasoning_gym.utils.SYSTEM_PROMPTS["DeepSeekZero"] - self.train_dataset = ReasoningGymDataset( - tokenizer=tokenizer, - dataset_name=self.dataset_name, - seed=1, - size=self.dataset_size, - developer_prompt=developer_prompt, - ) - self.val_dataset = ReasoningGymDataset( - tokenizer=tokenizer, - dataset_name=self.dataset_name, - seed=2, - size=self.dataset_size, - developer_prompt=developer_prompt, - ) + self.train_dataset = train_dataset + self.val_dataset = val_dataset - train_reward_fn = lambda data: self._score_output(data, num_examine=0) - val_reward_fn = lambda data: self._score_output(data, num_examine=1) + def make_reward_fn(num_examine: int): + def reward_fn(data: DataProto, return_dict: bool = False, **unused_kwargs): + tensor = self._score_output(data, num_examine=num_examine) + if return_dict: + # wrap it so trainer can pull out extras + return {"reward_tensor": tensor, "reward_extra_info": {}} + return tensor + + return reward_fn + + train_reward_fn = make_reward_fn(num_examine=0) + val_reward_fn = make_reward_fn(num_examine=1) super().__init__( config, @@ -126,6 +180,9 @@ class RayPPOTrainerCustom(RayPPOTrainer): ray_worker_group_cls, train_reward_fn, val_reward_fn, + train_dataset=train_dataset, + val_dataset=val_dataset, + train_sampler=None, ) def _score_output(self, data: DataProto, num_examine: int = 0) -> torch.Tensor: @@ -146,15 +203,16 @@ class RayPPOTrainerCustom(RayPPOTrainer): valid_response_ids = response_ids[:valid_response_length] # decode - sequences = torch.cat((valid_prompt_ids, valid_response_ids)) - sequences_str = self.tokenizer.decode(sequences) + prompt_str = self.tokenizer.decode(valid_prompt_ids) + response_str = self.tokenizer.decode(valid_response_ids) + sequences_str = prompt_str + response_str index = data_item.non_tensor_batch["index"] - score = self._compute_score( - solution_str=sequences_str, + solution_str=response_str, index=index, ) + reward_tensor[i, valid_response_length - 1] = score if num_printed < num_examine: @@ -167,12 +225,15 @@ class RayPPOTrainerCustom(RayPPOTrainer): found_answer = extract_answer(solution_str, tag_name="answer") entry = self.train_dataset.data[index] reward = self.train_dataset.data.score_answer(found_answer, entry=entry) - # print(f"found answer={found_answer}; reward: {reward};") return reward - def _create_dataloader(self): + def _create_dataloader(self, train_dataset, val_dataset, collate_fn=None, sampler=None): + + if collate_fn is None: + collate_fn = verl_collate_fn + self.train_dataloader = StatefulDataLoader( - dataset=self.train_dataset, + dataset=train_dataset, batch_size=self.config.data.train_batch_size, shuffle=True, drop_last=True, @@ -180,8 +241,8 @@ class RayPPOTrainerCustom(RayPPOTrainer): ) self.val_dataloader = StatefulDataLoader( - dataset=self.val_dataset, - batch_size=len(self.val_dataset), + dataset=val_dataset, + batch_size=self.config.data.val_batch_size, shuffle=True, drop_last=True, collate_fn=collate_fn, @@ -224,6 +285,7 @@ def main_task(config): # instantiate tokenizer tokenizer = hf_tokenizer(local_path) + train_dataset, val_dataset = prepare_datasets(config, tokenizer) # define worker classes if config.actor_rollout_ref.actor.strategy == "fsdp": @@ -269,12 +331,14 @@ def main_task(config): role_worker_mapping=role_worker_mapping, resource_pool_manager=resource_pool_manager, ray_worker_group_cls=ray_worker_group_cls, + train_dataset=train_dataset, + val_dataset=val_dataset, ) trainer.init_workers() trainer.fit() -@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) +@hydra.main(config_path="config", config_name="grpo_trainer", version_base=None) def main(config): if not ray.is_initialized(): # this is for local ray cluster