diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/check_clues.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/check_clues.py deleted file mode 100644 index 407cfb23..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/check_clues.py +++ /dev/null @@ -1,57 +0,0 @@ -import itertools -from collections import defaultdict - -import sat_utils -import solver as solver -from z3 import * - - -def check_singe_clues(puzzle_idx, answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells): - solved_cell = defaultdict(int) - valid_solutions = defaultdict(int) - print("check single clues") - for idx in idx2clue: - if idx not in used_clue: - new_clues = running_clues + idx2clue[idx].as_cnf() - numbered_cnf, num2var = sat_utils.translate(new_clues) - single_solver = solver.my_solver(puzzle_idx, answer_header, answer_value, numbered_cnf, num2var) - cell_info = single_solver.check_cell_difficulty() - num_cell = 0 - for cell in cell_info: - if cell not in all_cells: - num_cell += 1 - solved_cell[idx] = num_cell - del single_solver - idx = sorted(solved_cell.items(), key=lambda x: x[1], reverse=True)[0][0] - if solved_cell[idx] != 0: - return [idx] - else: - return [1000] - - -def check(puzzle_idx, answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells): - idx = check_singe_clues(puzzle_idx, answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells) - if idx[0] < 1000: - return idx - else: - print("check multiple clues") - solved_cell = defaultdict(int) - all_left_clues_idx = [i for i in idx2clue if i not in used_clue] - print(all_left_clues_idx) - for n in range(2, 10): - combinations = itertools.combinations(all_left_clues_idx, n) - for comb in combinations: - new_clues = [clues for clues in running_clues] - for comb_idx in comb: - new_clues += idx2clue[comb_idx].as_cnf() - numbered_cnf, num2var = sat_utils.translate(new_clues) - single_solver = solver.my_solver(puzzle_idx, answer_header, answer_value, numbered_cnf, num2var) - cell_info = single_solver.check_cell_difficulty() - num_cell = 0 - for cell in cell_info: - if cell not in all_cells: - num_cell += 1 - solved_cell[comb] = num_cell - del single_solver - if num_cell != 0: - return comb diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/graph_literals.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/graph_literals.py deleted file mode 100644 index ef90dc4b..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/graph_literals.py +++ /dev/null @@ -1,32 +0,0 @@ -def print_clue(clue_idxs, idx2clue, new_cell, first=False): - if len(clue_idxs) == 1: - # print(type(idx2clue[clue_idxs[0]])) - # if type(idx2clue[clue_idxs[0]]) == clues.found_at: - return single_find_at(idx2clue[clue_idxs[0]], new_cell, first) - else: - if first: - result = "First combining clues: " - else: - result = "Then combining clues: " - for current_idx in clue_idxs: - result += "<{}>".format(idx2clue[current_idx]) - result += " Unique Values Rules and the fixed table structure. We know that " - for cell in new_cell: - result += cell - result += ". " - return result - - -def single_find_at(clue, new_cell, first=False): - if first: - result = "First applying clue: " - else: - result = "Then applying clue: " - if len(new_cell) > 1: - result += "<{}> and Unique Values We know that ".format(clue) - else: - result += "<{}> We know that ".format(clue) - for cell in new_cell: - result += cell - result += ". " - return result diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/main.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/main.py deleted file mode 100644 index 763a31ab..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/main.py +++ /dev/null @@ -1,49 +0,0 @@ -import argparse -import json -import pickle - -import sat_utils -import solver -from tqdm import tqdm -from z3 import * - - -def solve_logic_grid_puzzle(inputfile, ground_truth): - answers = [json.loads(answer) for answer in list(open(ground_truth, "r"))][0] - cell_difficulty = {} - with open(inputfile, "rb") as f: - puzzles = pickle.load(f) - for i in tqdm(range(len(puzzles[:]))): - d = puzzles[i] - assert d["idx"] == answers[i]["idx"] - answer_header = answers[i]["solution"]["table_header"] - answer_value = answers[i]["solution"]["table_rows"] - # read cnf form - symbolic_cnf = d["puzzle"].as_cnf() - numbered_cnf, num2var = sat_utils.translate(symbolic_cnf) - - single_solver = solver.my_solver(d["idx"], answer_header, answer_value, numbered_cnf, num2var) - solution, unique = single_solver.check_solution() - if unique: - for row_num in range(len(solution)): - for column_num in range(len(solution[row_num])): - assert row_num == solution[row_num][column_num] - difficulty = single_solver.check_problem_difficulty() - cell_difficulty[d["idx"]] = difficulty - - with open("./logic_grid_puzzles.test.difficulty.pkl", "wb") as outputfile: - pickle.dump(cell_difficulty, outputfile) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--input_data", type=str, default="../logic_grid_puzzles.test.pkl") - parser.add_argument("--ground_truth", type=str, default="../logic_grid_puzzles.test.json") - args = parser.parse_args() - - solve_logic_grid_puzzle(args.input_data, args.ground_truth) - return 1 - - -if __name__ == "__main__": - main() diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/puzzle_analysis.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/puzzle_analysis.py deleted file mode 100644 index c09a50ef..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/puzzle_analysis.py +++ /dev/null @@ -1,190 +0,0 @@ -import argparse -import json -import os -from copy import deepcopy -from pathlib import Path -from typing import Sequence - -from tqdm import tqdm - - -def parse_table(table_string): - flat_rows = [r for r in table_string.split("\n") if r.startswith("$")] - table = {} - for flat_row in flat_rows: - flat_cells = [c for c in flat_row.split("|") if c] - house, attributes = flat_cells[0], flat_cells[1:] - h_prefix, h_number = house.split(":") - assert h_prefix == "$ House" - house_key = f"House {h_number.strip()}" - table[house_key] = {} - - for att in attributes: - if not ":" in att: - continue - name, value = att.split(":") - table[house_key][name.strip()] = value.strip() - - return table - - -def copy_table(table): - new_table = {} - for house, attributes in table.items(): - new_table[house] = {} - for att_name, att_value in attributes.items(): - new_table[house][att_name] = None - return new_table - - -def parse_step(step_text, table_to_fill): - parsed_clues = [s.split(">")[0] for s in step_text.split("<") if ">" in s] - ans_texts = [s.strip() for s in step_text.split("We know that ")[-1].split(".") if s.strip().startswith("The ")] - for ans_text in ans_texts: - try: - att_name = ans_text.split(" in house ")[0].split("The ")[-1].strip() - house_num = ans_text.split(" in house ")[1].split(" is ")[0].strip() - att_value = ans_text.split(" is ")[-1].strip() - table_to_fill[f"House {house_num}"][att_name] = att_value - except: - print(parsed_clues) - print(ans_text) - print(table_to_fill) - print() - continue - return parsed_clues, table_to_fill - - -def pre_process(src_file: os.PathLike, dest_dir: os.PathLike, overwrite: bool = False): - outputs = json.load(open(src_file, "r"))["data"] - - src_file = Path(src_file) - os.makedirs(dest_dir, exist_ok=True) - save_file = os.path.join(dest_dir, f"parsed_{src_file.name}") - - if not overwrite and os.path.exists(save_file): - return - - items = [] - for output in tqdm(outputs, desc=f"{src_file.stem}"): - groundtruth_table = parse_table(output["truth_outputs"][0]) - if type(output["output_text"]) == list: - output_text = output["output_text"][0].split("\n") - else: - output_text = output["output_text"].split("\n") - step_texts = [o for o in output_text if o.startswith("Step ")] - steps = [] - partial_table = copy_table(groundtruth_table) - for s_text in step_texts: - clues, filled_table = parse_step(s_text, partial_table) - steps.append( - { - "clues": clues, - "partial_table": filled_table, - } - ) - partial_table = deepcopy(filled_table) - - predict_table_texts = [o for o in output_text if o.startswith("$ House")] - if not predict_table_texts: - predicted_table = partial_table - else: - predicted_table = parse_table("\n".join(predict_table_texts)) - - items.append({"groundtruth_table": groundtruth_table, "predicted_table": predicted_table, "steps": steps}) - - with open(save_file, "w") as f: - json.dump(items, f, indent=4) - - -def error_analysis(home_path: os.PathLike, file_names: Sequence[str] = None): - if not file_names: - parsed_files = [p for p in Path(home_path).glob("parsed_table*judged.json")] - else: - parsed_files = [os.path.join(home_path, f) for f in file_names] - - datasets = [] - for parsed_file in parsed_files: - datasets.extend([json.loads(s) for s in open(parsed_file, "r").readlines()]) - - error_types = ["correct", "type1", "type2", "type3"] - error_stats = {} - for data in tqdm(datasets): - gt_table = data["groundtruth_table"] - previous_type = None - for i, step in enumerate(data["steps"]): - partial_table = step["partial_table"] - # whether value correct: - value_to_check = [] - for house_name, attributes in partial_table.items(): - for att, value in attributes.items(): - if not i: - previous_empty = True - else: - if att not in data["steps"][i - 1]["partial_table"][house_name]: - previous_empty = False - else: - previous_empty = data["steps"][i - 1]["partial_table"][house_name][att] is None - if value is not None and previous_empty: - value_to_check.append((house_name, att)) - - value_correct = all( - [partial_table[house_name][att] == gt_table[house_name][att] for house_name, att in value_to_check] - ) - operation_correct = step["label"] - if not i: - ancestor_correct = True - else: - ancestor_correct = previous_type == "correct" - - if value_correct: - if ancestor_correct: - node_type = "correct" - else: - node_type = "type3" - else: - if operation_correct: - node_type = "type2" - else: - node_type = "type1" - - previous_type = node_type - - if i not in error_stats.keys(): - error_stats[i] = {e: 0 for e in error_types} - error_stats[i][node_type] += 1 - - print(json.dumps(error_stats)) - percent_stats = {} - number_nodes = {} - for key, values in error_stats.items(): - percent_stats[key] = {} - total_nodes = sum([values[t] for t in error_types]) - number_nodes[key] = total_nodes - for t in error_types: - percent_stats[key][t] = values[t] / total_nodes - print(json.dumps(percent_stats)) - print(number_nodes) - - -def main(): - parser = argparse.ArgumentParser() - - parser.add_argument("--cot_dir", type=str, default=None) - parser.add_argument("--output_dir", type=str, default="tmp") - parser.add_argument("--overwrite", action="store_true", default=False) - - args = parser.parse_args() - - cot_dir = Path(args.cot_dir) - assert cot_dir.exists(), "cot_dir does not exist" - assert cot_dir.is_dir(), "cot_dir is not a directory" - - for cot_file in cot_dir.glob("table*.json"): - pre_process(cot_file, args.output_dir, args.overwrite) - - error_analysis(args.output_dir) - - -if __name__ == "__main__": - main() diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/reasoning_path.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/reasoning_path.py deleted file mode 100644 index cc63a80e..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/reasoning_path.py +++ /dev/null @@ -1,112 +0,0 @@ -import argparse -import json -import pickle -from collections import defaultdict - -import check_clues -import graph_literals -import sat_utils -import solver as solver -from tqdm import tqdm -from z3 import * - - -def get_idx2clue(clues): - clue_num2_clue = defaultdict(int) - clue_type2_clue_num = defaultdict(list) - clue_num = 0 - for clue in list(clues): - clue_num2_clue[clue_num] = clue - clue_type2_clue_num[type(clue)].append(clue_num) - clue_num += 1 - return clue_num2_clue, clue_type2_clue_num - - -def logic_grid_puzzle(inputfile, ground_truth, size, lower_part, higher_part): - reasoning_result = [] - answers = json.load(open(ground_truth, "r")) - puzzles = pickle.load(open(inputfile, "rb")) - mode = inputfile[inputfile.find("puzzles.") + 8 : inputfile.find(".pkl")] - print("Number of puzzles", len(answers)) - assert len(answers) == len(puzzles) - new_data = [] - for i in tqdm(range(len(puzzles[:]))): - d = puzzles[i] - per_size_idx = int(puzzles[i]["idx"].split("-")[-1]) - if ( - d["idx"].startswith("lgp-" + mode + "-" + size) - and per_size_idx >= lower_part - and per_size_idx < higher_part - ): - print("Puzzle id:", d["idx"]) - print("Puzzle", d) - print("Solving puzzle" + "===============" * 7 + "Solving puzzle") - - assert d["idx"] == answers[i]["idx"] - answer_header = answers[i]["solution"]["table_header"] - answer_value = answers[i]["solution"]["table_rows"] - - puzzle_clues = d["puzzle"].clues - idx2clue, clue_type2_idx = get_idx2clue(puzzle_clues) - - running_clues = [] - all_cells = [] - used_clue = [] - self_constraints = d["puzzle"].constraints - running_clues.extend(self_constraints) - - # for i in range(len(idx2clue)): - first = True - step_num = 1 - reasoning = "" - while len(used_clue) < len(idx2clue): - reasoning += "Step {}: ".format(step_num) - step_num += 1 - clue_idxs = check_clues.check( - d["idx"], answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells - ) - for current_clue_idx in clue_idxs: - running_clues.extend(idx2clue[current_clue_idx].as_cnf()) - used_clue.append(current_clue_idx) - - numbered_cnf, num2var = sat_utils.translate(running_clues) - single_solver = solver.my_solver(d["idx"], answer_header, answer_value, numbered_cnf, num2var) - cell_info = single_solver.check_cell_difficulty() - new_cell = [] - for cell in cell_info: - if cell not in all_cells: - new_cell.append(cell) - all_cells.append(cell) - reasoning += graph_literals.print_clue(clue_idxs, idx2clue, new_cell, first) - first = False - reasoning += "\n" - assert len(all_cells) == len(answer_value) * (len(answer_value[0]) - 1) - reasoning += "The puzzle is solved." - reasoning_result.append(reasoning) - print(reasoning) - single_data = answers[i] - single_data["reasoning"] = reasoning - new_data.append(single_data) - - with open( - "./data/logic_grid_puzzles.reasoning." + mode + size + "-" + str(lower_part) + "_" + str(higher_part) + ".json", - "w", - ) as outputfile: - json.dump(new_data, outputfile) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--input_data", type=str, default="./data/logic_grid_puzzles.test_id_xl.pkl") - parser.add_argument("--ground_truth", type=str, default="./data/ogic_grid_puzzles.test_id_xl.json") - parser.add_argument("--size", type=str, default="2x") - parser.add_argument("--lower_part", type=int, default=0) # min data index - parser.add_argument("--higher_part", type=int, default=100) # max data index - args = parser.parse_args() - - logic_grid_puzzle(args.input_data, args.ground_truth, args.size, args.lower_part, args.higher_part) - return 1 - - -if __name__ == "__main__": - main() diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/solver.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/solver.py deleted file mode 100644 index ed462270..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/solver.py +++ /dev/null @@ -1,143 +0,0 @@ -import collections - -import numpy as np -import solver_utils -from z3 import * - - -class my_solver: - def __init__(self, id, feature_name, feature_value, numbered_cnf, num2var): - self.id = id - self.num2var = num2var - self.clue_num = 0 - - self.feature_name = feature_name - self.feature_value = feature_value - self.numbered_cnf = numbered_cnf - - self.init_solver(feature_name, feature_value, numbered_cnf) - - def init_solver(self, feature_name, feature_value, numbered_cnf): - set_param(proof=True) - self.s = Solver() - - self.s.set(unsat_core=True) - house_total = len(feature_value) - self.house_total = house_total - features, mapping = self.define_feature(feature_name, feature_value) - - self.houses = [ - [ - solver_utils.instanciate_int_constrained("%s%d" % (prop, n), self.s, house_total) - for prop in features.keys() - ] - for n in range(house_total) - ] - self.mapping = mapping - self.feature = features - # because we read feature from GT, we can make this assumption to calculate difficulty. - self.ground_truth = self.feature - for single_cnf in numbered_cnf: - if len(single_cnf) == 1: - self.add_single_clue(single_cnf) - else: - self.add_mul_val_clue(single_cnf) - self.clue_num += 1 - - def define_feature(self, feature_name, feature_value): - features = collections.OrderedDict() - for i in range(len(feature_name)): - if feature_name[i] != "House": - features[feature_name[i]] = [] - for house_j in feature_value: - features[feature_name[i]].append(house_j[i]) - - feature_mapping = {list(features.keys())[i]: i for i in range(len(features.keys()))} - return features, feature_mapping - - def decode_var(self, var_num): - attribute, house_num = self.num2var[var_num].split(" ") - attribute_name, attribute_value = attribute.split(".") - if "_" in attribute_value: - attribute_value = attribute_value.replace("_", " ") - house_num = int(house_num) - 1 - return house_num, attribute_name, attribute_value - - def normal_vs_not_constraints(self, house_num, attribute_name, attribute_value): - if attribute_name.startswith("~"): - attribute_name = attribute_name[1:] - return Not( - self.houses[house_num][self.mapping[attribute_name]] - == self.feature[attribute_name].index(attribute_value) - ) - else: - return self.houses[house_num][self.mapping[attribute_name]] == self.feature[attribute_name].index( - attribute_value - ) - - def add_single_clue(self, single_cnf): - house_num, attribute_name, attribute_value = self.decode_var(single_cnf[0]) - self.s.assert_and_track( - self.normal_vs_not_constraints(house_num, attribute_name, attribute_value), "a" + str(self.clue_num) - ) - - def add_mul_val_clue(self, single_cnf): - cons = [] - for i in range(len(single_cnf)): - house_num, attribute_name, attribute_value = self.decode_var(single_cnf[i]) - cons.append(self.normal_vs_not_constraints(house_num, attribute_name, attribute_value)) - self.s.assert_and_track(Or(*cons), "a" + str(self.clue_num)) - - def check_solution(self): - if self.s.check() == unsat: - c = self.s.unsat_core() - print("Size of the unsat core:", len(c)) - print("Unsat core:", ", ".join([str(i) for i in c])) - - proof_str = str(self.s.proof()) - print(self.s.proof()) - print("Proof length:", len(proof_str)) - - else: - m = self.s.model() - solution = [[m[case].as_long() for case in line] for line in self.houses] - unique = True - if count_solutions(self.s) != 1: - print(self.id) - unique = False - # print("Number of solutions:", count_solutions(self.s)) - return solution, unique - - def print_solution(self, solution): - print("Solution:") - print(self.feature) - - def check_cell_difficulty(self): - results = [] - clue_num = 0 - for i in range(self.house_total): - for feature in self.ground_truth: - self.s.assert_and_track(Not(self.houses[i][self.mapping[feature]] == i), "check" + str(clue_num)) - if self.s.check() == unsat: - results.append("The {} in house {} is {}".format(feature, i + 1, self.ground_truth[feature][i])) - self.s.reset() - self.init_solver(self.feature_name, self.feature_value, self.numbered_cnf) - return results - - def check_statement_difficulty(self): - clue_num = 0 - proof_length = np.zeros((self.house_total, len(self.ground_truth))) - for i in range(self.house_total): - for feature in self.ground_truth: - self.s.assert_and_track(Not(self.houses[i][self.mapping[feature]] == i), "check" + str(clue_num)) - clue_num += 1 - if self.s.check() == unsat: - c = self.s.unsat_core() - proof_length[i, self.mapping[feature]] = self.s.statistics().propagations - print(proof_length) - return proof_length - - def check_problem_difficulty(self): - proof_length = np.zeros((1, 1)) - proof_length[0, 0] = self.s.statistics().propagations - return proof_length diff --git a/reasoning_gym/logic/contrib/logic_puzzle/graph/solver_utils.py b/reasoning_gym/logic/contrib/logic_puzzle/graph/solver_utils.py deleted file mode 100644 index 021d0763..00000000 --- a/reasoning_gym/logic/contrib/logic_puzzle/graph/solver_utils.py +++ /dev/null @@ -1,36 +0,0 @@ -from z3 import * - - -def column(matrix, i): - return [matrix[j][i] for j in range(len(matrix))] - - -def instanciate_int_constrained(name, s, card): - x = Int(name) - # Each int represent an index in p[name] - s.add(x >= 0, x <= card - 1) - return x - - -def count_solutions(s, max=1e9): - count = 0 - while s.check() == sat: - count += 1 - if count >= max: - return count - m = s.model() - - # Create a new constraint the blocks the current model - block = [] - for d in m: - # d is a declaration - if d.arity() > 0: - raise Z3Exception("uninterpreted functions are not supported") - # create a constant from declaration - c = d() - if is_array(c) or c.sort().kind() == Z3_UNINTERPRETED_SORT: - raise Z3Exception("arrays and uninterpreted sorts are not supported") - block.append(c != m[d]) - - s.add(Or(block)) - return count