remove solver graph folder

This commit is contained in:
Andreas Koepf 2025-02-04 00:07:01 +01:00
parent 04cd81dd76
commit 2b6315474a
7 changed files with 0 additions and 619 deletions

View file

@ -1,57 +0,0 @@
import itertools
from collections import defaultdict
import sat_utils
import solver as solver
from z3 import *
def check_singe_clues(puzzle_idx, answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells):
solved_cell = defaultdict(int)
valid_solutions = defaultdict(int)
print("check single clues")
for idx in idx2clue:
if idx not in used_clue:
new_clues = running_clues + idx2clue[idx].as_cnf()
numbered_cnf, num2var = sat_utils.translate(new_clues)
single_solver = solver.my_solver(puzzle_idx, answer_header, answer_value, numbered_cnf, num2var)
cell_info = single_solver.check_cell_difficulty()
num_cell = 0
for cell in cell_info:
if cell not in all_cells:
num_cell += 1
solved_cell[idx] = num_cell
del single_solver
idx = sorted(solved_cell.items(), key=lambda x: x[1], reverse=True)[0][0]
if solved_cell[idx] != 0:
return [idx]
else:
return [1000]
def check(puzzle_idx, answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells):
idx = check_singe_clues(puzzle_idx, answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells)
if idx[0] < 1000:
return idx
else:
print("check multiple clues")
solved_cell = defaultdict(int)
all_left_clues_idx = [i for i in idx2clue if i not in used_clue]
print(all_left_clues_idx)
for n in range(2, 10):
combinations = itertools.combinations(all_left_clues_idx, n)
for comb in combinations:
new_clues = [clues for clues in running_clues]
for comb_idx in comb:
new_clues += idx2clue[comb_idx].as_cnf()
numbered_cnf, num2var = sat_utils.translate(new_clues)
single_solver = solver.my_solver(puzzle_idx, answer_header, answer_value, numbered_cnf, num2var)
cell_info = single_solver.check_cell_difficulty()
num_cell = 0
for cell in cell_info:
if cell not in all_cells:
num_cell += 1
solved_cell[comb] = num_cell
del single_solver
if num_cell != 0:
return comb

View file

@ -1,32 +0,0 @@
def print_clue(clue_idxs, idx2clue, new_cell, first=False):
if len(clue_idxs) == 1:
# print(type(idx2clue[clue_idxs[0]]))
# if type(idx2clue[clue_idxs[0]]) == clues.found_at:
return single_find_at(idx2clue[clue_idxs[0]], new_cell, first)
else:
if first:
result = "First combining clues: "
else:
result = "Then combining clues: "
for current_idx in clue_idxs:
result += "<{}>".format(idx2clue[current_idx])
result += " Unique Values Rules and the fixed table structure. We know that "
for cell in new_cell:
result += cell
result += ". "
return result
def single_find_at(clue, new_cell, first=False):
if first:
result = "First applying clue: "
else:
result = "Then applying clue: "
if len(new_cell) > 1:
result += "<{}> and Unique Values We know that ".format(clue)
else:
result += "<{}> We know that ".format(clue)
for cell in new_cell:
result += cell
result += ". "
return result

View file

