diff --git a/reasoning_gym/combinatorics/combinatorics.py b/reasoning_gym/combinatorics/combinatorics.py index 0345bb59..d2573acf 100644 --- a/reasoning_gym/combinatorics/combinatorics.py +++ b/reasoning_gym/combinatorics/combinatorics.py @@ -1,5 +1,6 @@ import math import random +from collections import Counter from dataclasses import dataclass, field from typing import Any, Optional @@ -8,7 +9,24 @@ from ..factory import ProceduralDataset, register_dataset DATASET_NAME = "combinatorics" -TASK_TYPES = ("ncr", "npr", "permutations_repetition", "inclusion_exclusion", "stars_and_bars", "pigeonhole") +TASK_TYPES = ( + "ncr", + "npr", + "permutations_repetition", + "inclusion_exclusion", + "stars_and_bars", + "pigeonhole", + "multinomial", + "grid_paths", + "constrained_selection", + "circular_permutation", + "geometric_counting", + "dictionary_rank", + "derangement", + "group_division", + "legendres_formula", + "integral_solutions", +) @dataclass @@ -16,7 +34,13 @@ class CombinatoricsConfig: min_n: int = 5 max_n: int = 15 task_types: tuple[str, ...] = TASK_TYPES - task_weights: list[float] = field(default_factory=lambda: [0.2, 0.15, 0.2, 0.2, 0.15, 0.1]) + task_weights: list[float] = field( + default_factory=lambda: [ + 0.08, 0.06, 0.08, 0.08, 0.06, 0.04, + 0.07, 0.07, 0.07, 0.07, 0.07, 0.07, + 0.06, 0.06, 0.06, 0.06, + ] + ) seed: Optional[int] = None size: int = 500 @@ -119,6 +143,219 @@ class CombinatoricsDataset(ProceduralDataset): ) return {"question": question, "answer": str(answer), "task_type": "pigeonhole"} + # --- Advanced Counting Principles --- + + def _make_multinomial(self, rng: random.Random) -> dict: + num_vars = rng.randint(2, 4) + n = rng.randint(self.config.min_n, self.config.max_n) + parts = self._random_partition(rng, n, num_vars) + var_names = ["x", "y", "z", "w"][:num_vars] + + numerator = math.factorial(n) + denominator = 1 + for p in parts: + denominator *= math.factorial(p) + answer = numerator // denominator + + term_strs = [f"{v}^{e}" for v, e in zip(var_names, parts)] + sum_str = " + ".join(var_names) + question = ( + f"What is the coefficient of {' * '.join(term_strs)} in the expansion of " + f"({sum_str})^{n}? Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "multinomial"} + + @staticmethod + def _random_partition(rng: random.Random, n: int, k: int) -> list[int]: + """Generate a random composition of n into k positive parts.""" + if k == 1: + return [n] + cuts = sorted(rng.sample(range(1, n), k - 1)) + parts = [cuts[0]] + [cuts[i] - cuts[i - 1] for i in range(1, len(cuts))] + [n - cuts[-1]] + return parts + + def _make_grid_paths(self, rng: random.Random) -> dict: + m = rng.randint(2, self.config.max_n) + n = rng.randint(2, self.config.max_n) + answer = math.comb(m + n, m) + question = ( + f"How many shortest paths are there from the top-left corner to the bottom-right corner " + f"of a {m} x {n} grid, if you can only move right or down? " + f"Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "grid_paths"} + + def _make_constrained_selection(self, rng: random.Random) -> dict: + total_men = rng.randint(3, max(4, self.config.max_n)) + total_women = rng.randint(3, max(4, self.config.max_n)) + committee_size = rng.randint(3, min(total_men + total_women - 1, 8)) + min_women = rng.randint(1, min(total_women, committee_size - 1)) + + answer = 0 + for w in range(min_women, min(total_women, committee_size) + 1): + men_needed = committee_size - w + if men_needed > total_men: + continue + answer += math.comb(total_women, w) * math.comb(total_men, men_needed) + + question = ( + f"A committee of {committee_size} people is to be formed from {total_men} men and " + f"{total_women} women. If at least {min_women} woman/women must be included, how many " + f"ways can the committee be formed? Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "constrained_selection"} + + # --- Special Permutations & Geometry --- + + def _make_circular_permutation(self, rng: random.Random) -> dict: + n = rng.randint(self.config.min_n, self.config.max_n) + identical_rotations = rng.choice([True, False]) + + if identical_rotations: + answer = math.factorial(n - 1) // 2 + question = ( + f"How many distinct ways can {n} people be seated around a circular table, " + f"where clockwise and counter-clockwise arrangements are considered the same? " + f"Give your answer as a single integer." + ) + else: + answer = math.factorial(n - 1) + question = ( + f"How many distinct ways can {n} people be seated around a circular table? " + f"Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "circular_permutation"} + + def _make_geometric_counting(self, rng: random.Random) -> dict: + sub_type = rng.choice(["triangles", "diagonals"]) + if sub_type == "triangles": + n = rng.randint(max(6, self.config.min_n), max(7, self.config.max_n)) + m = rng.randint(3, n - 3) + answer = math.comb(n, 3) - math.comb(m, 3) + question = ( + f"There are {n} points in a plane, of which {m} are collinear. " + f"How many distinct triangles can be formed using these points as vertices? " + f"Give your answer as a single integer." + ) + else: + n = rng.randint(max(4, self.config.min_n), max(5, self.config.max_n)) + answer = n * (n - 3) // 2 + question = ( + f"How many diagonals does a {n}-sided convex polygon have? " + f"Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "geometric_counting"} + + def _make_dictionary_rank(self, rng: random.Random) -> dict: + length = rng.randint(3, min(6, max(4, self.config.max_n))) + letters = sorted(rng.sample("ABCDEFGHIJKLMNOPQRSTUVWXYZ", length)) + word_letters = letters[:] + rng.shuffle(word_letters) + word = "".join(word_letters) + + rank = 1 + remaining = sorted(word_letters) + for i, ch in enumerate(word): + pos = remaining.index(ch) + rank += pos * math.factorial(len(remaining) - 1) + remaining.pop(pos) + + question = ( + f"If all permutations of the letters {', '.join(sorted(set(word)))} are arranged " + f"in alphabetical (dictionary) order, what is the rank (position) of the word '{word}'? " + f"Give your answer as a single integer." + ) + return {"question": question, "answer": str(rank), "task_type": "dictionary_rank"} + + # --- Distribution & Partitioning --- + + def _make_derangement(self, rng: random.Random) -> dict: + n = rng.randint(self.config.min_n, min(self.config.max_n, 12)) + answer = self._subfactorial(n) + question = ( + f"How many derangements (permutations where no element appears in its original position) " + f"are there of a set of {n} elements? Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "derangement"} + + @staticmethod + def _subfactorial(n: int) -> int: + if n == 0: + return 1 + if n == 1: + return 0 + d_prev2, d_prev1 = 1, 0 + for i in range(2, n + 1): + d_curr = (i - 1) * (d_prev1 + d_prev2) + d_prev2, d_prev1 = d_prev1, d_curr + return d_prev1 + + def _make_group_division(self, rng: random.Random) -> dict: + num_groups = rng.randint(2, 4) + n = rng.randint(max(self.config.min_n, num_groups * 2), max(self.config.min_n + 1, self.config.max_n)) + group_sizes = self._random_partition(rng, n, num_groups) + group_sizes.sort(reverse=True) + + numerator = math.factorial(n) + denominator = 1 + for g in group_sizes: + denominator *= math.factorial(g) + size_counts = Counter(group_sizes) + for cnt in size_counts.values(): + if cnt > 1: + denominator *= math.factorial(cnt) + answer = numerator // denominator + + sizes_str = ", ".join(str(s) for s in group_sizes) + question = ( + f"In how many ways can {n} distinct items be divided into unlabeled groups of sizes " + f"{sizes_str}? Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "group_division"} + + # --- Number Theory in Combinatorics --- + + def _make_legendres_formula(self, rng: random.Random) -> dict: + n = rng.randint(self.config.min_n, self.config.max_n) + primes = [p for p in [2, 3, 5, 7, 11, 13] if p <= n] + if not primes: + primes = [2] + p = rng.choice(primes) + + exponent = 0 + pk = p + while pk <= n: + exponent += n // pk + pk *= p + + question = ( + f"What is the largest power of {p} that divides {n}!? " + f"In other words, find the largest k such that {p}^k divides {n}!. " + f"Give your answer as a single integer (the value of k)." + ) + return {"question": question, "answer": str(exponent), "task_type": "legendres_formula"} + + def _make_integral_solutions(self, rng: random.Random) -> dict: + r = rng.randint(2, 5) + variant = rng.choice(["non_negative", "positive"]) + n = rng.randint(max(self.config.min_n, r), self.config.max_n) + + if variant == "non_negative": + answer = math.comb(n + r - 1, r - 1) + var_list = " + ".join(f"x{i+1}" for i in range(r)) + question = ( + f"How many non-negative integer solutions are there to the equation " + f"{var_list} = {n}? Give your answer as a single integer." + ) + else: + answer = math.comb(n - 1, r - 1) + var_list = " + ".join(f"x{i+1}" for i in range(r)) + question = ( + f"How many positive integer solutions are there to the equation " + f"{var_list} = {n}? Give your answer as a single integer." + ) + return {"question": question, "answer": str(answer), "task_type": "integral_solutions"} + def __getitem__(self, idx: int) -> dict: rng = random.Random(self.seed + idx) task_type = rng.choices(self.config.task_types, weights=self.config.task_weights, k=1)[0] @@ -130,6 +367,16 @@ class CombinatoricsDataset(ProceduralDataset): "inclusion_exclusion": self._make_inclusion_exclusion, "stars_and_bars": self._make_stars_and_bars, "pigeonhole": self._make_pigeonhole, + "multinomial": self._make_multinomial, + "grid_paths": self._make_grid_paths, + "constrained_selection": self._make_constrained_selection, + "circular_permutation": self._make_circular_permutation, + "geometric_counting": self._make_geometric_counting, + "dictionary_rank": self._make_dictionary_rank, + "derangement": self._make_derangement, + "group_division": self._make_group_division, + "legendres_formula": self._make_legendres_formula, + "integral_solutions": self._make_integral_solutions, } result = generators[task_type](rng) return { diff --git a/reasoning_gym/probability/__init__.py b/reasoning_gym/probability/__init__.py index ee3e1c1e..35bf7da9 100644 --- a/reasoning_gym/probability/__init__.py +++ b/reasoning_gym/probability/__init__.py @@ -8,6 +8,11 @@ from .conditional_probability import ( ConditionalProbabilityCurriculum, ConditionalProbabilityDataset, ) +from .probability_problems import ( + ProbabilityProblemsConfig, + ProbabilityProblemsCurriculum, + ProbabilityProblemsDataset, +) __all__ = [ "CoinFlipDataset", @@ -16,4 +21,7 @@ __all__ = [ "ConditionalProbabilityDataset", "ConditionalProbabilityConfig", "ConditionalProbabilityCurriculum", + "ProbabilityProblemsDataset", + "ProbabilityProblemsConfig", + "ProbabilityProblemsCurriculum", ] diff --git a/reasoning_gym/probability/probability_problems.py b/reasoning_gym/probability/probability_problems.py new file mode 100644 index 00000000..0f0e9f8f --- /dev/null +++ b/reasoning_gym/probability/probability_problems.py @@ -0,0 +1,359 @@ +import math +import random +from dataclasses import dataclass, field +from fractions import Fraction +from typing import Any, Optional + +from ..coaching import BaseCurriculum, RangeAttributeDefinition +from ..factory import ProceduralDataset, register_dataset + +DATASET_NAME = "probability_problems" + +TASK_TYPES = ( + "independent_events", + "compound_events", + "total_probability", + "bayes_theorem", + "binomial_probability", + "binomial_stats", + "geometric_series", + "geometric_region", + "expectation_variance", +) + + +@dataclass +class ProbabilityProblemsConfig: + min_n: int = 3 + max_n: int = 10 + task_types: tuple[str, ...] = TASK_TYPES + task_weights: list[float] = field( + default_factory=lambda: [ + 0.12, 0.11, 0.12, 0.12, + 0.11, 0.10, 0.11, 0.10, 0.11, + ] + ) + seed: Optional[int] = None + size: int = 500 + + def validate(self) -> None: + assert self.size > 0, "size must be positive" + assert self.min_n >= 2, "min_n must be >= 2" + assert self.max_n >= self.min_n, "max_n must be >= min_n" + assert len(self.task_types) > 0, "must have at least one task type" + assert all(t in TASK_TYPES for t in self.task_types), "invalid task type" + assert len(self.task_weights) == len(self.task_types), "weights must match types" + + +class ProbabilityProblemsDataset(ProceduralDataset): + def __init__(self, config: ProbabilityProblemsConfig): + super().__init__(config=config, seed=config.seed, size=config.size) + + def _rand_prob(self, rng: random.Random) -> Fraction: + """Generate a random probability as a simple fraction in (0, 1).""" + denom = rng.choice([2, 3, 4, 5, 6, 8, 10]) + numer = rng.randint(1, denom - 1) + return Fraction(numer, denom) + + # --- Section 1: Conditional Probability & Multiplication Theorem --- + + def _make_independent_events(self, rng: random.Random) -> dict: + pa = self._rand_prob(rng) + pb = self._rand_prob(rng) + variant = rng.choice(["intersection", "union", "neither"]) + + if variant == "intersection": + answer = pa * pb + question = ( + f"Events A and B are independent with P(A) = {pa} and P(B) = {pb}. " + f"What is P(A and B)? Give your answer as a simplified fraction." + ) + elif variant == "union": + answer = pa + pb - pa * pb + question = ( + f"Events A and B are independent with P(A) = {pa} and P(B) = {pb}. " + f"What is P(A or B)? Give your answer as a simplified fraction." + ) + else: + answer = (1 - pa) * (1 - pb) + question = ( + f"Events A and B are independent with P(A) = {pa} and P(B) = {pb}. " + f"What is the probability that neither A nor B occurs? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(answer), "task_type": "independent_events"} + + def _make_compound_events(self, rng: random.Random) -> dict: + total = rng.randint(max(4, self.config.min_n), max(4, self.config.max_n)) + color_a_count = rng.randint(2, total - 2) + color_b_count = total - color_a_count + + colors = ["red", "blue", "green", "white", "black"] + color_a = rng.choice(colors) + color_b = rng.choice([c for c in colors if c != color_a]) + + seq = rng.choice(["ab", "ba"]) + if seq == "ab": + prob = Fraction(color_a_count, total) * Fraction(color_b_count, total - 1) + seq_desc = f"the first is {color_a} and the second is {color_b}" + else: + prob = Fraction(color_b_count, total) * Fraction(color_a_count, total - 1) + seq_desc = f"the first is {color_b} and the second is {color_a}" + + question = ( + f"A bag contains {color_a_count} {color_a} balls and {color_b_count} {color_b} balls. " + f"You draw 2 balls one after another without replacement. " + f"What is the probability that {seq_desc}? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(prob), "task_type": "compound_events"} + + # --- Section 2: Total Probability & Bayes' Theorem --- + + def _make_total_probability(self, rng: random.Random) -> dict: + num_bags = rng.randint(2, 3) + bags = [] + for _ in range(num_bags): + red = rng.randint(1, self.config.max_n) + blue = rng.randint(1, self.config.max_n) + bags.append((red, blue)) + + p_bag = Fraction(1, num_bags) + p_red = Fraction(0) + for red, blue in bags: + p_red += p_bag * Fraction(red, red + blue) + + bag_desc = ". ".join( + f"Bag {i + 1} contains {r} red and {b} blue balls" + for i, (r, b) in enumerate(bags) + ) + question = ( + f"{bag_desc}. " + f"One bag is chosen uniformly at random and a ball is drawn from it. " + f"What is the probability the ball is red? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(p_red), "task_type": "total_probability"} + + def _make_bayes_theorem(self, rng: random.Random) -> dict: + num_bags = rng.randint(2, 3) + bags = [] + for _ in range(num_bags): + red = rng.randint(1, self.config.max_n) + blue = rng.randint(1, self.config.max_n) + bags.append((red, blue)) + + target_bag = rng.randint(0, num_bags - 1) + + p_bag = Fraction(1, num_bags) + p_red = Fraction(0) + for red, blue in bags: + p_red += p_bag * Fraction(red, red + blue) + + red_t, blue_t = bags[target_bag] + p_red_given_target = Fraction(red_t, red_t + blue_t) + p_target_given_red = (p_bag * p_red_given_target) / p_red + + bag_desc = ". ".join( + f"Bag {i + 1} contains {r} red and {b} blue balls" + for i, (r, b) in enumerate(bags) + ) + question = ( + f"{bag_desc}. " + f"One bag is chosen uniformly at random and a ball is drawn. The ball is red. " + f"What is the probability that it came from Bag {target_bag + 1}? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(p_target_given_red), "task_type": "bayes_theorem"} + + # --- Section 3: Probability Distributions --- + + def _make_binomial_probability(self, rng: random.Random) -> dict: + p_choices = [ + Fraction(1, 6), Fraction(1, 4), Fraction(1, 3), + Fraction(1, 2), Fraction(2, 3), Fraction(3, 4), + ] + p = rng.choice(p_choices) + q = 1 - p + n = rng.randint(self.config.min_n, min(self.config.max_n, 8)) + r = rng.randint(0, n) + + prob = Fraction(math.comb(n, r)) * (p ** r) * (q ** (n - r)) + question = ( + f"A biased coin has a probability of heads equal to {p}. " + f"If it is flipped {n} times, what is the probability of getting exactly {r} heads? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(prob), "task_type": "binomial_probability"} + + def _make_binomial_stats(self, rng: random.Random) -> dict: + p_choices = [ + Fraction(1, 6), Fraction(1, 4), Fraction(1, 3), + Fraction(1, 2), Fraction(2, 3), Fraction(3, 4), + ] + p = rng.choice(p_choices) + q = 1 - p + n = rng.randint(self.config.min_n, self.config.max_n) + variant = rng.choice(["mean", "variance"]) + + if variant == "mean": + answer = Fraction(n) * p + question = ( + f"A random variable X follows a binomial distribution with {n} trials " + f"and success probability {p}. What is E(X), the expected value? " + f"Give your answer as a simplified fraction or integer." + ) + else: + answer = Fraction(n) * p * q + question = ( + f"A random variable X follows a binomial distribution with {n} trials " + f"and success probability {p}. What is Var(X), the variance? " + f"Give your answer as a simplified fraction or integer." + ) + return {"question": question, "answer": str(answer), "task_type": "binomial_stats"} + + # --- Section 4: Geometric Probability --- + + def _make_geometric_series(self, rng: random.Random) -> dict: + p_choices = [ + Fraction(1, 6), Fraction(1, 5), Fraction(1, 4), + Fraction(1, 3), Fraction(1, 2), + ] + p = rng.choice(p_choices) + q = rng.choice(p_choices) + + # A and B alternate; A goes first. + # P(A wins) = p / (1 - (1-p)(1-q)) via infinite geometric series + answer = p / (1 - (1 - p) * (1 - q)) + + name_a = rng.choice(["Alice", "Arun", "Alex"]) + name_b = rng.choice(["Bob", "Bala", "Beth"]) + + question = ( + f"{name_a} and {name_b} play a game where they take alternate turns, " + f"with {name_a} going first. On each of her turns, {name_a} has a probability " + f"of {p} of winning the game. If {name_a} does not win, {name_b} then has a " + f"probability of {q} of winning on his turn. If neither wins, the process " + f"repeats. What is the probability that {name_a} wins the game? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(answer), "task_type": "geometric_series"} + + def _make_geometric_region(self, rng: random.Random) -> dict: + a = rng.randint(max(2, self.config.min_n), self.config.max_n) + variant = rng.choice(["leq", "geq"]) + + if variant == "leq": + c = rng.randint(1, a) + answer = Fraction(c * c, 2 * a * a) + question = ( + f"Two numbers x and y are each chosen uniformly at random from [0, {a}]. " + f"What is the probability that x + y <= {c}? " + f"Give your answer as a simplified fraction." + ) + else: + c = rng.randint(a + 1, 2 * a - 1) + side = 2 * a - c + answer = Fraction(side * side, 2 * a * a) + question = ( + f"Two numbers x and y are each chosen uniformly at random from [0, {a}]. " + f"What is the probability that x + y >= {c}? " + f"Give your answer as a simplified fraction." + ) + return {"question": question, "answer": str(answer), "task_type": "geometric_region"} + + # --- Section 5: Random Variables & Expectation --- + + def _make_expectation_variance(self, rng: random.Random) -> dict: + k = rng.randint(3, 5) + outcomes = sorted(rng.sample(range(1, 11), k)) + weights = [rng.randint(1, 10) for _ in range(k)] + total_weight = sum(weights) + probs = [Fraction(w, total_weight) for w in weights] + + variant = rng.choice(["expectation", "variance"]) + + ex = sum(Fraction(x) * p for x, p in zip(outcomes, probs)) + + if variant == "expectation": + answer = ex + stat_name = "E(X), the expected value" + else: + ex2 = sum(Fraction(x * x) * p for x, p in zip(outcomes, probs)) + answer = ex2 - ex * ex + stat_name = "Var(X), the variance" + + table_lines = " | ".join(f"P(X={x}) = {p}" for x, p in zip(outcomes, probs)) + question = ( + f"A discrete random variable X has the following probability distribution: " + f"{table_lines}. " + f"What is {stat_name}? " + f"Give your answer as a simplified fraction or integer." + ) + return {"question": question, "answer": str(answer), "task_type": "expectation_variance"} + + def __getitem__(self, idx: int) -> dict: + rng = random.Random(self.seed + idx) + task_type = rng.choices(self.config.task_types, weights=self.config.task_weights, k=1)[0] + + generators = { + "independent_events": self._make_independent_events, + "compound_events": self._make_compound_events, + "total_probability": self._make_total_probability, + "bayes_theorem": self._make_bayes_theorem, + "binomial_probability": self._make_binomial_probability, + "binomial_stats": self._make_binomial_stats, + "geometric_series": self._make_geometric_series, + "geometric_region": self._make_geometric_region, + "expectation_variance": self._make_expectation_variance, + } + result = generators[task_type](rng) + return { + "question": result["question"], + "answer": result["answer"], + "metadata": { + "source_dataset": DATASET_NAME, + "source_index": idx, + "task_type": result["task_type"], + "difficulty": {"min_n": self.config.min_n, "max_n": self.config.max_n}, + }, + } + + def score_answer(self, answer: Optional[str], entry: dict[str, Any]) -> float: + if answer is None: + return 0.0 + oracle = entry["answer"] + if answer.strip() == oracle.strip(): + return 1.0 + try: + ans_frac = Fraction(answer.strip()) + oracle_frac = Fraction(oracle.strip()) + if ans_frac == oracle_frac: + return 1.0 + diff = abs(float(ans_frac) - float(oracle_frac)) + if diff < 1e-4: + return 0.9 + if diff < 1e-2: + return 0.5 + return 0.0 + except (ValueError, ZeroDivisionError): + return 0.0 + + +class ProbabilityProblemsCurriculum(BaseCurriculum): + def __init__(self): + super().__init__(ProbabilityProblemsCurriculum.__name__, ProbabilityProblemsConfig) + self._define_attributes( + RangeAttributeDefinition( + name="n_range", + levels=[3, 5, 10, 15, 20], + lower_field_name="min_n", + upper_field_name="max_n", + description="Range for n in probability problems", + ), + ) + + +register_dataset( + DATASET_NAME, ProbabilityProblemsDataset, ProbabilityProblemsConfig, ProbabilityProblemsCurriculum +) diff --git a/tests/test_combinatorics.py b/tests/test_combinatorics.py index b4410921..8af9c03f 100644 --- a/tests/test_combinatorics.py +++ b/tests/test_combinatorics.py @@ -1,6 +1,13 @@ +import math + import pytest -from reasoning_gym.combinatorics.combinatorics import CombinatoricsConfig, CombinatoricsCurriculum, CombinatoricsDataset +from reasoning_gym.combinatorics.combinatorics import ( + TASK_TYPES, + CombinatoricsConfig, + CombinatoricsCurriculum, + CombinatoricsDataset, +) def test_config_validation(): @@ -62,9 +69,147 @@ def test_curriculum(): def test_task_types(): - for task_type in ("ncr", "npr", "permutations_repetition", "inclusion_exclusion", "stars_and_bars", "pigeonhole"): + for task_type in TASK_TYPES: config = CombinatoricsConfig(seed=42, size=10, task_types=(task_type,), task_weights=[1.0]) ds = CombinatoricsDataset(config) for i in range(len(ds)): item = ds[i] assert item["metadata"]["task_type"] == task_type + assert item["answer"].lstrip("-").isdigit() + + +# --- Targeted tests for new task types --- + + +def test_multinomial_known_values(): + config = CombinatoricsConfig(seed=100, size=20, task_types=("multinomial",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) > 0 + assert "coefficient" in item["question"].lower() + + +def test_grid_paths_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("grid_paths",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) >= 1 + assert "grid" in item["question"].lower() + + +def test_constrained_selection_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("constrained_selection",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) >= 1 + assert "committee" in item["question"].lower() + + +def test_circular_permutation_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("circular_permutation",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) >= 1 + assert "circular" in item["question"].lower() + + +def test_geometric_counting_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("geometric_counting",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + ans = int(item["answer"]) + assert ans >= 0 + + +def test_dictionary_rank_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("dictionary_rank",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + rank = int(item["answer"]) + assert rank >= 1 + + +def test_dictionary_rank_manual(): + """Verify the rank algorithm against a known example: 'BAC' from {A,B,C} has rank 3.""" + dataset = CombinatoricsDataset.__new__(CombinatoricsDataset) + + remaining = sorted("BAC") # ['A', 'B', 'C'] + word = "BAC" + rank = 1 + for ch in word: + pos = remaining.index(ch) + rank += pos * math.factorial(len(remaining) - 1) + remaining.pop(pos) + assert rank == 3 # ABC=1, ACB=2, BAC=3 + + +def test_derangement_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("derangement",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + known = {2: 1, 3: 2, 4: 9, 5: 44, 6: 265, 7: 1854, 8: 14833, 9: 133496, 10: 1334961} + for i in range(len(ds)): + item = ds[i] + ans = int(item["answer"]) + assert ans >= 0 + q = item["question"] + for n_val, d_val in known.items(): + if f"set of {n_val} elements" in q: + assert ans == d_val, f"D({n_val}) should be {d_val}, got {ans}" + + +def test_group_division_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("group_division",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) >= 1 + + +def test_legendres_formula_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("legendres_formula",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) >= 0 + + +def test_legendres_formula_manual(): + """Power of 2 in 10! = floor(10/2) + floor(10/4) + floor(10/8) = 5+2+1 = 8.""" + config = CombinatoricsConfig(seed=0, size=50, task_types=("legendres_formula",), task_weights=[1.0], min_n=10, max_n=10) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + q = item["question"] + if "power of 2" in q and "10!" in q: + assert item["answer"] == "8", f"Expected 8, got {item['answer']}" + break + + +def test_integral_solutions_known_values(): + config = CombinatoricsConfig(seed=42, size=20, task_types=("integral_solutions",), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert int(item["answer"]) >= 1 + + +def test_all_new_types_score_oracle(): + """Oracle answers should all score 1.0.""" + new_types = ( + "multinomial", "grid_paths", "constrained_selection", "circular_permutation", + "geometric_counting", "dictionary_rank", "derangement", "group_division", + "legendres_formula", "integral_solutions", + ) + for tt in new_types: + config = CombinatoricsConfig(seed=42, size=10, task_types=(tt,), task_weights=[1.0]) + ds = CombinatoricsDataset(config) + for i in range(len(ds)): + item = ds[i] + score = ds.score_answer(item["answer"], item) + assert score == 1.0, f"{tt} item {i}: oracle scored {score}" diff --git a/tests/test_probability_problems.py b/tests/test_probability_problems.py new file mode 100644 index 00000000..c9b0d476 --- /dev/null +++ b/tests/test_probability_problems.py @@ -0,0 +1,215 @@ +from fractions import Fraction + +import pytest + +from reasoning_gym.probability.probability_problems import ( + TASK_TYPES, + ProbabilityProblemsConfig, + ProbabilityProblemsCurriculum, + ProbabilityProblemsDataset, +) + + +def test_config_validation(): + with pytest.raises(AssertionError): + config = ProbabilityProblemsConfig(min_n=1) + config.validate() + + with pytest.raises(AssertionError): + config = ProbabilityProblemsConfig(min_n=10, max_n=5) + config.validate() + + with pytest.raises(AssertionError): + config = ProbabilityProblemsConfig(size=0) + config.validate() + + +def test_deterministic(): + config = ProbabilityProblemsConfig(seed=42, size=10) + ds1 = ProbabilityProblemsDataset(config) + ds2 = ProbabilityProblemsDataset(config) + for i in range(len(ds1)): + assert ds1[i] == ds2[i] + + +def test_item_structure(): + config = ProbabilityProblemsConfig(seed=42, size=50) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert isinstance(item, dict) + assert "question" in item + assert "answer" in item + assert "metadata" in item + assert item["metadata"]["source_dataset"] == "probability_problems" + + +def test_answer_is_valid_fraction(): + config = ProbabilityProblemsConfig(seed=42, size=100) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert frac.denominator > 0 + + +def test_score_oracle(): + config = ProbabilityProblemsConfig(seed=42, size=50) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + score = ds.score_answer(item["answer"], item) + assert score == 1.0, f"Item {i}: oracle scored {score}" + + +def test_score_none(): + config = ProbabilityProblemsConfig(seed=42, size=10) + ds = ProbabilityProblemsDataset(config) + item = ds[0] + assert ds.score_answer(None, item) == 0.0 + + +def test_score_wrong_answer(): + config = ProbabilityProblemsConfig(seed=42, size=10) + ds = ProbabilityProblemsDataset(config) + item = ds[0] + assert ds.score_answer("not a fraction", item) == 0.0 + + +def test_score_equivalent_fraction(): + config = ProbabilityProblemsConfig( + seed=42, size=10, task_types=("independent_events",), task_weights=[1.0] + ) + ds = ProbabilityProblemsDataset(config) + item = ds[0] + oracle_frac = Fraction(item["answer"]) + unsimplified = f"{oracle_frac.numerator * 3}/{oracle_frac.denominator * 3}" + score = ds.score_answer(unsimplified, item) + assert score == 1.0 + + +def test_curriculum(): + curriculum = ProbabilityProblemsCurriculum() + base_value = {"size": 50, "seed": 1} + base_cfg = curriculum.generate_configuration(base_value) + assert base_cfg.seed == 1 + + curriculum.increment_attr_level("n_range") + increased_cfg = curriculum.generate_configuration(base_value) + assert increased_cfg.max_n >= base_cfg.max_n + + +def test_task_types(): + for task_type in TASK_TYPES: + config = ProbabilityProblemsConfig(seed=42, size=10, task_types=(task_type,), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + assert item["metadata"]["task_type"] == task_type + score = ds.score_answer(item["answer"], item) + assert score == 1.0, f"Task {task_type}, item {i}: oracle scored {score}" + + +# --- Targeted tests for individual task types --- + + +def test_independent_events_math(): + config = ProbabilityProblemsConfig(seed=100, size=30, task_types=("independent_events",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac <= 1 + + +def test_compound_events_math(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("compound_events",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac < 1 + + +def test_total_probability_in_range(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("total_probability",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac < 1, f"Item {i}: P(red) = {frac} not in (0,1)" + + +def test_bayes_theorem_in_range(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("bayes_theorem",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac <= 1, f"Item {i}: P(Bag|red) = {frac} not in (0,1]" + + +def test_binomial_probability_in_range(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("binomial_probability",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac <= 1 + + +def test_binomial_stats_positive(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("binomial_stats",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert frac > 0 + + +def test_geometric_series_in_range(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("geometric_series",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac < 1, f"Item {i}: P(A wins) = {frac} not in (0,1)" + + +def test_geometric_series_manual(): + """With p=q=1/2: P(A wins) = (1/2)/(1 - 1/4) = 2/3.""" + config = ProbabilityProblemsConfig(seed=0, size=50, task_types=("geometric_series",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + if "1/2" in item["question"]: + q = item["question"] + if q.count("1/2") >= 2: + assert item["answer"] == "2/3", f"With p=q=1/2, expected 2/3, got {item['answer']}" + break + + +def test_geometric_region_in_range(): + config = ProbabilityProblemsConfig(seed=42, size=20, task_types=("geometric_region",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + assert 0 < frac <= Fraction(1, 2), f"Item {i}: region prob = {frac}" + + +def test_expectation_variance_types(): + config = ProbabilityProblemsConfig(seed=42, size=30, task_types=("expectation_variance",), task_weights=[1.0]) + ds = ProbabilityProblemsDataset(config) + seen_exp = False + seen_var = False + for i in range(len(ds)): + item = ds[i] + frac = Fraction(item["answer"]) + if "E(X)" in item["question"]: + assert frac > 0 + seen_exp = True + if "Var(X)" in item["question"]: + assert frac >= 0 + seen_var = True + assert seen_exp and seen_var, "Should generate both expectation and variance problems"