From 8202f234be14a955a6fc0d8da8d88a0de9d846d5 Mon Sep 17 00:00:00 2001 From: Andreas Koepf Date: Sat, 1 Feb 2025 23:56:11 +0000 Subject: [PATCH] reduce veRL example size --- examples/veRL/main_ppo_custom_reward.py | 180 ++++++++---------------- 1 file changed, 57 insertions(+), 123 deletions(-) diff --git a/examples/veRL/main_ppo_custom_reward.py b/examples/veRL/main_ppo_custom_reward.py index db2d67d7..2addb8e9 100644 --- a/examples/veRL/main_ppo_custom_reward.py +++ b/examples/veRL/main_ppo_custom_reward.py @@ -1,24 +1,5 @@ -# This example is a modified version of: +# This example is an adapted version of Bytedance's code: # https://github.com/volcengine/verl/blob/a65c9157bc0b85b64cd753de19f94e80a11bd871/verl/trainer/main_ppo.py - - -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Note that we don't combine the main with ray_trainer as ray_trainer is used by other main. -""" - from typing import Optional import hydra @@ -38,79 +19,11 @@ import reasoning_gym.utils from reasoning_gym.utils import extract_answer -class RewardManager: - """The reward manager.""" - - def __init__(self, tokenizer, num_examine, compute_score) -> None: - self.tokenizer = tokenizer - self.num_examine = num_examine # the number of batches of decoded responses to print to the console - self.compute_score = compute_score - - def __call__(self, data: DataProto): - """We will expand this function gradually based on the available datasets""" - - # If there is rm score, we directly return rm score. Otherwise, we compute via rm_score_fn - if "rm_scores" in data.batch.keys(): - return data.batch["rm_scores"] - - reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32) - - already_print_data_sources = {} - - for i in range(len(data)): - data_item = data[i] # DataProtoItem - - prompt_ids = data_item.batch["prompts"] - - prompt_length = prompt_ids.shape[-1] - - valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum() - valid_prompt_ids = prompt_ids[-valid_prompt_length:] - - response_ids = data_item.batch["responses"] - valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum() - valid_response_ids = response_ids[:valid_response_length] - - # decode - sequences = torch.cat((valid_prompt_ids, valid_response_ids)) - sequences_str = self.tokenizer.decode(sequences) - - data_source = data_item.non_tensor_batch["data_source"] - ground_truth = data_item.non_tensor_batch["answer"] - index = data_item.non_tensor_batch["index"] - - score = self.compute_score( - data_source=data_source, - solution_str=sequences_str, - ground_truth=ground_truth, - index=index, - ) - reward_tensor[i, valid_response_length - 1] = score - - if data_source not in already_print_data_sources: - already_print_data_sources[data_source] = 0 - - if already_print_data_sources[data_source] < self.num_examine: - already_print_data_sources[data_source] += 1 - print(sequences_str) - - return reward_tensor - - -@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) -def main(config): - if not ray.is_initialized(): - # this is for local ray cluster - ray.init(runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}) - - ray.get(main_task.remote(config)) - - class ReasoningGymDataset(Dataset): def __init__( self, - dataset_name: str, tokenizer: PreTrainedTokenizer, + dataset_name: str, seed: int, size: int, developer_prompt: Optional[str] = None, @@ -177,8 +90,6 @@ class RayPPOTrainerCustom(RayPPOTrainer): role_worker_mapping: dict, resource_pool_manager, ray_worker_group_cls, - reward_fn=None, - val_reward_fn=None, dataset_name: str = "chain_sum", dataset_size: int = 10000, ): @@ -187,22 +98,23 @@ class RayPPOTrainerCustom(RayPPOTrainer): developer_prompt = reasoning_gym.utils.SYSTEM_PROMPTS["DeepSeekZero"] self.train_dataset = ReasoningGymDataset( - dataset_name=self.dataset_name, tokenizer=tokenizer, + dataset_name=self.dataset_name, seed=1, size=self.dataset_size, developer_prompt=developer_prompt, ) self.val_dataset = ReasoningGymDataset( - dataset_name=self.dataset_name, tokenizer=tokenizer, + dataset_name=self.dataset_name, seed=2, size=self.dataset_size, developer_prompt=developer_prompt, ) - reward_fn = RewardManager(tokenizer=tokenizer, num_examine=0, compute_score=self._compute_score) + train_reward_fn = lambda data: self._score_output(data, num_examine=0) + val_reward_fn = lambda data: self._score_output(data, num_examine=1) super().__init__( config, @@ -210,15 +122,51 @@ class RayPPOTrainerCustom(RayPPOTrainer): role_worker_mapping, resource_pool_manager, ray_worker_group_cls, - reward_fn, + train_reward_fn, val_reward_fn, ) - def _compute_score(self, data_source, solution_str, ground_truth, index) -> float: - print("Solution:", solution_str, ground_truth, index, data_source) + def _score_output(self, data: DataProto, num_examine: int = 0) -> torch.Tensor: + reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32) + + num_printed = 0 + for i in range(len(data)): + data_item = data[i] # DataProtoItem + + prompt_ids = data_item.batch["prompts"] # tokenized prompts + prompt_length = prompt_ids.shape[-1] + + valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum() + valid_prompt_ids = prompt_ids[-valid_prompt_length:] + + response_ids = data_item.batch["responses"] + valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum() + valid_response_ids = response_ids[:valid_response_length] + + # decode + sequences = torch.cat((valid_prompt_ids, valid_response_ids)) + sequences_str = self.tokenizer.decode(sequences) + + index = data_item.non_tensor_batch["index"] + + score = self._compute_score( + solution_str=sequences_str, + index=index, + ) + reward_tensor[i, valid_response_length - 1] = score + + if num_printed < num_examine: + print(f"reward={score}, seq={sequences_str}") + num_printed += 1 + + return reward_tensor + + def _compute_score(self, solution_str: str, index: int) -> float: found_answer = extract_answer(solution_str, tag_name="answer") entry = self.train_dataset.data[index] - return self.train_dataset.data.score_answer(found_answer, entry=entry) + reward = self.train_dataset.data.score_answer(found_answer, entry=entry) + # print(f"found answer={found_answer}; reward: {reward};") + return reward def _create_dataloader(self): self.train_dataloader = DataLoader( @@ -259,12 +207,11 @@ class RayPPOTrainerCustom(RayPPOTrainer): @ray.remote -def main_task(config, compute_score=None): +def main_task(config): # print initial config from pprint import pprint - from omegaconf import OmegaConf - from transformers import AutoTokenizer + from verl.utils import hf_tokenizer from verl.utils.fs import copy_local_path_from_hdfs pprint(OmegaConf.to_container(config, resolve=True)) # resolve=True will eval symbol values @@ -274,8 +221,6 @@ def main_task(config, compute_score=None): local_path = copy_local_path_from_hdfs(config.actor_rollout_ref.model.path) # instantiate tokenizer - from verl.utils import hf_tokenizer - tokenizer = hf_tokenizer(local_path) # define worker classes @@ -314,25 +259,6 @@ def main_task(config, compute_score=None): Role.RefPolicy: global_pool_id, } - # we should adopt a multi-source reward function here - # - for rule-based rm, we directly call a reward score - # - for model-based rm, we call a model - # - for code related prompt, we send to a sandbox if there are test cases - # - finally, we combine all the rewards together - # - The reward type depends on the tag of the data - if config.reward_model.enable: - if config.reward_model.strategy == "fsdp": - from verl.workers.fsdp_workers import RewardModelWorker - elif config.reward_model.strategy == "megatron": - from verl.workers.megatron_workers import RewardModelWorker - else: - raise NotImplementedError - role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker) - mapping[Role.RewardModel] = global_pool_id - - # Note that we always use function-based RM for validation - val_reward_fn = RewardManager(tokenizer=tokenizer, num_examine=1, compute_score=compute_score) - resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping) trainer = RayPPOTrainerCustom( @@ -341,11 +267,19 @@ def main_task(config, compute_score=None): role_worker_mapping=role_worker_mapping, resource_pool_manager=resource_pool_manager, ray_worker_group_cls=ray_worker_group_cls, - val_reward_fn=val_reward_fn, ) trainer.init_workers() trainer.fit() +@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None) +def main(config): + if not ray.is_initialized(): + # this is for local ray cluster + ray.init(runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN"}}) + + ray.get(main_task.remote(config)) + + if __name__ == "__main__": main()