@ -1,49 +0,0 @@
import argparse
import json
import pickle
import sat_utils
import solver
from tqdm import tqdm
from z3 import *
def solve_logic_grid_puzzle(inputfile, ground_truth):
answers = [json.loads(answer) for answer in list(open(ground_truth, "r"))][0]
cell_difficulty = {}
with open(inputfile, "rb") as f:
puzzles = pickle.load(f)
for i in tqdm(range(len(puzzles[:]))):
d = puzzles[i]
assert d["idx"] == answers[i]["idx"]
answer_header = answers[i]["solution"]["table_header"]
answer_value = answers[i]["solution"]["table_rows"]
# read cnf form
symbolic_cnf = d["puzzle"].as_cnf()
numbered_cnf, num2var = sat_utils.translate(symbolic_cnf)
single_solver = solver.my_solver(d["idx"], answer_header, answer_value, numbered_cnf, num2var)
solution, unique = single_solver.check_solution()
if unique:
for row_num in range(len(solution)):
for column_num in range(len(solution[row_num])):
assert row_num == solution[row_num][column_num]
difficulty = single_solver.check_problem_difficulty()
cell_difficulty[d["idx"]] = difficulty
with open("./logic_grid_puzzles.test.difficulty.pkl", "wb") as outputfile:
pickle.dump(cell_difficulty, outputfile)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str, default="../logic_grid_puzzles.test.pkl")
parser.add_argument("--ground_truth", type=str, default="../logic_grid_puzzles.test.json")
args = parser.parse_args()
solve_logic_grid_puzzle(args.input_data, args.ground_truth)
return 1
if __name__ == "__main__":
main()

View file

@ -1,190 +0,0 @@
import argparse
import json
import os
from copy import deepcopy
from pathlib import Path
from typing import Sequence
from tqdm import tqdm
def parse_table(table_string):
flat_rows = [r for r in table_string.split("\n") if r.startswith("$")]
table = {}
for flat_row in flat_rows:
flat_cells = [c for c in flat_row.split("|") if c]
house, attributes = flat_cells[0], flat_cells[1:]
h_prefix, h_number = house.split(":")
assert h_prefix == "$ House"
house_key = f"House {h_number.strip()}"
table[house_key] = {}
for att in attributes:
if not ":" in att:
continue
name, value = att.split(":")
table[house_key][name.strip()] = value.strip()
return table
def copy_table(table):
new_table = {}
for house, attributes in table.items():
new_table[house] = {}
for att_name, att_value in attributes.items():
new_table[house][att_name] = None
return new_table
def parse_step(step_text, table_to_fill):
parsed_clues = [s.split(">")[0] for s in step_text.split("<") if ">" in s]
ans_texts = [s.strip() for s in step_text.split("We know that ")[-1].split(".") if s.strip().startswith("The ")]
for ans_text in ans_texts:
try:
att_name = ans_text.split(" in house ")[0].split("The ")[-1].strip()
house_num = ans_text.split(" in house ")[1].split(" is ")[0].strip()
att_value = ans_text.split(" is ")[-1].strip()
table_to_fill[f"House {house_num}"][att_name] = att_value
except:
print(parsed_clues)
print(ans_text)
print(table_to_fill)
print()
continue
return parsed_clues, table_to_fill
def pre_process(src_file: os.PathLike, dest_dir: os.PathLike, overwrite: bool = False):
outputs = json.load(open(src_file, "r"))["data"]
src_file = Path(src_file)
os.makedirs(dest_dir, exist_ok=True)
save_file = os.path.join(dest_dir, f"parsed_{src_file.name}")
if not overwrite and os.path.exists(save_file):
return
items = []
for output in tqdm(outputs, desc=f"{src_file.stem}"):
groundtruth_table = parse_table(output["truth_outputs"][0])
if type(output["output_text"]) == list:
output_text = output["output_text"][0].split("\n")
else:
output_text = output["output_text"].split("\n")
step_texts = [o for o in output_text if o.startswith("Step ")]
steps = []
partial_table = copy_table(groundtruth_table)
for s_text in step_texts:
clues, filled_table = parse_step(s_text, partial_table)
steps.append(
{
"clues": clues,
"partial_table": filled_table,
}
)
partial_table = deepcopy(filled_table)
predict_table_texts = [o for o in output_text if o.startswith("$ House")]
if not predict_table_texts:
predicted_table = partial_table
else:
predicted_table = parse_table("\n".join(predict_table_texts))
items.append({"groundtruth_table": groundtruth_table, "predicted_table": predicted_table, "steps": steps})
with open(save_file, "w") as f:
json.dump(items, f, indent=4)
def error_analysis(home_path: os.PathLike, file_names: Sequence[str] = None):
if not file_names:
parsed_files = [p for p in Path(home_path).glob("parsed_table*judged.json")]
else:
parsed_files = [os.path.join(home_path, f) for f in file_names]
datasets = []
for parsed_file in parsed_files:
datasets.extend([json.loads(s) for s in open(parsed_file, "r").readlines()])
error_types = ["correct", "type1", "type2", "type3"]
error_stats = {}
for data in tqdm(datasets):
gt_table = data["groundtruth_table"]
previous_type = None
for i, step in enumerate(data["steps"]):
partial_table = step["partial_table"]
# whether value correct:
value_to_check = []
for house_name, attributes in partial_table.items():
for att, value in attributes.items():
if not i:
previous_empty = True
else:
if att not in data["steps"][i - 1]["partial_table"][house_name]:
previous_empty = False
else:
previous_empty = data["steps"][i - 1]["partial_table"][house_name][att] is None
if value is not None and previous_empty:
value_to_check.append((house_name, att))
value_correct = all(
[partial_table[house_name][att] == gt_table[house_name][att] for house_name, att in value_to_check]
)
operation_correct = step["label"]
if not i:
ancestor_correct = True
else:
ancestor_correct = previous_type == "correct"
if value_correct:
if ancestor_correct:
node_type = "correct"
else:
node_type = "type3"
else:
if operation_correct:
node_type = "type2"
else:
node_type = "type1"
previous_type = node_type
if i not in error_stats.keys():
error_stats[i] = {e: 0 for e in error_types}
error_stats[i][node_type] += 1
print(json.dumps(error_stats))
percent_stats = {}
number_nodes = {}
for key, values in error_stats.items():
percent_stats[key] = {}
total_nodes = sum([values[t] for t in error_types])
number_nodes[key] = total_nodes
for t in error_types:
percent_stats[key][t] = values[t] / total_nodes
print(json.dumps(percent_stats))
print(number_nodes)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--cot_dir", type=str, default=None)
parser.add_argument("--output_dir", type=str, default="tmp")
parser.add_argument("--overwrite", action="store_true", default=False)
args = parser.parse_args()
cot_dir = Path(args.cot_dir)
assert cot_dir.exists(), "cot_dir does not exist"
assert cot_dir.is_dir(), "cot_dir is not a directory"
for cot_file in cot_dir.glob("table*.json"):
pre_process(cot_file, args.output_dir, args.overwrite)
error_analysis(args.output_dir)
if __name__ == "__main__":
main()

