Merge branch 'main' of https://github.com/open-thought/reasoning-gym into env/matrix-manipulation

This commit is contained in:
Zafir Stojanovski 2025-02-10 20:40:41 +01:00
commit 696fdf8be7
16 changed files with 813 additions and 20 deletions

View file

@ -7,6 +7,7 @@ Algorithmic tasks for training reasoning capabilities:
"""
from .base_conversion import BaseConversionConfig, BaseConversionDataset
from .binary_matrix import BinaryMatrixConfig, BinaryMatrixDataset
from .caesar_cipher import CaesarCipherConfig, CaesarCipherDataset
from .group_anagrams import GroupAnagramsConfig, GroupAnagramsDataset
from .isomorphic_strings import IsomorphicStringsConfig, IsomorphicStringsDataset
@ -63,4 +64,6 @@ __all__ = [
"RotateMatrixDataset",
"ManipulateMatrixConfig",
"ManipulateMatrixDataset",
"BinaryMatrixConfig",
"BinaryMatrixDataset",
]

View file

@ -0,0 +1,125 @@
"""Find the distance to the nearest 0 for each cell in a binary matrix.
A popular Leetcode problem:
https://leetcode.com/problems/01-matrix/description/
"""
from collections import deque
from dataclasses import dataclass
from random import Random
from typing import Optional
from ..factory import ProceduralDataset, register_dataset
QUESTION_TEMPLATE = """Given a square matrix, your job is to find the taxicab distance of the nearest 0 for each cell.
Example:
Input: Find the distance to the nearest 0 for each cell in the matrix below:
0 0 0
0 1 0
1 1 1
Output:
0 0 0
0 1 0
1 2 1
Find the distance to the nearest 0 for each cell in the matrix below:
{matrix}
"""
@dataclass
class BinaryMatrixConfig:
"""Configuration for Binary Matrix dataset generation"""
max_n: int = 10 # Maximum size of the matrix
p_zero: float = 0.25 # Probability of a cell being 0
size: int = 500 # Virtual dataset size
seed: Optional[int] = None
def validate(self):
"""Validate configuration parameters"""
assert 1 <= self.max_n, "max_n must be at least 1"
assert 0 < self.p_zero <= 1, "p_zero must be between 0 and 1"
class BinaryMatrixDataset(ProceduralDataset):
"""Generates Binary Matrix exercises with configurable difficulty"""
def __init__(self, config: BinaryMatrixConfig):
super().__init__(config=config, seed=config.seed, size=config.size)
def _get_binary_matrix(self, rng: Random) -> list[list[int]]:
"""Generate a random binary matrix"""
n = rng.randint(1, self.config.max_n)
# Ensure at least one 0 in the matrix, so that a solution exists
numbers = [0] + [0 if rng.random() < self.config.p_zero else 1 for _ in range(n**2 - 1)]
rng.shuffle(numbers)
matrix = [numbers[i * n : (i + 1) * n] for i in range(n)]
return matrix
def _get_distances(self, matrix: list[list[int]]) -> list[list[int]]:
"""Get the distance to the nearest 0 for each cell in the matrix"""
n = len(matrix)
directions = [[1, 0], [-1, 0], [0, 1], [0, -1]]
visited = set()
queue = deque()
output = [[float("inf")] * n for _ in range(n)]
for r in range(n):
for c in range(n):
if matrix[r][c] == 0:
output[r][c] = 0
visited.add((r, c))
queue.append((r, c))
clock = 1
while True:
temp = deque()
while queue:
r, c = queue.popleft()
for dr, dc in directions:
new_r, new_c = r + dr, c + dc
if (
0 <= new_r < n
and 0 <= new_c < n
and (new_r, new_c) not in visited
and matrix[new_r][new_c] == 1
):
output[new_r][new_c] = clock
visited.add((new_r, new_c))
temp.append((new_r, new_c))
if temp:
queue = temp
else:
break
clock += 1
return output
def _matrix_to_str(self, matrix: list[list[int]]) -> str:
"""Get a string representation of the matrix"""
return "\n".join(" ".join(str(x) for x in row) for row in matrix)
def __getitem__(self, idx: int) -> dict:
"""Generate a single Binary Matrix question"""
rng = Random(self.seed + idx)
matrix = self._get_binary_matrix(rng)
matrix_str = self._matrix_to_str(matrix)
answer = self._get_distances(matrix)
answer_str = self._matrix_to_str(answer)
return {
"question": QUESTION_TEMPLATE.format(matrix=matrix_str),
"answer": answer_str,
"metadata": {"matrix": matrix, "solution": answer},
}
register_dataset("binary_matrix", BinaryMatrixDataset, BinaryMatrixConfig)

View file

@ -83,7 +83,7 @@ class RotateMatrixDataset(ProceduralDataset):
return "\n".join(" ".join(str(x) for x in row) for row in matrix)
def __getitem__(self, idx: int) -> dict:
"""Generate a single Spiral Matrix question"""
"""Generate a single Rotate Matrix question"""
rng = Random(self.seed + idx)
matrix = self._get_matrix(rng)

View file

@ -5,8 +5,7 @@ from dataclasses import dataclass
from random import Random
from typing import Dict, List, Optional, Set, Tuple
from reasoning_gym.data import read_data_file
from ..data import get_data_file_path
from ..factory import ProceduralDataset, register_dataset
@ -64,6 +63,7 @@ class WordLadderDataset(ProceduralDataset):
self.config = config
self.word_sets = {}
self.word_graphs = {}
self._vocabulary = None # A large list of dictionary words to validate words against
# Load words from CSV
self.word_sets = self._load_words_from_csv(
@ -84,28 +84,24 @@ class WordLadderDataset(ProceduralDataset):
assert 3 <= min_length <= max_length <= 5, "Word length must be between 3 and 5 inclusive"
import csv
from io import StringIO
word_sets = {}
try:
# Get CSV content as string
csv_content = read_data_file("words.csv")
with get_data_file_path("words.csv").open("r", encoding="utf-8") as csv_file:
reader = csv.DictReader(csv_file)
# Use StringIO to create a file-like object from the string
csv_file = StringIO(csv_content)
reader = csv.DictReader(csv_file)
for row in reader:
# Process each word length column using config range
for length in range(min_length, max_length + 1):
col_name = f"{length}_letter"
word = row.get(col_name, "")
for row in reader:
# Process each word length column using config range
for length in range(min_length, max_length + 1):
col_name = f"{length}_letter"
word = row.get(col_name, "")
if not word: # Skip empty entries
continue
if not word: # Skip empty entries
continue
word_sets.setdefault(length, set()).add(word.upper())
word_sets.setdefault(length, set()).add(word.upper())
except Exception as e:
raise RuntimeError(f"Error processing words.csv content: {e}") from e
@ -220,5 +216,43 @@ class WordLadderDataset(ProceduralDataset):
"metadata": {"start_word": start, "end_word": end, "word_length": length, "chain_length": len(path)},
}
def score_answer(self, answer: Optional[str], entry: Dict[str, any]) -> float:
if answer is None:
return 0
answer_words = tuple(s.strip() for s in answer.upper().split(","))
metadata = entry["metadata"]
start_word = metadata["start_word"]
end_word = metadata["end_word"]
word_length = len(end_word)
known_words = self.word_sets[word_length]
# Check conditions:
# 1. start and end word match question
# 2. all words have the correct length
# 3. every changed word is a single letter change from the previous word
# 4. all words are in our vocabulary
if len(answer_words) < 2:
return 0
if answer_words[0] != start_word or answer_words[-1] != end_word:
return 0.01
if not all(len(w) == word_length for w in answer_words):
return 0.01
for i in range(1, len(answer_words)):
if sum(1 for a, b in zip(answer_words[i - 1], answer_words[i]) if a != b) != 1:
return 0.01
reward = 1.0
for word in answer_words:
if not word in known_words:
reward *= 0.5
return reward
register_dataset("word_ladder", WordLadderDataset, WordLadderConfig)