diff --git a/eval/r1/eval.py b/eval/r1/eval.py index 737707c7..3dbc39b1 100644 --- a/eval/r1/eval.py +++ b/eval/r1/eval.py @@ -1,4 +1,5 @@ import argparse +import asyncio import json import logging import os @@ -6,10 +7,9 @@ from dataclasses import asdict from datetime import datetime from typing import Any, Dict, List -import requests +import aiohttp from eval_config import EvalConfig -from requests.exceptions import RequestException -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential +from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_exponential import reasoning_gym from reasoning_gym.utils import extract_answer @@ -30,9 +30,9 @@ class OpenRouterEvaluator: "X-Title": os.getenv("OR_APP_NAME", "Model Evaluation"), "Content-Type": "application/json", } + self.semaphore = asyncio.Semaphore(10) # Control concurrency def save_results(self, results: List[Dict[str, Any]], dataset, dataset_name) -> Dict[str, Any]: - file_name = f"{self.output_dir}/{dataset_name}.json" total_score = sum(r["score"] for r in results) @@ -45,7 +45,7 @@ class OpenRouterEvaluator: "total_examples": len(results), "timestamp": datetime.now().isoformat(), "config": asdict(dataset.config), - "results": results, # save results to allow for performance recalculation + "results": results, } with open(file_name, "w") as f: @@ -53,87 +53,93 @@ class OpenRouterEvaluator: return metrics def prepare_messages(self, prompt: str) -> List[Dict[str, str]]: - messages = [ - {"role": self.config.developer_role, "content": self.config.developer_prompt}, - {"role": "user", "content": prompt}, - ] + return { + "model": self.model, + "messages": [ + {"role": self.config.developer_role, "content": self.config.developer_prompt}, + {"role": "user", "content": prompt}, + ], + "provider": {"order": ["Nebius"], "allow_fallbacks": False}, + } + + async def get_model_response(self, session: aiohttp.ClientSession, prompt: str) -> str: payload = { "model": self.model, - "messages": messages, - "provider": {"order": ["Nebius"], "allow_fallbacks": False}, - } # make sure only one provider is used + "messages": [ + {"role": self.config.developer_role, "content": self.config.developer_prompt}, + {"role": "user", "content": prompt}, + ], + } - return payload + async for attempt in AsyncRetrying( + stop=stop_after_attempt(20), + wait=wait_exponential(multiplier=1, min=1, max=60), + retry=retry_if_exception_type( + (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError, ValueError) + ), + ): + with attempt: + async with session.post(self.base_url, json=payload) as response: + data = await response.json() - @retry( - retry=retry_if_exception_type(RequestException), - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=4, max=60), - ) - def get_model_response(self, prompt: str) -> str: - """Get response from the model via OpenRouter API.""" + if not data: + raise ValueError("Empty response") - payload = self.prepare_messages(prompt) - try: - response = requests.post(self.base_url, headers=self.headers, json=payload, timeout=30) - response.raise_for_status() - except requests.exceptions.RequestException as e: - raise RequestException( - f"API request failed: {str(e)}", {"endpoint": self.base_url, "model": self.model} - ) from e - return response.json()["choices"][0]["message"]["content"] + if not data.get("choices"): + raise ValueError("Missing choices in response") - def evaluate_datasets(self) -> List[Dict[str, Any]]: - """Evaluate model on multiple datasets with their respective configurations.""" + return data["choices"][0]["message"]["content"] + + raise Exception("Failed to get valid response after retries") + + async def process_entry(self, session: aiohttp.ClientSession, dataset: Any, entry: Any) -> Dict[str, Any]: + """Process a single entry with concurrency control.""" + async with self.semaphore: + response = await self.get_model_response(session, entry["question"]) + model_answer = extract_answer(response) + score = dataset.score_answer(answer=model_answer, entry=entry) + print(f"Question: {entry['question']}") + + return { + "question": entry["question"], + "expected_answer": str(entry["answer"]), + "model_answer": model_answer, + "score": score, + "metadata": str(entry["metadata"]), + } + + async def evaluate_dataset(self, session: aiohttp.ClientSession, dataset_name: str) -> Dict[str, Any]: + """Evaluate a single dataset asynchronously.""" + self.logger.info(f"\nEvaluating dataset: {dataset_name}") + dataset = reasoning_gym.create_dataset( + dataset_name, size=self.config.dataset_size, seed=self.config.dataset_seed + ) + + tasks = [self.process_entry(session, dataset, entry) for entry in dataset] + results = await asyncio.gather(*tasks) + return self.save_results(results, dataset, dataset_name) + + async def evaluate_datasets(self) -> List[Dict[str, Any]]: + """Main async evaluation entry point.""" all_results = [] - - for dataset_name in self.config.datasets: - self.logger.info(f"\nEvaluating dataset: {dataset_name}") - - # Create dataset with its specific configuration - dataset = reasoning_gym.create_dataset( - dataset_name, size=self.config.dataset_size, seed=self.config.dataset_seed - ) - results = [] - - for i, entry in enumerate(dataset): - print(f"On example {i+1} of {len(dataset)}") - response = self.get_model_response(entry["question"]) - model_answer = extract_answer(response) - - score = dataset.score_answer(answer=model_answer, entry=entry) - - result = { - "question": entry["question"], - "expected_answer": str(entry["answer"]), - "model_answer": model_answer, - "score": score, - "metadata": str(entry["metadata"]), - } - results.append(result) - - metrics = self.save_results(results, dataset, dataset_name) - - all_results.append({"metrics": metrics, "results": results}) - - return all_results + async with aiohttp.ClientSession(headers=self.headers) as session: + return await asyncio.gather(*(self.evaluate_dataset(session, name) for name in self.config.datasets)) -def main(): +async def async_main(): parser = argparse.ArgumentParser(description="Evaluate models on reasoning datasets") parser.add_argument("--yaml", required=True, help="Path to YAML configuration file") - args = parser.parse_args() + config = EvalConfig.from_yaml(args.yaml) + evaluator = OpenRouterEvaluator(model=config.model, config=config) + results = await evaluator.evaluate_datasets() + output_dir = f"{config.eval_dir}/{config.category}" os.makedirs(output_dir, exist_ok=True) - - evaluator = OpenRouterEvaluator(model=config.model, config=config) - all_results = evaluator.evaluate_datasets() - with open(f"{output_dir}/summary.json", "w") as f: - json.dump(all_results, f, indent=2) + json.dump(results, f, indent=2) if __name__ == "__main__": - main() + asyncio.run(async_main()) diff --git a/eval/r1/yaml/algorithmic.yaml b/eval/r1/yaml/algorithmic.yaml index c1c043ce..5d0d630a 100644 --- a/eval/r1/yaml/algorithmic.yaml +++ b/eval/r1/yaml/algorithmic.yaml @@ -1,9 +1,8 @@ model: deepseek/deepseek-r1 category: algorithmic datasets: - - base_conversion - binary_matrix - - caesar _cipher + - caesar_cipher - group_anagrams - isomorphic_strings - letter_counting