reasoning-gym/notebooks/codeio.ipynb
Zafir Stojanovski b47bf882ce filtering
2025-02-25 22:21:26 +01:00

609 lines
24 KiB
Text

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import abc\n",
"import asyncio\n",
"from collections import defaultdict\n",
"import json\n",
"import os\n",
"import random\n",
"from random import Random\n",
"import re\n",
"import signal\n",
"from typing import Union\n",
"\n",
"import aiohttp\n",
"import datasets\n",
"import numpy as np\n",
"from sentence_transformers import SentenceTransformer\n",
"from tenacity import (\n",
" AsyncRetrying,\n",
" retry_if_exception_type,\n",
" stop_after_attempt,\n",
" wait_exponential,\n",
")\n",
"import torch\n",
"from tqdm.notebook import tqdm"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dataset = datasets.load_dataset(\"hkust-nlp/CodeIO-PyEdu-Reasoning\")['train']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Extract the relevant parts of the prompt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pattern = re.compile(\n",
" r'(?s)' # DOTALL so . matches newlines\n",
" r'You are given a question that requires some input and output variables as follows:\\s*(.*?)'\n",
" r'\\s*The input and output requirements are as follows:\\s*(.*?)'\n",
" r'\\s*Given the following.*?Tip: Here is a reference code snippet for this question\\. '\n",
" r'You can refer to this code to guide your reasoning but not copy spans of code directly\\.\\s*(.*)'\n",
")\n",
"\n",
"seen = set()\n",
"duplicate = 0\n",
"\n",
"with open(\"data/codeio-pyedu-extracted.jsonl\", \"w+\") as f:\n",
" for i, item in tqdm(enumerate(dataset), total=len(dataset)):\n",
" match = pattern.search(item[\"prompt\"])\n",
" if match:\n",
" # Extract relevant info\n",
" task_description = match.group(1).strip()\n",
" input_output_spec = match.group(2).strip()\n",
" code_sample = match.group(3).strip()\n",
"\n",
" # Check if code sample is unique\n",
" hash_entry = f\"{hash(task_description)}-{hash(input_output_spec)}-{hash(code_sample)}\"\n",
" if hash_entry in seen:\n",
" duplicate += 1\n",
" continue\n",
" seen.add(hash_entry)\n",
"\n",
" # Save to disk\n",
" json.dump({\n",
" \"task_description\": task_description,\n",
" \"input_output_spec\": input_output_spec,\n",
" \"code_sample\": code_sample\n",
" }, f)\n",
" f.write(\"\\n\")\n",
" else:\n",
" print(f\"No match found for item {i}\")\n",
"\n",
"print(f\"There were {duplicate} out of {len(dataset)} duplicate entries\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Subsample the data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class IdentitySampler:\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" return features\n",
"\n",
"\n",
"class BaseSampler(abc.ABC):\n",
" def __init__(self, percentage: float):\n",
" if not 0 < percentage < 1:\n",
" raise ValueError(\"Percentage value not in (0, 1).\")\n",
" self.percentage = percentage\n",
"\n",
" @abc.abstractmethod\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" pass\n",
"\n",
" def _store_type(self, features: Union[torch.Tensor, np.ndarray]) -> None:\n",
" self.features_is_numpy = isinstance(features, np.ndarray)\n",
" if not self.features_is_numpy:\n",
" self.features_device = features.device\n",
"\n",
" def _restore_type(self, features: torch.Tensor) -> Union[torch.Tensor, np.ndarray]:\n",
" if self.features_is_numpy:\n",
" return features.cpu().numpy()\n",
" return features.to(self.features_device)\n",
"\n",
"\n",
"class GreedyCoresetSampler(BaseSampler):\n",
" def __init__(\n",
" self,\n",
" percentage: float,\n",
" device: torch.device,\n",
" dtype: torch.dtype = torch.float32,\n",
" dimension_to_project_features_to=128,\n",
" ):\n",
" \"\"\"Greedy Coreset sampling base class.\"\"\"\n",
" super().__init__(percentage)\n",
"\n",
" self.device = device\n",
" self.dtype = dtype\n",
" self.dimension_to_project_features_to = dimension_to_project_features_to\n",
"\n",
" def _reduce_features(self, features):\n",
" if features.shape[1] == self.dimension_to_project_features_to:\n",
" return features\n",
" mapper = torch.nn.Linear(\n",
" features.shape[1], self.dimension_to_project_features_to, bias=False, dtype=self.dtype,\n",
" )\n",
" _ = mapper.to(self.device)\n",
" features = features.to(self.device)\n",
" return mapper(features)\n",
"\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" \"\"\"Subsamples features using Greedy Coreset.\n",
"\n",
" Args:\n",
" features: [N x D]\n",
" \"\"\"\n",
" if self.percentage == 1:\n",
" return features\n",
" self._store_type(features)\n",
" if isinstance(features, np.ndarray):\n",
" features = torch.from_numpy(features)\n",
" reduced_features = self._reduce_features(features)\n",
" sample_indices = self._compute_greedy_coreset_indices(reduced_features)\n",
" return sample_indices\n",
"\n",
" @staticmethod\n",
" def _compute_batchwise_differences(\n",
" matrix_a: torch.Tensor, matrix_b: torch.Tensor\n",
" ) -> torch.Tensor:\n",
" \"\"\"Computes batchwise Euclidean distances using PyTorch.\"\"\"\n",
" a_times_a = matrix_a.unsqueeze(1).bmm(matrix_a.unsqueeze(2)).reshape(-1, 1)\n",
" b_times_b = matrix_b.unsqueeze(1).bmm(matrix_b.unsqueeze(2)).reshape(1, -1)\n",
" a_times_b = matrix_a.mm(matrix_b.T)\n",
"\n",
" return (-2 * a_times_b + a_times_a + b_times_b).clamp(0, None).sqrt()\n",
"\n",
" def _compute_greedy_coreset_indices(self, features: torch.Tensor) -> np.ndarray:\n",
" \"\"\"Runs iterative greedy coreset selection.\n",
"\n",
" Args:\n",
" features: [NxD] input feature bank to sample.\n",
" \"\"\"\n",
" distance_matrix = self._compute_batchwise_differences(features, features)\n",
" coreset_anchor_distances = torch.norm(distance_matrix, dim=1)\n",
"\n",
" coreset_indices = []\n",
" num_coreset_samples = int(len(features) * self.percentage)\n",
"\n",
" for _ in range(num_coreset_samples):\n",
" select_idx = torch.argmax(coreset_anchor_distances).item()\n",
" coreset_indices.append(select_idx)\n",
"\n",
" coreset_select_distance = distance_matrix[\n",
" :, select_idx : select_idx + 1 # noqa E203\n",
" ]\n",
" coreset_anchor_distances = torch.cat(\n",
" [coreset_anchor_distances.unsqueeze(-1), coreset_select_distance], dim=1\n",
" )\n",
" coreset_anchor_distances = torch.min(coreset_anchor_distances, dim=1).values\n",
"\n",
" return torch.tensor(coreset_indices, device=features.device, dtype=torch.int64)\n",
"\n",
"\n",
"class ApproximateGreedyCoresetSampler(GreedyCoresetSampler):\n",
" def __init__(\n",
" self,\n",
" percentage: float,\n",
" device: torch.device,\n",
" dtype: torch.dtype = torch.float32,\n",
" number_of_starting_points: int = 10,\n",
" dimension_to_project_features_to: int = 128,\n",
" ):\n",
" \"\"\"Approximate Greedy Coreset sampling base class.\"\"\"\n",
" self.number_of_starting_points = number_of_starting_points\n",
" super().__init__(percentage, device, dtype, dimension_to_project_features_to)\n",
"\n",
" def _compute_greedy_coreset_indices(self, features: torch.Tensor) -> np.ndarray:\n",
" \"\"\"Runs approximate iterative greedy coreset selection.\n",
"\n",
" This greedy coreset implementation does not require computation of the\n",
" full N x N distance matrix and thus requires a lot less memory, however\n",
" at the cost of increased sampling times.\n",
"\n",
" Args:\n",
" features: [NxD] input feature bank to sample.\n",
" \"\"\"\n",
" number_of_starting_points = np.clip(\n",
" self.number_of_starting_points, None, len(features)\n",
" )\n",
" start_points = np.random.choice(\n",
" len(features), number_of_starting_points, replace=False\n",
" ).tolist()\n",
"\n",
" approximate_distance_matrix = self._compute_batchwise_differences(\n",
" features, features[start_points]\n",
" )\n",
" approximate_coreset_anchor_distances = torch.mean(\n",
" approximate_distance_matrix, axis=-1\n",
" ).reshape(-1, 1)\n",
" coreset_indices = []\n",
" num_coreset_samples = int(len(features) * self.percentage)\n",
"\n",
" with torch.no_grad():\n",
" for _ in tqdm.tqdm(range(num_coreset_samples), desc=\"Subsampling...\"):\n",
" select_idx = torch.argmax(approximate_coreset_anchor_distances).item()\n",
" coreset_indices.append(select_idx)\n",
" coreset_select_distance = self._compute_batchwise_differences(\n",
" features, features[select_idx : select_idx + 1] # noqa: E203\n",
" )\n",
" approximate_coreset_anchor_distances = torch.cat(\n",
" [approximate_coreset_anchor_distances, coreset_select_distance],\n",
" dim=-1,\n",
" )\n",
" approximate_coreset_anchor_distances = torch.min(\n",
" approximate_coreset_anchor_distances, dim=1\n",
" ).values.reshape(-1, 1)\n",
"\n",
" return torch.tensor(coreset_indices, device=features.device, dtype=torch.int64)\n",
"\n",
"\n",
"class RandomSampler(BaseSampler):\n",
" def __init__(self, percentage: float):\n",
" super().__init__(percentage)\n",
"\n",
" def run(\n",
" self, features: Union[torch.Tensor, np.ndarray]\n",
" ) -> Union[torch.Tensor, np.ndarray]:\n",
" \"\"\"Randomly samples input feature collection.\n",
"\n",
" Args:\n",
" features: [N x D]\n",
" \"\"\"\n",
" num_random_samples = int(len(features) * self.percentage)\n",
" subset_indices = np.random.choice(\n",
" len(features), num_random_samples, replace=False\n",
" )\n",
" return torch.tensor(subset_indices, device=features.device, dtype=torch.int64)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# I ran this cell on Google Colab because I don't have a GPU on my local machine,\n",
"# hence why you see the Google Drive paths\n",
"\n",
"device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
"model = SentenceTransformer(\"nomic-ai/modernbert-embed-base\")\n",
"print(model)\n",
"\n",
"def get_entry_info(entry) -> str:\n",
" return entry['task_description']\n",
"\n",
"def get_embeddings(text) -> torch.Tensor:\n",
" return torch.from_numpy(model.encode(text)).to(torch.bfloat16)\n",
"\n",
"embeddings = []\n",
"\n",
"with open(\"./drive/MyDrive/reasoning-gym/codeio-pyedu-extracted.jsonl\") as f:\n",
" for line in tqdm(f):\n",
" entry = json.loads(line)\n",
" entry_info = get_entry_info(entry)\n",
" embeddings.append(get_embeddings(entry_info))\n",
"\n",
"embeddings = torch.stack(embeddings).to(torch.bfloat16).to(device)\n",
"print(embeddings.shape)\n",
"\n",
"sampler = ApproximateGreedyCoresetSampler(\n",
" percentage=0.05, \n",
" device=device, \n",
" dtype=torch.bfloat16,\n",
" dimension_to_project_features_to=768,\n",
")\n",
"subsampled = sampler.run(embeddings)\n",
"\n",
"indices = set(subsampled.cpu().tolist())\n",
"with open(\"./drive/MyDrive/reasoning-gym/codeio-pyedu-extracted.jsonl\", \"r\") as f_in, \\\n",
" open(\"./drive/MyDrive/reasoning-gym/codeio-pyedu-best-coverage.jsonl\", \"w+\") as f_out:\n",
" for i, line in enumerate(f_in):\n",
" if i in indices:\n",
" f_out.write(line)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create input generators for each problem separately"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"SYSTEM_PROMPT = \"\"\"You are a helpful assistant that generates valid Python functions that act as input generators for a given code snippet.\n",
"\n",
"You have access to `random.Random`, therefore you SHOULD NOT import it again. You should use this random number generator to make the input generation process stochastic on each call.\n",
"\n",
"When the user asks you to generate an input for a code snippet, you should strictly respond in the following format:\n",
"<function>\n",
"def generate_input(rng: Random) -> dict:\n",
" # Your code here\n",
" pass\n",
"</function>\n",
"\n",
"The output of the function should be a dictionary where the keys are the variable names and the values are the generated values.\n",
"\n",
"It must contain all the variables that listed in the user's input specification, or more precisely in the `main_solution` function signature. \n",
"\"\"\"\n",
"\n",
"USER_PROMPT = \"\"\"Following are a task description, input/output specification, and relevant code snippet for a Python programming task.\n",
"\n",
"<task_description>\n",
"{task_description}\n",
"</task_description>\n",
"\n",
"<input_output_spec>\n",
"{input_output_spec}\n",
"</input_output_spec>\n",
"\n",
"<code_sample>\n",
"{code_sample}\n",
"</code_sample>\n",
"\n",
"Your task is to write a Python function `def generate_input(rng: Random) -> dict:` that generates valid inputs for the given code snippet, based on the provided information.\n",
"\"\"\"\n",
"\n",
"# We'll control concurrency with a semaphore\n",
"CONCURRENCY_LIMIT = 10\n",
"sem = asyncio.Semaphore(CONCURRENCY_LIMIT)\n",
"\n",
"async def fetch_input_generator(session: aiohttp.ClientSession, entry: dict) -> dict:\n",
" \"\"\"\n",
" Sends a POST request to OpenRouter with the system & user prompts,\n",
" extracts the function from the response, and returns the updated entry.\n",
" \"\"\"\n",
" url = \"https://openrouter.ai/api/v1/chat/completions\"\n",
" headers = {\n",
" \"Authorization\": f\"Bearer {os.getenv('OPENROUTER_API_KEY')}\",\n",
" \"Content-Type\": \"application/json\",\n",
" }\n",
"\n",
" payload = {\n",
" \"model\": \"deepseek/deepseek-chat\",\n",
" \"messages\": [\n",
" {\"role\": \"system\", \"content\": SYSTEM_PROMPT},\n",
" {\n",
" \"role\": \"user\",\n",
" \"content\": USER_PROMPT.format(**entry)\n",
" },\n",
" ],\n",
" }\n",
"\n",
" async with sem:\n",
" async for attempt in AsyncRetrying(\n",
" stop=stop_after_attempt(5),\n",
" wait=wait_exponential(multiplier=1, min=1, max=60),\n",
" retry=retry_if_exception_type(\n",
" (aiohttp.ClientError, asyncio.TimeoutError, json.JSONDecodeError, ValueError)\n",
" ),\n",
" ):\n",
" with attempt:\n",
" async with session.post(url, headers=headers, json=payload) as response:\n",
" data = await response.json()\n",
"\n",
" # Basic checks for valid response\n",
" if \"choices\" not in data or not data[\"choices\"]:\n",
" print(\"No choices found in response\")\n",
" return entry\n",
"\n",
" content = data[\"choices\"][0][\"message\"][\"content\"]\n",
" match = re.search(r\"<function>(.*?)</function>\", content, re.DOTALL)\n",
" if not match:\n",
" print(\"Could not find <function>...</function> block in response\")\n",
" return entry\n",
"\n",
" input_generator = match.group(1).strip()\n",
" entry[\"input_generator\"] = input_generator\n",
" return entry\n",
"\n",
" # If we exit the loop without returning, raise Exception\n",
" raise Exception(\"Failed to get valid input generator after retries\")\n",
"\n",
"async def process_file(input_file: str, output_file: str):\n",
" \"\"\"\n",
" Reads each line from `input_file`, processes each entry concurrently,\n",
" and writes augmented entries to `output_file`.\n",
" \"\"\"\n",
" # Read all lines first (synchronously)\n",
" with open(input_file, \"r\") as f_in:\n",
" lines = f_in.readlines()\n",
"\n",
" tasks = []\n",
" async with aiohttp.ClientSession() as session:\n",
" # Create a task for each line/entry\n",
" for line in lines:\n",
" entry = json.loads(line)\n",
" tasks.append(asyncio.create_task(fetch_input_generator(session, entry)))\n",
"\n",
" # We'll gather results while showing progress\n",
" results = []\n",
" for t in tqdm(asyncio.as_completed(tasks), total=len(tasks)):\n",
" result = await t\n",
" results.append(result)\n",
"\n",
" # Write all results out\n",
" with open(output_file, \"w\") as f_out:\n",
" for res in results:\n",
" f_out.write(json.dumps(res))\n",
" f_out.write(\"\\n\")\n",
"\n",
"# Finally, run the entire pipeline\n",
"await process_file(\n",
" input_file=\"data/codeio-pyedu-best-coverage.jsonl\",\n",
" output_file=\"data/codeio-pyedu-with-input-generator.jsonl\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Filter out invalid input generators\n",
"\n",
"**NOTE**: The code below is buggy because we have a memory leak (I think) - every time you run the `exec` with some code snippet, it stores the variables in the global scope. Over time, this will consume all the memory. And besides, running `exec` on untrusted code is not smart."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# def timeout_handler(signum, frame):\n",
"# raise TimeoutError(\"Function call timed out\")\n",
"\n",
"# def get_input_generator_func(code_sample: str, input_generator_str: str) -> dict:\n",
"# env = globals().copy()\n",
"# exec(code_sample, env, env)\n",
"# exec(input_generator_str, env, env)\n",
"# return env['generate_input']\n",
"\n",
"# def execute_code_sample(code_sample: str, input_dict: dict) -> dict:\n",
"# env = globals().copy()\n",
"# exec(code_sample, env, env)\n",
"# main_solution = env['main_solution']\n",
"# return main_solution(**input_dict)\n",
"\n",
"# NUM_INPUT_GENERATE = 1_000 # how many inputs to try and generate\n",
"# ALARM_TOLERANCE = 1 # in seconds\n",
"# PERCENT_UNIQUE_INPUTS = 0.30 # what fraction of generated inputs should be unique\n",
"# PERCENT_UNIQUE_OUTPUTS = 0.30 # what fraction of generated outputs should be unique\n",
"\n",
"# signal.signal(signal.SIGALRM, timeout_handler)\n",
"\n",
"# rng = random.Random()\n",
"# rng.seed(42)\n",
"\n",
"# errors = defaultdict(int)\n",
"# total_entries = sum(1 for _ in open(\"data/codeio-pyedu-with-input-generator.jsonl\", \"r\"))\n",
"\n",
"# with open(\"data/codeio-pyedu-with-input-generator.jsonl\", \"r\") as f_in, \\\n",
"# open(\"data/codeio-pyedu-with-input-generator-filtered.jsonl\", \"w+\") as f_out:\n",
"\n",
"# iterator = tqdm(enumerate(f_in), total=total_entries)\n",
"\n",
"# for i, line in iterator:\n",
"# iterator.set_description(f\"Processing {i}/{total_entries} | \" + \" | \".join(f\"{k}: {v}\" for k, v in errors.items()) + f\" | total: {sum(errors.values())}\")\n",
"# entry = json.loads(line)\n",
"# # Check if input generator is present\n",
"# if not \"input_generator\" in entry:\n",
"# errors[\"missing_input_generator\"] += 1\n",
"# continue\n",
" \n",
"# # Check if input generator is valid function\n",
"# try:\n",
"# input_generator_func = get_input_generator_func(entry['code_sample'], entry['input_generator'])\n",
"# except Exception as e:\n",
"# errors[\"cannot_instantiate_input_generator\"] += 1\n",
"# continue\n",
"\n",
"# skip = False\n",
"# seen_inputs, seen_outputs = set(), set()\n",
"\n",
"# for _ in range(NUM_INPUT_GENERATE):\n",
"# try:\n",
"# # Check if you can generate input\n",
"# signal.alarm(ALARM_TOLERANCE)\n",
"# random_input = input_generator_func(rng)\n",
"# signal.alarm(0)\n",
"# seen_inputs.add(hash(json.dumps(random_input)))\n",
"\n",
"# # Check if code snippet can execute with generated input\n",
"# signal.alarm(ALARM_TOLERANCE)\n",
"# random_output = execute_code_sample(entry[\"code_sample\"], random_input)\n",
"# signal.alarm(0)\n",
"# seen_outputs.add(hash(json.dumps(random_output)))\n",
"# except Exception as e:\n",
"# signal.alarm(0)\n",
"# errors[\"unreliable_input_generator\"] += 1\n",
"# skip = True\n",
"# break\n",
"# if skip: \n",
"# continue\n",
" \n",
"# if len(seen_inputs) / NUM_INPUT_GENERATE < PERCENT_UNIQUE_INPUTS:\n",
"# errors[\"insufficient_unique_inputs\"] += 1\n",
"# continue\n",
" \n",
"# if len(seen_outputs) / NUM_INPUT_GENERATE < PERCENT_UNIQUE_OUTPUTS:\n",
"# errors[\"insufficient_unique_outputs\"] += 1\n",
"# continue\n",
"\n",
"# f_out.write(json.dumps(entry))\n",
"# f_out.write(\"\\n\")\n",
"\n",
"# for k, v in errors.items():\n",
"# print(f\"{k}: {v}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "reasoning_gym",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}