View file

@ -1,112 +0,0 @@
import argparse
import json
import pickle
from collections import defaultdict
import check_clues
import graph_literals
import sat_utils
import solver as solver
from tqdm import tqdm
from z3 import *
def get_idx2clue(clues):
clue_num2_clue = defaultdict(int)
clue_type2_clue_num = defaultdict(list)
clue_num = 0
for clue in list(clues):
clue_num2_clue[clue_num] = clue
clue_type2_clue_num[type(clue)].append(clue_num)
clue_num += 1
return clue_num2_clue, clue_type2_clue_num
def logic_grid_puzzle(inputfile, ground_truth, size, lower_part, higher_part):
reasoning_result = []
answers = json.load(open(ground_truth, "r"))
puzzles = pickle.load(open(inputfile, "rb"))
mode = inputfile[inputfile.find("puzzles.") + 8 : inputfile.find(".pkl")]
print("Number of puzzles", len(answers))
assert len(answers) == len(puzzles)
new_data = []
for i in tqdm(range(len(puzzles[:]))):
d = puzzles[i]
per_size_idx = int(puzzles[i]["idx"].split("-")[-1])
if (
d["idx"].startswith("lgp-" + mode + "-" + size)
and per_size_idx >= lower_part
and per_size_idx < higher_part
):
print("Puzzle id:", d["idx"])
print("Puzzle", d)
print("Solving puzzle" + "===============" * 7 + "Solving puzzle")
assert d["idx"] == answers[i]["idx"]
answer_header = answers[i]["solution"]["table_header"]
answer_value = answers[i]["solution"]["table_rows"]
puzzle_clues = d["puzzle"].clues
idx2clue, clue_type2_idx = get_idx2clue(puzzle_clues)
running_clues = []
all_cells = []
used_clue = []
self_constraints = d["puzzle"].constraints
running_clues.extend(self_constraints)
# for i in range(len(idx2clue)):
first = True
step_num = 1
reasoning = ""
while len(used_clue) < len(idx2clue):
reasoning += "Step {}: ".format(step_num)
step_num += 1
clue_idxs = check_clues.check(
d["idx"], answer_header, answer_value, running_clues, idx2clue, used_clue, all_cells
)
for current_clue_idx in clue_idxs:
running_clues.extend(idx2clue[current_clue_idx].as_cnf())
used_clue.append(current_clue_idx)
numbered_cnf, num2var = sat_utils.translate(running_clues)
single_solver = solver.my_solver(d["idx"], answer_header, answer_value, numbered_cnf, num2var)
cell_info = single_solver.check_cell_difficulty()
new_cell = []
for cell in cell_info:
if cell not in all_cells:
new_cell.append(cell)
all_cells.append(cell)
reasoning += graph_literals.print_clue(clue_idxs, idx2clue, new_cell, first)
first = False
reasoning += "\n"
assert len(all_cells) == len(answer_value) * (len(answer_value[0]) - 1)
reasoning += "The puzzle is solved."
reasoning_result.append(reasoning)
print(reasoning)
single_data = answers[i]
single_data["reasoning"] = reasoning
new_data.append(single_data)
with open(
"./data/logic_grid_puzzles.reasoning." + mode + size + "-" + str(lower_part) + "_" + str(higher_part) + ".json",
"w",
) as outputfile:
json.dump(new_data, outputfile)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str, default="./data/logic_grid_puzzles.test_id_xl.pkl")
parser.add_argument("--ground_truth", type=str, default="./data/ogic_grid_puzzles.test_id_xl.json")
parser.add_argument("--size", type=str, default="2x")
parser.add_argument("--lower_part", type=int, default=0) # min data index
parser.add_argument("--higher_part", type=int, default=100) # max data index
args = parser.parse_args()
logic_grid_puzzle(args.input_data, args.ground_truth, args.size, args.lower_part, args.higher_part)
return 1
if __name__ == "__main__":
main()

