consolidate eval scripts to have single eval.py

This commit is contained in:
Andreas Koepf 2025-02-25 16:13:22 +01:00
parent bea806fe3c
commit e7ae82a831
12 changed files with 104 additions and 337 deletions

View file

@ -1,171 +1,136 @@
import argparse
import asyncio
import json
import logging
import os
import re
import time
from dataclasses import asdict
from datetime import datetime
from typing import Any
from openai import AsyncOpenAI
from tqdm.asyncio import tqdm_asyncio
import aiohttp
from eval_config import EvalConfig
from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_exponential
from reasoning_gym.factory import create_dataset
from reasoning_gym.utils import SYSTEM_PROMPTS
import reasoning_gym
from reasoning_gym.utils import extract_answer
class AsyncOpenRouterEvaluator:
def __init__(self, model: str, max_concurrent: int = 10):
self.client = AsyncOpenAI(base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"))
class OpenRouterEvaluator:
def __init__(self, model: str, config: EvalConfig):
self.logger = logging.getLogger(f"OpenRouterEvaluator.{model}")
self.config = config
self.output_dir = f"{config.eval_dir}/{config.category}"
os.makedirs(self.output_dir, exist_ok=True)
self.base_url = "https://openrouter.ai/api/v1/chat/completions"
self.api_key = os.getenv("OPENROUTER_API_KEY")
self.model = model
self.extra_headers = {}
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"HTTP-Referer": os.getenv("OR_SITE_URL", "localhost"),
"X-Title": os.getenv("OR_APP_NAME", "Model Evaluation"),
"Content-Type": "application/json",
}
self.semaphore = asyncio.Semaphore(10) # Control concurrency
async def get_model_response(self, prompt: str) -> str:
"""Get response from the model via OpenRouter API with rate limiting."""
async with self.semaphore:
try:
completion = await self.client.chat.completions.create(
extra_headers=self.extra_headers,
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPTS["default"]},
{"role": "user", "content": prompt},
],
)
return completion.choices[0].message.content
except Exception as e:
print(f"Error calling OpenRouter API: {str(e)}")
raise
def save_results(self, results: list[dict[str, Any]], dataset, dataset_name) -> dict[str, Any]:
file_name = f"{self.output_dir}/{dataset_name}.json"
total_score = sum(r["score"] for r in results)
def parse_model_response(self, response: str) -> str:
"""Gather the final answer between the <answer> and </answer> tags."""
match = re.search(r"<answer>(.*?)</answer>", response, re.DOTALL)
return match.group(1).strip() if match else response
async def process_single_question(self, entry: dict, dataset) -> dict:
"""Process a single question and return the result."""
response = await self.get_model_response(entry["question"])
answer = self.parse_model_response(response)
score = dataset.score_answer(answer=answer, entry=entry)
return {
"question": entry["question"],
"expected_answer": entry["answer"],
"model_answer": answer,
"full_model_response": response,
"score": score,
"metadata": entry["metadata"],
metrics = {
"dataset_name": dataset_name,
"model": self.model,
"size": dataset.size,
"provider": self.config.provider,
"average_score": total_score / len(results) if results else 0,
"total_examples": len(results),
"timestamp": datetime.now().isoformat(),
"config": asdict(dataset.config),
"results": results,
}
async def evaluate_dataset(self, dataset_config: dict[str, Any]) -> dict[str, Any]:
"""Evaluate a single dataset with concurrent question processing."""
dataset_name = dataset_config.pop("name")
print(f"\nEvaluating dataset: {dataset_name}")
with open(file_name, "w") as f:
json.dump(metrics, f, indent=2)
return metrics
try:
# Create dataset with its specific configuration
data = create_dataset(dataset_name, **dataset_config)
all_entries = list(data)
async def get_model_response(self, session: aiohttp.ClientSession, prompt: str) -> str:
payload = {
"model": self.model,
"messages": [
{"role": self.config.developer_role, "content": self.config.developer_prompt},
{"role": "user", "content": prompt},
],
"provider": {"order": ["Nebius"], "allow_fallbacks": False},
}
# Process all questions concurrently
tasks = [self.process_single_question(entry, data) for entry in all_entries]
async for attempt in AsyncRetrying(
stop=stop_after_attempt(20),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type(
(aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError, ValueError)
),
):
with attempt:
async with session.post(self.base_url, json=payload) as response:
data = await response.json()
# Use tqdm to track progress
results = await tqdm_asyncio.gather(*tasks, desc=f"Processing {dataset_name}")
if not data:
raise ValueError("Empty response")
# Calculate aggregate metrics
total_score = sum(r["score"] for r in results)
metrics = {
"dataset_name": dataset_name,
"model": self.model,
"size": len(data),
"average_score": total_score / len(results) if results else 0,
"total_examples": len(results),
"timestamp": datetime.now().isoformat(),
"config": dataset_config,
if not data.get("choices"):
raise ValueError("Missing choices in response")
return data["choices"][0]["message"]["content"]
raise Exception("Failed to get valid response after retries")
async def process_entry(self, session: aiohttp.ClientSession, dataset: Any, entry: Any) -> dict[str, Any]:
"""Process a single entry with concurrency control."""
async with self.semaphore:
response = await self.get_model_response(session, entry["question"])
model_answer = extract_answer(response)
score = dataset.score_answer(answer=model_answer, entry=entry)
return {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"model_answer": model_answer,
"full_model_response": response,
"score": score,
"metadata": str(entry["metadata"]),
}
return {"metrics": metrics, "results": results}
async def evaluate_dataset(self, session: aiohttp.ClientSession, dataset_name: str) -> dict[str, Any]:
"""Evaluate a single dataset asynchronously."""
self.logger.info(f"\nEvaluating dataset: {dataset_name}")
dataset = reasoning_gym.create_dataset(
dataset_name, size=self.config.dataset_size, seed=self.config.dataset_seed
)
except Exception as e:
print(f"Error evaluating dataset {dataset_name}: {str(e)}")
return None
async def evaluate_datasets(self, dataset_configs: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Evaluate multiple datasets concurrently."""
tasks = [self.evaluate_dataset(config) for config in dataset_configs]
# Process all datasets concurrently
tasks = [self.process_entry(session, dataset, entry) for entry in dataset]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
return self.save_results(results, dataset, dataset_name)
async def evaluate_datasets(self) -> list[dict[str, Any]]:
"""Main async evaluation entry point."""
all_results = []
async with aiohttp.ClientSession(headers=self.headers) as session:
return await asyncio.gather(*(self.evaluate_dataset(session, name) for name in self.config.datasets))
async def main_async():
async def async_main():
parser = argparse.ArgumentParser(description="Evaluate models on reasoning datasets")
parser.add_argument("--model", required=True, help="Model to evaluate")
parser.add_argument("--config", required=True, help="Path to JSON configuration file")
parser.add_argument("--output-dir", default="results", help="Output directory")
parser.add_argument("--max-concurrent", type=int, default=10, help="Maximum number of concurrent API calls")
parser.add_argument("--yaml", required=True, help="Path to YAML configuration file")
args = parser.parse_args()
# Create output directory if it doesn't exist
os.makedirs(args.output_dir, exist_ok=True)
config = EvalConfig.from_yaml(args.yaml)
evaluator = OpenRouterEvaluator(model=config.model, config=config)
results = await evaluator.evaluate_datasets()
# Load dataset configurations
with open(args.config, "r") as f:
dataset_configs = json.load(f)
evaluator = AsyncOpenRouterEvaluator(model=args.model, max_concurrent=args.max_concurrent)
eval_start_time = time.time()
all_results = await evaluator.evaluate_datasets(dataset_configs)
print(f"Time taken to collect evaluation data: {time.time() - eval_start_time:.2f} seconds")
# Save results
output_file = os.path.join(
args.output_dir, f"evaluation_{args.model.replace('/', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(output_file, "w") as f:
json.dump(all_results, f, indent=2)
# Create and save summary
summary = []
for result in all_results:
metrics = result["metrics"]
summary_entry = {
"dataset_name": metrics["dataset_name"],
"model": metrics["model"],
"average_score": metrics["average_score"],
"total_examples": metrics["total_examples"],
"timestamp": metrics["timestamp"],
"config": metrics["config"],
}
summary.append(summary_entry)
summary_file = os.path.join(
args.output_dir, f"summary_{args.model.replace('/', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(summary_file, "w") as f:
json.dump(summary, f, indent=2)
# Print summary
print("\nEvaluation Summary:")
for entry in summary:
print(f"\nDataset: {entry['dataset_name']}")
print(f"Average Score: {entry['average_score']:.2%}")
print(f"Total Examples: {entry['total_examples']}")
print(f"\nDetailed results saved to: {output_file}")
print(f"Summary saved to: {summary_file}")
def main():
asyncio.run(main_async())
output_dir = f"{config.eval_dir}/{config.category}"
os.makedirs(output_dir, exist_ok=True)
with open(f"{output_dir}/summary.json", "w") as f:
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()
asyncio.run(async_main())