updated async impl and added r1

This commit is contained in:
joesharratt1229 2025-02-13 03:51:01 +00:00
parent 1a3728ec3a
commit b2e3ccf3d6
2 changed files with 77 additions and 72 deletions

View file

@ -1,4 +1,5 @@
import argparse
import asyncio
import json
import logging
import os
@ -6,10 +7,9 @@ from dataclasses import asdict
from datetime import datetime
from typing import Any, Dict, List
import requests
import aiohttp
from eval_config import EvalConfig
from requests.exceptions import RequestException
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_exponential
import reasoning_gym
from reasoning_gym.utils import extract_answer
@ -30,9 +30,9 @@ class OpenRouterEvaluator:
"X-Title": os.getenv("OR_APP_NAME", "Model Evaluation"),
"Content-Type": "application/json",
}
self.semaphore = asyncio.Semaphore(10) # Control concurrency
def save_results(self, results: List[Dict[str, Any]], dataset, dataset_name) -> Dict[str, Any]:
file_name = f"{self.output_dir}/{dataset_name}.json"
total_score = sum(r["score"] for r in results)
@ -45,7 +45,7 @@ class OpenRouterEvaluator:
"total_examples": len(results),
"timestamp": datetime.now().isoformat(),
"config": asdict(dataset.config),
"results": results, # save results to allow for performance recalculation
"results": results,
}
with open(file_name, "w") as f:
@ -53,87 +53,93 @@ class OpenRouterEvaluator:
return metrics
def prepare_messages(self, prompt: str) -> List[Dict[str, str]]:
messages = [
{"role": self.config.developer_role, "content": self.config.developer_prompt},
{"role": "user", "content": prompt},
]
return {
"model": self.model,
"messages": [
{"role": self.config.developer_role, "content": self.config.developer_prompt},
{"role": "user", "content": prompt},
],
"provider": {"order": ["Nebius"], "allow_fallbacks": False},
}
async def get_model_response(self, session: aiohttp.ClientSession, prompt: str) -> str:
payload = {
"model": self.model,
"messages": messages,
"provider": {"order": ["Nebius"], "allow_fallbacks": False},
} # make sure only one provider is used
"messages": [
{"role": self.config.developer_role, "content": self.config.developer_prompt},
{"role": "user", "content": prompt},
],
}
return payload
async for attempt in AsyncRetrying(
stop=stop_after_attempt(20),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type(
(aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError, ValueError)
),
):
with attempt:
async with session.post(self.base_url, json=payload) as response:
data = await response.json()
@retry(
retry=retry_if_exception_type(RequestException),
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
)
def get_model_response(self, prompt: str) -> str:
"""Get response from the model via OpenRouter API."""
if not data:
raise ValueError("Empty response")
payload = self.prepare_messages(prompt)
try:
response = requests.post(self.base_url, headers=self.headers, json=payload, timeout=30)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise RequestException(
f"API request failed: {str(e)}", {"endpoint": self.base_url, "model": self.model}
) from e
return response.json()["choices"][0]["message"]["content"]
if not data.get("choices"):
raise ValueError("Missing choices in response")
def evaluate_datasets(self) -> List[Dict[str, Any]]:
"""Evaluate model on multiple datasets with their respective configurations."""
return data["choices"][0]["message"]["content"]
raise Exception("Failed to get valid response after retries")
async def process_entry(self, session: aiohttp.ClientSession, dataset: Any, entry: Any) -> Dict[str, Any]:
"""Process a single entry with concurrency control."""
async with self.semaphore:
response = await self.get_model_response(session, entry["question"])
model_answer = extract_answer(response)
score = dataset.score_answer(answer=model_answer, entry=entry)
print(f"Question: {entry['question']}")
return {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"model_answer": model_answer,
"score": score,
"metadata": str(entry["metadata"]),
}
async def evaluate_dataset(self, session: aiohttp.ClientSession, dataset_name: str) -> Dict[str, Any]:
"""Evaluate a single dataset asynchronously."""
self.logger.info(f"\nEvaluating dataset: {dataset_name}")
dataset = reasoning_gym.create_dataset(
dataset_name, size=self.config.dataset_size, seed=self.config.dataset_seed
)
tasks = [self.process_entry(session, dataset, entry) for entry in dataset]
results = await asyncio.gather(*tasks)
return self.save_results(results, dataset, dataset_name)
async def evaluate_datasets(self) -> List[Dict[str, Any]]:
"""Main async evaluation entry point."""
all_results = []
for dataset_name in self.config.datasets:
self.logger.info(f"\nEvaluating dataset: {dataset_name}")
# Create dataset with its specific configuration
dataset = reasoning_gym.create_dataset(
dataset_name, size=self.config.dataset_size, seed=self.config.dataset_seed
)
results = []
for i, entry in enumerate(dataset):
print(f"On example {i+1} of {len(dataset)}")
response = self.get_model_response(entry["question"])
model_answer = extract_answer(response)
score = dataset.score_answer(answer=model_answer, entry=entry)
result = {
"question": entry["question"],
"expected_answer": str(entry["answer"]),
"model_answer": model_answer,
"score": score,
"metadata": str(entry["metadata"]),
}
results.append(result)
metrics = self.save_results(results, dataset, dataset_name)
all_results.append({"metrics": metrics, "results": results})
return all_results
async with aiohttp.ClientSession(headers=self.headers) as session:
return await asyncio.gather(*(self.evaluate_dataset(session, name) for name in self.config.datasets))
def main():
async def async_main():
parser = argparse.ArgumentParser(description="Evaluate models on reasoning datasets")
parser.add_argument("--yaml", required=True, help="Path to YAML configuration file")
args = parser.parse_args()
config = EvalConfig.from_yaml(args.yaml)
evaluator = OpenRouterEvaluator(model=config.model, config=config)
results = await evaluator.evaluate_datasets()
output_dir = f"{config.eval_dir}/{config.category}"
os.makedirs(output_dir, exist_ok=True)
evaluator = OpenRouterEvaluator(model=config.model, config=config)
all_results = evaluator.evaluate_datasets()
with open(f"{output_dir}/summary.json", "w") as f:
json.dump(all_results, f, indent=2)
json.dump(results, f, indent=2)
if __name__ == "__main__":
main()
asyncio.run(async_main())

View file

@ -1,9 +1,8 @@
model: deepseek/deepseek-r1
category: algorithmic
datasets:
- base_conversion
- binary_matrix
- caesar _cipher
- caesar_cipher
- group_anagrams
- isomorphic_strings
- letter_counting