View file

@ -1,143 +0,0 @@
import collections
import numpy as np
import solver_utils
from z3 import *
class my_solver:
def __init__(self, id, feature_name, feature_value, numbered_cnf, num2var):
self.id = id
self.num2var = num2var
self.clue_num = 0
self.feature_name = feature_name
self.feature_value = feature_value
self.numbered_cnf = numbered_cnf
self.init_solver(feature_name, feature_value, numbered_cnf)
def init_solver(self, feature_name, feature_value, numbered_cnf):
set_param(proof=True)
self.s = Solver()
self.s.set(unsat_core=True)
house_total = len(feature_value)
self.house_total = house_total
features, mapping = self.define_feature(feature_name, feature_value)
self.houses = [
[
solver_utils.instanciate_int_constrained("%s%d" % (prop, n), self.s, house_total)
for prop in features.keys()
]
for n in range(house_total)
]
self.mapping = mapping
self.feature = features
# because we read feature from GT, we can make this assumption to calculate difficulty.
self.ground_truth = self.feature
for single_cnf in numbered_cnf:
if len(single_cnf) == 1:
self.add_single_clue(single_cnf)
else:
self.add_mul_val_clue(single_cnf)
self.clue_num += 1
def define_feature(self, feature_name, feature_value):
features = collections.OrderedDict()
for i in range(len(feature_name)):
if feature_name[i] != "House":
features[feature_name[i]] = []
for house_j in feature_value:
features[feature_name[i]].append(house_j[i])
feature_mapping = {list(features.keys())[i]: i for i in range(len(features.keys()))}
return features, feature_mapping
def decode_var(self, var_num):
attribute, house_num = self.num2var[var_num].split(" ")
attribute_name, attribute_value = attribute.split(".")
if "_" in attribute_value:
attribute_value = attribute_value.replace("_", " ")
house_num = int(house_num) - 1
return house_num, attribute_name, attribute_value
def normal_vs_not_constraints(self, house_num, attribute_name, attribute_value):
if attribute_name.startswith("~"):
attribute_name = attribute_name[1:]
return Not(
self.houses[house_num][self.mapping[attribute_name]]
== self.feature[attribute_name].index(attribute_value)
)
else:
return self.houses[house_num][self.mapping[attribute_name]] == self.feature[attribute_name].index(
attribute_value
)
def add_single_clue(self, single_cnf):
house_num, attribute_name, attribute_value = self.decode_var(single_cnf[0])
self.s.assert_and_track(
self.normal_vs_not_constraints(house_num, attribute_name, attribute_value), "a" + str(self.clue_num)
)
def add_mul_val_clue(self, single_cnf):
cons = []
for i in range(len(single_cnf)):
house_num, attribute_name, attribute_value = self.decode_var(single_cnf[i])
cons.append(self.normal_vs_not_constraints(house_num, attribute_name, attribute_value))
self.s.assert_and_track(Or(*cons), "a" + str(self.clue_num))
def check_solution(self):
if self.s.check() == unsat:
c = self.s.unsat_core()
print("Size of the unsat core:", len(c))
print("Unsat core:", ", ".join([str(i) for i in c]))
proof_str = str(self.s.proof())
print(self.s.proof())
print("Proof length:", len(proof_str))
else:
m = self.s.model()
solution = [[m[case].as_long() for case in line] for line in self.houses]
unique = True
if count_solutions(self.s) != 1:
print(self.id)
unique = False
# print("Number of solutions:", count_solutions(self.s))
return solution, unique
def print_solution(self, solution):
print("Solution:")
print(self.feature)
def check_cell_difficulty(self):
results = []
clue_num = 0
for i in range(self.house_total):
for feature in self.ground_truth:
self.s.assert_and_track(Not(self.houses[i][self.mapping[feature]] == i), "check" + str(clue_num))
if self.s.check() == unsat:
results.append("The {} in house {} is {}".format(feature, i + 1, self.ground_truth[feature][i]))
self.s.reset()
self.init_solver(self.feature_name, self.feature_value, self.numbered_cnf)
return results
def check_statement_difficulty(self):
clue_num = 0
proof_length = np.zeros((self.house_total, len(self.ground_truth)))
for i in range(self.house_total):
for feature in self.ground_truth:
self.s.assert_and_track(Not(self.houses[i][self.mapping[feature]] == i), "check" + str(clue_num))
clue_num += 1
if self.s.check() == unsat:
c = self.s.unsat_core()
proof_length[i, self.mapping[feature]] = self.s.statistics().propagations
print(proof_length)
return proof_length
def check_problem_difficulty(self):
proof_length = np.zeros((1, 1))
proof_length[0, 0] = self.s.statistics().propagations
return proof_length

View file

@ -1,36 +0,0 @@
from z3 import *
def column(matrix, i):
return [matrix[j][i] for j in range(len(matrix))]
def instanciate_int_constrained(name, s, card):
x = Int(name)
# Each int represent an index in p[name]
s.add(x >= 0, x <= card - 1)
return x
def count_solutions(s, max=1e9):
count = 0
while s.check() == sat:
count += 1
if count >= max:
return count
m = s.model()
# Create a new constraint the blocks the current model
block = []
for d in m:
# d is a declaration
if d.arity() > 0:
raise Z3Exception("uninterpreted functions are not supported")
# create a constant from declaration
c = d()
if is_array(c) or c.sort().kind() == Z3_UNINTERPRETED_SORT:
raise Z3Exception("arrays and uninterpreted sorts are not supported")
block.append(c != m[d])
s.add(Or(block))
return count