Eval N completions per prompt (#374)

* feat: Add support for generating multiple completions per prompt
* feat: Track best and mean scores for multiple completions per prompt
* feat: Add checkpoint and resume functionality to evaluation script
This commit is contained in:
Andreas Köpf 2025-03-15 16:39:36 +01:00 committed by GitHub
parent 1d410cc600
commit 424ee6751a
12 changed files with 426 additions and 126 deletions

View file

@ -12,11 +12,13 @@ Options:
--output-dir DIR Override output directory specified in config
--category CATEGORY Evaluate only datasets from this category
--max-concurrent NUM Maximum number of concurrent API calls
--n NUM Number of completions to generate per prompt (default: 1, each completion is a separate API call)
--base-url URL API base URL (default: https://openrouter.ai/api/v1)
--save-metadata Save entry metadata in results
--full-results Save the full results file
--verbose Print detailed model responses
--debug Enable debug logging
--resume DIR Resume evaluation from the specified directory
Environment variables:
OPENROUTER_API_KEY Required API key for OpenRouter
@ -31,7 +33,7 @@ import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, Optional
from eval_config import CategoryConfig, DatasetConfig, EvalConfig
from openai import AsyncOpenAI
@ -40,6 +42,104 @@ from tqdm.asyncio import tqdm_asyncio
import reasoning_gym
from reasoning_gym.utils import extract_answer
class CheckpointManager:
"""Manages checkpoints for resumable evaluation."""
def __init__(self, output_dir: Path):
"""Initialize the checkpoint manager.
Args:
output_dir: Directory where checkpoints and results are stored
"""
self.output_dir = output_dir
self.checkpoint_path = output_dir / "checkpoint.json"
self.completed_datasets = set()
self.previous_category_results = {} # Store previously completed category results
self.load_checkpoint()
def load_checkpoint(self) -> None:
"""Load existing checkpoint and previous results if available."""
# Load checkpoint file
if self.checkpoint_path.exists():
with open(self.checkpoint_path, "r") as f:
checkpoint_data = json.load(f)
self.completed_datasets = set(checkpoint_data.get("completed_datasets", []))
# Load previous category results
for category_dir in self.output_dir.iterdir():
if category_dir.is_dir():
category_name = category_dir.name
self.previous_category_results[category_name] = []
# Load each dataset result file in this category
for dataset_file in category_dir.glob("*.json"):
try:
with open(dataset_file, "r") as f:
dataset_result = json.load(f)
self.previous_category_results[category_name].append(dataset_result)
except Exception as e:
logging.warning(f"Error loading previous result {dataset_file}: {str(e)}")
def is_dataset_completed(self, category: str, dataset: str) -> bool:
"""Check if a dataset has been completed.
Args:
category: Category name
dataset: Dataset name
Returns:
True if the dataset has been completed, False otherwise
"""
return f"{category}/{dataset}" in self.completed_datasets
def mark_dataset_completed(self, category: str, dataset: str) -> None:
"""Mark a dataset as completed and update checkpoint file.
Args:
category: Category name
dataset: Dataset name
"""
self.completed_datasets.add(f"{category}/{dataset}")
self._save_checkpoint()
def get_dataset_result(self, category: str, dataset: str) -> Optional[dict[str, Any]]:
"""Get previously completed dataset result if available.
Args:
category: Category name
dataset: Dataset name
Returns:
Dataset result dict if found, None otherwise
"""
# Try to find the dataset in previously loaded results first
if category in self.previous_category_results:
for dataset_result in self.previous_category_results[category]:
if dataset_result["name"] == dataset:
return dataset_result
# If not found in memory, try to load from file
dataset_path = self.output_dir / category / f"{dataset}.json"
if dataset_path.exists():
try:
with open(dataset_path, "r") as f:
return json.load(f)
except Exception as e:
logging.error(f"Error loading dataset result from {dataset_path}: {str(e)}")
return None
def _save_checkpoint(self) -> None:
"""Save checkpoint to disk."""
checkpoint_data = {
"completed_datasets": list(self.completed_datasets),
"last_updated": datetime.now().isoformat(),
}
with open(self.checkpoint_path, "w") as f:
json.dump(checkpoint_data, f, indent=2)
# Configure logging
logging.basicConfig(
level=logging.INFO,
@ -105,8 +205,87 @@ class AsyncModelEvaluator:
self.git_hash = get_git_hash()
self.start_time = datetime.now()
async def get_model_response(self, prompt: str) -> str:
"""Get response from model with retry logic via OpenRouter.
# Checkpoint and resume related attributes
self.resume_dir = None
self.output_dir = None
self.checkpoint_manager = None
def create_output_dir(self) -> Path:
"""Create output directory or use existing one for resuming.
Returns:
Path to the output directory
"""
# Check if we're resuming from a previous run
if self.resume_dir:
output_dir = Path(self.resume_dir)
if not output_dir.exists():
raise ValueError(f"Resume directory {output_dir} does not exist")
self.logger.info(f"Resuming evaluation from {output_dir}")
return output_dir
# Create new output directory
timestamp = self.start_time.strftime("%Y%m%d_%H%M%S")
model_name = self.config.model.replace("/", "_")
if len(self.config.categories) == 1:
# Include category name in the output directory when evaluating a single category
category_name = self.config.categories[0].category
output_dir = Path(self.config.output_dir) / f"{model_name}_{category_name}_{timestamp}"
else:
# Original format for multiple categories
output_dir = Path(self.config.output_dir) / f"{model_name}_{timestamp}"
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def _save_dataset_results(self, category_name: str, dataset_name: str, results: dict[str, Any]) -> None:
"""Save individual dataset results to file.
Args:
category_name: Category name
dataset_name: Dataset name
results: Dataset evaluation results
"""
category_dir = self.output_dir / category_name
category_dir.mkdir(exist_ok=True)
dataset_path = category_dir / f"{dataset_name}.json"
with open(dataset_path, "w") as f:
json.dump(results, f, indent=2)
def _update_partial_summary(self, category_results: list[dict[str, Any]]) -> None:
"""Update partial summary after each category completes.
Args:
category_results: List of category results completed so far
"""
partial_results = {
"metadata": {
"timestamp": self.start_time.isoformat(),
"model": self.config.model,
"provider": self.config.provider,
"git_hash": self.git_hash,
"duration_seconds": (datetime.now() - self.start_time).total_seconds(),
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"top_p": self.config.top_p,
"partial": True,
},
"categories": category_results,
}
# Generate partial summary
partial_results["summary"] = self.generate_summary(partial_results)
# Save partial summary
summary_path = self.output_dir / "summary.json"
with open(summary_path, "w") as f:
json.dump(partial_results["summary"], f, indent=2)
async def get_single_response(self, prompt: str) -> str:
"""Get a single response from model with retry logic via OpenRouter.
Args:
prompt: The prompt to send to the model
@ -150,7 +329,6 @@ class AsyncModelEvaluator:
response = completion.choices[0].message.content
if self.verbose:
self.logger.info(f"Prompt: {prompt}")
self.logger.info(f"Response: {response}")
return response
@ -163,49 +341,140 @@ class AsyncModelEvaluator:
raise Exception(f"Failed to get model response after {max_retries} attempts")
async def get_model_response(self, prompt: str) -> list[str]:
"""Get multiple responses from model by making multiple API calls.
Args:
prompt: The prompt to send to the model
Returns:
A list of model response texts
Raises:
Exception: If all attempts fail
"""
if self.verbose:
self.logger.info(f"Prompt: {prompt}")
self.logger.info(f"Generating {self.config.completions_per_prompt} completions...")
# Create tasks for multiple completions
tasks = []
for i in range(self.config.completions_per_prompt):
tasks.append(self.get_single_response(prompt))
# Execute all tasks concurrently
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions
valid_responses = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
self.logger.error(f"Completion {i+1} failed: {str(response)}")
else:
valid_responses.append(response)
if self.verbose:
self.logger.info(f"Response {len(valid_responses)}: {response}")
if not valid_responses:
raise Exception("All completion attempts failed")
return valid_responses
async def process_entry(
self, dataset: reasoning_gym.dataset.ProceduralDataset, entry: dict[str, Any]
self,
dataset: reasoning_gym.dataset.ProceduralDataset,
entry: dict[str, Any],
entry_index: int,
dataset_name: str,
) -> dict[str, Any]:
"""Process a single dataset entry.
Args:
dataset: The dataset instance
entry: The entry to process
entry_index: Index of the entry in the dataset
dataset_name: Name of the dataset
Returns:
Dict with processing results
"""
response = None
responses = None
try:
# Get model response first
response = await self.get_model_response(entry["question"])
# Get multiple model responses
responses = await self.get_model_response(entry["question"])
# Try to extract answer and score it
try:
model_answer = extract_answer(response)
except Exception as extract_error:
self.logger.error(f"Error extracting answer: {str(extract_error)}")
raise Exception(f"Answer extraction error: {str(extract_error)}")
# Process each response
completion_results = []
best_score = 0.0
total_score = 0.0
best_answer = None
best_response = None
try:
score = dataset.score_answer(answer=model_answer, entry=entry)
except Exception as score_error:
self.logger.error(f"Error scoring answer: {str(score_error)}")
raise Exception(f"Answer scoring error: {str(score_error)}")
# Count total completions for mean score calculation
total_completions = len(responses)
if self.verbose:
print(f"Question: {entry['question']}")
print(f"Expected: {entry['answer']}")
print(f"Answer: {model_answer}")
print(f"Score: {score}")
print("-" * 40)
for i, response in enumerate(responses):
try:
# Try to extract answer and score it
model_answer = extract_answer(response)
score = dataset.score_answer(answer=model_answer, entry=entry)
completion_result = {
"model_answer": model_answer,
"full_model_response": response,
"score": score,
}
# Track scores
if score > best_score:
best_score = score
best_answer = model_answer
best_response = response
total_score += score
completion_results.append(completion_result)
if self.verbose:
print(f"Question: {entry['question']}")
print(f"Expected: {entry['answer']}")
print(f"Completion {i+1} Answer: {model_answer}")
print(f"Completion {i+1} Score: {score}")
print("-" * 40)
except Exception as e:
self.logger.error(f"Error processing completion {i+1}: {str(e)}")
# Add failed completion with score 0.0 (already counted in total_completions)
completion_results.append(
{
"model_answer": "ERROR",
"full_model_response": response,
"score": 0.0,
"error": str(e),
}
)
# If we have no valid completions, log a warning instead of raising an exception
if not best_answer:
self.logger.warning(
f"Failed to extract a valid answer from model responses for dataset '{dataset_name}', entry index {entry_index}"
)
# Use None instead of empty string as the best answer
best_answer = None
best_response = responses[0] if responses and len(responses) > 0 else None
best_score = 0.0
# Calculate mean score - count all completions including failures
mean_score = total_score / total_completions if total_completions > 0 else 0.0
result = {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"model_answer": model_answer,
"full_model_response": response,
"score": score,
"best_model_answer": best_answer,
"best_full_model_response": best_response,
"best_score": best_score,
"mean_score": mean_score,
"completions": completion_results,
}
# Only include metadata if configured to do so
@ -219,10 +488,18 @@ class AsyncModelEvaluator:
result = {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"model_answer": "ERROR",
"full_model_response": response if response is not None else f"Error: {str(e)}",
"score": 0.0,
"best_model_answer": None,
# First check if we already have a best_response from partial processing
# If not, then fall back to the first response or None
"best_full_model_response": (
best_response
if best_response is not None
else (responses[0] if responses and len(responses) > 0 else None)
),
"best_score": best_score if best_score > 0 else 0.0,
"mean_score": total_score / total_completions if total_completions > 0 else 0.0,
"error": str(e),
"completions": completion_results if "completion_results" in locals() else [],
}
# Only include metadata if configured to do so
@ -242,6 +519,20 @@ class AsyncModelEvaluator:
Dict with evaluation results
"""
dataset_name = dataset_config.dataset
# Check if this dataset has already been completed
if self.checkpoint_manager.is_dataset_completed(category_name, dataset_name):
# Get the dataset result from checkpoint manager
dataset_result = self.checkpoint_manager.get_dataset_result(category_name, dataset_name)
if dataset_result:
self.logger.info(f"Skipping already completed dataset: {dataset_name}")
return dataset_result
# If we can't load the result, we'll need to re-evaluate the dataset
self.logger.info(f"Re-evaluating dataset: {dataset_name}")
# Remove from completed datasets so it will be processed
self.checkpoint_manager.completed_datasets.discard(f"{category_name}/{dataset_name}")
self.logger.info(f"Evaluating dataset: {dataset_name}")
try:
@ -268,30 +559,41 @@ class AsyncModelEvaluator:
# Get all entries
all_entries = list(dataset)
# Process entries with progress bar
tasks = [self.process_entry(dataset, entry) for entry in all_entries]
# Process entries with progress bar, passing the entry index and dataset name
tasks = [self.process_entry(dataset, entry, idx, dataset_name) for idx, entry in enumerate(all_entries)]
results = await tqdm_asyncio.gather(*tasks, desc=f"Processing {dataset_name}", leave=True)
# Calculate metrics
total_score = sum(r["score"] for r in results)
average_score = total_score / len(results) if results else 0
total_best_score = sum(r["best_score"] for r in results)
total_mean_score = sum(r["mean_score"] for r in results)
average_best_score = total_best_score / len(results) if results else 0
average_mean_score = total_mean_score / len(results) if results else 0
return {
dataset_results = {
"name": dataset_name,
"category": category_name,
"average_score": average_score,
"average_best_score": average_best_score,
"average_mean_score": average_mean_score,
"total_examples": len(results),
"config": {"size": dataset_config.size, "seed": dataset_config.seed, **dataset_config.params},
"system_prompt": self.config.get_system_prompt(),
"completions_per_prompt": self.config.completions_per_prompt,
"results": results,
}
# Mark dataset as completed and save results
self.checkpoint_manager.mark_dataset_completed(category_name, dataset_name)
self._save_dataset_results(category_name, dataset_name, dataset_results)
return dataset_results
except Exception as e:
self.logger.error(f"Error evaluating dataset {dataset_name}: {str(e)}")
return {
"name": dataset_name,
"category": category_name,
"average_score": 0.0,
"average_best_score": 0.0,
"average_mean_score": 0.0,
"total_examples": 0,
"config": {"size": dataset_config.size, "seed": dataset_config.seed, **dataset_config.params},
"system_prompt": self.config.get_system_prompt(),
@ -311,9 +613,26 @@ class AsyncModelEvaluator:
category_name = category_config.category
self.logger.info(f"Evaluating category: {category_name}")
tasks = [self.evaluate_dataset(category_name, dataset_config) for dataset_config in category_config.datasets]
# Check if all datasets in this category are already completed
all_completed = True
for dataset_config in category_config.datasets:
if not self.checkpoint_manager.is_dataset_completed(category_name, dataset_config.dataset):
all_completed = False
break
dataset_results = await asyncio.gather(*tasks)
# If all datasets are completed and we have previous results, use them
if all_completed and category_name in self.checkpoint_manager.previous_category_results:
self.logger.info(f"Using previously completed results for category: {category_name}")
return {
"name": category_name,
"datasets": self.checkpoint_manager.previous_category_results[category_name],
}
# Process datasets sequentially to ensure proper checkpointing
dataset_results = []
for dataset_config in category_config.datasets:
result = await self.evaluate_dataset(category_name, dataset_config)
dataset_results.append(result)
return {
"name": category_name,
@ -321,15 +640,25 @@ class AsyncModelEvaluator:
}
async def evaluate_all(self) -> dict[str, Any]:
"""Evaluate all categories and datasets.
"""Evaluate all categories and datasets, resuming from checkpoint if available.
Returns:
Dict with all evaluation results and summary
"""
self.logger.info(f"Starting evaluation of {len(self.config.categories)} categories")
tasks = [self.evaluate_category(category) for category in self.config.categories]
category_results = await asyncio.gather(*tasks)
# Initialize output directory and checkpoint manager
self.output_dir = self.create_output_dir()
self.checkpoint_manager = CheckpointManager(self.output_dir)
# Process each category sequentially to ensure proper checkpointing
category_results = []
for category in self.config.categories:
category_result = await self.evaluate_category(category)
category_results.append(category_result)
# Update partial summary after each category
self._update_partial_summary(category_results)
# Generate results structure
results = {
@ -342,6 +671,7 @@ class AsyncModelEvaluator:
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"top_p": self.config.top_p,
"partial": False, # Mark as complete
},
"categories": category_results,
}
@ -363,7 +693,8 @@ class AsyncModelEvaluator:
summary = {
"total_datasets": 0,
"total_examples": 0,
"dataset_scores": {},
"dataset_best_scores": {},
"dataset_mean_scores": {},
}
# Iterate through categories and datasets in the original order from config
@ -378,7 +709,8 @@ class AsyncModelEvaluator:
for dataset in category["datasets"]:
if dataset["name"] == dataset_name:
# Add to summary in original order
summary["dataset_scores"][dataset_name] = dataset["average_score"]
summary["dataset_best_scores"][dataset_name] = dataset["average_best_score"]
summary["dataset_mean_scores"][dataset_name] = dataset["average_mean_score"]
summary["total_datasets"] += 1
summary["total_examples"] += dataset["total_examples"]
dataset_found = True
@ -386,7 +718,8 @@ class AsyncModelEvaluator:
# If dataset wasn't found in results (error), add with score 0
if not dataset_found:
summary["dataset_scores"][dataset_name] = 0.0
summary["dataset_best_scores"][dataset_name] = 0.0
summary["dataset_mean_scores"][dataset_name] = 0.0
summary["total_datasets"] += 1
return summary
@ -400,26 +733,12 @@ class AsyncModelEvaluator:
Returns:
Tuple of (results_path, summary_path)
"""
# Create output directory with timestamp
timestamp = self.start_time.strftime("%Y%m%d_%H%M%S")
model_name = self.config.model.replace("/", "_")
# Format directory name with model, category (if single category), and timestamp
if len(self.config.categories) == 1:
# Include category name in the output directory when evaluating a single category
category_name = self.config.categories[0].category
output_dir = Path(self.config.output_dir) / f"{model_name}_{category_name}_{timestamp}"
else:
# Original format for multiple categories
output_dir = Path(self.config.output_dir) / f"{model_name}_{timestamp}"
output_dir.mkdir(parents=True, exist_ok=True)
# Output directory is already created during evaluation
results_path = None
# Save full results if configured to do so
if self.config.save_full_results:
results_path = output_dir / "results.json"
results_path = self.output_dir / "results.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2)
@ -435,22 +754,16 @@ class AsyncModelEvaluator:
summary_data["max_tokens"] = self.config.max_tokens
summary_data["temperature"] = self.config.temperature
summary_data["top_p"] = self.config.top_p
summary_data["completions_per_prompt"] = self.config.completions_per_prompt
summary_data["duration_seconds"] = results["metadata"]["duration_seconds"]
summary_data["partial"] = False # Mark as complete
# Save summary
summary_path = output_dir / "summary.json"
summary_path = self.output_dir / "summary.json"
with open(summary_path, "w") as f:
json.dump(summary_data, f, indent=2)
# Save individual dataset results
for category in results["categories"]:
category_dir = output_dir / category["name"]
category_dir.mkdir(exist_ok=True)
for dataset in category["datasets"]:
dataset_path = category_dir / f"{dataset['name']}.json"
with open(dataset_path, "w") as f:
json.dump(dataset, f, indent=2)
# Individual dataset results are already saved during evaluation
return str(results_path) if results_path else None, str(summary_path)
@ -471,12 +784,18 @@ class AsyncModelEvaluator:
print(f"Max Tokens: {self.config.max_tokens}")
print(f"Temperature: {self.config.temperature}")
print(f"Top-p: {self.config.top_p}")
print(f"Completions per prompt: {self.config.completions_per_prompt}")
print(f"Git Hash: {self.git_hash}")
print(f"Duration: {results['metadata']['duration_seconds']:.2f} seconds")
print()
print("Dataset Scores (in configuration order):")
for dataset_name, score in summary["dataset_scores"].items():
print(" Dataset Name Best Score Mean Score Examples")
print(" ------------------------------------------------------------------")
for dataset_name in summary["dataset_best_scores"].keys():
best_score = summary["dataset_best_scores"][dataset_name]
mean_score = summary["dataset_mean_scores"][dataset_name]
# Find the number of examples for this dataset
examples = 0
for category in results["categories"]:
@ -485,7 +804,8 @@ class AsyncModelEvaluator:
examples = dataset["total_examples"]
break
print(f" {dataset_name}: {score:.1%} ({examples} examples)")
# Use fixed-width formatting for better alignment
print(f" {dataset_name:<30} {best_score:>8.1%} {mean_score:>8.1%} {examples:>8}")
print()
print(f"Total datasets: {summary['total_datasets']}")
@ -500,6 +820,7 @@ async def main_async():
parser.add_argument("--output-dir", help="Override output directory specified in config")
parser.add_argument("--category", help="Evaluate only datasets from this category")
parser.add_argument("--max-concurrent", type=int, help="Maximum number of concurrent API calls")
parser.add_argument("--n", type=int, default=1, help="Number of completions to generate per prompt")
parser.add_argument("--base-url", default="https://openrouter.ai/api/v1", help="API base URL")
parser.add_argument(
"--api-key",
@ -509,6 +830,7 @@ async def main_async():
parser.add_argument("--full-results", action="store_true", help="Save the full results file")
parser.add_argument("--verbose", action="store_true", help="Print detailed model responses")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument("--resume", help="Resume evaluation from the specified directory")
args = parser.parse_args()
@ -541,6 +863,8 @@ async def main_async():
config.output_dir = args.output_dir
if args.max_concurrent:
config.max_concurrent = args.max_concurrent
if args.n:
config.completions_per_prompt = args.n
if args.save_metadata:
config.save_metadata = True
if args.full_results:
@ -560,6 +884,10 @@ async def main_async():
config=config, api_key=api_key, base_url=args.base_url, verbose=args.verbose, debug=args.debug
)
# Set resume directory if specified
if args.resume:
evaluator.resume_dir = args.resume
# Run evaluation
try:
results = await evaluator.evaluate_all()