add parallel compile. add instructions

This commit is contained in:
Artem Yatsenko 2025-05-16 19:53:46 -07:00
parent 45aece515f
commit fbfe06771b

View file

@ -1,7 +1,30 @@
# kernelbench_env.py
"""
KernelBench Environment Setup Instructions
----------------------------------------
Before running this script, you need to install KernelBench:
1. Install KernelBench from source:
pip install git@github.com:ScalingIntelligence/KernelBench.git
cd KernelBench
pip install -r requirements.txt
pip install -e .
cd -
2. Set variables at the top of this script:
KERNELBENCH_LEVEL: The difficulty level (1-3)
KERNELBENCH_PROBLEM_NUMBER: The specific problem number to solve
These environment variables will be used to configure the evaluation environment.
"""
from datasets import load_dataset
import os
import json
import subprocess
import asyncio
import multiprocessing as mp
from pathlib import Path
from typing import Dict, List, Optional, Tuple, TypedDict, Union
@ -10,48 +33,92 @@ from atroposlib.type_definitions import Item, number
from atroposlib.utils.tokenize_for_trainer import tokenize_for_trainer
# KernelBench imports
from src.eval import eval_kernel_against_ref
from src.eval import eval_kernel_against_ref # <- new import
# Set the start method to 'spawn' for CUDA compatibility
mp.set_start_method('spawn', force=True)
KERNELBENCH_DIR = Path("/home/artem_nous/KernelBench")
KERNELBENCH_LEVEL = 1
KERNELBENCH_PROBLEM_NUMBER = 1
KERNELBENCH_DIR = Path("/path/to/KernelBench") # ← point here to your clone
os.environ["TORCH_CUDA_ARCH_LIST"] = "9.0"
def get_kernelbench_code(level: int, problem_id: int) -> str:
"""
Return the `code` string for a given KernelBench level/problem combo.
Raises ValueError if the problem_id is not found in that level.
"""
split = f"level_{level}"
ds = load_dataset("ScalingIntelligence/KernelBench", split=split)
# Keep only rows whose `problem_id` exactly matches the desired one
row = ds.filter(lambda x: x["problem_id"] == problem_id)
if len(row) == 0:
raise ValueError(f"{problem_id=} not found in {split=}")
return row[0]["code"]
class KBRow(TypedDict):
"""Singletask record (prompt text plus meta)."""
prompt: str # full prompt given to the LLM
sample_path: str
def evaluate_single_kernel(args):
"""Helper function to evaluate a single kernel in a process."""
item, build_dir, ref_code = args
generated_src = item["messages"][-1]["content"].strip("```python\n").strip("```")
class KBEnv(BaseEnv):
"""
A strippeddown Atropos environment that only handles Level1 / problem1
(square matrix multiplication). It generates one kernel per rollout
group, writes the kernel to the expected `runs/{run_name}` layout, then
invokes KernelBench's evaluation script to obtain a scalar reward.
"""
# Initialize CUDA in the child process
import torch
torch.cuda.init()
name = "kernelbench"
eval_result = eval_kernel_against_ref(
original_model_src=ref_code,
custom_model_src=generated_src,
measure_performance=True,
verbose=True,
num_correct_trials=1,
num_perf_trials=1,
build_dir=build_dir,
device="cuda:7"
)
compiled_flag = bool(getattr(eval_result, "compiled", False))
runtime_val = float(getattr(eval_result, "runtime", -1.0))
reward = 0.3 * (1 if compiled_flag else 0) + runtime_val
# Note: We can't use the tokenizer here since it's not pickleable
# We'll return the raw data and tokenize in the main process
return {
"messages": item["messages"],
"finish_reason": item["finish_reason"],
"reward": reward
}
class KernelBenchEnv(BaseEnv):
name = "kernelbench_parallel"
# ---------- Static config helpers ----------------------------------------
@classmethod
def config_init(cls) -> Tuple[BaseEnvConfig, List[APIServerConfig]]:
env_cfg = BaseEnvConfig(
tokenizer_name="NousResearch/DeepHermes-3-Llama-3-3B-Preview",
group_size=4, # 4 candidate kernels per step
tokenizer_name="Qwen/Qwen3-4B",
group_size=2,
max_token_length=2048,
batch_size=4,
steps_per_eval=50,
batch_size=1,
steps_per_eval=1,
total_steps=1000,
rollout_server_url="http://localhost:8000",
use_wandb=False, # flip on if you want logging
wandb_name="kb_level1_prob1",
use_wandb=False,
wandb_name=f"kb_level{KERNELBENCH_LEVEL}_prob{KERNELBENCH_PROBLEM_NUMBER}_parallel",
)
server_cfgs = [
APIServerConfig(
model_name="NousResearch/DeepHermes-3-Llama-3-3B-Preview",
model_name="Qwen/Qwen3-4B",
base_url="http://localhost:9001/v1",
api_key="DUMMY_KB_KEY", # fill in if proxy requires it
api_key="DUMMY_KB_KEY",
num_requests_for_eval=64,
)
]
@ -59,23 +126,20 @@ class KBEnv(BaseEnv):
# --------------------- Data ------------------------------------------------
async def setup(self):
"""
Nothing to load from disk we construct the single prompt onthefly
with KernelBench's PromptConstructor so that it exactly matches their
evaluation format.
"""
# Hardcode the HF dataset identifier for problem 1
self.problem_spec = {
"level": 1,
"problem_id": 1,
"problem_file": "1_Square_matrix_multiplication_.py",
"level": KERNELBENCH_LEVEL,
"problem_id": KERNELBENCH_PROBLEM_NUMBER,
"problem_file": f"{KERNELBENCH_PROBLEM_NUMBER}_Square_matrix_multiplication_.py",
}
self.iter = 0
with open("prompt.txt", "r", encoding="utf-8") as f:
self.prompt = f.read()
self.sample_path="./sample.py"
# Get reference code directly from the dataset
self.ref_code = get_kernelbench_code(KERNELBENCH_LEVEL, KERNELBENCH_PROBLEM_NUMBER)
self.reward_buffer = list()
# Create a process pool for parallel processing
self.pool = mp.Pool(processes=24)
# --------------------- Rollout / scoring ----------------------------------
async def collect_trajectories(
@ -97,11 +161,11 @@ class KBEnv(BaseEnv):
)
# Path: runs/<RUN_NAME>/level_1/1/
run_dir = run_dir = KERNELBENCH_DIR / "runs" / self.config.wandb_name / "level_1" / "1"
run_dir = KERNELBENCH_DIR / "runs" / "wandb" / "level_1" / "1"
run_dir.mkdir(parents=True, exist_ok=True)
to_score: List[Dict] = []
to_backlog: list()
to_backlog: list() = []
for i, choice in enumerate(chat_completions.choices):
kernel_code = choice.message.content
sample_path = run_dir / f"sample_{i}.cu"
@ -111,7 +175,6 @@ class KBEnv(BaseEnv):
to_score.append(
{
"messages": messages,
"sample_path": str(sample_path),
"finish_reason": choice.finish_reason,
}
)
@ -119,58 +182,50 @@ class KBEnv(BaseEnv):
to_postprocess = await self.score(to_score)
return to_postprocess, to_backlog
async def score(
self, rollout_group_data: List[Dict]
) -> Union[Optional[ScoredDataGroup], List[Optional[ScoredDataGroup]]]:
scores = ScoredDataGroup(tokens=[], masks=[], scores=[])
scores["tokens"] = list()
scores["masks"] = list()
scores["scores"] = list()
# where we will build + compile kernels
build_dir = os.path.join("build", "kernelbench", f"{1}", f"{1}")
os.makedirs(build_dir, exist_ok=True)
for item in rollout_group_data:
generated_src = item["prompt"]
custom_model_src = Path(item["sample_path"]).read_text()
# Create arguments for parallel evaluation
eval_args = [(item, build_dir, self.ref_code) for item in rollout_group_data]
eval_result = eval_kernel_against_ref(
ref_arch_src=generated_src, # blank per instructions
custom_model_src=custom_model_src,
measure_performance=True,
verbose=True,
num_correct_trials=1,
num_perf_trials=1,
build_dir=build_dir,
# Run evaluations in parallel
results = []
for args in eval_args:
result = self.pool.apply_async(evaluate_single_kernel, args=(args,))
results.append(result)
# Wait for all evaluations to complete and process results
for result in results:
eval_result = result.get() # This will wait for the result
reward = eval_result["reward"]
# Tokenize in the main process since tokenizer isn't pickleable
out_dict = tokenize_for_trainer(
self.tokenizer,
eval_result["messages"],
eval_result["finish_reason"]
)
compiled_flag = bool(getattr(eval_result, "compiled", False))
runtime_val = float(getattr(eval_result, "runtime", -1.0))
reward = 0.3 * (1 if compiled_flag else 0) + runtime_val
out_dict = tokenize_for_trainer(self.tokenizer, item["messages"], item["finish_reason"])
scores["tokens"].append(out_dict["tokens"])
scores["masks"].append(out_dict["masks"])
scores["scores"].append(reward)
for score in scores["scores"]:
self.reward_buffer.append(max(score, 0))
self.reward_buffer.append(max(reward, 0))
return scores if scores["tokens"] else None
async def get_next_item(self) -> KBRow:
"""Return the same single problem every time (env is tiny)."""
return KBRow(prompt=self.prompt, sample_path=self.sample_path)
return KBRow(prompt=self.prompt, sample_path="") # sample_path is no longer used
async def evaluate(self, *args, **kwargs):
"""Evaluate the current model on a set of test problems."""
# For now, we'll just log the average reward from the reward buffer
if self.reward_buffer:
avg_reward = sum(self.reward_buffer) / len(self.reward_buffer)
self.eval_metrics.append(("eval/avg_reward", avg_reward))
@ -180,26 +235,24 @@ class KBEnv(BaseEnv):
if wandb_metrics is None:
wandb_metrics = {}
# Try to calculate percent_correct, pass if there's a division by zero
try:
wandb_metrics["train/reward"] = sum(
self.reward_buffer
) / len(self.reward_buffer)
except ZeroDivisionError:
# Skip if buffer is empty
pass
self.reward_buffer = list()
for item in self.eval_metrics:
wandb_metrics[item[0]] = item[1]
self.eval_metrics = list()
# Call the parent method to handle the server metrics
await super().wandb_log(wandb_metrics)
async def cleanup(self):
"""Clean up resources when done."""
self.pool.close()
self.pool.join()
await super().cleanup()
# -----------------------------------------------------------------------------
if __name__ == "__main__":
KBEnv.cli()
KernelBenchEnv.cli()