Merge branch 'main' into codeio-sampler

This commit is contained in:
Oliver 2025-02-23 20:28:06 +00:00
commit 3a5dc2080f
28 changed files with 932 additions and 164 deletions

View file

@ -9,7 +9,7 @@ main.py Orchestrates the overall flow:
import sys
import uuid
from pathlib import Path
from typing import Any, Dict
from typing import Any
from examples.word_ladder.utils import create_word_ladders, generate_reasoning

1
notebooks/codeio/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
raw_files/

View file

@ -0,0 +1,396 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## CodeI/O\n",
"\n",
"Original paper (DeepSeek): https://arxiv.org/pdf/2502.07316\n",
"\n",
"The approach begins by obtaining high quality raw code data and preprocessing it by prompting an LLM. The output of this preprocessing, for each raw code file used, should be:\n",
"\n",
"- cleaned reference code, with a main entrypoint function\n",
"- a query, converting the reference code into a question (along the lines of \"given [function parameters...] how can we obtain [desired outputs...]\")\n",
"- a natural language description of all inputs (function parameters) and outputs (function return values)\n",
"- an input generator, which can generate a dictionary of valid inputs for the function\n",
"\n",
"This notebook seeks to experiment with prompting an LLM to this end, as a starting point. The raw code data is from this GitHub repository that the DeepSeek paper mentions as one of their raw code sources: https://github.com/TheAlgorithms/Python\n",
"\n",
"NOTE: Be careful with the raw code you input into this, as cells later execute the LLM-generated outputs."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import random\n",
"from pathlib import Path\n",
"from dotenv import load_dotenv\n",
"load_dotenv()\n",
"raw_files = list(Path(\"raw_files/\").iterdir())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that the below prompt is built for DeepSeekV3. It may not work with other LLMs."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"format_prompt_template = \"\"\"\n",
"You are tasked with preprocessing a raw file of Python code into a standard format. The format is made up of several components. Here is a very simple example of a raw code file:\n",
"\n",
"def kg_to_pounds(weights):\n",
" return [w * 2.20462 for w in weights]\n",
"\n",
"def filter_weekly(original_measurements, days):\n",
" return [m for i, m in enumerate(original_measurements) if i % 7 == 0]\n",
"\n",
"def main(kgs, days):\n",
" lbs = kg_to_pounds(kgs)\n",
"\n",
" for measurement in filter_weekly(lbs, days):\n",
" print(measurement)\n",
"\n",
"1. Cleaned reference code, with a main entrypoint function that takes all required arguments as parameters and returns all outputs.\n",
"\n",
"The name of the main entrypoint function should be `main`. The parameters should be clearly named but do not require type hints. The function should return a dict mapping output names to values. The function should contain all the necessary code to perform the functionality, without splitting into several functions. The function should not print or otherwise output anything; results should be returned as part of the result dict. Ensure you include any imports necessary, prior to the function definition.\n",
"\n",
"Example function signature: `def main(weights_kg, days):`\n",
"\n",
"2. A query, defined as natural language description of the question the function answers.\n",
"\n",
"Example query: \"You are given two lists of integers, `weights_kg` and `days`. The unit of `weights_kg` is kilograms. `days` refers to the number of days passed, starting from zero. Your task is to convert the integers to pounds and filter to only one weight measurement every 7 days. Return the list of integers in pounds.\"\n",
"\n",
"The query should be as detailed as the code requires to be fully explained. It should be clear what the function does, what the inputs are, and what the outputs are.\n",
"\n",
"3. A natural language description of all inputs (function parameters) and outputs (return values) of the function.\n",
"\n",
"Example description:\n",
"\n",
"Input:\n",
" weights_kg (list of int): List of weight values in kilograms.\n",
" days (list of int): List of integers representing the number of days passed, starting from zero.\n",
"\n",
"Output:\n",
" return (dict): A dictionary with one key:\n",
" - weights_lb (list of int): List of filtered weight values in pounds.\n",
"\n",
"4. Python 3.11 code for an input generator, which randomly generates valid sets of inputs for the functions.\n",
"\n",
"The input generator should return a dict mapping parameter names to values. The values should be randomly generated, but should be valid inputs for the function.\n",
"\n",
"Example input generator:\n",
"\n",
"def input_generator():\n",
" weights = [np.random.uniform(0, 100) for _ in range(40)]\n",
" days = list(range(40))\n",
" return {{\"weights_kg\": weights, \"days\": days}}\n",
"\n",
"Using the guidelines and example above, preprocess the following raw code file into the standard format:\n",
"\n",
"{0}\n",
"\n",
"Output the components (reference code, query, description, input generator) in order. Separate each component with a line of dashes (---). Avoid code blocks and do not output any Markdown formatting. Respond only with the four components, no prefix or additional text.\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import time\n",
"from openai import OpenAI\n",
"from openai.types.chat import ChatCompletion, ChatCompletionMessageParam\n",
"from typing import Any, Iterable\n",
"\n",
"def llm_generate(\n",
" client: OpenAI,\n",
" messages: Iterable[ChatCompletionMessageParam],\n",
" sampling_params: dict[str, Any],\n",
") -> ChatCompletion:\n",
" max_retry = 3\n",
" for trial in range(max_retry):\n",
" try:\n",
" return client.chat.completions.create(\n",
" messages=messages,\n",
" **sampling_params,\n",
" )\n",
" except Exception as e:\n",
" print(\"failure response:\", e)\n",
" time.sleep(trial * trial) # quadratic backoff\n",
" if trial == max_retry - 1:\n",
" raise\n",
"\n",
"open_router_client = OpenAI(\n",
" base_url=\"https://openrouter.ai/api/v1\",\n",
" api_key=os.getenv(\"OPENROUTER_API_KEY\"),\n",
" timeout=90.0,\n",
")\n",
"\n",
"sampling_params = {\n",
" \"model\": \"deepseek/deepseek-chat:free\",\n",
" \"max_tokens\": 8192,\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"raw_files/climbing_stairs.py\n",
"def main(number_of_steps):\n",
" assert isinstance(number_of_steps, int) and number_of_steps > 0, (\n",
" f\"number_of_steps needs to be positive integer, your input {number_of_steps}\"\n",
" )\n",
" if number_of_steps == 1:\n",
" return {\"distinct_ways\": 1}\n",
" previous, current = 1, 1\n",
" for _ in range(number_of_steps - 1):\n",
" current, previous = current + previous, current\n",
" return {\"distinct_ways\": current}\n",
"\n",
"---\n",
"You are given an integer `number_of_steps` representing the number of steps on a staircase. Your task is to calculate the number of distinct ways to climb the staircase, where each time you can either climb 1 or 2 steps. Return the number of distinct ways as an integer.\n",
"\n",
"---\n",
"Input:\n",
" number_of_steps (int): The number of steps on the staircase. Must be a positive integer.\n",
"\n",
"Output:\n",
" return (dict): A dictionary with one key:\n",
" - distinct_ways (int): The number of distinct ways to climb the staircase.\n",
"\n",
"---\n",
"def input_generator():\n",
" import random\n",
" number_of_steps = random.randint(1, 100)\n",
" return {\"number_of_steps\": number_of_steps}\n"
]
}
],
"source": [
"raw_file = random.choice(raw_files)\n",
"\n",
"print(raw_file)\n",
"\n",
"raw_code = raw_file.read_text()\n",
"\n",
"prompt = format_prompt_template.format(raw_code)\n",
"\n",
"messages = [\n",
" {\"role\": \"user\", \"content\": prompt},\n",
"]\n",
"\n",
"response = llm_generate(open_router_client, messages, sampling_params)\n",
"print(response.choices[0].message.content)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"code, query, parameters, generator = response.choices[0].message.content.split(\"\\n---\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The below cell executes arbitrary code, so be careful with what you run."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"def generate_io_pairs(main_code: str, input_generator_code: str, num_pairs: int = 100):\n",
" local_vars = {}\n",
" exec(main_code, {}, local_vars)\n",
" exec(input_generator_code, {}, local_vars)\n",
" io_pairs = []\n",
" for _ in range(num_pairs):\n",
" inputs = local_vars[\"input_generator\"]()\n",
" outputs = local_vars[\"main\"](**inputs)\n",
" io_pairs.append((inputs, outputs))\n",
" return io_pairs\n",
"\n",
"io_pairs = generate_io_pairs(code, generator, num_pairs=2)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[({'number_of_steps': 65}, {'distinct_ways': 27777890035288}),\n",
" ({'number_of_steps': 19}, {'distinct_ways': 6765})]"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"io_pairs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next we need to synthesize chains of thought from the LLM for use in building a supervised finetuning dataset. From the paper:\n",
"\n",
"> Since we aim for the input-output prediction tasks, we construct the prompt using a designed template to combine the function, the query, the reference code, and either a specific input or output. The response should ideally be a natural language CoT to reason about how to derive the correct output or a feasible input.\n",
"\n",
"The below prompts are from the paper."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"synthetic_cot_prompt_prefix = \"\"\"\n",
"You are given a question that requires some input and output variables as follows:\n",
"\n",
"{0}\n",
"\n",
"The input and output requirements are as follows:\n",
"\n",
"{1}\n",
"\"\"\"\n",
"\n",
"synthetic_cot_prompt_suffix = \"\"\"\n",
"Tip: Here is a reference code snippet for this question. You can refer to this code to guide your reasoning but not copy spans of code directly.\n",
"\n",
"{3}\n",
"\"\"\"\n",
"\n",
"synthetic_cot_prompt_input_prediction = synthetic_cot_prompt_prefix + \"\"\"\n",
"Given the following output:\n",
"\n",
"{2}\n",
"\n",
"Can you predict a feasible input without writing any code? Please reason and put your final answer in the following json format: \"input\": <your input>, where <your input> should be a dictionary, even if the there is only one input variable, with keys strictly matching the input variables' names as specified.\n",
"\"\"\" + synthetic_cot_prompt_suffix\n",
"\n",
"synthetic_cot_prompt_output_prediction = synthetic_cot_prompt_prefix + \"\"\"\n",
"Given the following input:\n",
"\n",
"{2}\n",
"\n",
"Can you predict the output without writing any code? Please reason and put your final answer in the following json format: \"output\": <your output>, where <your output> should strictly match the the output requirement as specified.\n",
"\"\"\" + synthetic_cot_prompt_suffix"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'To determine the input `number_of_steps` that results in the output `{\\'distinct_ways\\': 27777890035288}`, we need to understand that this problem is related to the Fibonacci sequence. Specifically, the number of distinct ways to climb `n` steps, where you can climb either 1 or 2 steps at a time, is equal to the `(n+1)`-th Fibonacci number.\\n\\nGiven the output `27777890035288`, we need to find the integer `n` such that the `(n+1)`-th Fibonacci number is `27777890035288`.\\n\\nThe Fibonacci sequence grows exponentially, and the number `27777890035288` is a very large Fibonacci number. To find the corresponding `n`, we can use the fact that the Fibonacci sequence follows the recurrence relation:\\n\\n\\\\[ F(n) = F(n-1) + F(n-2) \\\\]\\n\\nGiven that `F(73) = 806515533049393` and `F(72) = 498454011879264`, it is clear that `27777890035288` is much smaller than `F(73)`. We need to find the exact `n` such that `F(n+1) = 27777890035288`.\\n\\nHowever, calculating Fibonacci numbers manually for large `n` is impractical. Instead, we can use the fact that `F(75) = 2111485077978050`, which is larger than `27777890035288`. Therefore, the `n` we are looking for must be between 72 and 75.\\n\\nBy checking Fibonacci numbers closer to `27777890035288`, we find that:\\n\\n\\\\[ F(74) = 1304969544928657 \\\\]\\n\\\\[ F(75) = 2111485077978050 \\\\]\\n\\nSince `27777890035288` is significantly larger than `F(74)` but smaller than `F(75)`, it is clear that `n` is 74.\\n\\nThus, the input `number_of_steps` should be 74, which corresponds to `F(75) = 27777890035288`.\\n\\nTherefore, the feasible input is:\\n\\n```json\\n{\"number_of_steps\": 74}\\n```'"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def predict_input(query, parameters, output, reference_code):\n",
" messages = [\n",
" {\"role\": \"user\", \"content\": synthetic_cot_prompt_input_prediction.format(query, parameters, output, reference_code)},\n",
" ]\n",
" response = llm_generate(open_router_client, messages, sampling_params)\n",
" return response.choices[0].message.content\n",
"\n",
"predict_input(query, parameters, io_pairs[0][1], code)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'To solve this problem, we need to calculate the number of distinct ways to climb a staircase with `number_of_steps` steps, where you can either take 1 or 2 steps at a time. This problem is a classic example of a dynamic programming problem and is very similar to the Fibonacci sequence.\\n\\n### Reasoning:\\n- The number of distinct ways to climb `n` steps is equal to the sum of the number of distinct ways to climb `n-1` steps and the number of distinct ways to climb `n-2` steps. This is because from the `n-1`th step, you can take a single step to reach the `n`th step, and from the `n-2`th step, you can take two steps to reach the `n`th step.\\n- The base cases are:\\n - For `n = 1`, there is only 1 way to climb the staircase (taking a single step).\\n - For `n = 2`, there are 2 ways to climb the staircase (taking two single steps or one double step).\\n\\nThe number of distinct ways to climb `n` steps follows the Fibonacci sequence. The Fibonacci sequence is defined as follows:\\n- F(0) = 0\\n- F(1) = 1\\n- F(n) = F(n-1) + F(n-2) for n ≥ 2\\n\\nHowever, in our problem, the number of ways to climb `n` steps corresponds to F(n+1) in the Fibonacci sequence. For example:\\n- For `n = 1` (F(2)), there is 1 way.\\n- For `n = 2` (F(3)), there are 2 ways.\\n- For `n = 3` (F(4)), there are 3 ways.\\n- For `n = 4` (F(5)), there are 5 ways.\\n\\nGiven `number_of_steps = 19`, we need to calculate F(20).\\n\\nThe Fibonacci sequence up to F(20) is as follows:\\n- F(0) = 0\\n- F(1) = 1\\n- F(2) = 1\\n- F(3) = 2\\n- F(4) = 3\\n- F(5) = 5\\n- F(6) = 8\\n- F(7) = 13\\n- F(8) = 21\\n- F(9) = 34\\n- F(10) = 55\\n- F(11) = 89\\n- F(12) = 144\\n- F(13) = 233\\n- F(14) = 377\\n- F(15) = 610\\n- F(16) = 987\\n- F(17) = 1597\\n- F(18) = 2584\\n- F(19) = 4181\\n- F(20) = 6765\\n\\nTherefore, the number of distinct ways to climb a staircase with 19 steps is 6765.\\n\\n### Final Answer:\\n```json\\n{\"output\": {\"distinct_ways\": 6765}}\\n```'"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def predict_output(query, parameters, input, reference_code):\n",
" messages = [\n",
" {\"role\": \"user\", \"content\": synthetic_cot_prompt_output_prediction.format(query, parameters, input, reference_code)},\n",
" ]\n",
" response = llm_generate(open_router_client, messages, sampling_params)\n",
" return response.choices[0].message.content\n",
"\n",
"predict_output(query, parameters, io_pairs[1][0], code)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View file

@ -1,7 +1,6 @@
import importlib.util
import os
from datetime import datetime
from typing import Any, Dict
def test_generator_files(directory_path: str) -> None:

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "reasoning_gym"
version = "0.1.9"
version = "0.1.11"
authors = [
{ name = "Open-Thought community", email = "andreas.koepf@xamla.com" },
]

View file

@ -5,7 +5,7 @@ Reasoning Gym - A library of procedural dataset generators for training reasonin
from . import algebra, algorithmic, arc, arithmetic, code, cognition, data, games, geometry, graphs, induction, logic
from .factory import create_dataset, register_dataset
__version__ = "0.1.9"
__version__ = "0.1.11"
__all__ = [
"arc",
"algebra",

View file

@ -1,6 +1,6 @@
import random
from dataclasses import dataclass
from typing import Any, Dict, Optional
from typing import Any, Optional
import sympy

View file

@ -1,7 +1,7 @@
import random
from dataclasses import dataclass
from fractions import Fraction
from typing import Any, Dict, Optional
from typing import Any, Optional
import sympy

View file

@ -13,7 +13,7 @@ No leading letter can be zero (unless allow_leading_zero=True).
from dataclasses import dataclass
from random import Random
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..factory import ProceduralDataset, register_dataset

View file

@ -4,12 +4,12 @@ from collections import deque
from dataclasses import dataclass
from functools import reduce
from random import Random
from typing import Dict, List, Optional, Tuple
from typing import Any, Optional
from ..factory import ProceduralDataset, register_dataset
def min_moves_n(jug_capacities: List[int], target: int) -> Optional[int]:
def min_moves_n(jug_capacities: list[int], target: int) -> Optional[int]:
"""
Compute the minimum number of moves required to have exactly `target` gallons
in any one jug for a puzzle with multiple jugs.
@ -73,7 +73,7 @@ def min_moves_n(jug_capacities: List[int], target: int) -> Optional[int]:
return None
def generate_puzzle(rng: Random, num_jugs: int = 3, difficulty: int = 6, max_attempts: int = 10000) -> Dict[str, any]:
def generate_puzzle(rng: Random, num_jugs: int = 3, difficulty: int = 6, max_attempts: int = 10000) -> dict[str, Any]:
"""
Generate a multi-jug water puzzle.
@ -181,7 +181,7 @@ def verify_solution(puzzle, moves):
return (any(w == target for w in state), states)
def generate_jug_solution(jug_capacities: Tuple[int, int, int], target: int) -> List[str]:
def generate_jug_solution(jug_capacities: tuple[int, int, int], target: int) -> list[str]:
"""Solves the jug puzzle and returns a sequence of formatted steps."""
capacities = list(jug_capacities)
initial_state = (0, 0, 0)
@ -283,14 +283,14 @@ Reply as a JSON-parsable list of moves which result in any of the jugs being fil
"metadata": {"puzzle": puzzle},
}
def score_answer(self, answer: Optional[str], entry: Dict[str, any]) -> float:
def score_answer(self, answer: Optional[str], entry: dict[str, Any]) -> float:
"""Determine if the solution provided solves the Jugs task.
The function awards 1.0 for a correct answer.
Args:
answer (Optional[str]): The user's answer.
entry (Dict[str, any]): The original dataset entry containing the correct answer.
entry (dict[str, Any]): The original dataset entry containing the correct answer.
Returns:
float: The computed score between 0.0 and 1.0.

View file

@ -1,7 +1,7 @@
import random
import string
from dataclasses import dataclass
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..factory import ProceduralDataset, register_dataset

View file

@ -7,7 +7,7 @@ https://leetcode.com/problems/rotting-oranges/description/
from collections import deque
from dataclasses import dataclass
from random import Random
from typing import Dict, Optional
from typing import Optional
from ..factory import ProceduralDataset, register_dataset

View file

@ -3,7 +3,7 @@
import re
from dataclasses import dataclass
from random import Random
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..data import read_data_file
from ..factory import ProceduralDataset, register_dataset

View file

@ -3,7 +3,7 @@
import re
from dataclasses import dataclass
from random import Random
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..data import read_data_file
from ..factory import ProceduralDataset, register_dataset

View file

@ -18,7 +18,7 @@ class Arc1DConfig:
def validate(self) -> None:
"""Validate configuration parameters"""
assert self.min_size > 0, "min_size must be positive"
assert self.min_size >= 8, "min_size must be >= 8"
assert self.max_size >= self.min_size, "max_size must be >= min_size"
assert self.num_train > 0, "num_train must be positive"
assert self.size > 0, "size must be positive"

View file

@ -38,7 +38,7 @@ def task_move_n_pix(rng: Random, size: int, move_pix: int, solid: bool) -> Optio
def task_move_n_pix_wrapped(rng: Random, size: int, move_pix: int, solid: bool) -> Optional[dict[str, list[int]]]:
"""Generate a task where a block is moved to the right by move_pix pixels with wrapping."""
block_size = rng.randint(1, size)
block_pos = rng.randint(0, size)
block_pos = rng.randint(0, size - 1)
if solid:
color = rng.randint(1, 9)
@ -95,8 +95,8 @@ def task_block_touch_dot(rng: Random, size: int) -> Optional[dict[str, list[int]
dot_color = 1
block_color = rng.randint(2, 9)
block_size = rng.randint(1, size)
dot_pos = rng.randint(0, size)
block_size = rng.randint(1, size - 1)
dot_pos = rng.randint(0, size - 1)
can_place_left = dot_pos >= block_size
can_place_right = dot_pos + block_size < size
@ -105,7 +105,7 @@ def task_block_touch_dot(rng: Random, size: int) -> Optional[dict[str, list[int]
return None
if can_place_left and can_place_right:
side = rng.choice(["left", "right"])
side = rng.choice(("left", "right"))
elif can_place_left:
side = "left"
else:
@ -134,8 +134,8 @@ def task_block_touch_dot_n_pix(rng: Random, size: int, move_pix: int) -> Optiona
dot_color = 2
block_color = rng.randint(3, 9)
block_size = rng.randint(1, size)
dot_pos = rng.randint(0, size)
block_size = rng.randint(1, size - 1)
dot_pos = rng.randint(0, size - 1)
can_place_left = dot_pos >= block_size
can_place_right = dot_pos + block_size < size
@ -144,7 +144,7 @@ def task_block_touch_dot_n_pix(rng: Random, size: int, move_pix: int) -> Optiona
return None
if can_place_left and can_place_right:
side = rng.choice(["left", "right"])
side = rng.choice(("left", "right"))
elif can_place_left:
side = "left"
else:
@ -177,8 +177,8 @@ def task_block_scale_to_dot(rng: Random, size: int) -> Optional[dict[str, list[i
dot_color = 2
block_color = rng.randint(3, 9)
block_size = rng.randint(1, size)
dot_pos = rng.randint(0, size)
block_size = rng.randint(1, size - 1)
dot_pos = rng.randint(0, size - 1)
can_place_left = dot_pos >= block_size
can_place_right = dot_pos + block_size < size
@ -187,7 +187,7 @@ def task_block_scale_to_dot(rng: Random, size: int) -> Optional[dict[str, list[i
return None
if can_place_left and can_place_right:
side = rng.choice(["left", "right"])
side = rng.choice(("left", "right"))
elif can_place_left:
side = "left"
else:
@ -238,13 +238,9 @@ def task_two_points_and_fill(rng: Random, size: int) -> Optional[dict[str, list[
def task_reflect_block_with_border_pixel(rng: Random, size: int) -> Optional[dict[str, list[int]]]:
"""Generate a task where a block with a border pixel is reflected."""
block_size = rng.randint(2, size)
if block_size > size:
return None
c1 = rng.randint(1, 9)
c2 = rng.randint(1, 9)
if c1 == c2:
return None
c2 = rng.choice(tuple(c for c in range(1, 9) if c != c1))
side = "left" if rng.random() < 0.5 else "right"
pos = rng.randint(0, size - block_size)
@ -265,22 +261,17 @@ def task_reflect_block_with_border_pixel(rng: Random, size: int) -> Optional[dic
def task_reflect_block_with_border_pixel_random(rng: Random, size: int) -> Optional[dict[str, list[int]]]:
"""Generate a task where a random-colored block with a border pixel is reflected."""
block_size = rng.randint(2, size)
if block_size > size:
return None
side = "left" if rng.random() < 0.5 else "right"
pos = rng.randint(0, size - block_size)
block = [rng.randint(1, 9) for _ in range(block_size)]
border_color = rng.randint(1, 9)
other_colors = tuple(c for c in range(1, 9) if c != border_color)
block = [rng.choice(other_colors) for _ in range(block_size)]
if side == "left":
if block[0] == border_color:
return None
block[0] = border_color
else:
if block[block_size - 1] == border_color:
return None
block[block_size - 1] = border_color
question = write_block(pos, block, gen_field(size))
@ -294,8 +285,8 @@ def task_reflect_block_around_dot(rng: Random, size: int) -> Optional[dict[str,
"""Generate a task where a block is reflected around a dot."""
dot_color = 2
dot_pos = rng.randint(0, size)
block_size = rng.randint(1, size)
dot_pos = rng.randint(0, size - 1)
block_size = rng.randint(1, size - 1)
block_pos = rng.randint(0, size - block_size)
block_end = block_pos + block_size - 1
@ -331,8 +322,6 @@ def task_reflect_block_around_dot(rng: Random, size: int) -> Optional[dict[str,
def task_block_and_noise_remove(rng: Random, size: int) -> Optional[dict[str, list[int]]]:
"""Generate a task where noise around a block needs to be removed."""
block_size = rng.randint(2, size)
if block_size > size:
return None
block_pos = rng.randint(0, size - block_size)
color = rng.randint(1, 9)
@ -356,7 +345,7 @@ def task_block_and_noise_remove(rng: Random, size: int) -> Optional[dict[str, li
noise_positions = []
for _ in range(noise_count):
allowed = [i for i in range(size) if not forbidden[i]]
allowed = tuple(i for i in range(size) if not forbidden[i])
if not allowed:
break
noise_pos = rng.choice(allowed)
@ -385,8 +374,6 @@ def task_block_and_noise_remove_inside(rng: Random, size: int) -> Optional[dict[
return None
block_size = rng.randint(6, size)
if block_size > size:
return None
block_pos = rng.randint(0, size - block_size)
color = rng.randint(1, 9)
@ -471,7 +458,7 @@ def task_copy_block_to_dots_colors(rng: Random, size: int) -> Optional[dict[str,
dot_colors = []
pos = block_size + block_size // 2 + 1
while pos < size - block_size:
while pos <= size - block_size:
if rng.random() < 0.5:
dot_color = rng.randint(1, 9)
dot_positions.append(pos)
@ -759,13 +746,14 @@ def task_duplicate_block_from_seeds(rng: Random, size: int) -> Optional[dict[str
return None
# Position block with space for seeds
block_pos = rng.randint(2, size - block_size - 1)
block_pos = rng.randint(2, size - block_size - 2)
# Decide seed placement
left_seed = rng.random() < 0.5
right_seed = rng.random() < 0.5
if not (left_seed or right_seed):
return None
left_seed = False
right_seed = False
while not left_seed and not right_seed:
left_seed = rng.random() < 0.5
right_seed = rng.random() < 0.5
# Create input
question = gen_field(size)
@ -814,12 +802,13 @@ def task_duplicate_block_from_seeds(rng: Random, size: int) -> Optional[dict[str
def task_fill_from_pixel(rng: Random, size: int) -> Optional[dict[str, list[int]]]:
"""Generate a task where a pixel fills in one direction until hitting another pixel."""
block_size = rng.randint(3, 6)
if block_size >= size - 2:
if size < 8:
return None
block_size = rng.randint(3, size - 5)
# Position block with space for seed
block_pos = rng.randint(1, size - block_size - 1)
block_pos = rng.randint(2, size - block_size - 2)
# Create input
question = gen_field(size)
@ -830,9 +819,9 @@ def task_fill_from_pixel(rng: Random, size: int) -> Optional[dict[str, list[int]
question[block_pos + i] = block_color
# Place seed pixel and determine fill direction
seed_color = rng.randint(1, 9)
while seed_color == block_color:
seed_color = rng.randint(1, 9)
seed_color = rng.randint(1, 8)
if seed_color >= block_color:
seed_color += 1
is_left = rng.random() < 0.5
@ -858,48 +847,51 @@ def task_fill_from_pixel(rng: Random, size: int) -> Optional[dict[str, list[int]
def task_mark_size_two_blocks(rng: Random, size: int) -> Optional[dict[str, list[int]]]:
"""Generate a task where size-2 blocks are marked with surrounding pixels."""
blocks = []
pos = 0
if size < 8:
return None
# Generate blocks with minimum gap of 2
# Start with one size-2 block
blocks = [2]
pos = 4 # Space for first block (2) + gap (2)
# Generate more blocks
while pos < size:
if rng.random() < 0.4:
block_size = rng.randint(1, 3)
# Check if we have space for block and potential markers
needed_space = block_size + (2 if block_size == 2 else 0)
if pos + needed_space < size:
blocks.append((pos, block_size))
pos += block_size + 2 # Minimum gap of 2
if pos + block_size <= size:
blocks.append(block_size)
pos += block_size + 2 # block + gap
else:
blocks.append(0)
pos += 1
pos += 1
# Shuffle block sizes
rng.shuffle(blocks)
if len(blocks) < 2:
return None
# Assign positions with proper gaps
block_positions = []
pos = 0
# Verify gaps between blocks (including markers)
valid = True
for i in range(len(blocks) - 1):
pos1, size1 = blocks[i]
pos2, _ = blocks[i + 1]
needed_gap = 3 if size1 == 2 else 2
if pos2 - (pos1 + size1) < needed_gap:
valid = False
break
if not valid:
return None
for block_size in blocks:
if block_size == 0:
pos += 1
else:
block_positions.append((pos, block_size))
pos += block_size + 2 # Move past block + gap
# Create input with blocks
question = gen_field(size)
for pos, block_size in blocks:
# Place block
for pos, block_size in block_positions:
block_color = rng.randint(1, 8)
if block_color >= 3: # avoid marker color 3
block_color += 1
for i in range(block_size):
question[pos + i] = 1
question[pos + i] = block_color
# Create answer with markers
answer = question.copy()
for pos, block_size in blocks:
for pos, block_size in block_positions:
if block_size == 2:
# Add markers for size 2 blocks
if pos > 0:
answer[pos - 1] = 3
if pos + block_size < size:
@ -946,7 +938,10 @@ def task_fill_until_collision(rng: Random, size: int) -> Optional[dict[str, list
# Color random pixels
for pos in positions:
question[pos] = rng.randint(1, 9)
c = rng.randint(1, 8)
if c >= 5: # don't use side marker color 5
c += 1
question[pos] = c
positions.sort()
@ -1039,8 +1034,8 @@ def task_color_left_half_blocks(rng: Random, size: int) -> Optional[dict[str, li
# Generate blocks with gap 1
while pos < size:
if rng.random() < 0.4:
block_size = rng.randint(2, 8)
if pos + block_size >= size:
block_size = rng.randint(2, size // 2)
if pos + block_size > size:
break
blocks.append((pos, block_size))

View file

@ -1,6 +1,6 @@
from dataclasses import dataclass, field
from random import Random
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, Optional
from ..factory import ProceduralDataset, register_dataset
from .board_format import ARC_PROMPT_TEMPLATE, BoardFormattingOptions, format_board, format_board_pair, parse_board

View file

@ -1,23 +1,23 @@
# types
from typing import Any, Callable, Container, FrozenSet, Tuple, Union
from typing import Any, Callable, Container, FrozenSet, Union
Boolean = bool
Integer = int
IntegerTuple = Tuple[Integer, Integer]
Numerical = Union[Integer, IntegerTuple]
Integertuple = tuple[Integer, Integer]
Numerical = Union[Integer, Integertuple]
IntegerSet = FrozenSet[Integer]
Grid = Tuple[Tuple[Integer]]
Cell = Tuple[Integer, IntegerTuple]
Grid = tuple[tuple[Integer]]
Cell = tuple[Integer, Integertuple]
Object = FrozenSet[Cell]
Objects = FrozenSet[Object]
Indices = FrozenSet[IntegerTuple]
Indices = FrozenSet[Integertuple]
IndicesSet = FrozenSet[Indices]
Patch = Union[Object, Indices]
Element = Union[Object, Grid]
Piece = Union[Grid, Patch]
TupleTuple = Tuple[Tuple]
tupletuple = tuple[tuple]
ContainerContainer = Container[Container]
@ -160,17 +160,17 @@ def difference(a: Container, b: Container) -> Container:
return type(a)(e for e in a if e not in b)
def dedupe(iterable: Tuple) -> Tuple:
def dedupe(iterable: tuple) -> tuple:
"""remove duplicates"""
return tuple(e for i, e in enumerate(iterable) if iterable.index(e) == i)
def order(container: Container, compfunc: Callable) -> Tuple:
def order(container: Container, compfunc: Callable) -> tuple:
"""order container by custom key"""
return tuple(sorted(container, key=compfunc))
def repeat(item: Any, num: Integer) -> Tuple:
def repeat(item: Any, num: Integer) -> tuple:
"""repetition of item within vector"""
return tuple(item for i in range(num))
@ -277,12 +277,12 @@ def positive(x: Integer) -> Boolean:
return x > 0
def toivec(i: Integer) -> IntegerTuple:
def toivec(i: Integer) -> Integertuple:
"""vector pointing vertically"""
return (i, 0)
def tojvec(j: Integer) -> IntegerTuple:
def tojvec(j: Integer) -> Integertuple:
"""vector pointing horizontally"""
return (0, j)
@ -302,7 +302,7 @@ def extract(container: Container, condition: Callable) -> Any:
return next(e for e in container if condition(e))
def totuple(container: FrozenSet) -> Tuple:
def totuple(container: FrozenSet) -> tuple:
"""conversion to tuple"""
return tuple(container)
@ -332,12 +332,12 @@ def other(container: Container, value: Any) -> Any:
return first(remove(value, container))
def interval(start: Integer, stop: Integer, step: Integer) -> Tuple:
def interval(start: Integer, stop: Integer, step: Integer) -> tuple:
"""range"""
return tuple(range(start, stop, step))
def astuple(a: Integer, b: Integer) -> IntegerTuple:
def astuple(a: Integer, b: Integer) -> Integertuple:
"""constructs a tuple"""
return (a, b)
@ -347,7 +347,7 @@ def product(a: Container, b: Container) -> FrozenSet:
return frozenset((i, j) for j in b for i in a)
def pair(a: Tuple, b: Tuple) -> TupleTuple:
def pair(a: tuple, b: tuple) -> tupletuple:
"""zipping of two tuples"""
return tuple(zip(a, b))
@ -421,12 +421,12 @@ def mapply(function: Callable, container: ContainerContainer) -> FrozenSet:
return merge(apply(function, container))
def papply(function: Callable, a: Tuple, b: Tuple) -> Tuple:
def papply(function: Callable, a: tuple, b: tuple) -> tuple:
"""apply function on two vectors"""
return tuple(function(i, j) for i, j in zip(a, b))
def mpapply(function: Callable, a: Tuple, b: Tuple) -> Tuple:
def mpapply(function: Callable, a: tuple, b: tuple) -> tuple:
"""apply function on two vectors and merge"""
return merge(papply(function, a, b))
@ -466,7 +466,7 @@ def width(piece: Piece) -> Integer:
return rightmost(piece) - leftmost(piece) + 1
def shape(piece: Piece) -> IntegerTuple:
def shape(piece: Piece) -> Integertuple:
"""height and width of grid or patch"""
return (height(piece), width(piece))
@ -503,27 +503,27 @@ def ofcolor(grid: Grid, value: Integer) -> Indices:
return frozenset((i, j) for i, r in enumerate(grid) for j, v in enumerate(r) if v == value)
def ulcorner(patch: Patch) -> IntegerTuple:
def ulcorner(patch: Patch) -> Integertuple:
"""index of upper left corner"""
return tuple(map(min, zip(*toindices(patch))))
def urcorner(patch: Patch) -> IntegerTuple:
def urcorner(patch: Patch) -> Integertuple:
"""index of upper right corner"""
return tuple(map(lambda ix: {0: min, 1: max}[ix[0]](ix[1]), enumerate(zip(*toindices(patch)))))
def llcorner(patch: Patch) -> IntegerTuple:
def llcorner(patch: Patch) -> Integertuple:
"""index of lower left corner"""
return tuple(map(lambda ix: {0: max, 1: min}[ix[0]](ix[1]), enumerate(zip(*toindices(patch)))))
def lrcorner(patch: Patch) -> IntegerTuple:
def lrcorner(patch: Patch) -> Integertuple:
"""index of lower right corner"""
return tuple(map(max, zip(*toindices(patch))))
def crop(grid: Grid, start: IntegerTuple, dims: IntegerTuple) -> Grid:
def crop(grid: Grid, start: Integertuple, dims: Integertuple) -> Grid:
"""subgrid specified by start and dimension"""
return tuple(r[start[1] : start[1] + dims[1]] for r in grid[start[0] : start[0] + dims[0]])
@ -542,7 +542,7 @@ def recolor(value: Integer, patch: Patch) -> Object:
return frozenset((value, index) for index in toindices(patch))
def shift(patch: Patch, directions: IntegerTuple) -> Patch:
def shift(patch: Patch, directions: Integertuple) -> Patch:
"""shift patch"""
if len(patch) == 0:
return patch
@ -559,19 +559,19 @@ def normalize(patch: Patch) -> Patch:
return shift(patch, (-uppermost(patch), -leftmost(patch)))
def dneighbors(loc: IntegerTuple) -> Indices:
def dneighbors(loc: Integertuple) -> Indices:
"""directly adjacent indices"""
return frozenset({(loc[0] - 1, loc[1]), (loc[0] + 1, loc[1]), (loc[0], loc[1] - 1), (loc[0], loc[1] + 1)})
def ineighbors(loc: IntegerTuple) -> Indices:
def ineighbors(loc: Integertuple) -> Indices:
"""diagonally adjacent indices"""
return frozenset(
{(loc[0] - 1, loc[1] - 1), (loc[0] - 1, loc[1] + 1), (loc[0] + 1, loc[1] - 1), (loc[0] + 1, loc[1] + 1)}
)
def neighbors(loc: IntegerTuple) -> Indices:
def neighbors(loc: Integertuple) -> Indices:
"""adjacent indices"""
return dneighbors(loc) | ineighbors(loc)
@ -690,7 +690,7 @@ def bordering(patch: Patch, grid: Grid) -> Boolean:
)
def centerofmass(patch: Patch) -> IntegerTuple:
def centerofmass(patch: Patch) -> Integertuple:
"""center of mass"""
return tuple(map(lambda x: sum(x) // len(patch), zip(*toindices(patch))))
@ -895,14 +895,14 @@ def subgrid(patch: Patch, grid: Grid) -> Grid:
return crop(grid, ulcorner(patch), shape(patch))
def hsplit(grid: Grid, n: Integer) -> Tuple:
def hsplit(grid: Grid, n: Integer) -> tuple:
"""split grid horizontally"""
h, w = len(grid), len(grid[0]) // n
offset = len(grid[0]) % n != 0
return tuple(crop(grid, (0, w * i + i * offset), (h, w)) for i in range(n))
def vsplit(grid: Grid, n: Integer) -> Tuple:
def vsplit(grid: Grid, n: Integer) -> tuple:
"""split grid vertically"""
h, w = len(grid) // n, len(grid[0])
offset = len(grid) % n != 0
@ -933,12 +933,12 @@ def switch(grid: Grid, a: Integer, b: Integer) -> Grid:
return tuple(tuple(v if (v != a and v != b) else {a: b, b: a}[v] for v in r) for r in grid)
def center(patch: Patch) -> IntegerTuple:
def center(patch: Patch) -> Integertuple:
"""center of the patch"""
return (uppermost(patch) + height(patch) // 2, leftmost(patch) + width(patch) // 2)
def position(a: Patch, b: Patch) -> IntegerTuple:
def position(a: Patch, b: Patch) -> Integertuple:
"""relative position between two patches"""
ia, ja = center(toindices(a))
ib, jb = center(toindices(b))
@ -952,7 +952,7 @@ def position(a: Patch, b: Patch) -> IntegerTuple:
return (-1, 1 if ja < jb else -1)
def index(grid: Grid, loc: IntegerTuple) -> Integer:
def index(grid: Grid, loc: Integertuple) -> Integer:
"""color at location"""
i, j = loc
h, w = len(grid), len(grid[0])
@ -961,7 +961,7 @@ def index(grid: Grid, loc: IntegerTuple) -> Integer:
return grid[loc[0]][loc[1]]
def canvas(value: Integer, dimensions: IntegerTuple) -> Grid:
def canvas(value: Integer, dimensions: Integertuple) -> Grid:
"""grid construction"""
return tuple(tuple(value for j in range(dimensions[1])) for i in range(dimensions[0]))
@ -971,7 +971,7 @@ def corners(patch: Patch) -> Indices:
return frozenset({ulcorner(patch), urcorner(patch), llcorner(patch), lrcorner(patch)})
def connect(a: IntegerTuple, b: IntegerTuple) -> Indices:
def connect(a: Integertuple, b: Integertuple) -> Indices:
"""line between two points"""
ai, aj = a
bi, bj = b
@ -1000,7 +1000,7 @@ def trim(grid: Grid) -> Grid:
return tuple(r[1:-1] for r in grid[1:-1])
def move(grid: Grid, obj: Object, offset: IntegerTuple) -> Grid:
def move(grid: Grid, obj: Object, offset: Integertuple) -> Grid:
"""move object on grid"""
return paint(cover(grid, obj), shift(obj, offset))
@ -1025,12 +1025,12 @@ def righthalf(grid: Grid) -> Grid:
return rot270(bottomhalf(rot90(grid)))
def vfrontier(location: IntegerTuple) -> Indices:
def vfrontier(location: Integertuple) -> Indices:
"""vertical frontier"""
return frozenset((i, location[1]) for i in range(30))
def hfrontier(location: IntegerTuple) -> Indices:
def hfrontier(location: Integertuple) -> Indices:
"""horizontal frontier"""
return frozenset((location[0], j) for j in range(30))
@ -1052,7 +1052,7 @@ def delta(patch: Patch) -> Indices:
return backdrop(patch) - toindices(patch)
def gravitate(source: Patch, destination: Patch) -> IntegerTuple:
def gravitate(source: Patch, destination: Patch) -> Integertuple:
"""direction to move source until adjacent to destination"""
source_i, source_j = center(source)
destination_i, destination_j = center(destination)
@ -1108,7 +1108,7 @@ def box(patch: Patch) -> Indices:
return frozenset(vlines | hlines)
def shoot(start: IntegerTuple, direction: IntegerTuple) -> Indices:
def shoot(start: Integertuple, direction: Integertuple) -> Indices:
"""line from starting point and direction"""
return connect(start, (start[0] + 42 * direction[0], start[1] + 42 * direction[1]))

View file

@ -1,7 +1,7 @@
import random
from dataclasses import dataclass
from decimal import Decimal
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..coaching import AttributeType, BaseCurriculum, RangeAttributeDefinition
from ..factory import ProceduralDataset, register_dataset

View file

@ -1,7 +1,7 @@
import math
from fractions import Fraction
from random import Random
from typing import Any, Dict
from typing import Any
from reasoning_gym.utils import format_number, is_integer

View file

@ -1,6 +1,6 @@
from fractions import Fraction
from random import Random
from typing import Any, Dict
from typing import Any
from reasoning_gym.utils import format_number, is_integer

View file

@ -2,7 +2,7 @@
from dataclasses import dataclass
from random import Random
from typing import Dict, Optional
from typing import Optional
from ..factory import ProceduralDataset, register_dataset

View file

@ -1,7 +1,6 @@
import re
from dataclasses import dataclass
from random import Random
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from ..factory import ProceduralDataset, register_dataset
@ -20,7 +19,7 @@ class NeedleHaystackConfig:
assert self.num_statements < 168387000, f"num_statements must be less than {168387000}"
def generate_unique_triplets(names: List[str], verbs: List[str], subjects: List[str], n: int, rng) -> Dict[str, Any]:
def generate_unique_triplets(names: list[str], verbs: list[str], subjects: list[str], n: int, rng) -> dict[str, Any]:
"""
Generate n unique random triplets (name, verb, subject) without generating the full Cartesian product in memory.
@ -29,14 +28,14 @@ def generate_unique_triplets(names: List[str], verbs: List[str], subjects: List[
randomly chosen as the 'needle'.
Args:
names (List[str]): List of names.
verbs (List[str]): List of verbs.
subjects (List[str]): List of subjects.
names (list[str]): List of names.
verbs (list[str]): List of verbs.
subjects (list[str]): List of subjects.
n (int): Number of unique triplets to generate.
rng (random.Random): A pre-seeded random number generator.
Returns:
Dict[str, Any]: A dictionary with:
dict[str, Any]: A dictionary with:
- "triplets": a list of n unique triplets (tuples of (name, verb, subject)),
- "needle": one triplet randomly chosen from the list.
@ -47,7 +46,7 @@ def generate_unique_triplets(names: List[str], verbs: List[str], subjects: List[
# Use a range for memory efficiency and sample n unique indices.
indices = rng.sample(range(total_possible), n)
triplets: List[Tuple[str, str, str]] = []
triplets: list[tuple[str, str, str]] = []
num_verbs = len(verbs)
num_subjects = len(subjects)
@ -101,12 +100,12 @@ class NeedleHaystackDataset(ProceduralDataset):
"metadata": {"question": question},
}
def score_answer(self, answer: Optional[str], entry: Dict[str, any]) -> float:
def score_answer(self, answer: Optional[str], entry: dict[str, Any]) -> float:
"""Determine if the solution provided solves the task.
Args:
answer (Optional[str]): The user's answer.
entry (Dict[str, any]): The original dataset entry containing the correct answer.
entry (dict[str, Any]): The original dataset entry containing the correct answer.
Returns:
float: The computed score between 0.0 and 1.0.

View file

@ -7,6 +7,7 @@ Game tasks for training reasoning capabilities:
"""
from .countdown import CountdownConfig, CountdownDataset
from .emoji_mystery import EmojiMysteryConfig, EmojiMysteryDataset
from .futoshiki import FutoshikiConfig, FutoshikiDataset
from .knight_swap import KnightSwapConfig, KnightSwapDataset
from .maze import MazeConfig, MazeDataset
@ -21,6 +22,8 @@ from .tsumego import TsumegoConfig, TsumegoDataset
__all__ = [
"CountdownConfig",
"CountdownDataset",
"EmojiMysteryConfig",
"EmojiMysteryDataset",
"FutoshikiConfig",
"FutoshikiDataset",
"MiniSudokuConfig",

View file

@ -0,0 +1,235 @@
import re
from dataclasses import dataclass
from random import Random
from typing import Any, Optional
from ..data import read_data_file
from ..factory import ProceduralDataset, register_dataset
_EMOJIS = [
"😀",
"😃",
"😄",
"😁",
"😆",
"😅",
"🤣",
"😂",
"🙂",
"🙃",
"😉",
"😊",
"😇",
"🥰",
"😍",
"🤩",
"😘",
"😗",
"😚",
"😙",
"🥲",
"😋",
"😛",
"😜",
"🤪",
"😝",
"🤑",
"🤗",
"🤭",
"🤫",
"🤔",
"🤐",
"🤨",
"😐",
"😑",
"😶",
"😏",
"😒",
"🙄",
"😬",
"😮",
"😯",
"😲",
"😳",
"🥺",
"😦",
"😧",
"😨",
"😰",
"😥",
"😢",
"😭",
"😱",
"😖",
"😣",
"😞",
"😓",
"😩",
"😫",
"🥱",
"😤",
"😡",
"😠",
"🤬",
"😈",
"👿",
"💀",
"",
"💩",
"🤡",
"👹",
"👺",
"👻",
"👽",
"👾",
"🤖",
"😺",
"😸",
"😹",
"😻",
"😼",
"😽",
"🙀",
"😿",
"😾",
"🙈",
"🙉",
"🙊",
"💋",
"💌",
"💘",
"💝",
"💖",
"💗",
"💓",
"💞",
"💕",
"💟",
"",
"💔",
"❤️",
"🧡",
"💛",
"💚",
"💙",
"💜",
"🤎",
"🖤",
"🤍",
]
hint_function = """
```python
def variance_selector_to_byte(variation_selector):
variation_selector_codepoint = ord(variation_selector)
if 0xFE00 <= variation_selector_codepoint <= 0xFE0F:
return variation_selector_codepoint - 0xFE00
elif 0xE0100 <= variation_selector_codepoint <= 0xE01EF:
return variation_selector_codepoint - 0xE0100 + 16
else:
return None
def decode(encoded_sentence):
decoded_bytes = []
variation_selectors_part = encoded_sentence[1:]
for char in variation_selectors_part:
byte_val = variance_selector_to_byte(char)
if byte_val is not None:
decoded_bytes.append(byte_val)
return bytes(decoded_bytes).decode('utf-8')
```
"""
QUESTION_TEMPLATE = "\n".join(
[
"The following emoji is encoded with a sentence.",
"Decode the following sentence from the emoji: {sentence}",
"Here is a hint: {hint_function}",
"Return the secret sentence as your final answer.",
]
)
@dataclass
class EmojiMysteryConfig:
"""Configuration for Emoji Mystery task generation"""
size: int = 1000
seed: Optional[int] = None
min_words_in_sentence: int = 3
max_words_in_sentence: int = 35
def validate(self):
assert self.min_words_in_sentence > 0, "min_words_in_sentence must be positive"
assert (
self.max_words_in_sentence >= self.min_words_in_sentence
), "max_words_in_sentence must be >= min_words_in_sentence"
assert self.size > 0, "size must be positive"
class EmojiMysteryDataset(ProceduralDataset):
def __init__(self, config: EmojiMysteryConfig):
super().__init__(config=config, seed=config.seed, size=config.size)
text = read_data_file("in_the_year_2889.txt")
self.emojis = _EMOJIS
self.sentences = [
sentence.strip()
for sentence in re.findall(r"[^.!?]+[.!?]", text)
if self.config.min_words_in_sentence
<= len(re.findall(r"\b\w+\b", sentence))
<= self.config.max_words_in_sentence
]
def __getitem__(self, idx: int) -> dict[str, Any]:
rng = Random(self.seed + idx)
secret_emoji = rng.choice(self.emojis)
secret_sentence = rng.choice(self.sentences).strip().replace("\n", " ")
encoded_sentence = self.encode(secret_sentence, secret_emoji)
question = QUESTION_TEMPLATE.format(sentence=encoded_sentence, hint_function=hint_function)
return {"question": question, "answer": secret_sentence, "metadata": {"emoji": secret_emoji}}
def variance_selector_to_byte(self, variation_selector: str) -> Optional[int]:
variation_selector_codepoint = ord(variation_selector)
if 0xFE00 <= variation_selector_codepoint <= 0xFE0F:
return variation_selector_codepoint - 0xFE00
elif 0xE0100 <= variation_selector_codepoint <= 0xE01EF:
return variation_selector_codepoint - 0xE0100 + 16
def decode(self, encoded_sentence: str) -> str:
decoded_bytes = []
variation_selectors_part = encoded_sentence[1:]
for char in variation_selectors_part:
byte_val = self.variance_selector_to_byte(char)
if byte_val is not None:
decoded_bytes.append(byte_val)
return bytes(decoded_bytes).decode("utf-8")
def byte_to_variance_selector(self, byte: bytes) -> bytes:
if byte < 16:
return chr(0xFE00 + byte)
else:
return chr(0xE0100 + (byte - 16))
def encode(self, sentence: str, base: str) -> str:
encoded_bytes = sentence.encode("utf-8")
return base + "".join(self.byte_to_variance_selector(b) for b in encoded_bytes)
def score_answer(self, answer: str | None, entry: dict[str, Any]) -> int:
reward = 0.0
if answer is not None:
try:
if answer == entry["answer"]:
return 1.0
elif len(answer) == len(entry["answer"]):
score = [1.0 if a == b else 0.0 for a, b in zip(answer, entry["answer"])]
reward = sum(score) / len(score)
else:
reward = 0.01
except:
reward = 0.01
return reward
register_dataset("emoji_mystery", EmojiMysteryDataset, EmojiMysteryConfig)

View file

@ -1,6 +1,6 @@
import random
from random import Random
from typing import Any, Dict
from typing import Any
NUM_OF_PAIRS_GENERATED = 5
@ -65,7 +65,7 @@ def create_numbers_divisible_by_five_or_ten(rng: Random):
return result
def generate_0(rng: Random) -> Dict[str, Any]:
def generate_0(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where input remains unchanged"""
pairs = {}
@ -78,7 +78,7 @@ def generate_0(rng: Random) -> Dict[str, Any]:
return pairs
def generate_1(rng: Random) -> Dict[str, Any]:
def generate_1(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of the third element
after removing all other elements
"""
@ -95,7 +95,7 @@ def generate_1(rng: Random) -> Dict[str, Any]:
return pairs
def generate_2(rng: Random) -> Dict[str, Any]:
def generate_2(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a reversed list of the input"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -108,7 +108,7 @@ def generate_2(rng: Random) -> Dict[str, Any]:
return pairs
def generate_3(rng: Random) -> Dict[str, Any]:
def generate_3(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is the sum of unique elements in the list less than 30"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -127,7 +127,7 @@ def generate_3(rng: Random) -> Dict[str, Any]:
return pairs
def generate_4(rng: Random) -> Dict[str, Any]:
def generate_4(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is the count of elements equal to 5"""
pairs = {}
for i in range(NUM_OF_PAIRS_GENERATED):
@ -151,7 +151,7 @@ def generate_4(rng: Random) -> Dict[str, Any]:
return pairs
def generate_5(rng: Random) -> Dict[str, Any]:
def generate_5(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of elements that are followed by an even number
NOTE: This is suppose to be a relatively hard problem
@ -173,7 +173,7 @@ def generate_5(rng: Random) -> Dict[str, Any]:
return pairs
def generate_6(rng: Random) -> Dict[str, Any]:
def generate_6(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of elements where each element in input is added to its position(Using zero-indexing)"""
pairs = {}
for i in range(NUM_OF_PAIRS_GENERATED):
@ -190,7 +190,7 @@ def generate_6(rng: Random) -> Dict[str, Any]:
return pairs
def generate_7(rng: Random) -> Dict[str, Any]:
def generate_7(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of element whose position is indicated by the last element in the input
EXAMPLE:
@ -213,7 +213,7 @@ def generate_7(rng: Random) -> Dict[str, Any]:
return pairs
def generate_8(rng: Random) -> Dict[str, Any]:
def generate_8(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is count of elements in the input"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -227,7 +227,7 @@ def generate_8(rng: Random) -> Dict[str, Any]:
return pairs
def generate_9(rng: Random) -> Dict[str, Any]:
def generate_9(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is sum total of elements in the input"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -241,7 +241,7 @@ def generate_9(rng: Random) -> Dict[str, Any]:
return pairs
def generate_10(rng: Random) -> Dict[str, Any]:
def generate_10(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of the elements in ascending order"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -255,7 +255,7 @@ def generate_10(rng: Random) -> Dict[str, Any]:
return pairs
def generate_11(rng: Random) -> Dict[str, Any]:
def generate_11(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of the elements in descending order"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -269,7 +269,7 @@ def generate_11(rng: Random) -> Dict[str, Any]:
return pairs
def generate_12(rng: Random) -> Dict[str, Any]:
def generate_12(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a list of the elements where the first and last element in input are replaced by their
successor. Example, for an integer 4, successor is 5
"""
@ -288,7 +288,7 @@ def generate_12(rng: Random) -> Dict[str, Any]:
return pairs
def generate_13(rng: Random) -> Dict[str, Any]:
def generate_13(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is [1] if list of input elements is in ascending order, [0] in descending order"""
pairs = {}
for i in range(NUM_OF_PAIRS_GENERATED):
@ -307,7 +307,7 @@ def generate_13(rng: Random) -> Dict[str, Any]:
return pairs
def generate_14(rng: Random) -> Dict[str, Any]:
def generate_14(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is [1] if input element is divisible by 10, [0] if divisible by 5"""
pairs = {}
@ -327,7 +327,7 @@ def generate_14(rng: Random) -> Dict[str, Any]:
return pairs
def generate_15(rng: Random) -> Dict[str, Any]:
def generate_15(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is a twice the amount of last element in the input"""
pairs = {}
for _ in range(NUM_OF_PAIRS_GENERATED):
@ -348,7 +348,7 @@ def generate_15(rng: Random) -> Dict[str, Any]:
return pairs
def generate_16(rng: Random) -> Dict[str, Any]:
def generate_16(rng: Random) -> dict[str, Any]:
"""Generate input and output pairs where output is built from a function 2x - 4
NOTE: This is suppose to be amazingly hard for the LLM.
"""

View file

@ -1,3 +1,5 @@
from random import Random
import pytest
from reasoning_gym.arc import Arc1DConfig, Arc1DDataset
@ -69,7 +71,7 @@ def test_arc_1d_items():
def test_arc_1d_iteration():
"""Test that iteration respects dataset size"""
config = Arc1DConfig(size=5, seed=42) # Small size for testing
config = Arc1DConfig(size=100, seed=42) # Small size for testing
dataset = Arc1DDataset(config)
# Test manual iteration
@ -105,3 +107,38 @@ def test_arc_1d_scoring():
# Test None answer
assert dataset.score_answer(None, entry) == 0.0
@pytest.mark.parametrize("board_size", [8, 9, 10, 12, 15, 20])
def test_arc_1d_sizes(board_size: int):
config = Arc1DConfig(size=1000, seed=42 + board_size, min_size=board_size, max_size=board_size)
dataset = Arc1DDataset(config)
for entry in dataset:
assert len(entry["metadata"]["test_example"]["input"]) == board_size
assert len(entry["metadata"]["test_example"]["output"]) == board_size
assert dataset.score_answer(entry["answer"], entry) == 1.0
@pytest.mark.parametrize("min_size,max_size", [(8, 10), (9, 13), (10, 12), (12, 20)])
def test_arc_1d_size_ranges(min_size: int, max_size: int):
config = Arc1DConfig(size=1000, seed=42, min_size=min_size, max_size=max_size)
dataset = Arc1DDataset(config)
for entry in dataset:
assert min_size <= len(entry["metadata"]["test_example"]["input"]) <= max_size
assert min_size <= len(entry["metadata"]["test_example"]["output"]) <= max_size
assert dataset.score_answer(entry["answer"], entry) == 1.0
def test_arc_1d_generate_all_tasks():
config = Arc1DConfig(size=100, seed=17, min_size=8, max_size=10)
dataset = Arc1DDataset(config)
tasks = dataset.ARC_1D_TASKS
rng = Random(999)
for task_name, (generator_fn, args) in tasks.items():
for j in range(3):
for i in range(20):
x = generator_fn(rng=rng, size=10, **args)
if x is not None:
break
assert i < 20
print(task_name, j, i, x)

103
tests/test_emoji_mystery.py Normal file
View file

@ -0,0 +1,103 @@
from random import Random
import pytest
from reasoning_gym.games.emoji_mystery import EmojiMysteryConfig, EmojiMysteryDataset
def test_emoji_mystery_config_validation():
"""Test that config validation works"""
config = EmojiMysteryConfig(size=-1)
with pytest.raises(AssertionError):
config.validate()
def test_emoji_mystery_deterministic():
"""Test that dataset generates same items with same seed"""
config = EmojiMysteryConfig(seed=42, size=10)
dataset1 = EmojiMysteryDataset(config)
dataset2 = EmojiMysteryDataset(config)
for i in range(len(dataset1)):
assert dataset1[i] == dataset2[i]
def test_emoji_mystery_items():
"""Test basic properties of generated items"""
config = EmojiMysteryConfig(size=100, seed=42)
dataset = EmojiMysteryDataset(config)
for i in range(len(dataset)):
item = dataset[i]
assert isinstance(item, dict)
assert "question" in item
assert "answer" in item
assert isinstance(item["question"], str)
assert isinstance(item["answer"], str)
def test_emoji_mystery_iteration():
"""Test that iteration respects dataset size"""
config = EmojiMysteryConfig(size=5, seed=42) # Small size for testing
dataset = EmojiMysteryDataset(config)
# Test manual iteration
items = []
for item in dataset:
items.append(item)
assert len(items) == config.size, "Iterator should yield exactly size items"
# Test list conversion
items = list(dataset)
assert len(items) == config.size, "Iterator should yield exactly size items"
# Test multiple iterations
first_items = list(dataset)
second_items = list(dataset)
assert first_items == second_items, "Multiple iterations should yield same items"
def test_emoji_mystery_encoding_decoding():
"""Test the encoding and decoding functionality"""
config = EmojiMysteryConfig()
dataset = EmojiMysteryDataset(config)
# Test with a simple sentence
test_sentence = "Hello, World!"
test_emoji = "😀"
# Test encoding
encoded = dataset.encode(test_sentence, test_emoji)
assert encoded.startswith(test_emoji)
# Test decoding
decoded = dataset.decode(encoded)
assert decoded == test_sentence
# Test with various sentences
test_cases = [
"A simple test.",
"More complex sentence with numbers 123!",
"Special characters: @#$%^&*()",
]
for sentence in test_cases:
encoded = dataset.encode(sentence, test_emoji)
decoded = dataset.decode(encoded)
assert decoded == sentence
def test_emoji_mystery_scoring():
"""Test the scoring functionality"""
config = EmojiMysteryConfig()
dataset = EmojiMysteryDataset(config)
# Test exact match
entry = {"answer": "Test answer"}
assert dataset.score_answer("Test answer", entry) == 1.0
# Test partial match
assert dataset.score_answer("Test answe", entry) == 0.01 # Different length
# Test None answer
assert dataset.score_answer(None, entry) == 0.0