"""
Shared helper functions for evaluation environments.
This module contains common utilities used across multiple eval environments,
making it easier to maintain consistent behavior and update logic in one place.
Includes:
- MCQA answer extraction (letter-based)
- Numbered choice extraction
- Freeform answer extraction
- Thinking mode validation
- Math answer verification (using math_verify library)
- System prompt creation
- Results saving utilities
"""
import json
import os
import re
from concurrent.futures import ProcessPoolExecutor
from string import ascii_uppercase
from typing import Dict, List, Optional, Set, Tuple
# Try to import math_verify libraries (optional dependency for math evals)
try:
from latex2sympy2_extended import NormalizationConfig
from math_verify import LatexExtractionConfig, parse, verify
from math_verify.errors import TimeoutException
MATH_VERIFY_AVAILABLE = True
except ImportError:
MATH_VERIFY_AVAILABLE = False
NormalizationConfig = None
LatexExtractionConfig = None
parse = None
verify = None
TimeoutException = Exception
# Pre-compiled regex for answer tag extraction
ANSWER_TAG_PATTERN = re.compile(r"(.*?)", re.DOTALL | re.IGNORECASE)
# Pre-compiled regex for thinking mode
THINK_OPEN_PATTERN = re.compile(r"", re.IGNORECASE)
THINK_CLOSE_PATTERN = re.compile(r"", re.IGNORECASE)
THINK_CONTENT_AFTER_PATTERN = re.compile(r"\s*(.*)", re.DOTALL | re.IGNORECASE)
THINK_CONTENT_INSIDE_PATTERN = re.compile(
r"(.*?)", re.DOTALL | re.IGNORECASE
)
# Common prefixes that models use before stating their answer
# These will be stripped to help isolate the actual answer
ANSWER_PREFIXES = [
# "Final Answer" variants
r"(?:the\s+)?final\s+answer\s+is\s*:?\s*",
r"(?:my\s+)?final\s+answer\s*:?\s*",
# "Answer" variants
r"(?:the\s+)?answer\s+is\s*:?\s*",
r"(?:the\s+)?correct\s+answer\s+is\s*:?\s*",
r"(?:my\s+)?answer\s*:?\s*",
r"answer\s*:\s*",
# "Choice/Option" variants
r"(?:the\s+)?(?:correct\s+)?(?:choice|option)\s+is\s*:?\s*",
r"(?:i\s+)?(?:choose|select|pick)\s*:?\s*",
# "It is/It's" variants
r"(?:it\s+is|it's)\s*:?\s*",
r"(?:that\s+would\s+be|that's)\s*:?\s*",
# "I think/believe" variants
r"(?:i\s+)?(?:think|believe|would\s+say)\s+(?:it\s+is|it's|the\s+answer\s+is)\s*:?\s*",
]
# Compile the prefix patterns
ANSWER_PREFIX_PATTERN = re.compile(
r"^(?:" + "|".join(ANSWER_PREFIXES) + r")", re.IGNORECASE
)
def extract_letter_from_answer_tag(
response: str,
valid_letters: Set[str],
debug: bool = False,
choices: Optional[List[str]] = None,
) -> Tuple[Optional[str], str]:
"""
Extract a single letter answer from tags.
Uses multiple strategies:
1. Exact choice text matching (if choices provided)
2. Word boundary matching for single letter
Handles many common variations:
- A -> A (single letter)
- A. -> A (letter with punctuation)
- (A) -> A (letter in parentheses)
- Final Answer: B -> B (common prefix stripped)
- The answer is C -> C (common prefix stripped)
- Tom Holland -> A (if "Tom Holland" is choice A)
- A) Tom Holland -> A (choice text with letter prefix)
- A - Tom Holland -> A (choice text with letter prefix)
Rejects ambiguous cases like:
- A is better than B -> None (multiple valid letters)
- Between A and C -> None (multiple valid letters)
Args:
response: The model's response (content after in thinking mode)
valid_letters: Set of valid answer letters (e.g., {'A', 'B', 'C', 'D'})
debug: Whether to print debug information
choices: Optional list of choice texts in order (A, B, C, D...)
Returns:
Tuple of (extracted_letter or None, extraction_method)
"""
# Find the first tag
answer_tag_match = ANSWER_TAG_PATTERN.search(response)
if not answer_tag_match:
return None, "no_answer_tag"
answer_content = answer_tag_match.group(1).strip()
if not answer_content:
return None, "empty_answer_tag"
# Try stripping common prefixes to isolate the answer
content_to_check = answer_content
prefix_match = ANSWER_PREFIX_PATTERN.match(content_to_check)
if prefix_match:
content_to_check = content_to_check[prefix_match.end() :].strip()
if debug:
print(f" Stripped prefix, remaining content: '{content_to_check}'")
# STRICT CHECK: If after stripping, we have just a letter (with optional punctuation)
# This is the ideal case: B or Final Answer: B
cleaned = content_to_check.strip(".,)(:;!? \t\n")
if cleaned.upper() in valid_letters:
if debug:
print(
f" Extracted '{cleaned.upper()}' using method 'answer_tag' (strict letter match)"
)
return cleaned.upper(), "answer_tag"
# CHOICE TEXT MATCHING: Check if answer matches a choice text
if choices:
letter_from_choice = _match_choice_text(
answer_content, choices, valid_letters, debug
)
if letter_from_choice:
return letter_from_choice, "answer_tag_choice_match"
# WORD BOUNDARY CHECK: Find all valid letters as standalone words
letters_pattern = "|".join(sorted(valid_letters))
word_bounded_pattern = re.compile(rf"\b({letters_pattern})\b", re.IGNORECASE)
# Find ALL valid letters in the (prefix-stripped) content
found_letters = word_bounded_pattern.findall(content_to_check.upper())
# Only accept if EXACTLY ONE valid letter is found
if len(found_letters) == 1:
letter = found_letters[0].upper()
if debug:
print(
f" Extracted '{letter}' using method 'answer_tag' (single word-bounded letter)"
)
return letter, "answer_tag"
elif len(found_letters) > 1:
# If multiple found after prefix strip, try the original content
found_in_original = word_bounded_pattern.findall(answer_content.upper())
if len(found_in_original) == 1:
letter = found_in_original[0].upper()
if debug:
print(
f" Extracted '{letter}' using method 'answer_tag' (from original content)"
)
return letter, "answer_tag"
if debug:
print(
f" Multiple letters found in answer tag: {found_letters} - rejecting"
)
return None, "answer_tag_ambiguous"
else:
# No letters found after prefix strip, try original content
found_in_original = word_bounded_pattern.findall(answer_content.upper())
if len(found_in_original) == 1:
letter = found_in_original[0].upper()
if debug:
print(
f" Extracted '{letter}' using method 'answer_tag' (from original content)"
)
return letter, "answer_tag"
if debug:
print(
f" No valid letters found in answer tag content: '{answer_content}'"
)
return None, "answer_tag_no_letter"
def _match_choice_text(
answer_content: str,
choices: List[str],
valid_letters: Set[str],
debug: bool = False,
) -> Optional[str]:
"""
Match answer content against choice texts.
Handles formats like:
- "Tom Holland" (exact choice text)
- "A) Tom Holland" (letter prefix + choice text)
- "A - Tom Holland" (letter prefix + choice text)
- "A. Tom Holland" (letter prefix + choice text)
Args:
answer_content: The content inside tags
choices: List of choice texts in order (index 0 = A, 1 = B, etc.)
valid_letters: Set of valid letters
debug: Whether to print debug info
Returns:
The letter if a match is found, None otherwise
"""
answer_lower = answer_content.lower().strip()
answer_normalized = re.sub(r"\s+", " ", answer_lower) # Normalize whitespace
for i, choice_text in enumerate(choices):
if i >= len(valid_letters):
break
letter = ascii_uppercase[i]
if letter not in valid_letters:
continue
choice_lower = choice_text.lower().strip()
choice_normalized = re.sub(r"\s+", " ", choice_lower)
# Check exact match with choice text
if answer_normalized == choice_normalized:
if debug:
print(
f" Extracted '{letter}' via exact choice text match: '{choice_text}'"
)
return letter
# Check if answer contains choice text (for longer answers)
if choice_normalized and choice_normalized in answer_normalized:
# Make sure it's a substantial match (not just a single word)
if len(choice_normalized) >= 3:
if debug:
print(
f" Extracted '{letter}' via choice text containment: '{choice_text}'"
)
return letter
# Check for "A) choice text", "A - choice text", "A. choice text" patterns
prefixed_patterns = [
rf"^{letter}\s*[\)\-\.:\]]\s*", # A), A-, A., A:, A]
rf"^\({letter}\)\s*", # (A)
]
for pattern in prefixed_patterns:
stripped = re.sub(pattern, "", answer_content, flags=re.IGNORECASE).strip()
stripped_normalized = re.sub(r"\s+", " ", stripped.lower())
if stripped_normalized == choice_normalized:
if debug:
print(
f" Extracted '{letter}' via prefixed choice text match: '{choice_text}'"
)
return letter
return None
def extract_number_from_answer_tag(
response: str, num_choices: int, debug: bool = False
) -> Tuple[Optional[int], str]:
"""
Extract a single number answer from tags.
Uses word boundary matching to find valid numbers. Only returns a match
if EXACTLY ONE valid number (1 to num_choices) is found.
Handles variations like:
- 2 -> 2
- 2. -> 2
- Choice 2 -> 2
- The answer is 3 -> 3
- Option 1 -> 1
Args:
response: The model's response (content after in thinking mode)
num_choices: Number of valid choices (e.g., 5 means valid range is 1-5)
debug: Whether to print debug information
Returns:
Tuple of (extracted_number or None, extraction_method)
"""
# Find the first tag
answer_tag_match = ANSWER_TAG_PATTERN.search(response)
if not answer_tag_match:
return None, "no_answer_tag"
answer_content = answer_tag_match.group(1).strip()
if not answer_content:
return None, "empty_answer_tag"
# Try stripping common prefixes
content_to_check = answer_content
prefix_match = ANSWER_PREFIX_PATTERN.match(content_to_check)
if prefix_match:
content_to_check = content_to_check[prefix_match.end() :].strip()
if debug:
print(f" Stripped prefix, remaining content: '{content_to_check}'")
# STRICT CHECK: If after stripping, we have just a number
cleaned = content_to_check.strip(".,)(:;!? \t\n")
try:
num = int(cleaned)
if 1 <= num <= num_choices:
if debug:
print(
f" Extracted '{num}' using method 'answer_tag' (strict match after prefix strip)"
)
return num, "answer_tag"
except ValueError:
pass
# WORD BOUNDARY CHECK: Find ALL word-bounded numbers
word_bounded_numbers = re.findall(r"\b(\d+)\b", content_to_check)
# Filter to valid range
valid_numbers = []
for num_str in word_bounded_numbers:
try:
num = int(num_str)
if 1 <= num <= num_choices:
valid_numbers.append(num)
except ValueError:
continue
# Only accept if EXACTLY ONE valid number is found
if len(valid_numbers) == 1:
number = valid_numbers[0]
if debug:
print(
f" Extracted '{number}' using method 'answer_tag' (single word-bounded number)"
)
return number, "answer_tag"
elif len(valid_numbers) > 1:
# Try original content
original_numbers = re.findall(r"\b(\d+)\b", answer_content)
valid_in_original = [
int(n) for n in original_numbers if 1 <= int(n) <= num_choices
]
if len(valid_in_original) == 1:
if debug:
print(
f" Extracted '{valid_in_original[0]}' using method 'answer_tag' (from original)"
)
return valid_in_original[0], "answer_tag"
if debug:
print(
f" Multiple valid numbers found in answer tag: {valid_numbers} - rejecting"
)
return None, "answer_tag_ambiguous"
else:
# Try original content
original_numbers = re.findall(r"\b(\d+)\b", answer_content)
valid_in_original = [
int(n) for n in original_numbers if 1 <= int(n) <= num_choices
]
if len(valid_in_original) == 1:
if debug:
print(
f" Extracted '{valid_in_original[0]}' using method 'answer_tag' (from original)"
)
return valid_in_original[0], "answer_tag"
if debug:
print(
f" No valid numbers found in answer tag content: '{answer_content}'"
)
return None, "answer_tag_no_number"
def extract_freeform_from_answer_tag(
response: str, debug: bool = False
) -> Tuple[Optional[str], str]:
"""
Extract freeform text answer from tags.
Simply returns the stripped content inside the tags.
Used for open-ended questions like DROP and SimpleQA.
Args:
response: The model's response (content after in thinking mode)
debug: Whether to print debug information
Returns:
Tuple of (extracted_text or None, extraction_method)
"""
# Find the first tag
answer_tag_match = ANSWER_TAG_PATTERN.search(response)
if not answer_tag_match:
return None, "no_answer_tag"
answer_content = answer_tag_match.group(1).strip()
if not answer_content:
return None, "empty_answer_tag"
if debug:
preview = (
answer_content[:50] + "..." if len(answer_content) > 50 else answer_content
)
print(f" Extracted '{preview}' using method 'answer_tag'")
return answer_content, "answer_tag"
def validate_thinking_format(
response: str, thinking_mode: bool = True
) -> Tuple[bool, str]:
"""
Validate thinking format and extract content after tags.
In thinking mode, we expect exactly one pair of tags.
Returns the content after for answer extraction.
Args:
response: The model's full response
thinking_mode: Whether thinking mode is enabled
Returns:
Tuple of (is_valid, content_for_extraction)
"""
if not thinking_mode:
return True, response
# Check for exactly one pair of think tags
think_open_count = len(THINK_OPEN_PATTERN.findall(response))
think_close_count = len(THINK_CLOSE_PATTERN.findall(response))
if think_open_count != 1 or think_close_count != 1:
return False, response
# Extract content after tags for answer extraction
match = THINK_CONTENT_AFTER_PATTERN.search(response)
if match:
return True, match.group(1).strip()
else:
return False, response
def extract_thinking_content(response: str) -> Optional[str]:
"""
Extract the content inside tags.
Args:
response: The model's full response
Returns:
Content inside think tags, or None if not found
"""
match = THINK_CONTENT_INSIDE_PATTERN.search(response)
if match:
return match.group(1).strip()
return None
def get_default_thinking_prompt(custom_prompt: Optional[str] = None) -> str:
"""
Get the thinking system prompt.
Args:
custom_prompt: Optional custom thinking prompt to use instead of default
Returns:
The thinking prompt string
"""
if custom_prompt:
return custom_prompt
return (
"You are a deep thinking AI, you may use extremely long chains of thought to deeply consider the "
"problem and deliberate with yourself via systematic reasoning processes to help come to a correct "
"solution prior to answering. You should enclose your thoughts and internal monologue inside "
" tags, and then provide your solution or response to the problem."
)
# Fallback regex patterns for MCQA when answer tags don't work
def build_mcqa_fallback_patterns(num_choices: int = 4):
"""
Build fallback regex patterns for extracting MCQA answers.
These are used when tags are not present or ambiguous.
Patterns are ordered by priority (lower number = higher priority).
Args:
num_choices: Number of valid choices (determines valid letters)
Returns:
List of (priority, pattern, method_name) tuples
"""
letters = ascii_uppercase[:num_choices]
letter_pattern = rf"([{letters}]|\([{letters}]\))"
patterns = [
# Priority 0: "final answer is: X" with "I hope"
(
0,
re.compile(
rf"(?i:final\s+answer\s+is)\s*:?\s*{letter_pattern}\.?\s*I\s*hope",
re.IGNORECASE,
),
"final_answer_hope",
),
# Priority 50: "final answer ... is X"
(
50,
re.compile(
rf"(?i:final\s+answer).{{0,100}}?\s+is\s*:?\s*{letter_pattern}",
re.IGNORECASE | re.DOTALL,
),
"final_answer_is",
),
# Priority 75: "the answer is X"
(
75,
re.compile(
rf"(?i:the\s+answer\s+is)\s*:?\s*{letter_pattern}", re.IGNORECASE
),
"the_answer_is",
),
# Priority 100: "answer: X"
(
100,
re.compile(
rf"(?i:answer)\s*:\s*.{{0,50}}?{letter_pattern}",
re.IGNORECASE | re.DOTALL,
),
"answer_colon",
),
# Priority 150: "answer X"
(
150,
re.compile(rf"(?i:answer)\s+{letter_pattern}", re.IGNORECASE),
"answer_space",
),
# Priority 200: Response starts with letter
(
200,
re.compile(rf"^\s*\**{letter_pattern}\**[\s\.\)\:]", re.IGNORECASE),
"start",
),
# Priority 210: Letter at start of any line
(
210,
re.compile(rf"\n\s*\**{letter_pattern}\**[\s\.\)\:]", re.IGNORECASE),
"line_start",
),
# Priority 250: Standalone letter with word boundaries
(250, re.compile(rf"\b{letter_pattern}\b", re.IGNORECASE), "standalone"),
]
return patterns
def extract_mcqa_answer_with_fallback(
response: str,
num_choices: int = 4,
fallback_patterns: list = None,
debug: bool = False,
) -> Tuple[Optional[str], str]:
"""
Extract MCQA answer using answer tags first, then fallback patterns.
Args:
response: The model's response (content after in thinking mode)
num_choices: Number of valid choices
fallback_patterns: Pre-built fallback patterns (optional, will be built if not provided)
debug: Whether to print debug information
Returns:
Tuple of (extracted_letter or None, extraction_method)
"""
if not response:
return None, "empty_response"
valid_letters = set(ascii_uppercase[:num_choices])
# PRIMARY: Try tags first
letter, method = extract_letter_from_answer_tag(response, valid_letters, debug)
if letter:
return letter, method
# FALLBACK: Use regex patterns
if fallback_patterns is None:
fallback_patterns = build_mcqa_fallback_patterns(num_choices)
for priority, pattern, method_name in fallback_patterns:
matches = pattern.findall(response)
if matches:
# Get the last match for answer patterns (final answer is most reliable)
match = (
matches[-1]
if method_name
in ["final_answer_is", "the_answer_is", "answer_colon", "answer_space"]
else matches[0]
)
if isinstance(match, tuple):
match = match[0]
letter = match.strip("()").upper()
if letter in valid_letters:
if debug:
print(
f" Extracted '{letter}' using fallback method '{method_name}' (priority {priority})"
)
return letter, f"fallback_{method_name}"
# Last resort: find any valid letter (take the last one)
for letter in reversed(list(valid_letters)):
if letter in response.upper():
if debug:
print(f" Extracted '{letter}' using fallback 'last_valid_letter'")
return letter, "fallback_last_valid_letter"
return None, "no_match"
# =============================================================================
# MATH ANSWER VERIFICATION HELPERS
# =============================================================================
# These functions use the math_verify library for robust mathematical answer
# verification. They support \boxed{} extraction, symbolic comparison, and
# string normalization fallback.
# Regex for extracting \boxed{} content
BOXED_PATTERN = re.compile(r"\\boxed\{([^{}]*(?:\{[^{}]*\}[^{}]*)*)\}")
# Global ProcessPoolExecutor for math verification (avoids timeouts)
_math_executor: Optional[ProcessPoolExecutor] = None
def get_math_executor(max_workers: int = 64) -> ProcessPoolExecutor:
"""
Get or create the global ProcessPoolExecutor for math verification.
Using a process pool protects against hangs from sympy/latex parsing.
Args:
max_workers: Maximum number of worker processes
Returns:
ProcessPoolExecutor instance
"""
global _math_executor
if _math_executor is None:
_math_executor = ProcessPoolExecutor(max_workers=max_workers)
return _math_executor
def extract_boxed_answers(text: str) -> List[str]:
"""
Extract all \\boxed{} answers from text.
Args:
text: The text to search for boxed answers
Returns:
List of extracted boxed contents
"""
return BOXED_PATTERN.findall(text)
def extract_first_boxed_answer(
response: str, after_think: bool = True, debug: bool = False
) -> Tuple[Optional[str], str, bool]:
"""
Extract the first \\boxed{} answer from a response.
Follows the rule: only accept if there's exactly ONE boxed answer
after the tag (if thinking mode). Multiple boxed answers = failure.
Args:
response: The model's full response
after_think: Whether to only look after tags
debug: Whether to print debug information
Returns:
Tuple of (extracted_answer or None, extraction_method, has_multiple_boxed)
"""
# Get content to search
if after_think:
# Extract content after
match = THINK_CONTENT_AFTER_PATTERN.search(response)
if match:
search_content = match.group(1)
else:
# No think tags, use full response
search_content = response
else:
search_content = response
# Find all boxed answers
boxed_answers = extract_boxed_answers(search_content)
if len(boxed_answers) == 0:
if debug:
print(" No \\boxed{} found in response")
return None, "no_boxed", False
if len(boxed_answers) > 1:
if debug:
print(f" Multiple \\boxed{{}} found ({len(boxed_answers)}) - rejecting")
return None, "multiple_boxed", True
# Exactly one boxed answer
answer = boxed_answers[0].strip()
if debug:
preview = answer[:50] + "..." if len(answer) > 50 else answer
print(f" Extracted '{preview}' from \\boxed{{}}")
return answer, "boxed", False
def math_normalize_string(text: str) -> str:
"""
Normalize a math answer string for comparison.
This is a fallback when symbolic verification fails.
Based on lighteval's math_normalizer.
Args:
text: The text to normalize
Returns:
Normalized string
"""
if not text:
return ""
# Remove outer whitespace
text = text.strip()
# Remove \boxed{} wrapper if present
boxed_match = BOXED_PATTERN.search(text)
if boxed_match:
text = boxed_match.group(1)
# Normalize whitespace
text = " ".join(text.split())
# Remove common LaTeX commands that don't affect value
text = re.sub(r"\\[,;:!]", "", text) # \, \; etc.
text = re.sub(r"\\quad|\\qquad", " ", text)
text = re.sub(r"\\text\{([^}]*)\}", r"\1", text) # \text{...} -> ...
text = re.sub(r"\\mathrm\{([^}]*)\}", r"\1", text)
text = re.sub(r"\\mathbf\{([^}]*)\}", r"\1", text)
# Normalize math operators
text = text.replace("\\times", "*")
text = text.replace("\\cdot", "*")
text = text.replace("\\div", "/")
text = text.replace("\\pm", "+-")
# Remove $ signs
text = text.replace("$", "")
# Remove trailing punctuation
text = text.rstrip(".,;:")
# Lowercase for comparison
text = text.lower()
return text.strip()
def compare_math_strings(pred: str, gold: str) -> bool:
"""
Compare two math answer strings after normalization.
This is a fallback when symbolic verification fails.
Args:
pred: Predicted answer
gold: Gold answer
Returns:
True if answers match, False otherwise
"""
pred_norm = math_normalize_string(pred)
gold_norm = math_normalize_string(gold)
if not pred_norm:
return False
# Exact match
if pred_norm == gold_norm:
return True
# Try numeric comparison
try:
# Handle commas in numbers
pred_clean = pred_norm.replace(",", "").replace(" ", "")
gold_clean = gold_norm.replace(",", "").replace(" ", "")
pred_num = float(pred_clean)
gold_num = float(gold_clean)
# Exact numeric match
if pred_num == gold_num:
return True
# Small relative error (for floating point)
if gold_num != 0 and abs(pred_num - gold_num) / abs(gold_num) < 1e-6:
return True
except (ValueError, TypeError):
pass
# Try integer comparison (for AIME-style 0-999)
try:
pred_int = int(float(pred_norm.replace(",", "")))
gold_int = int(float(gold_norm.replace(",", "")))
if pred_int == gold_int:
return True
except (ValueError, TypeError):
pass
return False
def _score_math_answer_worker(
gold: str, response: str, wrap_gold_boxed: bool = True
) -> Tuple[Optional[bool], str]:
"""
Worker function for scoring math answers (runs in separate process).
This function is designed to run in a ProcessPoolExecutor to protect
against hangs from sympy/latex parsing.
Args:
gold: The gold answer
response: The model's response (content to extract answer from)
wrap_gold_boxed: Whether to wrap gold in \\boxed{} if not present
Returns:
Tuple of (is_correct or None, method_used)
"""
if not MATH_VERIFY_AVAILABLE:
# Fallback to string comparison
boxed = extract_boxed_answers(response)
if boxed:
return compare_math_strings(boxed[0], gold), "string_fallback_no_lib"
return None, "no_math_verify"
try:
# Prepare gold answer
if wrap_gold_boxed and "\\boxed" not in gold:
gold_text = f"\\boxed{{{gold}}}"
else:
gold_text = gold
# Parse gold
gold_parsed = parse(
gold_text,
extraction_mode="first_match",
extraction_config=[LatexExtractionConfig()],
)
if len(gold_parsed) == 0:
# Gold couldn't be parsed, try string comparison
boxed = extract_boxed_answers(response)
if boxed:
return (
compare_math_strings(boxed[0], gold),
"string_fallback_gold_parse",
)
return None, "gold_parse_failed"
# Parse response
response_parsed = parse(
response,
extraction_config=[
LatexExtractionConfig(
normalization_config=NormalizationConfig(
nits=False,
malformed_operators=False,
basic_latex=True,
equations=True,
boxed="all",
units=True,
),
boxed_match_priority=0,
try_extract_without_anchor=False,
)
],
extraction_mode="first_match",
)
if len(response_parsed) == 0:
# Response couldn't be parsed, try string comparison
boxed = extract_boxed_answers(response)
if boxed:
return (
compare_math_strings(boxed[0], gold),
"string_fallback_response_parse",
)
return None, "response_parse_failed"
# Verify match
is_correct = verify(response_parsed, gold_parsed)
return is_correct, "math_verify"
except TimeoutException:
# Timeout during parsing/verification, try string comparison
boxed = extract_boxed_answers(response)
if boxed:
return compare_math_strings(boxed[0], gold), "string_fallback_timeout"
return None, "timeout"
except Exception as e:
# Any other error, try string comparison
boxed = extract_boxed_answers(response)
if boxed:
return compare_math_strings(boxed[0], gold), "string_fallback_error"
return None, f"error_{type(e).__name__}"
def score_math_answer(
gold: str,
response: str,
after_think: bool = True,
wrap_gold_boxed: bool = True,
executor: Optional[ProcessPoolExecutor] = None,
debug: bool = False,
) -> Tuple[Optional[bool], str, bool]:
"""
Score a math answer using math_verify with process isolation.
This is the main function for scoring math answers. It:
1. Extracts content after if thinking mode
2. Checks for multiple \\boxed{} (fails if multiple)
3. Uses math_verify for symbolic comparison
4. Falls back to string normalization if that fails
Args:
gold: The gold answer
response: The model's full response
after_think: Whether to extract content after
wrap_gold_boxed: Whether to wrap gold in \\boxed{} if not present
executor: Optional ProcessPoolExecutor to use
debug: Whether to print debug information
Returns:
Tuple of (is_correct or None, method_used, has_multiple_boxed)
"""
# Get content to score
if after_think:
match = THINK_CONTENT_AFTER_PATTERN.search(response)
if match:
score_content = match.group(1)
else:
score_content = response
else:
score_content = response
# Check for multiple boxed answers
boxed_answers = extract_boxed_answers(score_content)
if len(boxed_answers) > 1:
if debug:
print(f" Multiple \\boxed{{}} found ({len(boxed_answers)}) - rejecting")
return None, "multiple_boxed", True
if len(boxed_answers) == 0:
if debug:
print(" No \\boxed{} found")
return None, "no_boxed", False
# Use executor if provided, otherwise run directly
if executor is not None:
import asyncio
try:
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
executor,
_score_math_answer_worker,
gold,
score_content,
wrap_gold_boxed,
)
# Note: This needs to be awaited in async context
# For sync usage, call _score_math_answer_worker directly
is_correct, method = future.result(timeout=30)
except Exception as e:
if debug:
print(f" Executor error: {e}")
# Fallback to string comparison
if boxed_answers:
is_correct = compare_math_strings(boxed_answers[0], gold)
method = "string_fallback_executor_error"
else:
return None, f"executor_error_{type(e).__name__}", False
else:
is_correct, method = _score_math_answer_worker(
gold, score_content, wrap_gold_boxed
)
if debug:
print(f" Score: {is_correct} (method: {method})")
return is_correct, method, False
async def score_math_answer_async(
gold: str,
response: str,
after_think: bool = True,
wrap_gold_boxed: bool = True,
executor: Optional[ProcessPoolExecutor] = None,
debug: bool = False,
) -> Tuple[Optional[bool], str, bool]:
"""
Async version of score_math_answer for use in async evaluation loops.
Uses ProcessPoolExecutor to run math verification in separate process,
protecting against hangs.
Args:
gold: The gold answer
response: The model's full response
after_think: Whether to extract content after
wrap_gold_boxed: Whether to wrap gold in \\boxed{} if not present
executor: Optional ProcessPoolExecutor to use
debug: Whether to print debug information
Returns:
Tuple of (is_correct or None, method_used, has_multiple_boxed)
"""
import asyncio
# Get content to score
if after_think:
match = THINK_CONTENT_AFTER_PATTERN.search(response)
if match:
score_content = match.group(1)
else:
score_content = response
else:
score_content = response
# Check for multiple boxed answers
boxed_answers = extract_boxed_answers(score_content)
if len(boxed_answers) > 1:
if debug:
print(f" Multiple \\boxed{{}} found ({len(boxed_answers)}) - rejecting")
return None, "multiple_boxed", True
if len(boxed_answers) == 0:
if debug:
print(" No \\boxed{} found")
return None, "no_boxed", False
# Get executor
if executor is None:
executor = get_math_executor()
try:
loop = asyncio.get_event_loop()
is_correct, method = await loop.run_in_executor(
executor, _score_math_answer_worker, gold, score_content, wrap_gold_boxed
)
except Exception as e:
if debug:
print(f" Executor error: {e}")
# Fallback to string comparison
if boxed_answers:
is_correct = compare_math_strings(boxed_answers[0], gold)
method = "string_fallback_executor_error"
else:
return None, f"executor_error_{type(e).__name__}", False
if debug:
print(f" Score: {is_correct} (method: {method})")
return is_correct, method, False
def format_math_answer_instruction(include_hope: bool = True) -> str:
"""
Get the standard instruction for math answer format.
Based on lighteval's AIME prompt which works well with math_verify.
Args:
include_hope: Whether to include "I hope it is correct" suffix
Returns:
Instruction string
"""
if include_hope:
return (
"The last line of your response should be of the following format: "
"'Therefore, the final answer is: $\\boxed{ANSWER}$. I hope it is correct' "
"(without quotes) where ANSWER is just the final number or expression that solves the problem."
)
else:
return (
"Put your final answer in \\boxed{} format. "
"For example: \\boxed{42} or \\boxed{\\frac{1}{2}}"
)
# =============================================================================
# SYSTEM PROMPT AND CONFIGURATION HELPERS
# =============================================================================
# These functions handle common system prompt creation patterns used across
# evaluation environments with thinking mode support.
def create_system_content(
thinking_mode: bool,
custom_thinking_prompt: Optional[str] = None,
custom_system_prompt: Optional[str] = None,
) -> Optional[str]:
"""
Create system message content based on thinking mode configuration.
This is the standard pattern used across all eval environments:
- In thinking mode: thinking_prompt + optional system_prompt
- In non-thinking mode: just the system_prompt (or None)
Args:
thinking_mode: Whether thinking mode is enabled
custom_thinking_prompt: Optional custom thinking prompt (uses default if None)
custom_system_prompt: Optional additional system prompt
Returns:
System content string, or None if no content needed
"""
if thinking_mode:
thinking_prompt = get_default_thinking_prompt(custom_thinking_prompt)
if custom_system_prompt:
return f"{thinking_prompt}\n\n{custom_system_prompt}"
return thinking_prompt
return custom_system_prompt
# =============================================================================
# RESULTS SAVING UTILITIES
# =============================================================================
# Common patterns for saving evaluation results to disk.
def save_eval_results(
save_dir: str,
metrics: Dict,
results: List[Dict],
metrics_filename: str = "metrics.json",
results_filename: str = "results.jsonl",
print_confirmation: bool = True,
) -> Tuple[str, str]:
"""
Save evaluation results to disk in standard format.
Creates two files:
- metrics.json: Summary metrics dict
- results.jsonl: Per-item results (one JSON object per line)
Args:
save_dir: Directory to save results to (created if doesn't exist)
metrics: Dictionary of evaluation metrics
results: List of per-item result dictionaries
metrics_filename: Name for metrics file
results_filename: Name for results file
print_confirmation: Whether to print confirmation message
Returns:
Tuple of (metrics_path, results_path)
"""
os.makedirs(save_dir, exist_ok=True)
# Save metrics
metrics_path = os.path.join(save_dir, metrics_filename)
with open(metrics_path, "w") as f:
json.dump(metrics, f, indent=2, default=str)
# Save detailed results
results_path = os.path.join(save_dir, results_filename)
with open(results_path, "w") as f:
for r in results:
f.write(json.dumps(r, default=str) + "\n")
if print_confirmation:
print(f"Results saved to {save_dir}")
return metrics_path, results_path
def load_eval_results(
save_dir: str,
metrics_filename: str = "metrics.json",
results_filename: str = "results.jsonl",
) -> Tuple[Dict, List[Dict]]:
"""
Load evaluation results from disk.
Args:
save_dir: Directory containing results
metrics_filename: Name of metrics file
results_filename: Name of results file
Returns:
Tuple of (metrics dict, list of result dicts)
"""
# Load metrics
metrics_path = os.path.join(save_dir, metrics_filename)
with open(metrics_path, "r") as f:
metrics = json.load(f)
# Load detailed results
results_path = os.path.join(save_dir, results_filename)
results = []
with open(results_path, "r") as f:
for line in f:
if line.strip():
results.append(json.loads(line))
return metrics, results
# =============================================================================
# COMMON EVALUATION UTILITIES
# =============================================================================
# Helper functions used across multiple eval environments.
def calculate_accuracy(
results: List[Dict],
score_key: str = "is_correct",
filter_fn: Optional[callable] = None,
) -> float:
"""
Calculate accuracy from a list of result dictionaries.
Args:
results: List of result dictionaries
score_key: Key to look up score/correctness (should be bool or 0/1)
filter_fn: Optional function to filter results before calculation
Returns:
Accuracy as float between 0 and 1
"""
if filter_fn:
results = [r for r in results if filter_fn(r)]
if not results:
return 0.0
correct = sum(1 for r in results if r.get(score_key, False))
return correct / len(results)
def group_results_by_key(results: List[Dict], key: str) -> Dict[str, List[Dict]]:
"""
Group results by a specific key value.
Useful for computing per-category or per-subset metrics.
Args:
results: List of result dictionaries
key: Key to group by
Returns:
Dictionary mapping key values to lists of results
"""
grouped = {}
for r in results:
value = r.get(key, "unknown")
if value not in grouped:
grouped[value] = []
grouped[value].append(r)
return grouped
def format_percentage(value: float, decimals: int = 2) -> str:
"""
Format a float as a percentage string.
Args:
value: Float value (0-1 scale)
decimals: Number of decimal places
Returns:
Formatted percentage string (e.g., "75.50%")
"""
return f"{value * 100:.{decimals}f}%"
def print_eval_summary(title: str, metrics: Dict[str, float], width: int = 60) -> None:
"""
Print a formatted evaluation summary.
Args:
title: Summary title
metrics: Dictionary of metric name -> value
width: Width of separator lines
"""
print(f"\n{'='*width}")
print(title)
print(f"{'='*width}")
for name, value in metrics.items():
if isinstance(value, float) and 0 <= value <= 1:
print(f" {name}: {format_percentage(value)}")
else:
print(f" {name}: {value}")
print(f"{'='*width}\n")