diff --git a/environments/README.md b/environments/README.md index dadaac50..6f73ca2c 100644 --- a/environments/README.md +++ b/environments/README.md @@ -626,7 +626,7 @@ Please act as an impartial judge and evaluate the quality of the responses provi **Evaluation Methodology:** 1. **Model Response Generation**: Generate response to Arena-Hard prompt using configured temperature/tokens 2. **Thinking Validation**: If thinking mode enabled, validate exactly one `` pair and extract content after tags -3. **Dual-Round Judging**: +3. **Dual-Round Judging**: - Round 1: Judge model response (A) vs GPT-4 baseline (B) - Round 2: Judge GPT-4 baseline (A) vs model response (B) 4. **Score Combination**: Average the two judgment scores using Arena-Hard logic @@ -635,7 +635,7 @@ Please act as an impartial judge and evaluate the quality of the responses provi **Reward Function:** - **Training**: Scores range from -1.0 to 1.0 based on combined judgment results - 1.0: Model response clearly better than baseline - - 0.0: Tie between model and baseline + - 0.0: Tie between model and baseline - -1.0: Baseline clearly better than model response - **Invalid Thinking**: Automatic 0.0 score for malformed `` tags - **Evaluation**: Converted to Arena-Hard winrate format (0.0 to 1.0) diff --git a/environments/arena_hard_environment.py b/environments/arena_hard_environment.py index bb003839..0f34d61d 100644 --- a/environments/arena_hard_environment.py +++ b/environments/arena_hard_environment.py @@ -200,7 +200,7 @@ class ArenaHardEnv(BaseEnv): if self.config.custom_thinking_prompt else "You are a deep thinking AI assistant. Before providing your response, you should think through the problem carefully. Use tags to enclose your internal reasoning and thought process, then provide your final response after the thinking tags." ) - + # Append custom system prompt if provided if self.config.custom_system_prompt: return f"{base_thinking_prompt}\n\n{self.config.custom_system_prompt}" @@ -214,41 +214,69 @@ class ArenaHardEnv(BaseEnv): def _load_dataset(self, dataset_path: str, split: str = None) -> List[Dict]: """Load dataset using HuggingFace load_dataset.""" import os - + try: # Check if it's a local file if os.path.exists(dataset_path): - if dataset_path.endswith('.jsonl') or dataset_path.endswith('.json'): - dataset = load_dataset("json", data_files=dataset_path, split=split or "train", trust_remote_code=True) - elif dataset_path.endswith('.csv'): - dataset = load_dataset("csv", data_files=dataset_path, split=split or "train", trust_remote_code=True) - elif dataset_path.endswith('.parquet'): - dataset = load_dataset("parquet", data_files=dataset_path, split=split or "train", trust_remote_code=True) + if dataset_path.endswith(".jsonl") or dataset_path.endswith(".json"): + dataset = load_dataset( + "json", + data_files=dataset_path, + split=split or "train", + trust_remote_code=True, + ) + elif dataset_path.endswith(".csv"): + dataset = load_dataset( + "csv", + data_files=dataset_path, + split=split or "train", + trust_remote_code=True, + ) + elif dataset_path.endswith(".parquet"): + dataset = load_dataset( + "parquet", + data_files=dataset_path, + split=split or "train", + trust_remote_code=True, + ) else: - dataset = load_dataset("json", data_files=dataset_path, split=split or "train", trust_remote_code=True) - - print(f"Loaded local dataset from {dataset_path} with {len(dataset)} examples") - + dataset = load_dataset( + "json", + data_files=dataset_path, + split=split or "train", + trust_remote_code=True, + ) + + print( + f"Loaded local dataset from {dataset_path} with {len(dataset)} examples" + ) + else: # HuggingFace dataset if split: - dataset = load_dataset(dataset_path, split=split, trust_remote_code=True) + dataset = load_dataset( + dataset_path, split=split, trust_remote_code=True + ) else: dataset_dict = load_dataset(dataset_path, trust_remote_code=True) - if hasattr(dataset_dict, 'keys'): + if hasattr(dataset_dict, "keys"): available_splits = list(dataset_dict.keys()) if available_splits: dataset = dataset_dict[available_splits[0]] - print(f"No split specified, using '{available_splits[0]}' split") + print( + f"No split specified, using '{available_splits[0]}' split" + ) else: dataset = dataset_dict else: dataset = dataset_dict - - print(f"Loaded HuggingFace dataset {dataset_path} with {len(dataset)} examples") - + + print( + f"Loaded HuggingFace dataset {dataset_path} with {len(dataset)} examples" + ) + return dataset - + except Exception as e: print(f"Error loading dataset {dataset_path}: {e}") raise @@ -299,9 +327,15 @@ class ArenaHardEnv(BaseEnv): """Set up the environment by loading datasets.""" # Load training datasets try: - self.train_prompts = self._load_dataset(self.config.train_prompt_dataset, self.config.train_split) - self.train_baselines = self._load_dataset(self.config.train_baseline_dataset, self.config.train_split) - print(f"Loaded training datasets: {len(self.train_prompts)} prompts, {len(self.train_baselines)} baselines") + self.train_prompts = self._load_dataset( + self.config.train_prompt_dataset, self.config.train_split + ) + self.train_baselines = self._load_dataset( + self.config.train_baseline_dataset, self.config.train_split + ) + print( + f"Loaded training datasets: {len(self.train_prompts)} prompts, {len(self.train_baselines)} baselines" + ) except Exception as e: print(f"Error loading training datasets: {e}") # Don't create fallback data as requested @@ -309,16 +343,26 @@ class ArenaHardEnv(BaseEnv): # Load evaluation datasets try: - self.eval_prompts = self._load_dataset(self.config.eval_prompt_dataset, self.config.eval_split) - self.eval_baselines = self._load_dataset(self.config.eval_baseline_dataset, self.config.eval_split) - print(f"Loaded evaluation datasets: {len(self.eval_prompts)} prompts, {len(self.eval_baselines)} baselines") + self.eval_prompts = self._load_dataset( + self.config.eval_prompt_dataset, self.config.eval_split + ) + self.eval_baselines = self._load_dataset( + self.config.eval_baseline_dataset, self.config.eval_split + ) + print( + f"Loaded evaluation datasets: {len(self.eval_prompts)} prompts, {len(self.eval_baselines)} baselines" + ) except Exception as e: print(f"Error loading evaluation datasets: {e}") raise # Create UID to item mappings for quick lookup - self.train_baseline_by_uid = {item.get("uid"): item for item in self.train_baselines} - self.eval_baseline_by_uid = {item.get("uid"): item for item in self.eval_baselines} + self.train_baseline_by_uid = { + item.get("uid"): item for item in self.train_baselines + } + self.eval_baseline_by_uid = { + item.get("uid"): item for item in self.eval_baselines + } # Show configuration info print(f"\nArena-Hard Configuration:") @@ -329,8 +373,12 @@ class ArenaHardEnv(BaseEnv): print(f" - Group size: {self.config.group_size}") print(f" - Thinking mode: {self.config.thinking_mode}") if self.config.thinking_mode: - print(f" - Custom thinking prompt: {'Yes' if self.config.custom_thinking_prompt else 'No (using default)'}") - print(f" - Custom system prompt: {'Yes' if self.config.custom_system_prompt else 'No'}") + print( + f" - Custom thinking prompt: {'Yes' if self.config.custom_thinking_prompt else 'No (using default)'}" + ) + print( + f" - Custom system prompt: {'Yes' if self.config.custom_system_prompt else 'No'}" + ) self.iter = 0 @@ -347,7 +395,9 @@ class ArenaHardEnv(BaseEnv): last_100 = text_clean[-100:] return f"{label}: '{first_100}...{last_100}' (total {len(text_clean)} chars)" - def _log_full_debug_request(self, messages: List[Dict], params: Dict, context: str = ""): + def _log_full_debug_request( + self, messages: List[Dict], params: Dict, context: str = "" + ): """Log full debug information for API requests.""" if not self.config.full_debug: return @@ -358,7 +408,9 @@ class ArenaHardEnv(BaseEnv): for i, message in enumerate(messages): role = message.get("role", "unknown") content = message.get("content", "") - print(f" Message {i+1} ({role}): {self._format_debug_text(content, 'Content')}") + print( + f" Message {i+1} ({role}): {self._format_debug_text(content, 'Content')}" + ) def _log_full_debug_response(self, completion, context: str = ""): """Log full debug information for API responses.""" @@ -367,14 +419,20 @@ class ArenaHardEnv(BaseEnv): print(f"\nšŸ” FULL DEBUG - API RESPONSE [{context}]") - if hasattr(completion, 'usage'): + if hasattr(completion, "usage"): print(f" Usage: {completion.usage}") - if hasattr(completion, 'choices') and completion.choices: + if hasattr(completion, "choices") and completion.choices: for i, choice in enumerate(completion.choices): - content = choice.message.content if hasattr(choice, 'message') else "" - finish_reason = choice.finish_reason if hasattr(choice, 'finish_reason') else "unknown" - print(f" Choice {i+1}: {self._format_debug_text(content, 'Response')}") + content = choice.message.content if hasattr(choice, "message") else "" + finish_reason = ( + choice.finish_reason + if hasattr(choice, "finish_reason") + else "unknown" + ) + print( + f" Choice {i+1}: {self._format_debug_text(content, 'Response')}" + ) print(f" Finish reason: {finish_reason}") else: print(" No choices in response") @@ -394,7 +452,7 @@ class ArenaHardEnv(BaseEnv): # Get next prompt sequentially prompt_item = self.train_prompts[self.iter % len(self.train_prompts)] - + # Find corresponding baseline response baseline_item = self.train_baseline_by_uid.get(prompt_item.get("uid")) if baseline_item is None: @@ -412,46 +470,55 @@ class ArenaHardEnv(BaseEnv): if self.config.thinking_mode: messages = [ {"role": "system", "content": self.thinking_system_prompt}, - {"role": "user", "content": prompt_text} + {"role": "user", "content": prompt_text}, ] - prompt = tuple([ - frozenset({"role": "system", "content": self.thinking_system_prompt}.items()), - frozenset({"role": "user", "content": prompt_text}.items()), - ]) + prompt = tuple( + [ + frozenset( + { + "role": "system", + "content": self.thinking_system_prompt, + }.items() + ), + frozenset({"role": "user", "content": prompt_text}.items()), + ] + ) else: system_prompt = self._get_system_prompt() if system_prompt: messages = [ {"role": "system", "content": system_prompt}, - {"role": "user", "content": prompt_text} + {"role": "user", "content": prompt_text}, ] - prompt = tuple([ - frozenset({"role": "system", "content": system_prompt}.items()), - frozenset({"role": "user", "content": prompt_text}.items()), - ]) + prompt = tuple( + [ + frozenset({"role": "system", "content": system_prompt}.items()), + frozenset({"role": "user", "content": prompt_text}.items()), + ] + ) else: - messages = [ - {"role": "user", "content": prompt_text} - ] - prompt = tuple([ - frozenset({"role": "user", "content": prompt_text}.items()), - ]) + messages = [{"role": "user", "content": prompt_text}] + prompt = tuple( + [ + frozenset({"role": "user", "content": prompt_text}.items()), + ] + ) # Return item with prompt and baseline for later judgment return (prompt, {"prompt_item": prompt_item, "baseline_item": baseline_item}) - async def judge_responses(self, question: str, answer_a: str, answer_b: str) -> Tuple[str, str]: + async def judge_responses( + self, question: str, answer_a: str, answer_b: str + ) -> Tuple[str, str]: """Use Claude Sonnet to judge two responses.""" # Format the judge prompt user_content = self.judge_prompt_template.format( - question=question, - answer_a=answer_a, - answer_b=answer_b + question=question, answer_a=answer_a, answer_b=answer_b ) messages = [ {"role": "system", "content": self.judge_system_prompt}, - {"role": "user", "content": user_content} + {"role": "user", "content": user_content}, ] # Retry logic for judge calls @@ -459,8 +526,11 @@ class ArenaHardEnv(BaseEnv): try: self._log_full_debug_request( messages, - {"temperature": self.config.judge_temperature, "max_tokens": self.config.judge_max_tokens}, - f"JUDGE attempt {attempt + 1}/{self.config.max_retries}" + { + "temperature": self.config.judge_temperature, + "max_tokens": self.config.judge_max_tokens, + }, + f"JUDGE attempt {attempt + 1}/{self.config.max_retries}", ) completion = await self.judge_client.chat.completions.create( @@ -470,7 +540,9 @@ class ArenaHardEnv(BaseEnv): max_tokens=self.config.judge_max_tokens, ) - self._log_full_debug_response(completion, f"JUDGE attempt {attempt + 1}/{self.config.max_retries}") + self._log_full_debug_response( + completion, f"JUDGE attempt {attempt + 1}/{self.config.max_retries}" + ) if not completion.choices: if attempt < self.config.max_retries - 1: @@ -493,11 +565,15 @@ class ArenaHardEnv(BaseEnv): except Exception as e: if attempt < self.config.max_retries - 1: - print(f"Judge API call failed (attempt {attempt + 1}/{self.config.max_retries}): {e}") + print( + f"Judge API call failed (attempt {attempt + 1}/{self.config.max_retries}): {e}" + ) await asyncio.sleep(self.config.retry_delay) continue else: - print(f"Judge API call failed after {self.config.max_retries} attempts: {e}") + print( + f"Judge API call failed after {self.config.max_retries} attempts: {e}" + ) return "ERROR", f"Judge error: {e}" return "ERROR", "Judge failed after all retries" @@ -544,7 +620,7 @@ class ArenaHardEnv(BaseEnv): def _validate_and_extract_thinking(self, response: str) -> Tuple[bool, str]: """ Validate thinking format and extract content after tags. - + Returns: (is_valid, extracted_content) - is_valid: True if thinking format is valid (exactly one pair of think tags) @@ -552,17 +628,17 @@ class ArenaHardEnv(BaseEnv): """ if not self.config.thinking_mode: return True, response - + if not response or not isinstance(response, str): return False, "" - + # Check for exactly one pair of think tags think_open_count = len(self._think_pattern.findall(response)) think_close_count = len(self._think_close_pattern.findall(response)) - + if think_open_count != 1 or think_close_count != 1: return False, "" - + # Extract content after tags match = self._think_content_pattern.search(response) if match: @@ -595,14 +671,17 @@ class ArenaHardEnv(BaseEnv): self._log_full_debug_request( messages, completion_params, - f"MODEL_TRAIN attempt {attempt + 1}/{self.config.max_retries}" + f"MODEL_TRAIN attempt {attempt + 1}/{self.config.max_retries}", ) completions = await self.server.chat_completion( messages=messages, **completion_params ) - self._log_full_debug_response(completions, f"MODEL_TRAIN attempt {attempt + 1}/{self.config.max_retries}") + self._log_full_debug_response( + completions, + f"MODEL_TRAIN attempt {attempt + 1}/{self.config.max_retries}", + ) if not completions.choices: if attempt < self.config.max_retries - 1: @@ -614,9 +693,12 @@ class ArenaHardEnv(BaseEnv): # Validate completions valid_completions = [] for completion_choice in completions.choices: - if (completion_choice.message.content is not None and - isinstance(completion_choice.message.content, str) and - len(completion_choice.message.content.strip()) >= self.config.min_response_length): + if ( + completion_choice.message.content is not None + and isinstance(completion_choice.message.content, str) + and len(completion_choice.message.content.strip()) + >= self.config.min_response_length + ): valid_completions.append(completion_choice) if len(valid_completions) < len(completions.choices) // 2: @@ -628,33 +710,43 @@ class ArenaHardEnv(BaseEnv): except Exception as e: if attempt < self.config.max_retries - 1: - print(f"Model completion failed (attempt {attempt + 1}/{self.config.max_retries}): {e}") + print( + f"Model completion failed (attempt {attempt + 1}/{self.config.max_retries}): {e}" + ) await asyncio.sleep(self.config.retry_delay) continue else: - print(f"Model completion failed after {self.config.max_retries} attempts: {e}") + print( + f"Model completion failed after {self.config.max_retries} attempts: {e}" + ) return None, [] # Judge each response against baseline to_score = [] question = prompt_item.get("prompt", "") - + # Extract baseline answer from simplified structure baseline_answer = baseline_item.get("answer", "") if not baseline_answer: - print(f"Warning: Could not extract baseline answer for {prompt_item.get('uid')}") + print( + f"Warning: Could not extract baseline answer for {prompt_item.get('uid')}" + ) return None, [] for completion_choice in valid_completions: model_answer = completion_choice.message.content # Validate thinking format if thinking mode is enabled - is_valid_thinking, extracted_content = self._validate_and_extract_thinking(model_answer) - + is_valid_thinking, extracted_content = self._validate_and_extract_thinking( + model_answer + ) + if not is_valid_thinking: # Score 0 for invalid thinking format without judging - trajectory_messages = messages + [{"role": "assistant", "content": model_answer}] + trajectory_messages = messages + [ + {"role": "assistant", "content": model_answer} + ] to_score.append((tuple(trajectory_messages), 0.0)) continue @@ -662,16 +754,22 @@ class ArenaHardEnv(BaseEnv): content_for_judging = extracted_content # Judge model answer vs baseline (round 1: model=A, baseline=B) - score_1, judgment_1 = await self.judge_responses(question, content_for_judging, baseline_answer) - - # Judge baseline vs model answer (round 2: baseline=A, model=B) - score_2, judgment_2 = await self.judge_responses(question, baseline_answer, content_for_judging) + score_1, judgment_1 = await self.judge_responses( + question, content_for_judging, baseline_answer + ) + + # Judge baseline vs model answer (round 2: baseline=A, model=B) + score_2, judgment_2 = await self.judge_responses( + question, baseline_answer, content_for_judging + ) # Combine scores using Arena-Hard logic final_score = self._combine_scores(score_1, score_2) # Create trajectory - trajectory_messages = messages + [{"role": "assistant", "content": model_answer}] + trajectory_messages = messages + [ + {"role": "assistant", "content": model_answer} + ] to_score.append((tuple(trajectory_messages), final_score)) # Score the trajectories @@ -680,14 +778,19 @@ class ArenaHardEnv(BaseEnv): def _combine_scores(self, score_1: str, score_2: str) -> float: """Combine two judgment scores using Arena-Hard logic.""" + # Convert scores to numeric values def score_to_value(score: str, is_first_round: bool) -> float: if score == "ERROR": return 0.0 elif score == "A>B": - return 1.0 if is_first_round else -1.0 # A>B in round 1 = model wins, A>B in round 2 = baseline wins + return ( + 1.0 if is_first_round else -1.0 + ) # A>B in round 1 = model wins, A>B in round 2 = baseline wins elif score == "B>A": - return -1.0 if is_first_round else 1.0 # B>A in round 1 = baseline wins, B>A in round 2 = model wins + return ( + -1.0 if is_first_round else 1.0 + ) # B>A in round 1 = baseline wins, B>A in round 2 = model wins elif score == "A=B": return 0.0 # Tie else: @@ -761,7 +864,7 @@ class ArenaHardEnv(BaseEnv): try: prompt_item = eval_item baseline_item = self.eval_baseline_by_uid.get(prompt_item.get("uid")) - + if baseline_item is None: return {"score": 0.0, "sample": None} @@ -773,18 +876,18 @@ class ArenaHardEnv(BaseEnv): if self.config.thinking_mode: messages = [ {"role": "system", "content": self.thinking_system_prompt}, - {"role": "user", "content": question} + {"role": "user", "content": question}, ] else: system_prompt = self._get_system_prompt() if system_prompt: messages = [ {"role": "system", "content": system_prompt}, - {"role": "user", "content": question} + {"role": "user", "content": question}, ] else: messages = [{"role": "user", "content": question}] - + completion_params = { "n": 1, "max_tokens": self.config.eval_max_tokens, @@ -798,16 +901,22 @@ class ArenaHardEnv(BaseEnv): self._log_full_debug_request( messages, completion_params, - f"MODEL_EVAL attempt {attempt + 1}/{self.config.max_retries}" + f"MODEL_EVAL attempt {attempt + 1}/{self.config.max_retries}", ) completion = await self.server.chat_completion( messages=messages, **completion_params ) - self._log_full_debug_response(completion, f"MODEL_EVAL attempt {attempt + 1}/{self.config.max_retries}") + self._log_full_debug_response( + completion, + f"MODEL_EVAL attempt {attempt + 1}/{self.config.max_retries}", + ) - if not completion.choices or not completion.choices[0].message.content: + if ( + not completion.choices + or not completion.choices[0].message.content + ): if attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_delay) continue @@ -815,7 +924,7 @@ class ArenaHardEnv(BaseEnv): return {"score": 0.0, "sample": None} model_answer = completion.choices[0].message.content - + if len(model_answer.strip()) < self.config.min_response_length: if attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_delay) @@ -833,8 +942,10 @@ class ArenaHardEnv(BaseEnv): return {"score": 0.0, "sample": None} # Validate thinking format and extract content for judging - is_valid_thinking, content_for_judging = self._validate_and_extract_thinking(model_answer) - + is_valid_thinking, content_for_judging = ( + self._validate_and_extract_thinking(model_answer) + ) + if not is_valid_thinking: # Return 0 score for invalid thinking format return {"score": 0.0, "sample": None} @@ -846,19 +957,23 @@ class ArenaHardEnv(BaseEnv): return {"score": 0.0, "sample": None} # Judge both rounds using extracted content - score_1, judgment_1 = await self.judge_responses(question, content_for_judging, baseline_answer) - score_2, judgment_2 = await self.judge_responses(question, baseline_answer, content_for_judging) + score_1, judgment_1 = await self.judge_responses( + question, content_for_judging, baseline_answer + ) + score_2, judgment_2 = await self.judge_responses( + question, baseline_answer, content_for_judging + ) # Calculate final score for Arena-Hard compatibility final_score = self._combine_scores(score_1, score_2) - + # Convert to Arena-Hard winrate format (0.0 to 1.0) arena_score = max((final_score + 1) / 2, 0.0) sample = { "question": question, "model_answer": model_answer, # Full response including thinking tags - "model_answer_for_judging": content_for_judging, # Content used for judging + "model_answer_for_judging": content_for_judging, # Content used for judging "baseline_answer": baseline_answer, "score_round_1": score_1, "judgment_round_1": judgment_1, @@ -892,7 +1007,8 @@ class ArenaHardEnv(BaseEnv): # Filter valid results valid_results = [ - result for result in results + result + for result in results if result and result.get("sample") is not None ] @@ -910,7 +1026,7 @@ class ArenaHardEnv(BaseEnv): # Calculate overall winrate (Arena-Hard style) overall_winrate = sum(scores) / len(scores) if scores else 0.0 - + # Calculate category-specific winrates category_scores = {} for sample in samples: @@ -978,28 +1094,34 @@ class ArenaHardEnv(BaseEnv): if wandb_metrics is None: wandb_metrics = {} - # Training accuracy metrics + # Training accuracy metrics if self.percent_correct_buffer: - wandb_metrics["train/winrate"] = sum(self.percent_correct_buffer) / len(self.percent_correct_buffer) + wandb_metrics["train/winrate"] = sum(self.percent_correct_buffer) / len( + self.percent_correct_buffer + ) # Judge outcome distribution if self.total_judgments > 0: - wandb_metrics.update({ - "train/win_rate": self.win_count / self.total_judgments, - "train/tie_rate": self.tie_count / self.total_judgments, - "train/loss_rate": self.loss_count / self.total_judgments, - "train/total_judgments": self.total_judgments, - }) + wandb_metrics.update( + { + "train/win_rate": self.win_count / self.total_judgments, + "train/tie_rate": self.tie_count / self.total_judgments, + "train/loss_rate": self.loss_count / self.total_judgments, + "train/total_judgments": self.total_judgments, + } + ) # Configuration metrics - wandb_metrics.update({ - "config/group_size": self.config.group_size, - "config/max_token_length": self.config.max_token_length, - "config/judge_temperature": self.config.judge_temperature, - "config/eval_temperature": self.config.eval_temperature, - "config/rollout_temperature": self.config.rollout_temperature, - "config/thinking_mode": 1.0 if self.config.thinking_mode else 0.0, - }) + wandb_metrics.update( + { + "config/group_size": self.config.group_size, + "config/max_token_length": self.config.max_token_length, + "config/judge_temperature": self.config.judge_temperature, + "config/eval_temperature": self.config.eval_temperature, + "config/rollout_temperature": self.config.rollout_temperature, + "config/thinking_mode": 1.0 if self.config.thinking_mode else 0.0, + } + ) # Reset training metrics self._reset_metrics()