""" PHYBench Evaluation Environment for Atropos. This environment evaluates models on PHYBench - a benchmark for evaluating physical perception and reasoning capabilities in Large Language Models. Dataset: Eureka-Lab/PHYBench Paper: https://arxiv.org/abs/2504.16074 Website: https://www.phybench.cn/ PHYBench is a human-curated benchmark with 500 original physics problems spanning: - Mechanics (MECHANICS) - Electromagnetism (ELECTRICITY) - Thermodynamics (THERMODYNAMICS) - Optics (OPTICS) - Modern Physics (MODERN) - Advanced Physics (ADVANCED) Key features: - Original problems to prevent data contamination - Symbolic expression answers in LaTeX format - Two evaluation metrics: 1. Binary Accuracy: Exact match using SymPy equivalence 2. EED Score: Expression Edit Distance for partial credit (0-100) The EED Score provides: - 204% improved sample efficiency over binary scoring - Continuous scoring that captures partial correctness - Differentiation between minor coefficient errors and structural errors Supports thinking mode with tags for extended reasoning. """ import asyncio import os import random import re from typing import Dict, List, Optional, Tuple import wandb from datasets import load_dataset from eed_score import EED_AVAILABLE, compute_eed_score, extract_all_boxed from eval_helpers import ( THINK_CONTENT_AFTER_PATTERN, create_system_content, extract_thinking_content, get_default_thinking_prompt, save_eval_results, validate_thinking_format, ) from pydantic import Field from tqdm.asyncio import tqdm_asyncio from atroposlib.envs.base import ( APIServerConfig, BaseEnv, BaseEnvConfig, EvalHandlingEnum, ) # Physics domain tags in PHYBench PHYBENCH_TAGS = [ "MECHANICS", "ELECTRICITY", "THERMODYNAMICS", "OPTICS", "MODERN", "ADVANCED", ] # Prompt template for PHYBench evaluation PHYBENCH_PROMPT_TEMPLATE = """You are a physics expert. Please read the following question and provide a step-by-step solution. Put your final answer, which must be a readable LaTeX formula, in a \\boxed{{}} environment. Question: {problem} Answer:""" # Alternative prompt with more detailed instructions PHYBENCH_DETAILED_PROMPT_TEMPLATE = """Solve the following physics problem. Show your reasoning step by step. Your final answer should be a single symbolic expression (e.g., $\\sqrt{{\\frac{{2g}}{{3R}}}}$). - Equivalent forms are accepted - No numerical approximations - No equation chains Put your final answer in \\boxed{{}} format. For example: \\boxed{{2mg + \\frac{{4mv_0^2}}{{l}}}} Problem: {problem} Solution:""" class PHYBenchEvalConfig(BaseEnvConfig): """Configuration for PHYBench evaluation environment.""" # Dataset configuration dataset_name: str = Field( default="Eureka-Lab/PHYBench", description="HuggingFace dataset name", ) eval_split: str = Field( default="train", description="Split to evaluate on (PHYBench only has train split)", ) shuffle_seed: int = Field( default=42, description="Random seed for shuffling", ) max_samples: Optional[int] = Field( default=None, description="Maximum number of samples to evaluate (None = all)", ) tags_filter: Optional[List[str]] = Field( default=None, description="Filter to specific physics domains (e.g., ['MECHANICS', 'OPTICS'])", ) # Generation parameters eval_temperature: float = Field( default=0.6, description="Temperature for evaluation generation", ) eval_max_tokens: int = Field( default=0, description="Max tokens for evaluation (0 = use model default)", ) # System prompt configuration custom_system_prompt: Optional[str] = Field( default=None, description="Optional custom system prompt", ) # Thinking mode configuration thinking_mode: bool = Field( default=False, description="Whether to use thinking mode with tags", ) custom_thinking_prompt: Optional[str] = Field( default=None, description="Optional custom thinking prompt", ) # Prompt configuration use_detailed_prompt: bool = Field( default=False, description="Use detailed prompt with more instructions", ) # Scoring configuration compute_eed_score: bool = Field( default=True, description="Whether to compute EED Score (requires latex2sympy2_extended)", ) # Retry and debug configuration max_retries: int = Field( default=3, description="Maximum retries for failed API calls", ) retry_delay: float = Field( default=1.0, description="Delay between retries in seconds", ) min_response_length: int = Field( default=1, description="Minimum response length to consider valid", ) full_debug: bool = Field( default=False, description="Enable full debug output", ) class PHYBenchEvalEnv(BaseEnv): """ PHYBench Evaluation Environment. Evaluates models on physics problems requiring symbolic expression answers. Uses both binary accuracy and EED Score for comprehensive evaluation. """ name = "phybench_eval" env_config_cls = PHYBenchEvalConfig def __init__( self, config: PHYBenchEvalConfig, server_configs: List[APIServerConfig], slurm: bool = False, testing: bool = False, ): super().__init__(config, server_configs, slurm, testing) self.config: PHYBenchEvalConfig = config self.eval_items: List[Dict] = [] self._dataset_loaded = False # Pre-compile regex patterns for answer extraction self._boxed_pattern = re.compile(r"\\boxed\{([^{}]*(?:\{[^{}]*\}[^{}]*)*)\}") # Check EED availability if self.config.compute_eed_score and not EED_AVAILABLE: print( "Warning: EED Score requested but latex2sympy2_extended not available. " "Install with: pip install latex2sympy2_extended sympy" ) @classmethod def config_init(cls) -> Tuple[PHYBenchEvalConfig, List[APIServerConfig]]: """Initialize default configuration for the environment.""" env_config = PHYBenchEvalConfig( tokenizer_name="NousResearch/Hermes-3-Llama-3.1-8B", group_size=1, use_wandb=True, max_num_workers_per_node=128, rollout_server_url="http://localhost:8000", total_steps=1, batch_size=1, steps_per_eval=1, inference_weight=1.0, wandb_name="phybench_eval", eval_handling=EvalHandlingEnum.STOP_TRAIN, max_eval_workers=256, max_num_workers=1024, # PHYBench specific defaults dataset_name="Eureka-Lab/PHYBench", eval_split="train", eval_temperature=0.6, eval_max_tokens=0, # Use model default thinking_mode=False, compute_eed_score=True, ) server_configs = [ APIServerConfig( model_name="gpt-4.1", base_url="https://api.openai.com/v1", api_key=os.getenv("OPENAI_API_KEY", "none"), num_max_requests_at_once=32, num_requests_for_eval=1024, ), ] return env_config, server_configs async def setup(self) -> None: """Initialize the environment and load the dataset.""" if not self._dataset_loaded: await self._load_dataset() print("\nPHYBench Evaluation Setup:") print(f" Dataset: {self.config.dataset_name}") print(f" Evaluation split: {self.config.eval_split}") print(f" Thinking mode: {self.config.thinking_mode}") print(f" EED Score enabled: {self.config.compute_eed_score and EED_AVAILABLE}") if self.config.thinking_mode: thinking_prompt = get_default_thinking_prompt( self.config.custom_thinking_prompt ) print(f" Thinking prompt: {thinking_prompt[:80]}...") if self.config.tags_filter: print(f" Tags filter: {self.config.tags_filter}") print(f" Loaded {len(self.eval_items)} evaluation items") async def _load_dataset(self) -> None: """Load and process the PHYBench dataset.""" print(f"Loading PHYBench dataset: {self.config.dataset_name}...") try: dataset = load_dataset( self.config.dataset_name, trust_remote_code=True, ) except Exception as e: print(f"Error loading dataset: {e}") raise if self.config.eval_split not in dataset: available_splits = list(dataset.keys()) raise ValueError( f"Split '{self.config.eval_split}' not found. Available: {available_splits}" ) split_data = dataset[self.config.eval_split] # Process items (deduplicate by content - dataset has duplicates) self.eval_items = [] tag_counts: Dict[str, int] = {} seen_content: set = set() for item in split_data: problem_id = item.get("id", "") tag = item.get("tag", "UNKNOWN") content = item.get("content", "") solution = item.get("solution", "") answer = item.get("answer", "") # Skip if no content or answer if not content or not answer: continue # Skip duplicates (dataset contains each question twice) if content in seen_content: continue seen_content.add(content) # Apply tag filter if specified if self.config.tags_filter and tag not in self.config.tags_filter: continue # Track tag distribution tag_counts[tag] = tag_counts.get(tag, 0) + 1 self.eval_items.append( { "id": problem_id, "tag": tag, "content": content, "solution": solution, "answer": answer, } ) # Shuffle with seed for reproducibility random.seed(self.config.shuffle_seed) random.shuffle(self.eval_items) # Apply max_samples limit if specified if self.config.max_samples and len(self.eval_items) > self.config.max_samples: self.eval_items = self.eval_items[: self.config.max_samples] self._dataset_loaded = True # Print tag distribution print(f"Loaded {len(self.eval_items)} items") print("Tag distribution:") for tag, count in sorted(tag_counts.items()): print(f" {tag}: {count}") def _format_prompt(self, item: Dict) -> str: """Format the problem into a prompt.""" if self.config.use_detailed_prompt: return PHYBENCH_DETAILED_PROMPT_TEMPLATE.format(problem=item["content"]) return PHYBENCH_PROMPT_TEMPLATE.format(problem=item["content"]) def _create_system_content(self) -> str: """Create system message content based on thinking mode.""" return ( create_system_content( self.config.thinking_mode, self.config.custom_thinking_prompt, self.config.custom_system_prompt, ) or "" ) def _extract_answer( self, response: str, debug: bool = False ) -> Tuple[Optional[str], str]: """ Extract the answer from the model's response. Looks for \\boxed{} content. If multiple found, uses the last one. Args: response: Model's response text debug: Whether to print debug info Returns: Tuple of (extracted_answer, extraction_method) """ if not response: return None, "empty_response" # Find all boxed answers boxed_answers = extract_all_boxed(response) if not boxed_answers: if debug: print(" No \\boxed{} found in response") return None, "no_boxed" if len(boxed_answers) > 1: if debug: print( f" Multiple \\boxed{{}} found ({len(boxed_answers)}), using last one" ) return boxed_answers[-1], "boxed_last" return boxed_answers[0], "boxed" def _check_equivalence( self, predicted: str, gold: str, debug: bool = False, ) -> Tuple[bool, str]: """ Check if predicted answer is equivalent to gold answer. Uses SymPy for symbolic equivalence checking. Args: predicted: Predicted answer in LaTeX gold: Gold answer in LaTeX debug: Whether to print debug info Returns: Tuple of (is_correct, method) """ if not predicted: return False, "empty_prediction" # Clean up the answers pred_clean = predicted.strip() gold_clean = gold.strip() # Exact string match if pred_clean == gold_clean: return True, "exact_match" # Try EED Score - if score is 100, they're equivalent if self.config.compute_eed_score and EED_AVAILABLE: try: score, _, _, _ = compute_eed_score( gold_clean, pred_clean, debug_mode=False ) if score == 100: return True, "sympy_equivalent" except Exception: pass return False, "not_equivalent" def _compute_scores( self, predicted: str, gold: str, debug: bool = False, ) -> Dict: """ Compute both accuracy and EED Score. Args: predicted: Predicted answer gold: Gold answer debug: Whether to print debug info Returns: Dictionary with scoring results """ result = { "is_correct": False, "match_method": "none", "eed_score": 0.0, "eed_rel_distance": -1, "eed_tree_size": -1, "eed_distance": -1, } if not predicted: return result # Check equivalence (for binary accuracy) is_correct, match_method = self._check_equivalence(predicted, gold, debug) result["is_correct"] = is_correct result["match_method"] = match_method # Compute EED Score if enabled if self.config.compute_eed_score and EED_AVAILABLE: try: eed_score, rel_dist, tree_size, distance = compute_eed_score( gold, predicted, debug_mode=debug ) result["eed_score"] = eed_score result["eed_rel_distance"] = rel_dist result["eed_tree_size"] = tree_size result["eed_distance"] = distance # If EED score is 100, mark as correct if eed_score == 100 and not is_correct: result["is_correct"] = True result["match_method"] = "eed_equivalent" except Exception as e: if debug: print(f" EED Score error: {e}") return result async def rollout_and_score_eval(self, item: Dict) -> Optional[Dict]: """Run evaluation on a single item and return the result.""" if self.config.full_debug: print( f"[DEBUG] Starting eval for item: {item.get('id', 'unknown')}", flush=True, ) prompt = self._format_prompt(item) system_content = self._create_system_content() messages = [] if system_content: messages.append({"role": "system", "content": system_content}) messages.append({"role": "user", "content": prompt}) # Build API call parameters kwargs = { "messages": messages, "n": 1, "temperature": self.config.eval_temperature, "split": "eval", } if self.config.eval_max_tokens > 0: kwargs["max_tokens"] = self.config.eval_max_tokens response_text = "" for attempt in range(self.config.max_retries): try: if self.config.full_debug: print( f" Making API request (attempt {attempt + 1}/{self.config.max_retries})...", flush=True, ) print( f" Temperature: {self.config.eval_temperature}", flush=True ) print( f" Max tokens: {self.config.eval_max_tokens if self.config.eval_max_tokens > 0 else 'model default'}", # noqa: E501 flush=True, ) response = await self.server.chat_completion(**kwargs) response_text = response.choices[0].message.content or "" if len(response_text) >= self.config.min_response_length: break except Exception as e: if self.config.full_debug: print(f" API error (attempt {attempt + 1}): {e}") if attempt < self.config.max_retries - 1: await asyncio.sleep(self.config.retry_delay) continue if not response_text: return None # Validate thinking format and extract content after is_valid_format, content_for_extraction = validate_thinking_format( response_text, self.config.thinking_mode ) # Extract thinking content if present thinking_content = ( extract_thinking_content(response_text) if self.config.thinking_mode else None ) # Get content for answer extraction if self.config.thinking_mode: match = THINK_CONTENT_AFTER_PATTERN.search(response_text) if match: answer_content = match.group(1) else: answer_content = response_text else: answer_content = response_text # Extract answer extracted_answer, extraction_method = self._extract_answer( answer_content, debug=self.config.full_debug ) # Compute scores gold_answer = item["answer"] scores = self._compute_scores( extracted_answer, gold_answer, debug=self.config.full_debug ) if self.config.full_debug: status = "✓" if scores["is_correct"] else "✗" eed = scores["eed_score"] print( f" [{status}] {item['tag']}: EED={eed:.1f}, gold={gold_answer[:50]}..." ) return { "item_id": item["id"], "tag": item["tag"], "content": item["content"][:200], "gold_answer": gold_answer, "extracted_answer": extracted_answer, "extraction_method": extraction_method, "is_correct": scores["is_correct"], "match_method": scores["match_method"], "eed_score": scores["eed_score"], "eed_rel_distance": scores["eed_rel_distance"], "eed_tree_size": scores["eed_tree_size"], "eed_distance": scores["eed_distance"], "format_valid": is_valid_format, "response": response_text, "response_length": len(response_text), "thinking_content": thinking_content, "has_thinking": thinking_content is not None, } async def evaluate(self, *args, **kwargs) -> Dict: """Run the full PHYBench evaluation.""" print(f"\n{'='*60}") print("Starting PHYBench Evaluation") print(f"{'='*60}") print(f" Total questions: {len(self.eval_items)}") print(f" Thinking mode: {self.config.thinking_mode}") print(f" EED Score: {self.config.compute_eed_score and EED_AVAILABLE}") print(f"{'='*60}\n") # Create evaluation tasks eval_tasks = [self.rollout_and_score_eval(item) for item in self.eval_items] # Run with progress bar results = await tqdm_asyncio.gather(*eval_tasks, desc="Evaluating PHYBench") # Filter out failed results valid_results = [r for r in results if r is not None] if not valid_results: print("Warning: No valid evaluation results obtained") return {"error": "No valid results", "accuracy": 0.0} # Calculate metrics total = len(valid_results) correct = sum(1 for r in valid_results if r["is_correct"]) accuracy = correct / total if total > 0 else 0.0 # Calculate average EED Score eed_scores = [r["eed_score"] for r in valid_results if r["eed_score"] >= 0] avg_eed_score = sum(eed_scores) / len(eed_scores) if eed_scores else 0.0 # Calculate per-tag metrics tag_metrics: Dict[str, Dict] = {} for r in valid_results: tag = r.get("tag", "UNKNOWN") if tag not in tag_metrics: tag_metrics[tag] = {"total": 0, "correct": 0, "eed_scores": []} tag_metrics[tag]["total"] += 1 if r["is_correct"]: tag_metrics[tag]["correct"] += 1 if r["eed_score"] >= 0: tag_metrics[tag]["eed_scores"].append(r["eed_score"]) for tag in tag_metrics: t_total = tag_metrics[tag]["total"] t_correct = tag_metrics[tag]["correct"] t_eed_scores = tag_metrics[tag]["eed_scores"] tag_metrics[tag]["accuracy"] = t_correct / t_total if t_total > 0 else 0.0 tag_metrics[tag]["avg_eed_score"] = ( sum(t_eed_scores) / len(t_eed_scores) if t_eed_scores else 0.0 ) # Calculate extraction method statistics extraction_methods: Dict[str, int] = {} for r in valid_results: method = r.get("extraction_method", "unknown") extraction_methods[method] = extraction_methods.get(method, 0) + 1 # Format compliance and thinking utilization format_valid = sum(1 for r in valid_results if r.get("format_valid", True)) has_thinking = sum(1 for r in valid_results if r.get("has_thinking", False)) has_boxed = sum( 1 for r in valid_results if r.get("extracted_answer") is not None ) # Average response length response_lengths = [r.get("response_length", 0) for r in valid_results] avg_response_length = ( sum(response_lengths) / len(response_lengths) if response_lengths else 0 ) metrics = { "accuracy": accuracy, "avg_eed_score": avg_eed_score, "total_evaluated": total, "total_correct": correct, "has_boxed_rate": has_boxed / total if total > 0 else 0.0, "format_compliance_rate": format_valid / total if total > 0 else 0.0, "thinking_utilization_rate": has_thinking / total if total > 0 else 0.0, "avg_response_length": avg_response_length, "tag_metrics": tag_metrics, "extraction_methods": extraction_methods, } # Print summary print(f"\n{'='*60}") print("PHYBench Evaluation Results") print(f"{'='*60}") print(f" Overall Accuracy: {accuracy:.2%} ({correct}/{total})") print(f" Average EED Score: {avg_eed_score:.1f}/100") print(f" Has \\boxed{{}} Rate: {has_boxed / total:.2%}") print(f" Avg Response Length: {avg_response_length:.0f} chars") if self.config.thinking_mode: print(f" Format Compliance: {format_valid / total:.2%}") print(f" Thinking Utilization: {has_thinking / total:.2%}") print("\n Per-Tag Breakdown:") for tag in sorted(tag_metrics.keys()): data = tag_metrics[tag] acc = data["accuracy"] eed = data["avg_eed_score"] cnt = data["total"] print(f" {tag}: Acc={acc:.2%}, EED={eed:.1f} ({cnt} items)") print("\n Extraction Methods:") for method, count in sorted(extraction_methods.items(), key=lambda x: -x[1]): print(f" {method}: {count}") print(f"{'='*60}\n") # Save results if self.config.data_dir_to_save_evals: self._save_results(metrics, valid_results) return metrics def _save_results(self, metrics: Dict, results: List[Dict]) -> None: """Save evaluation results to disk.""" save_eval_results(self.config.data_dir_to_save_evals, metrics, results) async def wandb_log(self, metrics: Dict, step: int = 0) -> None: """Log metrics to Weights & Biases.""" if not self.config.use_wandb: return log_dict = { "phybench/accuracy": metrics.get("accuracy", 0), "phybench/avg_eed_score": metrics.get("avg_eed_score", 0), "phybench/total_evaluated": metrics.get("total_evaluated", 0), "phybench/has_boxed_rate": metrics.get("has_boxed_rate", 0), "phybench/format_compliance_rate": metrics.get("format_compliance_rate", 0), "phybench/thinking_utilization_rate": metrics.get( "thinking_utilization_rate", 0 ), "phybench/avg_response_length": metrics.get("avg_response_length", 0), } # Log per-tag metrics for tag, data in metrics.get("tag_metrics", {}).items(): safe_tag = tag.lower() log_dict[f"phybench/accuracy_{safe_tag}"] = data.get("accuracy", 0) log_dict[f"phybench/eed_score_{safe_tag}"] = data.get("avg_eed_score", 0) wandb.log(log_dict, step=step) # Required abstract method implementations async def get_next_item(self) -> Optional[Dict]: """Not used in evaluation mode.""" return None async def collect_trajectories(self, item) -> List: """Not used in evaluation mode.""" return [] async def score(self, rollout_group_data) -> Optional[List]: """Not used in evaluation mode.""" return None if __name__ == "__main__": PHYBenchEvalEnv.cli()