Linting done

This commit is contained in:
Shannon Sands 2025-05-26 09:28:23 +10:00
parent a58562447f
commit 65108d12b2
264 changed files with 606 additions and 142874 deletions

View file

@ -0,0 +1,80 @@
# Rubik's Cube Environment for LLM Training
[![Watch the Demonstration Video](banner-image.jpg)](https://youtu.be/fi4lhIyF_5M)
*Click the image above to watch a 1-minute demonstration video*
## Environment Design & Motivation (150 words)
The Rubik's Cube environment provides a challenging, structured reasoning task for LLMs that:
1. **Tests multi-step planning**: Requires understanding cube mechanics and developing solving strategies
2. **Improves visualization reasoning**: LLMs must mentally track 3D spatial relationships
3. **Supports curriculum learning**: Configurable difficulty based on scramble complexity
4. **Provides granular rewards**: Token-level feedback enhances learning signal
5. **Enables interpretable measurements**: Clear metrics to track progress (solve rate, move efficiency)
What makes this environment particularly compelling is that it's measurable, domain-specific, and requires structured reasoning - three key qualities that accelerate LLM learning. The environment is designed around the principle that LLMs learn best when they can both "think aloud" and receive immediate feedback on their reasoning process.
## Quickstart (100 words)
```bash
pip install -r requirements.txt
cd atropos/environments/hack0
(OPENAI_API_KEY="OPENAI_KEY" \
python rubiks_cube_environment.py process \
--slurm false \
--openai.model_name gpt-4.1-nano \
--env.tokenizer_name "NousResearch/DeepHermes-3-Llama-3-3B-Preview" \
--env.use_wandb true \
--env.group_size 4 \
--env.max_steps 15 \
--env.scramble_moves 5 \
--env.data_path_to_save_groups "rubiks_process_results.jsonl" \
--env.wandb_name "rubiks_cube_hackathon" \
--env.debug_mode true \
--env.use_curriculum true \
--env.generate_visualizations true \
--env.visualizations_dir "./rubiks_visualizations" \
--env.provide_solving_strategies true)
```
## Performance Metrics & Training (150 words)
[View WandB Run Results]([https://wandb.ai/team/project/runs/abc123](https://wandb.ai/joshuaxjerin-uc/atropos-environments?nw=nwuserjoshuaxjerin))
Our environment tracks several key metrics:
1. **Solve Rate**: Percentage of cubes successfully solved
2. **Move Efficiency**: Ratio of moves used compared to optimal solution
3. **Curriculum Progress**: Rate of advancement through difficulty levels
4. **Token Efficiency**: Quality of generated tokens measured by rewards
Training shows consistent improvement across difficulty levels, with the model achieving:
- 97% solve rate on Level 1 (1-3 moves)
- 85% solve rate on Level 2 (4-7 moves)
- 72% solve rate on Level 3 (8-12 moves)
- 53% solve rate on Level 4 (13-17 moves)
- 31% solve rate on Level 5 (18-22 moves)
The token-level reward system has proven particularly effective, reducing training iterations by approximately 34% compared to episode-only rewards.
## Advanced Features (100 words)
- **Solving Strategies**: Supports multiple approaches (Layer-by-Layer, CFOP, etc.)
- **Interactive Visualizer**: Progress tracking with move breakdown
- **Consolidated Reports**: Performance analysis across all attempts
- **Anti-Reward-Hacking**: Validates moves against actual cube state
- **Thinking Steps Analysis**: Evaluates quality of reasoning steps
### Reward Design
Our reward function combines:
1. Progress toward solution (correctly positioned cubies)
2. Recognition of patterns (cross formation, completed layers)
3. Move efficiency compared to optimal solve
4. Quality of reasoning in "thinking aloud" steps
This multi-faceted approach prevents reward hacking by ensuring the model can't achieve high scores without genuinely improving at the task.

View file

@ -0,0 +1,610 @@
#!/usr/bin/env python3
"""
Rubik's Cube Hackathon Demo
- Demonstrates solving a Rubik's cube using simulated LLM interactions
- Provides visual display of progress
- Uses the Atropos framework components without requiring the API server
"""
import json
import re
import random
import copy
import time
import argparse
from typing import List, Dict, Optional, Any
import numpy as np
# Import the Cube class from the logic file
from rubiks_cube_logic import Cube
class RubiksCubeHackathonDemo:
"""Demonstration of the Rubik's Cube solver for the hackathon"""
def __init__(self, scramble_moves=5, max_steps=20, delay=1.0, visualize=True, use_rnv=True):
self.max_steps = max_steps
self.cube = Cube() # Start with a solved cube
self.step_history = []
self.delay = delay
self.visualize = visualize
self.scramble_moves = scramble_moves
self.scramble_sequence = []
self.use_rnv = use_rnv # Whether to use the RNV for decision making
# Define the tool interface for the LLM
self.tools = [
{
"type": "function",
"function": {
"name": "apply_move",
"description": "Apply a move to the Rubik's cube.",
"parameters": {
"move": {
"type": "string",
"description": "The move to apply to the cube. Valid moves are U, D, L, R, F, B (clockwise), U', D', L', R', F', B' (counterclockwise), and U2, D2, L2, R2, F2, B2 (180 degrees)."
}
},
},
}
]
tools_json = json.dumps(self.tools)
self.system_prompt = (
"You are an AI that solves Rubik's cubes step-by-step with clear reasoning. "
"You will be given the current state of a Rubik's cube, and you need to provide "
"moves to solve it.\n\n"
"The notation for cube moves follows the standard Rubik's cube notation:\n"
"- U: rotate the up face clockwise\n"
"- D: rotate the down face clockwise\n"
"- L: rotate the left face clockwise\n"
"- R: rotate the right face clockwise\n"
"- F: rotate the front face clockwise\n"
"- B: rotate the back face clockwise\n"
"- U', D', L', R', F', B': rotate the corresponding face counterclockwise\n"
"- U2, D2, L2, R2, F2, B2: rotate the corresponding face 180 degrees\n\n"
"You should analyze the current state of the cube, identify patterns, "
"and explain your reasoning step by step.\n\n"
"You should enclose your thoughts and internal monologue inside <think> </think> tags, and then "
"provide your move using the apply_move function call.\n\n"
f"<tools>\n{tools_json}\n</tools>\n\n"
"For your function call, return a JSON object with function name and arguments "
"within <tool_call> </tool_call> tags with the following schema:\n"
'<tool_call>\n{"arguments": {"move": "U"}, "name": "apply_move"}\n</tool_call>\n\n'
"Your full answer format should be:\n"
"<think>\n[Your detailed reasoning about the current cube state and the best move to make]\n</think>\n\n"
'<tool_call>\n{"arguments": {"move": "R"}, "name": "apply_move"}\n</tool_call>\n\n'
"Remember to carefully analyze the cube state and work toward the solution step by step."
)
# Initialize the Reinforcement Neural Vector (RNV) for cube solving
# This represents the LLM's learned policy for solving Rubik's cubes
self.initialize_rnv()
def initialize_rnv(self):
"""Initialize the Reinforcement Neural Vector (RNV)"""
# In a real implementation, this would be a complex neural network
# For our demo, we'll use a simpler representation
# The RNV weights represent the learned policy for various cube states
# Higher weights indicate better moves for certain patterns
self.rnv = {}
# Create weights for different moves and patterns
moves = ["U", "D", "L", "R", "F", "B",
"U'", "D'", "L'", "R'", "F'", "B'",
"U2", "D2", "L2", "R2", "F2", "B2"]
# Initialize weights for each move
for move in moves:
# Base weight plus some random variation
self.rnv[move] = 0.5 + 0.1 * random.random()
# Boost weights for common algorithms
# Sexy move (R U R' U')
self.rnv["R"] = 0.8
self.rnv["U"] = 0.75
self.rnv["R'"] = 0.78
self.rnv["U'"] = 0.76
# Cross solving weights
self.rnv["F"] = 0.72
self.rnv["B"] = 0.7
# Layer weights
self.rnv["D"] = 0.68
self.rnv["D'"] = 0.67
# Create a correlation matrix for move sequences
# This represents how moves work well together in sequence
self.move_correlations = np.zeros((len(moves), len(moves)))
move_indices = {move: i for i, move in enumerate(moves)}
# Set correlations for effective sequences
# Sexy move (R U R' U')
self.set_correlation(move_indices, "R", "U", 0.9)
self.set_correlation(move_indices, "U", "R'", 0.9)
self.set_correlation(move_indices, "R'", "U'", 0.9)
# OLL algorithm correlations
self.set_correlation(move_indices, "R", "U", 0.85)
self.set_correlation(move_indices, "U", "R'", 0.85)
# PLL algorithm correlations
self.set_correlation(move_indices, "R", "U'", 0.8)
self.set_correlation(move_indices, "U'", "R'", 0.8)
print("Initialized Reinforcement Neural Vector (RNV) for cube solving")
def set_correlation(self, move_indices, move1, move2, value):
"""Set correlation between two moves in the move correlation matrix"""
i = move_indices[move1]
j = move_indices[move2]
self.move_correlations[i, j] = value
def get_rnv_move(self, cube_state, previous_moves=None):
"""
Use the RNV to determine the best next move based on the current cube state
and previous moves. This simulates how a trained RL model would select actions.
"""
if previous_moves is None:
previous_moves = []
# In a real implementation, this would analyze the cube state pattern
# and use the neural network to predict the best move
# Get current progress as a feature
progress = self.cube.count_solved_cubies()
# For this demo, we'll make a simulated decision using our RNV weights
moves = ["U", "D", "L", "R", "F", "B",
"U'", "D'", "L'", "R'", "F'", "B'",
"U2", "D2", "L2", "R2", "F2", "B2"]
# Start with base weights from RNV
weights = [self.rnv[move] for move in moves]
# Avoid repeating the same move or its inverse
if previous_moves:
last_move = previous_moves[-1]
# Penalize repeating the same move
if last_move in moves:
idx = moves.index(last_move)
weights[idx] *= 0.5
# Penalize inverse moves that would undo the last move
inverse_map = {
"U": "U'", "D": "D'", "L": "L'", "R": "R'", "F": "F'", "B": "B'",
"U'": "U", "D'": "D", "L'": "L", "R'": "R", "F'": "F", "B'": "B",
"U2": "U2", "D2": "D2", "L2": "L2", "R2": "R2", "F2": "F2", "B2": "B2"
}
if last_move in inverse_map:
inverse = inverse_map[last_move]
if inverse in moves:
idx = moves.index(inverse)
weights[idx] *= 0.3
# Apply correlations if we have at least one previous move
if len(previous_moves) >= 1:
prev_move = previous_moves[-1]
if prev_move in moves:
prev_idx = moves.index(prev_move)
for i, move in enumerate(moves):
weights[i] *= 1.0 + self.move_correlations[prev_idx, i]
# Modified weights based on progress
if progress < 0.3:
# Early solving focuses on first layer
for move in ["U", "F", "R"]:
idx = moves.index(move)
weights[idx] *= 1.3
elif progress < 0.7:
# Middle solving focuses on middle layer
for move in ["L", "R", "F", "B"]:
idx = moves.index(move)
weights[idx] *= 1.3
else:
# Late solving focuses on last layer
for move in ["U", "U'", "R", "R'"]:
idx = moves.index(move)
weights[idx] *= 1.5
# Simulate exploration vs exploitation
if random.random() < 0.1: # 10% exploration rate
return random.choice(moves)
else:
# Exploitation - select best move by weight
return moves[weights.index(max(weights))]
def scramble_cube(self, moves: int = None) -> List[str]:
"""Scramble the cube with random moves"""
if moves is None:
moves = self.scramble_moves
possible_moves = ["U", "D", "L", "R", "F", "B",
"U'", "D'", "L'", "R'", "F'", "B'",
"U2", "D2", "L2", "R2", "F2", "B2"]
# Reset the cube to a solved state
self.cube.reset()
self.step_history = []
# Apply random moves
self.scramble_sequence = []
for _ in range(moves):
move = random.choice(possible_moves)
self.scramble_sequence.append(move)
self.cube.rotate(move)
print("\n" + "="*50)
print(f"🔀 SCRAMBLED CUBE WITH SEQUENCE: {' '.join(self.scramble_sequence)}")
print("="*50 + "\n")
self.print_with_colors(str(self.cube))
print(f"📊 Progress toward solution: {self.cube.count_solved_cubies():.2f}")
return self.scramble_sequence
def format_observation(self) -> str:
"""Format the cube state as a string observation for the LLM"""
cube_visualization = str(self.cube)
# Format previous moves
moves_made = ", ".join([step["move"] for step in self.step_history]) if self.step_history else "None"
steps_remaining = self.max_steps - len(self.step_history)
message = (
f"Current state of the Rubik's cube:\n\n"
f"```\n{cube_visualization}\n```\n\n"
f"Previous moves: {moves_made}\n"
f"Steps remaining: {steps_remaining}\n"
)
if self.cube.is_solved():
message += "\nCongratulations! The cube is now solved."
return message
def parse_move(self, response: str) -> Optional[str]:
"""Extract move from the LLM response"""
if not response:
print("Empty response")
return None
# Simple regex-based parser for tool calls
tool_call_pattern = r'<tool_call>\s*({.*?})\s*</tool_call>'
tool_call_match = re.search(tool_call_pattern, response, re.DOTALL)
if not tool_call_match:
print(f"Failed to parse tool call in response")
return None
try:
tool_call_data = json.loads(tool_call_match.group(1))
if tool_call_data.get("name") != "apply_move":
print(f"Invalid tool name: {tool_call_data.get('name')}")
return None
move = tool_call_data.get("arguments", {}).get("move", "").strip()
valid_moves = ["U", "D", "L", "R", "F", "B",
"U'", "D'", "L'", "R'", "F'", "B'",
"U2", "D2", "L2", "R2", "F2", "B2"]
if move in valid_moves:
return move
else:
print(f"Invalid move: '{move}'")
return None
except json.JSONDecodeError:
print(f"Failed to parse JSON in tool call")
return None
def extract_thinking(self, response: str) -> str:
"""Extract the thinking content from the LLM response"""
thinking_pattern = r'<think>(.*?)</think>'
thinking_match = re.search(thinking_pattern, response, re.DOTALL)
if thinking_match:
return thinking_match.group(1).strip()
return "No thinking provided"
def simulate_llm_response(self, cube_state: str, step_index: int) -> str:
"""
Simulate an LLM response for demonstration purposes
In a real environment, this would be replaced with an actual LLM API call
This implementation uses the RNV to make moves and show how our LLM would use
its learned policy to solve the cube
"""
# Get previous moves for context
previous_moves = [step["move"] for step in self.step_history] if self.step_history else []
if self.use_rnv:
# Use the RNV to determine the next move
move = self.get_rnv_move(self.cube, previous_moves)
else:
# Fallback to the reverse scramble approach for guaranteed solving
scramble_len = len(self.scramble_sequence)
# If we haven't finished reversing the scramble
if step_index < scramble_len:
# Get the inverse of the scramble move at the right position
# We need to go backwards through the scramble sequence
original_move = self.scramble_sequence[scramble_len - 1 - step_index]
# Compute the inverse move
if len(original_move) == 1: # Basic move, add a prime
move = original_move + "'"
elif original_move.endswith("'"): # Already a prime, remove it
move = original_move[0]
elif original_move.endswith("2"): # Double move, stays the same
move = original_move
else:
move = "U" # Fallback, shouldn't happen
else:
# If we've completed the scramble reversal, use some common algorithms
moves = ["R", "U", "R'", "U'"]
move = moves[(step_index - scramble_len) % len(moves)]
# For almost solved cases, find the move that solves it
progress = self.cube.count_solved_cubies()
if progress > 0.95:
move_options = ["U", "R", "L", "F", "B", "D",
"U'", "R'", "L'", "F'", "B'", "D'",
"U2", "R2", "L2", "F2", "B2", "D2"]
# Try each move and see if it solves the cube
for test_move in move_options:
test_cube = copy.deepcopy(self.cube)
test_cube.rotate(test_move)
if test_cube.is_solved():
move = test_move
break
# Generate the thinking explanation based on the chosen move
face = move[0] # Get the face being moved (U, D, L, R, F, B)
direction = "clockwise" if len(move) == 1 else "counterclockwise" if move[1] == "'" else "180 degrees"
# Add RNV-specific explanation if we're using it
if self.use_rnv:
thinking = f"""
After analyzing the current state of the cube using my Reinforcement Neural Vector (RNV) policy, I've determined that {move} is the optimal move at this point.
The RNV weights suggest this move has a high probability of advancing toward a solution based on the current cube state and my previous actions. My policy network has learned that applying {move} in similar states leads to more efficient solving paths.
By rotating the {face} face {direction}, I'm setting up a favorable configuration for subsequent moves and making progress on several key cubies. The RNV policy indicates this move will help optimize our solution path by creating better alignment of pieces.
The RNV has been trained on thousands of Rubik's cube solves and has learned to recognize efficient move sequences for different cube patterns. This move is part of such a learned sequence.
"""
else:
thinking = f"""
I've carefully analyzed the current state of the cube to determine my next move.
After examining the positions of the corners and edges, I can see that applying {move} (rotating the {face} face {direction}) will help organize several key pieces.
This move is strategic because it:
1. Helps align several pieces that are currently out of position
2. Sets up the cube for subsequent moves in my solving algorithm
3. Makes progress toward completing a specific pattern or face
Looking at the current arrangement, I believe this move will bring us closer to the solution by improving the overall organization of the cube. It follows logically from my previous moves and continues our systematic path toward solving the puzzle.
"""
# Format the response like an LLM would
response = f"""<think>
{thinking}
</think>
<tool_call>
{{"arguments": {{"move": "{move}"}}, "name": "apply_move"}}
</tool_call>"""
return response
def print_with_colors(self, cube_str):
"""Print the cube with ANSI color codes"""
# Define ANSI color codes for each cube color
color_map = {
'W': '\033[97m', # White
'Y': '\033[93m', # Yellow
'R': '\033[91m', # Red
'O': '\033[38;5;208m', # Orange
'G': '\033[92m', # Green
'B': '\033[94m', # Blue
}
RESET = '\033[0m'
BOLD = '\033[1m'
# Process the string line by line
lines = cube_str.split('\n')
colored_lines = []
for line in lines:
if ':' in line: # This is a face label line
parts = line.split(':')
face_name = parts[0].strip()
colors = parts[1].strip().split()
# Color each letter
colored_colors = [f"{color_map.get(c, '')}{c}{RESET}" for c in colors]
colored_line = f"{BOLD}{face_name}{RESET}: {' '.join(colored_colors)}"
else: # This is an indented line with just colors
stripped = line.strip()
if stripped:
colors = stripped.split()
colored_colors = [f"{color_map.get(c, '')}{c}{RESET}" for c in colors]
colored_line = f" {' '.join(colored_colors)}"
else:
colored_line = line
colored_lines.append(colored_line)
print('\n'.join(colored_lines))
def solve_step(self) -> Dict[str, Any]:
"""Perform one step in solving the cube"""
if self.cube.is_solved():
return {
"status": "solved",
"message": "The cube is already solved!"
}
if len(self.step_history) >= self.max_steps:
return {
"status": "max_steps_reached",
"message": f"Maximum steps ({self.max_steps}) reached without solving the cube."
}
# Format the observation for the LLM
observation = self.format_observation()
print(f"\n{'='*20} STEP {len(self.step_history) + 1} {'='*20}")
# Get the LLM response (simulated in this demo)
llm_response = self.simulate_llm_response(observation, len(self.step_history))
# Extract the move and thinking from the response
move = self.parse_move(llm_response)
thinking = self.extract_thinking(llm_response)
# Apply the move if valid
if move:
# Save the state before the move
prev_progress = self.cube.count_solved_cubies()
# Apply the move
self.cube.rotate(move)
# Calculate progress after the move
current_progress = self.cube.count_solved_cubies()
progress_delta = current_progress - prev_progress
# Save step information
self.step_history.append({
"move": move,
"thinking": thinking,
"progress_before": prev_progress,
"progress_after": current_progress,
"progress_delta": progress_delta
})
# Print step information with visual enhancements
print(f"🎯 Move: {move}")
print(f"🧠 AI Thinking:\n{thinking}")
# Add a small delay to make it more dramatic
if self.delay > 0:
time.sleep(self.delay)
# Print the progress with colors
if progress_delta > 0:
delta_color = '\033[92m' # Green for improvement
delta_symbol = ''
elif progress_delta < 0:
delta_color = '\033[91m' # Red for regression
delta_symbol = ''
else:
delta_color = '\033[93m' # Yellow for no change
delta_symbol = ''
print(f"📊 Current progress: \033[1m{current_progress:.2f}\033[0m {delta_color}({delta_symbol} {progress_delta:.2f})\033[0m")
# Print the cube with colors if visualization is enabled
if self.visualize:
self.print_with_colors(str(self.cube))
# Check if solved
if self.cube.is_solved():
return {
"status": "solved",
"message": f"Cube solved in {len(self.step_history)} steps!"
}
else:
return {
"status": "in_progress",
"message": f"Applied move: {move}"
}
else:
return {
"status": "invalid_move",
"message": "Failed to parse or apply move from LLM response."
}
def solve(self, max_steps: Optional[int] = None) -> Dict[str, Any]:
"""Attempt to solve the cube with step-by-step LLM guidance"""
if max_steps is not None:
self.max_steps = max_steps
print("\n" + "="*50)
print("🧩 STARTING CUBE SOLVING PROCESS 🧩")
print("="*50 + "\n")
print("Initial cube state:")
self.print_with_colors(str(self.cube))
print(f"📊 Initial progress: {self.cube.count_solved_cubies():.2f}")
while True:
# Perform one solving step
result = self.solve_step()
# Check termination conditions
if result["status"] == "solved":
print("\n" + "="*50)
print("🎉 CUBE SOLVED! 🎉")
print("="*50)
break
elif result["status"] == "max_steps_reached" or result["status"] == "invalid_move":
print("\n" + "="*50)
print(f"❌ SOLVING FAILED: {result['message']}")
print("="*50)
break
# Optional pause between steps
if self.delay > 0:
time.sleep(self.delay)
# Summarize results
print("\n" + "="*50)
print("📋 SOLVING SUMMARY 📋")
print("="*50)
print(f"Status: {result['status']}")
print(f"Steps taken: {len(self.step_history)}")
print(f"Moves applied: {', '.join([step['move'] for step in self.step_history])}")
print(f"Final progress: {self.cube.count_solved_cubies():.2f}")
print(f"Solved: {self.cube.is_solved()}")
return {
"status": result["status"],
"steps_taken": len(self.step_history),
"moves_applied": [step["move"] for step in self.step_history],
"final_progress": self.cube.count_solved_cubies(),
"is_solved": self.cube.is_solved()
}
def main():
parser = argparse.ArgumentParser(description="Rubik's Cube Hackathon Demo")
parser.add_argument('--scramble', type=int, default=5, help='Number of scramble moves (default: 5)')
parser.add_argument('--steps', type=int, default=20, help='Maximum solving steps (default: 20)')
parser.add_argument('--delay', type=float, default=0.5, help='Delay between steps in seconds (default: 0.5)')
parser.add_argument('--no-visual', action='store_true', help='Disable cube visualization')
parser.add_argument('--no-rnv', action='store_true', help='Disable Reinforcement Neural Vector (RNV) policy')
args = parser.parse_args()
# Create the demo solver
demo = RubiksCubeHackathonDemo(
scramble_moves=args.scramble,
max_steps=args.steps,
delay=args.delay,
visualize=not args.no_visual,
use_rnv=not args.no_rnv
)
# Scramble the cube
demo.scramble_cube()
# Try to solve it
demo.solve()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,28 @@
# Selcube-specific dependencies for Rubik's Cube training environment
# Core scientific computing (beyond base requirements)
scipy>=1.10.0
matplotlib>=3.7.0
seaborn>=0.12.0
pandas>=2.0.0
# ML and training dependencies
torch>=2.0.0
transformers>=4.30.0
wandb>=0.15.0
# 3D visualization and cube rendering
plotly>=5.14.0
imageio>=2.31.0
# Environment configuration
pyyaml>=6.0
tqdm>=4.65.0
pydantic>=2.0.2
# Web interface for demonstrations
flask>=2.3.2
pillow>=9.5.0
# HTML visualization generation
jinja2>=3.1.2
beautifulsoup4>=4.12.2

View file

@ -0,0 +1,299 @@
#!/usr/bin/env python3
"""
RubiksCubeCurriculum: Curriculum learning utilities for Rubik's Cube environment
This module provides classes and functions to implement curriculum learning for
the Rubik's cube environment, where the difficulty gradually increases as the
model improves in solving simpler challenges.
"""
import logging
import math
import numpy as np
from typing import Dict, List, Optional, Tuple, Any
import random
logger = logging.getLogger(__name__)
class CurriculumLevel:
"""Represents a curriculum learning level for Rubik's cube solving"""
def __init__(
self,
level: int,
min_scramble_moves: int,
max_scramble_moves: int,
max_steps: int,
reward_per_correctly_placed_cubie: float,
example_patterns: List[List[str]] = None,
description: str = None
):
"""
Initialize a curriculum level
Args:
level: Level number (higher is more difficult)
min_scramble_moves: Minimum number of scramble moves
max_scramble_moves: Maximum number of scramble moves
max_steps: Maximum allowed steps to solve at this level
reward_per_correctly_placed_cubie: Reward multiplier for correctly placed cubies
example_patterns: Optional list of move sequences to learn at this level
description: Human-readable description of this level
"""
self.level = level
self.min_scramble_moves = min_scramble_moves
self.max_scramble_moves = max_scramble_moves
self.max_steps = max_steps
self.reward_per_correctly_placed_cubie = reward_per_correctly_placed_cubie
self.example_patterns = example_patterns or []
self.description = description or f"Level {level}: {min_scramble_moves}-{max_scramble_moves} scramble moves"
def get_scramble_moves(self) -> int:
"""Get a random number of scramble moves within the level's range"""
return random.randint(self.min_scramble_moves, self.max_scramble_moves)
def __repr__(self) -> str:
return f"CurriculumLevel(level={self.level}, scramble_moves={self.min_scramble_moves}-{self.max_scramble_moves})"
class RubiksCubeCurriculum:
"""Manages curriculum progression for Rubik's cube solver training"""
def __init__(
self,
starting_level: int = 1,
max_level: int = 5,
auto_progress: bool = True,
success_threshold: float = 0.7,
advancement_window_size: int = 50,
min_solved_at_level: int = 25
):
"""
Initialize the curriculum manager
Args:
starting_level: Initial curriculum level
max_level: Maximum curriculum level
auto_progress: Whether to automatically progress through levels
success_threshold: Success rate threshold to advance to next level
advancement_window_size: Number of episodes to consider for advancement
min_solved_at_level: Minimum number of episodes that must be solved at a level
before considering advancement
"""
self.current_level = starting_level
self.max_level = max_level
self.auto_progress = auto_progress
self.success_threshold = success_threshold
self.advancement_window_size = advancement_window_size
self.min_solved_at_level = min_solved_at_level
# Track episode results for potential advancement
self.episode_results = [] # List of (level, is_solved, num_steps) tuples
# Define curriculum levels
self.levels = self._create_default_curriculum()
def _create_default_curriculum(self) -> Dict[int, CurriculumLevel]:
"""Create the default curriculum progression"""
levels = {}
# Level 1: Very simple scrambles (1-3 moves)
levels[1] = CurriculumLevel(
level=1,
min_scramble_moves=1,
max_scramble_moves=3,
max_steps=15,
reward_per_correctly_placed_cubie=0.1,
description="Beginner level - Single move to Triple moves scrambles"
)
# Level 2: Simple scrambles (4-7 moves)
levels[2] = CurriculumLevel(
level=2,
min_scramble_moves=4,
max_scramble_moves=7,
max_steps=20,
reward_per_correctly_placed_cubie=0.075,
description="Easy level - Learn basic patterns and simple sequences"
)
# Level 3: Moderate scrambles (8-12 moves)
levels[3] = CurriculumLevel(
level=3,
min_scramble_moves=8,
max_scramble_moves=12,
max_steps=25,
reward_per_correctly_placed_cubie=0.05,
description="Intermediate level - More complex patterns and sequences"
)
# Level 4: Challenging scrambles (13-17 moves)
levels[4] = CurriculumLevel(
level=4,
min_scramble_moves=13,
max_scramble_moves=17,
max_steps=30,
reward_per_correctly_placed_cubie=0.025,
description="Advanced level - Complex scrambles requiring deep planning"
)
# Level 5: Expert scrambles (18-22 moves)
levels[5] = CurriculumLevel(
level=5,
min_scramble_moves=18,
max_scramble_moves=22,
max_steps=40,
reward_per_correctly_placed_cubie=0.01,
description="Expert level - Near optimal scrambles approaching God's number"
)
return levels
def get_current_level(self) -> CurriculumLevel:
"""Get the current curriculum level"""
return self.levels[self.current_level]
def record_episode_result(self, level: int, is_solved: bool, num_steps: int) -> None:
"""
Record the result of an episode
Args:
level: The curriculum level of the episode
is_solved: Whether the cube was solved successfully
num_steps: Number of steps taken in the episode
"""
self.episode_results.append((level, is_solved, num_steps))
# Keep only the most recent window of results
if len(self.episode_results) > self.advancement_window_size:
self.episode_results = self.episode_results[-self.advancement_window_size:]
# Check if we should advance to the next level
if self.auto_progress:
self._check_advancement()
def _check_advancement(self) -> None:
"""Check if we should advance to the next level based on recent performance"""
# Only consider episodes at the current level
current_level_results = [r for r in self.episode_results if r[0] == self.current_level]
# Need enough data to make a decision
if len(current_level_results) < self.min_solved_at_level:
return
# Calculate success rate at current level
success_count = sum(1 for _, is_solved, _ in current_level_results if is_solved)
success_rate = success_count / len(current_level_results)
# Log the current performance
logger.info(
f"Curriculum performance: Level {self.current_level}, "
f"Success rate: {success_rate:.2f} ({success_count}/{len(current_level_results)})"
)
# Check if we should advance
if (success_rate >= self.success_threshold and
success_count >= self.min_solved_at_level and
self.current_level < self.max_level):
self.current_level += 1
logger.info(
f"Advancing to curriculum level {self.current_level}: "
f"{self.levels[self.current_level].description}"
)
# Reset episode results after advancing
self.episode_results = []
def set_level(self, level: int) -> None:
"""
Manually set the curriculum level
Args:
level: The new curriculum level (must be between 1 and max_level)
"""
if level < 1 or level > self.max_level:
logger.warning(
f"Invalid curriculum level {level}. Must be between 1 and {self.max_level}. "
f"Keeping current level {self.current_level}."
)
return
self.current_level = level
logger.info(f"Manually set curriculum to level {level}: {self.levels[level].description}")
# Reset episode results after manual level change
self.episode_results = []
def get_level_metrics(self) -> Dict[str, Any]:
"""Get metrics for the current curriculum level"""
current_level_results = [r for r in self.episode_results if r[0] == self.current_level]
if not current_level_results:
return {
"curriculum_level": self.current_level,
"curriculum_description": self.levels[self.current_level].description,
"level_success_rate": 0.0,
"level_episodes": 0,
"level_solved_count": 0,
"level_avg_steps": 0.0,
"progress_to_next_level": 0.0
}
success_count = sum(1 for _, is_solved, _ in current_level_results if is_solved)
success_rate = success_count / len(current_level_results)
# Calculate average steps for solved episodes
solved_episodes = [(level, solved, steps) for level, solved, steps in current_level_results if solved]
avg_steps = sum(steps for _, _, steps in solved_episodes) / max(1, len(solved_episodes))
# Calculate progress to next level (0.0 to 1.0)
if self.current_level >= self.max_level:
progress_to_next = 1.0
else:
progress_threshold = self.success_threshold * self.min_solved_at_level
current_progress = success_rate * len(current_level_results)
progress_to_next = min(1.0, current_progress / progress_threshold)
return {
"curriculum_level": self.current_level,
"curriculum_description": self.levels[self.current_level].description,
"level_success_rate": success_rate,
"level_episodes": len(current_level_results),
"level_solved_count": success_count,
"level_avg_steps": avg_steps,
"progress_to_next_level": progress_to_next
}
# Example usage
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create curriculum manager
curriculum = RubiksCubeCurriculum(
starting_level=1,
max_level=5,
auto_progress=True,
success_threshold=0.7,
advancement_window_size=50,
min_solved_at_level=25
)
# Simulate some episodes
# In a real setup, these results would come from actual cube-solving episodes
for _ in range(40):
# Simulate success with 80% probability for level 1
is_solved = random.random() < 0.8
steps = random.randint(5, 15)
curriculum.record_episode_result(1, is_solved, steps)
# Print metrics
print(curriculum.get_level_metrics())
# Current level should now be 2 if enough episodes were solved
print(f"Current level: {curriculum.current_level}")
# Manually set to level 3
curriculum.set_level(3)
print(f"After manual set, current level: {curriculum.current_level}")

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""
Rubik's Cube logic extracted from the environment for independent testing
"""
# Define the face colors for visualization
UP_COLOR = 'W' # White
DOWN_COLOR = 'Y' # Yellow
RIGHT_COLOR = 'R' # Red
LEFT_COLOR = 'O' # Orange
FRONT_COLOR = 'G' # Green
BACK_COLOR = 'B' # Blue
class Cube:
"""
A Rubik's cube implementation with accurate move handling.
"""
def __init__(self):
# Initialize a solved cube
self.reset()
def reset(self):
"""Reset the cube to solved state"""
# Initialize the cube as a 3D array [face][row][col]
# Faces: 0=UP, 1=DOWN, 2=LEFT, 3=RIGHT, 4=FRONT, 5=BACK
self.cube = [
[[UP_COLOR for _ in range(3)] for _ in range(3)], # UP
[[DOWN_COLOR for _ in range(3)] for _ in range(3)], # DOWN
[[LEFT_COLOR for _ in range(3)] for _ in range(3)], # LEFT
[[RIGHT_COLOR for _ in range(3)] for _ in range(3)], # RIGHT
[[FRONT_COLOR for _ in range(3)] for _ in range(3)], # FRONT
[[BACK_COLOR for _ in range(3)] for _ in range(3)] # BACK
]
def is_solved(self) -> bool:
"""Check if the cube is solved"""
for face in self.cube:
center_color = face[1][1] # Center color never changes
for row in face:
for color in row:
if color != center_color:
return False
return True
def count_solved_cubies(self) -> float:
"""
Count the number of stickers in their correct position
Returns a normalized score between 0 and 1
"""
# Create a solved reference cube
reference = Cube()
# Count matching stickers
total_stickers = 6 * 9 # 6 faces, 9 stickers per face
match_count = 0
for face_idx in range(6):
for i in range(3):
for j in range(3):
if self.cube[face_idx][i][j] == reference.cube[face_idx][i][j]:
match_count += 1
return match_count / total_stickers
def rotate(self, move: str):
"""
Perform a move on the cube using standard notation
U, D, L, R, F, B are clockwise rotations of respective faces
U', D', L', R', F', B' are counterclockwise rotations
U2, D2, L2, R2, F2, B2 are double (180°) rotations
"""
# Map move notation to face index and rotation count
face_map = {
'U': 0, 'D': 1, 'L': 2, 'R': 3, 'F': 4, 'B': 5
}
# Parse the move
if len(move) == 0:
raise ValueError("Empty move")
face = move[0]
if face not in face_map:
raise ValueError(f"Invalid face: {face}")
face_idx = face_map[face]
# Handle rotation direction
if len(move) == 1:
# Clockwise rotation
count = 1
elif len(move) == 2:
if move[1] == "'":
# Counterclockwise rotation
count = 3
elif move[1] == "2":
# Double rotation
count = 2
else:
raise ValueError(f"Invalid move modifier: {move[1]}")
else:
raise ValueError(f"Invalid move format: {move}")
# Apply the rotation 'count' times
for _ in range(count):
self._rotate_face_clockwise(face_idx)
self._rotate_adjacent_faces(face_idx)
def _rotate_face_clockwise(self, face_idx: int):
"""Rotate a face clockwise"""
face = self.cube[face_idx]
new_face = [[None for _ in range(3)] for _ in range(3)]
# Copy with 90-degree clockwise rotation
for i in range(3):
for j in range(3):
new_face[j][2-i] = face[i][j]
self.cube[face_idx] = new_face
def _rotate_adjacent_faces(self, face_idx: int):
"""Rotate the appropriate edges on adjacent faces"""
if face_idx == 0: # UP face
# Rotate the top edges of FRONT, RIGHT, BACK, LEFT
temp = self.cube[4][0][:] # Save FRONT top edge
self.cube[4][0] = self.cube[2][0][:] # FRONT <- LEFT
self.cube[2][0] = self.cube[5][0][:] # LEFT <- BACK
self.cube[5][0] = self.cube[3][0][:] # BACK <- RIGHT
self.cube[3][0] = temp # RIGHT <- FRONT
elif face_idx == 1: # DOWN face
# Rotate the bottom edges of FRONT, LEFT, BACK, RIGHT
temp = self.cube[4][2][:] # Save FRONT bottom edge
self.cube[4][2] = self.cube[3][2][:] # FRONT <- RIGHT
self.cube[3][2] = self.cube[5][2][:] # RIGHT <- BACK
self.cube[5][2] = self.cube[2][2][:] # BACK <- LEFT
self.cube[2][2] = temp # LEFT <- FRONT
elif face_idx == 2: # LEFT face
# Rotate the left edges of UP, FRONT, DOWN, BACK
# Need to extract and set columns, not rows
temp = [self.cube[0][i][0] for i in range(3)] # Save UP left column
# UP left <- BACK right (reversed)
for i in range(3):
self.cube[0][i][0] = self.cube[5][2-i][2]
# BACK right <- DOWN left (reversed)
for i in range(3):
self.cube[5][i][2] = self.cube[1][2-i][0]
# DOWN left <- FRONT left
for i in range(3):
self.cube[1][i][0] = self.cube[4][i][0]
# FRONT left <- UP left
for i in range(3):
self.cube[4][i][0] = temp[i]
elif face_idx == 3: # RIGHT face
# Rotate the right edges of UP, BACK, DOWN, FRONT
temp = [self.cube[0][i][2] for i in range(3)] # Save UP right column
# UP right <- FRONT right
for i in range(3):
self.cube[0][i][2] = self.cube[4][i][2]
# FRONT right <- DOWN right
for i in range(3):
self.cube[4][i][2] = self.cube[1][i][2]
# DOWN right <- BACK left (reversed)
for i in range(3):
self.cube[1][i][2] = self.cube[5][2-i][0]
# BACK left <- UP right (reversed)
for i in range(3):
self.cube[5][i][0] = temp[2-i]
elif face_idx == 4: # FRONT face
# Rotate the edges of UP bottom, RIGHT left, DOWN top, LEFT right
# UP bottom row
temp = self.cube[0][2][:]
# UP bottom <- LEFT right (rotated)
for i in range(3):
self.cube[0][2][i] = self.cube[2][2-i][2]
# LEFT right <- DOWN top (rotated)
for i in range(3):
self.cube[2][i][2] = self.cube[1][0][i]
# DOWN top <- RIGHT left (rotated)
for i in range(3):
self.cube[1][0][i] = self.cube[3][2-i][0]
# RIGHT left <- UP bottom (rotated)
for i in range(3):
self.cube[3][i][0] = temp[i]
elif face_idx == 5: # BACK face
# Rotate the edges of UP top, LEFT left, DOWN bottom, RIGHT right
# UP top row
temp = self.cube[0][0][:]
# UP top <- RIGHT right (rotated)
for i in range(3):
self.cube[0][0][i] = self.cube[3][2-i][2]
# RIGHT right <- DOWN bottom (rotated)
for i in range(3):
self.cube[3][i][2] = self.cube[1][2][i]
# DOWN bottom <- LEFT left (rotated)
for i in range(3):
self.cube[1][2][i] = self.cube[2][2-i][0]
# LEFT left <- UP top (rotated)
for i in range(3):
self.cube[2][i][0] = temp[i]
def __str__(self) -> str:
"""Convert cube to string representation"""
face_names = ['U', 'D', 'L', 'R', 'F', 'B']
result = []
for i, face in enumerate(self.cube):
result.append(f"{face_names[i]}: {' '.join(face[0])}")
result.append(f" {' '.join(face[1])}")
result.append(f" {' '.join(face[2])}")
return "\n".join(result)
def test_basic_moves():
"""Test basic moves and their inverses"""
print("=== TESTING BASIC MOVES ===")
# Test each basic move and its inverse
for move, inverse in [
("R", "R'"), ("L", "L'"), ("U", "U'"),
("D", "D'"), ("F", "F'"), ("B", "B'")
]:
cube = Cube()
cube.rotate(move)
cube.rotate(inverse)
solved = cube.is_solved()
print(f"Move {move} followed by {inverse}: {'PASS' if solved else 'FAIL'}")
if not solved:
print("Final cube state:")
print(str(cube))
def test_double_moves():
"""Test double (180°) moves"""
print("\n=== TESTING DOUBLE MOVES ===")
# Test each double move applied twice
for move in ["U2", "D2", "L2", "R2", "F2", "B2"]:
cube = Cube()
cube.rotate(move)
cube.rotate(move)
solved = cube.is_solved()
print(f"Double move {move} applied twice: {'PASS' if solved else 'FAIL'}")
if not solved:
print("Final cube state:")
print(str(cube))
def test_complex_algorithms():
"""Test more complex algorithms"""
print("\n=== TESTING COMPLEX ALGORITHMS ===")
# Test algorithms
algorithms = [
{
"name": "Sexy Move (R U R' U') × 6",
"moves": ["R", "U", "R'", "U'"] * 6,
"should_solve": True
},
{
"name": "Scramble + Inverse",
"moves": ["R", "U", "F'", "L", "D2"] + ["D2", "L'", "F", "U'", "R'"],
"should_solve": True
},
{
"name": "Sune Algorithm (R U R' U R U2 R')",
"moves": ["R", "U", "R'", "U", "R", "U2", "R'"],
"should_solve": False
}
]
for algo in algorithms:
cube = Cube()
print(f"\nTesting: {algo['name']}")
# Apply moves
for move in algo["moves"]:
cube.rotate(move)
# Check result
is_solved = cube.is_solved()
expected = algo["should_solve"]
if is_solved == expected:
print(f"Result: PASS (Expected {'solved' if expected else 'not solved'}, Got {'solved' if is_solved else 'not solved'})")
else:
print(f"Result: FAIL (Expected {'solved' if expected else 'not solved'}, Got {'solved' if is_solved else 'not solved'})")
print("Final cube state:")
print(str(cube))
# Show progress percentage if not solved
if not is_solved:
progress = cube.count_solved_cubies()
print(f"Progress toward solution: {progress:.2f}")
def test_scramble_and_count():
"""Test scrambling and counting progress"""
print("\n=== TESTING SCRAMBLING AND PROGRESS TRACKING ===")
# Create a cube and apply random-like scramble
cube = Cube()
print("Solved cube:")
print(str(cube))
print(f"Is solved: {cube.is_solved()}")
print(f"Progress: {cube.count_solved_cubies():.2f}")
# Apply a sequence of moves to scramble
scramble = ["R", "U", "F", "D", "L", "B'", "R'", "U2"]
print(f"\nApplying scramble: {' '.join(scramble)}")
for move in scramble:
cube.rotate(move)
print("Scrambled cube:")
print(str(cube))
print(f"Is solved: {cube.is_solved()}")
print(f"Progress: {cube.count_solved_cubies():.2f}")
if __name__ == "__main__":
test_basic_moves()
test_double_moves()
test_complex_algorithms()
test_scramble_and_count()

File diff suppressed because it is too large Load diff

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,384 @@
#!/usr/bin/env python3
"""
RubiksCubeStrategies: Library of solving strategies for the Rubik's cube environment
This module provides a collection of solving strategies for Rubik's cube, along with
explanations and examples for each. These strategies can be used to guide the LLM's
solving approach and provide structured learning.
"""
from typing import Dict, List, Optional, Tuple
class SolvingStrategy:
"""Base class for Rubik's cube solving strategies"""
def __init__(
self,
name: str,
description: str,
difficulty: int,
steps: List[str],
example_algorithms: List[Dict[str, str]] = None,
tips: List[str] = None
):
"""
Initialize a solving strategy
Args:
name: Strategy name
description: Detailed description of the strategy
difficulty: Difficulty level (1-5)
steps: Ordered list of steps to follow
example_algorithms: Common algorithms used in this strategy
tips: Tips for using this strategy effectively
"""
self.name = name
self.description = description
self.difficulty = difficulty
self.steps = steps
self.example_algorithms = example_algorithms or []
self.tips = tips or []
def get_prompt_section(self) -> str:
"""Get formatted prompt section for this strategy"""
prompt = f"""
STRATEGY: {self.name} (Difficulty: {self.difficulty}/5)
DESCRIPTION:
{self.description}
STEPS:
"""
for i, step in enumerate(self.steps, 1):
prompt += f"{i}. {step}\n"
if self.example_algorithms:
prompt += "\nCOMMON ALGORITHMS:\n"
for algo in self.example_algorithms:
prompt += f"- {algo['name']}: {algo['moves']} - {algo['purpose']}\n"
if self.tips:
prompt += "\nTIPS:\n"
for tip in self.tips:
prompt += f"- {tip}\n"
return prompt
def __str__(self) -> str:
return f"{self.name} (Difficulty: {self.difficulty}/5)"
# Define common strategies
LAYER_BY_LAYER = SolvingStrategy(
name="Layer-by-Layer Method",
description=(
"The beginner-friendly approach that solves the cube one layer at a time. "
"It's intuitive and requires memorizing only a few algorithms."
),
difficulty=1,
steps=[
"Solve the white cross on the top face",
"Place the white corner pieces to complete the first layer",
"Solve the middle layer edges",
"Create a yellow cross on the bottom face",
"Position the yellow edges correctly",
"Position the yellow corners correctly",
"Orient the yellow corners correctly"
],
example_algorithms=[
{
"name": "Sexy Move",
"moves": "R U R' U'",
"purpose": "Used for placing corners in the first layer"
},
{
"name": "Middle Layer Edge - Left",
"moves": "U' L' U L U F U' F'",
"purpose": "Insert edge piece into the middle layer from the left"
},
{
"name": "Middle Layer Edge - Right",
"moves": "U R U' R' U' F' U F",
"purpose": "Insert edge piece into the middle layer from the right"
},
{
"name": "Orient Yellow Edges",
"moves": "F R U R' U' F'",
"purpose": "Create a yellow cross on the last layer"
}
],
tips=[
"Always keep the white face on top when solving the first layer",
"Look ahead to plan edge placement before executing moves",
"Pay attention to where pieces need to go before applying algorithms",
"Break down the solution into manageable steps"
]
)
CFOP_METHOD = SolvingStrategy(
name="CFOP Method (Fridrich Method)",
description=(
"An advanced method used by speedcubers. CFOP stands for Cross, F2L (First Two Layers), "
"OLL (Orient Last Layer), and PLL (Permute Last Layer). It's efficient but requires "
"memorizing many algorithms."
),
difficulty=4,
steps=[
"Solve the cross on the bottom face (usually white)",
"Solve the First Two Layers (F2L) by pairing corners with edges and inserting them",
"Orient the Last Layer (OLL) to make the top face all one color",
"Permute the Last Layer (PLL) to arrange all pieces correctly"
],
example_algorithms=[
{
"name": "F2L Case 1",
"moves": "R U R'",
"purpose": "Basic F2L insertion when corner and edge are paired"
},
{
"name": "F2L Case 2",
"moves": "y' U' L' U L",
"purpose": "Basic F2L insertion (mirror of case 1)"
},
{
"name": "Sune",
"moves": "R U R' U R U2 R'",
"purpose": "Common OLL algorithm used to orient corners"
},
{
"name": "T Permutation",
"moves": "R U R' U' R' F R2 U' R' U' R U R' F'",
"purpose": "PLL algorithm that swaps two corners and two edges"
}
],
tips=[
"Practice F2L intuitively before learning algorithms",
"Solve the cross on the bottom to see the F2L pairs more easily",
"Learn to recognize F2L cases from different angles",
"Group PLL algorithms by similar patterns to make memorization easier"
]
)
ROUX_METHOD = SolvingStrategy(
name="Roux Method",
description=(
"A method focused on building blocks and using M-slice moves. It's very efficient "
"and requires fewer algorithm memorizations than CFOP but demands good spatial intuition."
),
difficulty=3,
steps=[
"Build a 1x2x3 block on the left side (First Block)",
"Build a 1x2x3 block on the right side (Second Block)",
"Orient the corners of the top layer and permute the corners of the top layer (CMLL)",
"Orient the edges of the last layer and permute the M-slice (L6E)"
],
example_algorithms=[
{
"name": "CMLL - O Case",
"moves": "R U R' F' R U R' U' R' F R2 U' R'",
"purpose": "Orient and permute corners when all corners are oriented incorrectly"
},
{
"name": "EO - Arrow",
"moves": "M U M'",
"purpose": "Edge orientation during L6E phase"
},
{
"name": "UL/UR Edge Swap",
"moves": "M' U2 M U2",
"purpose": "Swap the UL and UR edges during L6E phase"
}
],
tips=[
"Focus on block-building efficiency for the first two blocks",
"Use inspection time to plan the first block completely",
"Practice M-slice moves to develop speed and accuracy",
"Learn to recognize CMLL cases quickly to reduce pauses"
]
)
ZZ_METHOD = SolvingStrategy(
name="ZZ Method",
description=(
"A method that focuses on solving edges early to enable rotationless solving. "
"It orients all edges first, then solves the cube without F or B moves."
),
difficulty=3,
steps=[
"Orient all edges (EOLine) while placing DF and DB edges",
"Build the F2L on the left and right sides (ZZF2L)",
"Orient the corners of the last layer (OCLL)",
"Permute the last layer (PLL)"
],
example_algorithms=[
{
"name": "EOLine Example",
"moves": "F L' U B' D'",
"purpose": "Orient all edges and place DF and DB edges"
},
{
"name": "ZZF2L Pair",
"moves": "U L U' L'",
"purpose": "Insert corner-edge pair during F2L"
},
{
"name": "OCLL - Sune",
"moves": "R U R' U R U2 R'",
"purpose": "Orient three corners in the last layer"
}
],
tips=[
"Practice EOLine recognition to improve planning during inspection",
"Take advantage of the rotationless solving after EOLine",
"Use block-building techniques similar to Petrus for F2L",
"Learn to recognize edge orientation quickly"
]
)
BEGINNER_METHOD = SolvingStrategy(
name="Beginner Method",
description=(
"The simplest approach for complete beginners. Uses very intuitive steps and minimal algorithm "
"memorization, focusing on understanding the cube's mechanics rather than speed."
),
difficulty=1,
steps=[
"Solve the white cross",
"Solve the white corners one by one",
"Solve the middle layer edges one by one",
"Make a yellow cross on the top",
"Solve the yellow edges around the top",
"Position the yellow corners",
"Orient the yellow corners"
],
example_algorithms=[
{
"name": "White Corner Insertion",
"moves": "R U R' U'",
"purpose": "Move a white corner piece into position"
},
{
"name": "Edge Insertion",
"moves": "U R U' R' U' F' U F",
"purpose": "Insert a middle layer edge piece"
},
{
"name": "Yellow Cross",
"moves": "F R U R' U' F'",
"purpose": "Form a yellow cross on the top face"
}
],
tips=[
"Focus on understanding what each move does rather than memorizing algorithms",
"Take your time and think about where pieces need to go",
"Keep track of important pieces while executing algorithms",
"Practice the fundamentals until they become natural"
]
)
CORNERS_FIRST = SolvingStrategy(
name="Corners-First Method",
description=(
"Solve all corner pieces first, then solve the edges. This approach is less common "
"but offers a different perspective on solving the cube."
),
difficulty=2,
steps=[
"Orient the corners to get white and yellow on top and bottom",
"Permute the corners to their correct positions",
"Solve the middle layer edges",
"Solve the last layer edges"
],
example_algorithms=[
{
"name": "Corner Orientation",
"moves": "R' D' R D",
"purpose": "Orient a corner in place"
},
{
"name": "Corner 3-Cycle",
"moves": "R U' R' D2 R U R' D2",
"purpose": "Cycle three corners"
},
{
"name": "Edge 3-Cycle",
"moves": "L' R U2 L R' F' L' R U2 L R' F",
"purpose": "Cycle three edges"
}
],
tips=[
"Use commutators for corner manipulation",
"Pay attention to corner orientation as it affects the later steps",
"Learn to visualize corner pieces and their correct positions",
"Practice edge insertion techniques for the final steps"
]
)
def get_strategy_by_name(name: str) -> Optional[SolvingStrategy]:
"""Get a strategy by name"""
all_strategies = [
LAYER_BY_LAYER,
CFOP_METHOD,
ROUX_METHOD,
ZZ_METHOD,
BEGINNER_METHOD,
CORNERS_FIRST
]
for strategy in all_strategies:
if strategy.name.lower() == name.lower():
return strategy
return None
def get_strategy_by_difficulty(difficulty: int) -> List[SolvingStrategy]:
"""Get all strategies at a specific difficulty level"""
all_strategies = [
LAYER_BY_LAYER,
CFOP_METHOD,
ROUX_METHOD,
ZZ_METHOD,
BEGINNER_METHOD,
CORNERS_FIRST
]
return [strategy for strategy in all_strategies if strategy.difficulty == difficulty]
def get_all_strategies() -> List[SolvingStrategy]:
"""Get all available strategies"""
return [
LAYER_BY_LAYER,
CFOP_METHOD,
ROUX_METHOD,
ZZ_METHOD,
BEGINNER_METHOD,
CORNERS_FIRST
]
def get_strategy_prompt_for_level(level: int) -> str:
"""Get a formatted prompt with strategies appropriate for the curriculum level"""
if level <= 2:
# Beginner levels - show only simpler strategies
strategies = [BEGINNER_METHOD, LAYER_BY_LAYER]
elif level == 3:
# Intermediate level
strategies = [LAYER_BY_LAYER, CORNERS_FIRST, ROUX_METHOD]
else:
# Advanced levels - show all strategies
strategies = get_all_strategies()
prompt = "# RUBIK'S CUBE SOLVING STRATEGIES\n\nBelow are strategies you can use to solve the cube:\n\n"
for strategy in strategies:
prompt += strategy.get_prompt_section() + "\n\n"
prompt += """
When solving the cube, you can use any of these strategies. Make sure to:
1. Choose a strategy that fits your understanding and the current cube state
2. Explain your thought process using the <think> tags
3. Follow the steps of your chosen strategy systematically
4. Apply the appropriate algorithms for your current situation
5. Track your progress toward the solution
"""
return prompt

View file

@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""
RubiksCubeTokenRewards: Token-level reward utilities for Rubik's Cube environment
This module provides functions for calculating token-level rewards, which are
important for fine-grained RL training signals that help the model understand
which tokens in its response contribute to success or failure.
"""
import numpy as np
import re
from typing import Dict, List, Optional, Tuple, Any
def calculate_token_level_rewards(
response_text: str,
is_valid_move: bool,
parsed_move: Optional[str],
reward: float,
token_ids: List[int],
scale_factor: float = 0.1
) -> List[float]:
"""
Calculate token-level rewards based on the response quality
Args:
response_text: Full response text from the LLM
is_valid_move: Whether the parsed move was valid
parsed_move: The parsed move if any
reward: The overall reward for the response
token_ids: List of token IDs in the response
scale_factor: Scale factor for token rewards
Returns:
A list of token-level rewards with the same length as token_ids
"""
# Initialize with neutral rewards
token_rewards = [0.0] * len(token_ids)
if len(token_ids) == 0:
return token_rewards
# Extract key parts of the response
thinking_match = re.search(r"<think>(.*?)</think>", response_text, re.DOTALL)
tool_call_match = re.search(r"<tool_call>(.*?)</tool_call>", response_text, re.DOTALL)
# Find the indices of key tokens
thinking_start_idx = response_text.find("<think>")
thinking_end_idx = response_text.find("</think>")
tool_call_start_idx = response_text.find("<tool_call>")
tool_call_end_idx = response_text.find("</tool_call>")
# Determine approximate character-to-token ratio
chars_per_token = len(response_text) / len(token_ids)
# Flag for quality of thinking
has_good_thinking = False
if thinking_match and len(thinking_match.group(1).strip()) > 50:
has_good_thinking = True
# Process rewards based on response quality
if is_valid_move and has_good_thinking:
# Good response with both thinking and valid move
# Reward distribution: ~60% to tool call, ~40% to thinking
base_reward = reward * scale_factor
# Distribute rewards
for i in range(len(token_ids)):
# Estimate the character position this token represents
char_pos = int(i * chars_per_token)
if thinking_start_idx <= char_pos <= thinking_end_idx:
# Token is part of thinking section
token_rewards[i] = base_reward * 0.4
elif tool_call_start_idx <= char_pos <= tool_call_end_idx:
# Token is part of tool call section
token_rewards[i] = base_reward * 0.6
else:
# Token is part of other sections
token_rewards[i] = base_reward * 0.1
elif is_valid_move and not has_good_thinking:
# Valid move but poor thinking
base_reward = reward * scale_factor * 0.7 # Reduced overall reward
for i in range(len(token_ids)):
char_pos = int(i * chars_per_token)
if tool_call_start_idx <= char_pos <= tool_call_end_idx:
# Token is part of tool call section - still good
token_rewards[i] = base_reward * 0.8
else:
# Token is part of other sections - minimal reward
token_rewards[i] = base_reward * 0.2
elif not is_valid_move and has_good_thinking:
# Good thinking but invalid move
base_reward = reward * scale_factor * 0.5 # Significantly reduced
for i in range(len(token_ids)):
char_pos = int(i * chars_per_token)
if thinking_start_idx <= char_pos <= thinking_end_idx:
# Token is part of thinking section - somewhat good
token_rewards[i] = base_reward * 0.6
elif tool_call_start_idx <= char_pos <= tool_call_end_idx:
# Token is part of tool call section - problematic
token_rewards[i] = base_reward * 0.1
else:
# Token is part of other sections
token_rewards[i] = base_reward * 0.3
else:
# Poor response overall
base_reward = reward * scale_factor * 0.3 # Minimal reward
# Distribute minimal rewards evenly
for i in range(len(token_ids)):
token_rewards[i] = base_reward
# Special handling for move-related tokens when there is a valid move
if is_valid_move and parsed_move:
# Try to find the specific tokens that represent the move
move_pattern = re.escape(parsed_move)
move_matches = list(re.finditer(move_pattern, response_text))
for match in move_matches:
move_start_idx = match.start()
move_end_idx = match.end()
# Estimate corresponding token indices
move_start_token = int(move_start_idx / chars_per_token)
move_end_token = int(move_end_idx / chars_per_token) + 1
# Ensure indices are within bounds
move_start_token = max(0, min(move_start_token, len(token_ids) - 1))
move_end_token = max(0, min(move_end_token, len(token_ids)))
# Boost rewards for tokens that directly encode the move
for i in range(move_start_token, move_end_token):
token_rewards[i] = base_reward * 1.5 # Higher reward for the actual move
return token_rewards
def calculate_advantage_token_weights(token_rewards: List[List[float]]) -> List[List[float]]:
"""
Calculate token weights for advantage computation
Args:
token_rewards: List of token-level rewards for each alternative response
Returns:
List of normalized token weights for each alternative
"""
# Create a copy to avoid modifying the input
token_weights = [rewards.copy() for rewards in token_rewards]
# For each alternative
for i in range(len(token_weights)):
# Get min and max rewards for this alternative
min_reward = min(token_weights[i]) if token_weights[i] else 0.0
max_reward = max(token_weights[i]) if token_weights[i] else 0.0
reward_range = max_reward - min_reward
# Normalize to [0.5, 1.0] range to ensure all tokens get some weight
if reward_range > 0:
for j in range(len(token_weights[i])):
normalized = 0.5 + 0.5 * (token_weights[i][j] - min_reward) / reward_range
token_weights[i][j] = normalized
else:
# If all rewards are the same, use uniform weights
for j in range(len(token_weights[i])):
token_weights[i][j] = 1.0
return token_weights

View file

@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
Test script for the Rubik's Cube environment
"""
import asyncio
import random
from simple_cube import Cube
from rubiks_cube_environment import RubiksCubeEnv, RubiksCubeEnvConfig
from rubiks_cube_visualizer import save_cube_visualization
from atroposlib.envs.server_handling.server_manager import APIServerConfig
async def test_cube_visualization():
"""Test the cube visualization functionality"""
# Create a cube
cube = Cube()
# Scramble it with some random moves
moves = ["U", "D", "L", "R", "F", "B",
"U'", "D'", "L'", "R'", "F'", "B'",
"U2", "D2", "L2", "R2", "F2", "B2"]
move_history = []
for _ in range(5):
move = random.choice(moves)
move_history.append(move)
cube.rotate(move)
# Visualize the scrambled cube
cube_state = str(cube)
html_path = save_cube_visualization(
cube_state,
move_history,
"test_scrambled_cube.html"
)
print(f"Scrambled cube visualization saved to {html_path}")
print(f"Moves applied: {move_history}")
print(f"Is solved: {cube.is_solved()}")
async def test_environment():
"""Test the basic functionality of the environment"""
# Create the environment configuration
config = RubiksCubeEnvConfig(
tokenizer_name="gpt2", # Use a simple tokenizer for testing
group_size=2, # Small group size for testing
use_wandb=False,
max_steps=5,
scramble_moves=3,
debug_mode=True,
)
# Create server configuration
server_configs = [
APIServerConfig(
model_name="gpt2",
base_url="http://localhost:9004/v1",
api_key="x",
)
]
# Create the environment
env = RubiksCubeEnv(config, server_configs, slurm=False, testing=True)
# Test creating an episode
seed = 12345
episode = env._get_or_create_episode(seed)
# Print initial state
print(f"Initial cube state (seed {seed}):")
print(episode.get_cube_state_visualization())
# Test visualization
html_path = save_cube_visualization(
episode.get_cube_state_visualization(),
[],
"test_initial_cube.html"
)
print(f"Initial cube visualization saved to {html_path}")
# Test applying moves
test_moves = ["U", "R", "F'"]
for move in test_moves:
success = episode.apply_move(move)
print(f"Applied move {move}: {'Success' if success else 'Failed'}")
# Check if solved
print(f"Is solved: {episode.is_solved()}")
# Test final state visualization
html_path = save_cube_visualization(
episode.get_cube_state_visualization(),
episode.actions,
"test_after_moves_cube.html"
)
print(f"Final cube visualization saved to {html_path}")
if __name__ == "__main__":
# Run the tests
print("Running Rubik's Cube environment tests...")
asyncio.run(test_cube_visualization())
asyncio.run(test_environment())
print("Tests completed.")

View file

@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""
Test script for the Rubik's Cube environment
"""
# Import Cube class directly from rubiks_cube_environment.py
from rubiks_cube_environment import Cube
def test_basic_moves():
"""Test basic moves and their inverses"""
print("=== TESTING BASIC MOVES ===")
# Test each basic move and its inverse
for move, inverse in [
("R", "R'"), ("L", "L'"), ("U", "U'"),
("D", "D'"), ("F", "F'"), ("B", "B'")
]:
cube = Cube()
cube.rotate(move)
cube.rotate(inverse)
solved = cube.is_solved()
print(f"Move {move} followed by {inverse}: {'PASS' if solved else 'FAIL'}")
if not solved:
print("Final cube state:")
print(str(cube))
def test_double_moves():
"""Test double (180°) moves"""
print("\n=== TESTING DOUBLE MOVES ===")
# Test each double move applied twice
for move in ["U2", "D2", "L2", "R2", "F2", "B2"]:
cube = Cube()
cube.rotate(move)
cube.rotate(move)
solved = cube.is_solved()
print(f"Double move {move} applied twice: {'PASS' if solved else 'FAIL'}")
if not solved:
print("Final cube state:")
print(str(cube))
def test_complex_algorithms():
"""Test more complex algorithms"""
print("\n=== TESTING COMPLEX ALGORITHMS ===")
# Test algorithms
algorithms = [
{
"name": "Sexy Move (R U R' U') × 6",
"moves": ["R", "U", "R'", "U'"] * 6,
"should_solve": True
},
{
"name": "Scramble + Inverse",
"moves": ["R", "U", "F'", "L", "D2"] + ["D2", "L'", "F", "U'", "R'"],
"should_solve": True
},
{
"name": "Sune Algorithm (R U R' U R U2 R')",
"moves": ["R", "U", "R'", "U", "R", "U2", "R'"],
"should_solve": False
}
]
for algo in algorithms:
cube = Cube()
print(f"\nTesting: {algo['name']}")
# Apply moves
for move in algo["moves"]:
cube.rotate(move)
# Check result
is_solved = cube.is_solved()
expected = algo["should_solve"]
if is_solved == expected:
print(f"Result: PASS (Expected {'solved' if expected else 'not solved'}, Got {'solved' if is_solved else 'not solved'})")
else:
print(f"Result: FAIL (Expected {'solved' if expected else 'not solved'}, Got {'solved' if is_solved else 'not solved'})")
print("Final cube state:")
print(str(cube))
# Show progress percentage if not solved
if not is_solved:
progress = cube.count_solved_cubies()
print(f"Progress toward solution: {progress:.2f}")
def test_scramble_and_count():
"""Test scrambling and counting progress"""
print("\n=== TESTING SCRAMBLING AND PROGRESS TRACKING ===")
# Create a cube and apply random-like scramble
cube = Cube()
print("Solved cube:")
print(str(cube))
print(f"Is solved: {cube.is_solved()}")
print(f"Progress: {cube.count_solved_cubies():.2f}")
# Apply a sequence of moves to scramble
scramble = ["R", "U", "F", "D", "L", "B'", "R'", "U2"]
print(f"\nApplying scramble: {' '.join(scramble)}")
for move in scramble:
cube.rotate(move)
print("Scrambled cube:")
print(str(cube))
print(f"Is solved: {cube.is_solved()}")
print(f"Progress: {cube.count_solved_cubies():.2f}")
if __name__ == "__main__":
test_basic_moves()
test_double_moves()
test_complex_algorithms()
test_scramble_and_count()