BIG UPDATES logging everything, better structure of moves, everything runs fast af

This commit is contained in:
AlxAI 2025-05-11 19:10:18 -04:00
parent 64dd7be6f2
commit 0bd6428729
12 changed files with 1101 additions and 920 deletions

View file

@ -7,7 +7,7 @@ import re
# Assuming BaseModelClient is importable from clients.py in the same directory # Assuming BaseModelClient is importable from clients.py in the same directory
from .clients import BaseModelClient from .clients import BaseModelClient
# Import load_prompt and the new logging wrapper from utils # Import load_prompt and the new logging wrapper from utils
from .utils import load_prompt, run_llm_and_log from .utils import load_prompt, run_llm_and_log, log_llm_response
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -182,78 +182,133 @@ class DiplomacyAgent:
async def generate_negotiation_diary_entry(self, game: 'Game', game_history: 'GameHistory', log_file_path: str): async def generate_negotiation_diary_entry(self, game: 'Game', game_history: 'GameHistory', log_file_path: str):
""" """
Generates a diary entry summarizing negotiations and updates relationships. Generates a diary entry summarizing negotiations and updates relationships.
This method now includes comprehensive LLM interaction logging.
""" """
logger.info(f"[{self.power_name}] Generating negotiation diary entry for {game.current_short_phase}...") logger.info(f"[{self.power_name}] Generating negotiation diary entry for {game.current_short_phase}..." )
prompt_template = _load_prompt_file('negotiation_diary_prompt.txt') full_prompt = "" # For logging in finally block
if not prompt_template: raw_response = "" # For logging in finally block
logger.error(f"[{self.power_name}] Could not load negotiation_diary_prompt.txt. Skipping diary entry.") success_status = "Failure: Initialized" # Default
return
# Prepare context for the prompt
board_state_dict = game.get_state()
board_state_str = f"Units: {board_state_dict.get('units', {})}, Centers: {board_state_dict.get('centers', {})}"
messages_this_round = game_history.get_messages_this_round(
power_name=self.power_name,
current_phase_name=game.current_short_phase
)
if not messages_this_round.strip() or messages_this_round.startswith("\n(No messages"):
messages_this_round = "(No messages exchanged this negotiation round)"
goals_str = "\n".join([f"- {g}" for g in self.goals]) if self.goals else "None"
relationships_str = "\n".join([f"- {p}: {s}" for p, s in self.relationships.items()]) if self.relationships else "None"
prompt = prompt_template.format(
power_name=self.power_name,
current_phase=game.current_short_phase,
messages_this_round=messages_this_round,
agent_goals=goals_str,
agent_relationships=relationships_str,
board_state_str=board_state_str
)
response_data = None
try: try:
prompt_template_content = _load_prompt_file('negotiation_diary_prompt.txt')
if not prompt_template_content:
logger.error(f"[{self.power_name}] Could not load negotiation_diary_prompt.txt. Skipping diary entry.")
success_status = "Failure: Prompt file not loaded"
# No LLM call, so log_llm_response won't have typical LLM data, but we still log the attempt.
# Or, decide not to log if no LLM call is even attempted. For consistency, let's log an attempt.
# To do that, we'd need to call log_llm_response here or ensure finally block handles it.
# For now, the finally block will catch this, but raw_response and full_prompt will be empty.
return # Exit early if prompt is critical
# Prepare context for the prompt
board_state_dict = game.get_state()
board_state_str = f"Units: {board_state_dict.get('units', {})}, Centers: {board_state_dict.get('centers', {})}"
messages_this_round = game_history.get_messages_this_round(
power_name=self.power_name,
current_phase_name=game.current_short_phase
)
if not messages_this_round.strip() or messages_this_round.startswith("\n(No messages"):
messages_this_round = "(No messages involving your power this round that require deep reflection for diary. Focus on overall situation.)"
current_relationships_str = json.dumps(self.relationships)
current_goals_str = json.dumps(self.goals)
formatted_diary = self.format_private_diary_for_prompt()
full_prompt = prompt_template_content.format(
power_name=self.power_name,
current_phase=game.current_short_phase,
board_state=board_state_str,
messages_this_round=messages_this_round,
current_relationships=current_relationships_str,
current_goals=current_goals_str,
private_diary_summary=formatted_diary, # Pass formatted diary
allowed_relationships_str=", ".join(ALLOWED_RELATIONSHIPS)
)
logger.debug(f"[{self.power_name}] Negotiation diary prompt:\n{full_prompt[:500]}...")
raw_response = await run_llm_and_log( raw_response = await run_llm_and_log(
client=self.client, client=self.client,
prompt=prompt, prompt=full_prompt,
log_file_path=log_file_path, log_file_path=log_file_path, # Pass the main log file path
power_name=self.power_name, power_name=self.power_name,
phase=game.current_short_phase, phase=game.current_short_phase,
response_type='negotiation_diary', response_type='negotiation_diary_raw' # For run_llm_and_log context
) )
response_data = self._extract_json_from_text(raw_response)
except Exception as e:
logger.error(f"[{self.power_name}] Error generating or parsing negotiation diary: {e}", exc_info=True)
self.add_diary_entry(f"Error generating negotiation diary: {e}", game.current_short_phase)
return
if response_data: logger.debug(f"[{self.power_name}] Raw negotiation diary response: {raw_response[:300]}...")
summary = response_data.get("negotiation_summary", "(No summary provided)")
intent = response_data.get("intent", "(No intent stated)")
diary_text = f"Negotiation Summary: {summary}\nIntent for Orders: {intent}"
self.add_diary_entry(diary_text, game.current_short_phase)
# Update relationships parsed_data = None
rship_updates = response_data.get("relationship_updates", {}) try:
if isinstance(rship_updates, dict): parsed_data = self._extract_json_from_text(raw_response)
updated_count = 0 logger.debug(f"[{self.power_name}] Parsed diary data: {parsed_data}")
for power, status in rship_updates.items(): success_status = "Success: Parsed diary data"
power_upper = power.upper() # Normalize except json.JSONDecodeError as e:
if power_upper in ALL_POWERS and power_upper != self.power_name and status in ALLOWED_RELATIONSHIPS: logger.error(f"[{self.power_name}] Failed to parse JSON from diary response: {e}. Response: {raw_response[:300]}...")
if self.relationships.get(power_upper) != status: success_status = "Failure: JSONDecodeError"
self.relationships[power_upper] = status # Continue without parsed_data, rely on diary_entry_text if available or just log failure
self.add_journal_entry(f"[{game.current_short_phase}] Relationship with {power_upper} updated to {status} via diary.")
updated_count +=1 diary_entry_text = "(LLM diary entry generation or parsing failed.)" # Fallback
relationships_updated = False
if parsed_data:
diary_entry_text = parsed_data.get('diary_entry', diary_entry_text)
# Update relationships if provided and valid
new_relationships = parsed_data.get('updated_relationships')
if isinstance(new_relationships, dict):
valid_new_rels = {}
for p, r in new_relationships.items():
p_upper = str(p).upper()
r_title = str(r).title()
if p_upper in ALL_POWERS and p_upper != self.power_name and r_title in ALLOWED_RELATIONSHIPS:
valid_new_rels[p_upper] = r_title
elif p_upper != self.power_name: # Log invalid relationship for a valid power
logger.warning(f"[{self.power_name}] Invalid relationship '{r}' for power '{p}' in diary update. Keeping old.")
if valid_new_rels:
# Log changes before applying
for p_changed, new_r_val in valid_new_rels.items():
old_r_val = self.relationships.get(p_changed, "Unknown")
if old_r_val != new_r_val:
logger.info(f"[{self.power_name}] Relationship with {p_changed} changing from {old_r_val} to {new_r_val} based on diary.")
self.relationships.update(valid_new_rels)
relationships_updated = True
success_status = "Success: Applied diary data (relationships updated)"
else: else:
logger.warning(f"[{self.power_name}] Invalid relationship update from diary: {power}-{status}") logger.info(f"[{self.power_name}] No valid relationship updates found in diary response.")
if updated_count > 0: if success_status == "Success: Parsed diary data": # If only parsing was successful before
logger.info(f"[{self.power_name}] Updated {updated_count} relationships based on negotiation diary.") success_status = "Success: Parsed, no valid relationship updates"
else: elif new_relationships is not None: # It was provided but not a dict
logger.warning(f"[{self.power_name}] Relationship updates from diary not in expected dict format: {rship_updates}") logger.warning(f"[{self.power_name}] 'updated_relationships' from diary LLM was not a dictionary: {type(new_relationships)}")
else:
self.add_diary_entry("Failed to generate negotiation summary and intent.", game.current_short_phase) # Add the generated (or fallback) diary entry
self.add_diary_entry(diary_entry_text, game.current_short_phase)
if relationships_updated:
self.add_journal_entry(f"[{game.current_short_phase}] Relationships updated after negotiation diary: {self.relationships}")
# If success_status is still the default 'Parsed diary data' but no relationships were updated, refine it.
if success_status == "Success: Parsed diary data" and not relationships_updated:
success_status = "Success: Parsed, only diary text applied"
except Exception as e:
logger.error(f"[{self.power_name}] Error in generate_negotiation_diary_entry: {e}", exc_info=True)
success_status = f"Failure: Exception ({type(e).__name__})"
# Add a fallback diary entry in case of general error
self.add_diary_entry(f"(Error generating diary entry: {type(e).__name__})", game.current_short_phase)
finally:
if log_file_path: # Ensure log_file_path is provided
log_llm_response(
log_file_path=log_file_path,
model_name=self.client.model_name if self.client else "UnknownModel",
power_name=self.power_name,
phase=game.current_short_phase if game else "UnknownPhase",
response_type="negotiation_diary", # Specific type for CSV logging
raw_input_prompt=full_prompt,
raw_response=raw_response,
success=success_status
)
async def generate_order_diary_entry(self, game: 'Game', orders: List[str], log_file_path: str): async def generate_order_diary_entry(self, game: 'Game', orders: List[str], log_file_path: str):
""" """
@ -283,174 +338,83 @@ class DiplomacyAgent:
) )
response_data = None response_data = None
raw_response = None raw_response = None # Initialize raw_response
try: try:
raw_response = await run_llm_and_log( raw_response = await run_llm_and_log(
client=self.client, client=self.client,
prompt=prompt, prompt=prompt,
log_file_path=log_file_path, log_file_path=log_file_path,
power_name=self.power_name, power_name=self.power_name,
phase=game.current_short_phase, phase=game.current_short_phase,
response_type='order_diary', response_type='order_diary'
# raw_input_prompt=prompt, # REMOVED from run_llm_and_log
) )
response_data = self._extract_json_from_text(raw_response)
except Exception as e:
logger.error(f"[{self.power_name}] Error generating or parsing order diary: {e}", exc_info=True)
logger.error(raw_response)
#self.add_diary_entry(f"Error generating order reflection diary: {e}", game.current_short_phase)
return
if response_data: success_status = "FALSE"
order_summary = response_data.get("order_summary", "(Order summary missing)") response_data = None
logger.info('Order summary: ' + str(order_summary)) actual_diary_text = None # Variable to hold the final diary text
self.add_diary_entry(f"Order Summary: {order_summary}", game.current_short_phase)
else:
logger.error("Failed to generate order summary.")
#self.add_diary_entry("Failed to generate order summary.", game.current_short_phase)
if raw_response:
def get_relationships(self) -> Dict[str, str]: try:
"""Returns a copy of the agent's current relationships with other powers.""" response_data = self._extract_json_from_text(raw_response)
return self.relationships.copy() if response_data:
diary_text_candidate = response_data.get("diary_entry")
# Make the initialization method asynchronous if isinstance(diary_text_candidate, str) and diary_text_candidate.strip():
async def initialize_agent_state(self, game: 'Game', game_history: 'GameHistory', log_file_path: str): actual_diary_text = diary_text_candidate
"""Uses the LLM to set initial goals based on the starting game state.""" success_status = "TRUE"
logger.info(f"[{self.power_name}] Initializing agent state using LLM...")
current_phase = game.get_current_phase() # Get phase for logging
try:
# Use a simplified prompt for initial state generation
# TODO: Create a dedicated 'initial_state_prompt.txt'
allowed_labels_str = ", ".join(ALLOWED_RELATIONSHIPS)
initial_prompt = f"You are the agent for {self.power_name} in a game of Diplomacy at the very start (Spring 1901). " \
f"Analyze the initial board position and suggest 2-3 strategic high-level goals for the early game. " \
f"Consider your power's strengths, weaknesses, and neighbors. " \
f"Also, provide an initial assessment of relationships with other powers. " \
f"IMPORTANT: For each relationship, you MUST use exactly one of the following labels: {allowed_labels_str}. " \
f"Format your response as a JSON object with two keys: 'initial_goals' (a list of strings) and 'initial_relationships' (a dictionary mapping power names to one of the allowed relationship strings)."
# == Fix: Get required state info from game object ==
board_state = game.get_state()
possible_orders = game.get_all_possible_orders()
# == Add detailed logging before call ==
logger.debug(f"[{self.power_name}] Preparing context for initial state. Got board_state type: {type(board_state)}, possible_orders type: {type(possible_orders)}, game_history type: {type(game_history)}")
logger.debug(f"[{self.power_name}] Calling build_context_prompt with game: {game is not None}, board_state: {board_state is not None}, power_name: {self.power_name}, possible_orders: {possible_orders is not None}, game_history: {game_history is not None}")
# Get formatted diary for context (will be empty at initialization)
formatted_diary = self.format_private_diary_for_prompt()
context = self.client.build_context_prompt(
game=game,
board_state=board_state, # Pass board_state
power_name=self.power_name,
possible_orders=possible_orders, # Pass possible_orders
game_history=game_history, # Pass game_history
agent_goals=None, # No goals yet
agent_relationships=None, # No relationships yet (defaults used in prompt)
agent_private_diary=formatted_diary, # Pass formatted diary
)
full_prompt = initial_prompt + "\n\n" + context
# Await the asynchronous client call USING THE WRAPPER
response = await run_llm_and_log(
client=self.client,
prompt=full_prompt,
log_file_path=log_file_path,
power_name=self.power_name,
phase=current_phase,
response_type='initialization',
)
logger.debug(f"[{self.power_name}] LLM response for initial state: {response}")
# Try to extract JSON from the response
try:
update_data = self._extract_json_from_text(response)
logger.debug(f"[{self.power_name}] Successfully parsed JSON: {update_data}")
except json.JSONDecodeError as e:
logger.error(f"[{self.power_name}] All JSON extraction attempts failed: {e}")
# Create default data rather than failing
update_data = {
"initial_goals": ["Survive and expand", "Form beneficial alliances", "Secure key territories"],
"initial_relationships": {p: "Neutral" for p in ALL_POWERS if p != self.power_name},
"goals": ["Survive and expand", "Form beneficial alliances", "Secure key territories"],
"relationships": {p: "Neutral" for p in ALL_POWERS if p != self.power_name}
}
logger.warning(f"[{self.power_name}] Using default goals and relationships: {update_data}")
# Check for both possible key names
initial_goals = update_data.get('initial_goals')
if initial_goals is None:
initial_goals = update_data.get('goals')
if initial_goals is not None:
logger.debug(f"[{self.power_name}] Using 'goals' key instead of 'initial_goals'")
initial_relationships = update_data.get('initial_relationships')
if initial_relationships is None:
initial_relationships = update_data.get('relationships')
if initial_relationships is not None:
logger.debug(f"[{self.power_name}] Using 'relationships' key instead of 'initial_relationships'")
if isinstance(initial_goals, list):
self.goals = initial_goals
# == Fix: Correct add_journal_entry call signature ==
self.add_journal_entry(f"[{game.current_short_phase}] Initial Goals Set: {self.goals}")
else:
logger.warning(f"[{self.power_name}] LLM did not provide valid 'initial_goals' list.")
# Set default goals
self.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"]
self.add_journal_entry(f"[{game.current_short_phase}] Set default initial goals: {self.goals}")
if isinstance(initial_relationships, dict):
# Validate relationship keys and values
valid_relationships = {}
invalid_count = 0
for p, r in initial_relationships.items():
# Convert power name to uppercase for case-insensitive matching
p_upper = p.upper()
if p_upper in ALL_POWERS and p_upper != self.power_name:
# Check against allowed labels (case-insensitive)
r_title = r.title() if isinstance(r, str) else r # Convert "enemy" to "Enemy" etc.
if r_title in ALLOWED_RELATIONSHIPS:
valid_relationships[p_upper] = r_title
else: else:
invalid_count += 1 # Try 'order_summary' if 'diary_entry' is missing or invalid
if invalid_count <= 2: # Only log first few to reduce noise logger.debug(f"[{self.power_name}] 'diary_entry' missing or invalid. Trying 'order_summary'. Value was: {diary_text_candidate}")
logger.warning(f"[{self.power_name}] Received invalid relationship label '{r}' for '{p}'. Setting to Neutral.") order_summary_candidate = response_data.get("order_summary")
valid_relationships[p_upper] = "Neutral" if isinstance(order_summary_candidate, str) and order_summary_candidate.strip():
else: actual_diary_text = order_summary_candidate
invalid_count += 1 success_status = "TRUE"
if invalid_count <= 2 and not p_upper.startswith(self.power_name): # Only log first few to reduce noise logger.info(f"[{self.power_name}] Used 'order_summary' for order diary entry.")
logger.warning(f"[{self.power_name}] Received relationship for invalid/own power '{p}'. Ignoring.") else:
logger.warning(f"[{self.power_name}] Both 'diary_entry' and 'order_summary' missing, invalid, or empty. 'diary_entry': {diary_text_candidate}, 'order_summary': {order_summary_candidate}")
# Summarize if there were many invalid entries success_status = "FALSE"
if invalid_count > 2: # If response_data is None (JSON parsing failed), success_status remains "FALSE"
logger.warning(f"[{self.power_name}] {invalid_count} total invalid relationships were processed.") except Exception as e:
logger.error(f"[{self.power_name}] Error parsing order diary JSON: {e}. Raw response: {raw_response[:200]} ", exc_info=False)
# If we have any valid relationships, use them # success_status remains "FALSE"
if valid_relationships:
self.relationships = valid_relationships log_llm_response(
self.add_journal_entry(f"[{game.current_short_phase}] Initial Relationships Set: {self.relationships}") log_file_path=log_file_path,
else: model_name=self.client.model_name,
# Set default relationships power_name=self.power_name,
logger.warning(f"[{self.power_name}] No valid relationships found, using defaults.") phase=game.current_short_phase,
self.relationships = {p: "Neutral" for p in ALL_POWERS if p != self.power_name} response_type='order_diary',
self.add_journal_entry(f"[{game.current_short_phase}] Set default neutral relationships.") raw_input_prompt=prompt, # ENSURED
raw_response=raw_response if raw_response else "",
success=success_status
)
if success_status == "TRUE" and actual_diary_text:
self.add_diary_entry(actual_diary_text, game.current_short_phase)
logger.info(f"[{self.power_name}] Order diary entry generated and added.")
else: else:
logger.warning(f"[{self.power_name}] LLM did not provide valid 'initial_relationships' dict.") fallback_diary = f"Submitted orders for {game.current_short_phase}: {', '.join(orders)}. (LLM failed to generate a specific diary entry)"
# Set default relationships self.add_diary_entry(fallback_diary, game.current_short_phase)
self.relationships = {p: "Neutral" for p in ALL_POWERS if p != self.power_name} logger.warning(f"[{self.power_name}] Failed to generate specific order diary entry. Added fallback.")
self.add_journal_entry(f"[{game.current_short_phase}] Set default neutral relationships.")
except Exception as e: except Exception as e:
logger.error(f"[{self.power_name}] Error during initial state generation: {e}", exc_info=True) # Ensure prompt is defined or handled if it might not be (it should be in this flow)
# Set conservative defaults even if everything fails current_prompt = prompt if 'prompt' in locals() else "[prompt_unavailable_in_exception]"
if not self.goals: current_raw_response = raw_response if 'raw_response' in locals() and raw_response is not None else f"Error: {e}"
self.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"] log_llm_response(
if not self.relationships: log_file_path=log_file_path,
self.relationships = {p: "Neutral" for p in ALL_POWERS if p != self.power_name} model_name=self.client.model_name if hasattr(self, 'client') else "UnknownModel",
logger.info(f"[{self.power_name}] Set fallback goals and relationships after error.") power_name=self.power_name,
phase=game.current_short_phase if 'game' in locals() and hasattr(game, 'current_short_phase') else "order_phase",
response_type='order_diary_exception',
raw_input_prompt=current_prompt, # ENSURED (using current_prompt for safety)
raw_response=current_raw_response,
success="FALSE"
)
fallback_diary = f"Submitted orders for {game.current_short_phase}: {', '.join(orders)}. (Critical error in diary generation process)"
self.add_diary_entry(fallback_diary, game.current_short_phase)
logger.warning(f"[{self.power_name}] Added fallback order diary entry due to critical error.")
# Rest of the code remains the same
def log_state(self, prefix=""): def log_state(self, prefix=""):
logger.debug(f"[{self.power_name}] {prefix} State: Goals={self.goals}, Relationships={self.relationships}") logger.debug(f"[{self.power_name}] {prefix} State: Goals={self.goals}, Relationships={self.relationships}")
@ -539,21 +503,57 @@ class DiplomacyAgent:
) )
logger.debug(f"[{power_name}] Raw LLM response for state update: {response}") logger.debug(f"[{power_name}] Raw LLM response for state update: {response}")
# Use our robust JSON extraction helper log_entry_response_type = 'state_update' # Default for log_llm_response
try: log_entry_success = "FALSE" # Default
update_data = self._extract_json_from_text(response) update_data = None # Initialize
logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}")
except json.JSONDecodeError as e: if response is not None and response.strip(): # Check if response is not None and not just whitespace
logger.error(f"[{power_name}] Failed to parse JSON response for state update: {e}") try:
logger.error(f"[{power_name}] Raw response was: {response}") update_data = self._extract_json_from_text(response)
# Create fallback data to avoid full failure logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}")
update_data = { # Check if essential data ('updated_goals' or 'goals') is present AND is a list (for goals)
"updated_goals": self.goals, # Maintain current goals # For relationships, check for 'updated_relationships' or 'relationships' AND is a dict.
"updated_relationships": self.relationships, # Maintain current relationships # Consider it TRUE if at least one of the primary data structures (goals or relationships) is present and correctly typed.
"goals": self.goals, # Alternative key goals_present_and_valid = isinstance(update_data.get('updated_goals'), list) or isinstance(update_data.get('goals'), list)
"relationships": self.relationships # Alternative key rels_present_and_valid = isinstance(update_data.get('updated_relationships'), dict) or isinstance(update_data.get('relationships'), dict)
}
logger.warning(f"[{power_name}] Using existing goals and relationships as fallback: {update_data}") if update_data and (goals_present_and_valid or rels_present_and_valid):
log_entry_success = "TRUE"
elif update_data: # Parsed, but maybe not all essential data there or not correctly typed
log_entry_success = "PARTIAL"
log_entry_response_type = 'state_update_partial_data'
else: # Parsed to None or empty dict/list, or data not in expected format
log_entry_success = "FALSE"
log_entry_response_type = 'state_update_parsing_empty_or_invalid_data'
except json.JSONDecodeError as e:
logger.error(f"[{power_name}] Failed to parse JSON response for state update: {e}. Raw response: {response}")
log_entry_response_type = 'state_update_json_error'
# log_entry_success remains "FALSE"
else: # response was None or empty/whitespace
logger.error(f"[{power_name}] No valid response (None or empty) received from LLM for state update.")
log_entry_response_type = 'state_update_no_response'
# log_entry_success remains "FALSE"
# Log the attempt and its outcome
log_llm_response(
log_file_path=log_file_path,
model_name=self.client.model_name,
power_name=power_name,
phase=current_phase,
response_type=log_entry_response_type,
raw_input_prompt=prompt, # ENSURED
raw_response=response if response is not None else "", # Handle if response is None
success=log_entry_success
)
# Fallback logic if update_data is still None or not usable
if not update_data or not (isinstance(update_data.get('updated_goals'), list) or isinstance(update_data.get('goals'), list) or isinstance(update_data.get('updated_relationships'), dict) or isinstance(update_data.get('relationships'), dict)):
logger.warning(f"[{power_name}] update_data is None or missing essential valid structures after LLM call. Using existing goals and relationships as fallback.")
update_data = {
"updated_goals": self.goals,
"updated_relationships": self.relationships,
}
logger.warning(f"[{power_name}] Using existing goals and relationships as fallback: {update_data}")
# Check for both possible key names (prompt uses "goals"/"relationships", # Check for both possible key names (prompt uses "goals"/"relationships",
# but code was expecting "updated_goals"/"updated_relationships") # but code was expecting "updated_goals"/"updated_relationships")
@ -624,7 +624,6 @@ class DiplomacyAgent:
self.log_state(f"After State Update ({game.current_short_phase})") self.log_state(f"After State Update ({game.current_short_phase})")
def update_goals(self, new_goals: List[str]): def update_goals(self, new_goals: List[str]):
"""Updates the agent's strategic goals.""" """Updates the agent's strategic goals."""
self.goals = new_goals self.goals = new_goals
@ -662,18 +661,4 @@ class DiplomacyAgent:
except Exception as e: except Exception as e:
logger.error(f"Agent {self.power_name} failed to generate plan: {e}") logger.error(f"Agent {self.power_name} failed to generate plan: {e}")
self.add_journal_entry(f"Failed to generate plan for phase {game.current_phase} due to error: {e}") self.add_journal_entry(f"Failed to generate plan for phase {game.current_phase} due to error: {e}")
return "Error: Failed to generate plan." return "Error: Failed to generate plan."
# def process_message(self, message, game_phase):
# """Processes an incoming message, updates relationships/journal."""
# # 1. Analyze message content
# # 2. Update self.relationships based on message
# # 3. Add journal entry about the message and its impact
# pass
# def generate_message_reply(self, conversation_so_far, game_phase):
# """Generates a reply to a conversation using agent state."""
# # 1. Consider goals, relationships when crafting reply
# # 2. Delegate to self.client.get_conversation_reply(...)
# # 3. Add journal entry about the generated message
# pass

View file

@ -3,7 +3,6 @@ import json
from json import JSONDecodeError from json import JSONDecodeError
import re import re
import logging import logging
import ast
import asyncio # Added for async operations import asyncio # Added for async operations
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
@ -19,9 +18,10 @@ import google.generativeai as genai
from diplomacy.engine.message import GLOBAL from diplomacy.engine.message import GLOBAL
from .game_history import GameHistory from .game_history import GameHistory
from .utils import load_prompt, run_llm_and_log from .utils import load_prompt, run_llm_and_log, log_llm_response # Ensure log_llm_response is imported
# Import DiplomacyAgent for type hinting if needed, but avoid circular import if possible # Import DiplomacyAgent for type hinting if needed, but avoid circular import if possible
# from .agent import DiplomacyAgent # from .agent import DiplomacyAgent
from .possible_order_context import generate_rich_order_context
# set logger back to just info # set logger back to just info
logger = logging.getLogger("client") logger = logging.getLogger("client")
@ -98,11 +98,11 @@ class BaseModelClient:
enemy_units[power] = info enemy_units[power] = info
enemy_centers[power] = board_state["centers"].get(power, []) enemy_centers[power] = board_state["centers"].get(power, [])
# Get possible orders # Get possible orders - REPLACED WITH NEW FUNCTION
possible_orders_str = "" # possible_orders_str = ""
for loc, orders in possible_orders.items(): # for loc, orders in possible_orders.items():
possible_orders_str += f" {loc}: {orders}\n" # possible_orders_str += f" {loc}: {orders}\n"
possible_orders_context_str = generate_rich_order_context(game, power_name, possible_orders)
# Get messages for the current round # Get messages for the current round
messages_this_round_text = game_history.get_messages_this_round( messages_this_round_text = game_history.get_messages_this_round(
@ -112,15 +112,6 @@ class BaseModelClient:
if not messages_this_round_text.strip(): if not messages_this_round_text.strip():
messages_this_round_text = "\n(No messages this round)\n" messages_this_round_text = "\n(No messages this round)\n"
# Get history from previous phases
previous_history_text = game_history.get_previous_phases_history(
power_name=power_name,
current_phase_name=year_phase
# include_plans and num_prev_phases will use defaults
)
if not previous_history_text.strip():
previous_history_text = "\n(No previous game history)\n"
# Load in current context values # Load in current context values
# Simplified map representation based on DiploBench approach # Simplified map representation based on DiploBench approach
units_repr = "\n".join([f" {p}: {u}" for p, u in board_state["units"].items()]) units_repr = "\n".join([f" {p}: {u}" for p, u in board_state["units"].items()])
@ -132,8 +123,7 @@ class BaseModelClient:
all_unit_locations=units_repr, all_unit_locations=units_repr,
all_supply_centers=centers_repr, all_supply_centers=centers_repr,
messages_this_round=messages_this_round_text, messages_this_round=messages_this_round_text,
previous_game_history=previous_history_text, possible_orders=possible_orders_context_str,
possible_orders=possible_orders_str,
agent_goals="\n".join(f"- {g}" for g in agent_goals) if agent_goals else "None specified", agent_goals="\n".join(f"- {g}" for g in agent_goals) if agent_goals else "None specified",
agent_relationships="\n".join(f"- {p}: {s}" for p, s in agent_relationships.items()) if agent_relationships else "None specified", agent_relationships="\n".join(f"- {p}: {s}" for p, s in agent_relationships.items()) if agent_relationships else "None specified",
agent_private_diary=agent_private_diary if agent_private_diary else "(No diary entries yet)", # Use new parameter agent_private_diary=agent_private_diary if agent_private_diary else "(No diary entries yet)", # Use new parameter
@ -207,6 +197,9 @@ class BaseModelClient:
) )
raw_response = "" raw_response = ""
# Initialize success status. Will be updated based on outcome.
success_status = "Failure: Initialized"
parsed_orders_for_return = self.fallback_orders(possible_orders) # Default to fallback
try: try:
# Call LLM using the logging wrapper # Call LLM using the logging wrapper
@ -216,10 +209,10 @@ class BaseModelClient:
log_file_path=log_file_path, log_file_path=log_file_path,
power_name=power_name, power_name=power_name,
phase=phase, phase=phase,
response_type='order', response_type='order', # Context for run_llm_and_log's own error logging
) )
logger.debug( logger.debug(
f"[{self.model_name}] Raw LLM response for {power_name}:\n{raw_response}" f"[{self.model_name}] Raw LLM response for {power_name} orders:\n{raw_response}"
) )
# Attempt to parse the final "orders" from the LLM # Attempt to parse the final "orders" from the LLM
@ -229,17 +222,37 @@ class BaseModelClient:
logger.warning( logger.warning(
f"[{self.model_name}] Could not extract moves for {power_name}. Using fallback." f"[{self.model_name}] Could not extract moves for {power_name}. Using fallback."
) )
if model_error_stats is not None: if model_error_stats is not None and self.model_name in model_error_stats:
model_error_stats[self.model_name].setdefault("order_decoding_errors", 0)
model_error_stats[self.model_name]["order_decoding_errors"] += 1 model_error_stats[self.model_name]["order_decoding_errors"] += 1
return self.fallback_orders(possible_orders) success_status = "Failure: No moves extracted"
# Validate or fallback # Fallback is already set to parsed_orders_for_return
validated_moves = self._validate_orders(move_list, possible_orders) else:
logger.debug(f"[{self.model_name}] Validated moves for {power_name}: {validated_moves}") # Validate or fallback
return validated_moves validated_moves = self._validate_orders(move_list, possible_orders)
logger.debug(f"[{self.model_name}] Validated moves for {power_name}: {validated_moves}")
parsed_orders_for_return = validated_moves
success_status = "Success"
except Exception as e: except Exception as e:
logger.error(f"[{self.model_name}] LLM error for {power_name}: {e}") logger.error(f"[{self.model_name}] LLM error for {power_name} in get_orders: {e}", exc_info=True)
return self.fallback_orders(possible_orders) success_status = f"Failure: Exception ({type(e).__name__})"
# Fallback is already set to parsed_orders_for_return
finally:
# Log the attempt regardless of outcome
if log_file_path: # Only log if a path is provided
log_llm_response(
log_file_path=log_file_path,
model_name=self.model_name,
power_name=power_name,
phase=phase,
response_type="order_generation", # Specific type for CSV logging
raw_input_prompt=prompt, # Renamed from 'prompt' to match log_llm_response arg
raw_response=raw_response,
success=success_status
# token_usage and cost can be added later if available and if log_llm_response supports them
)
return parsed_orders_for_return
def _extract_moves(self, raw_response: str, power_name: str) -> Optional[List[str]]: def _extract_moves(self, raw_response: str, power_name: str) -> Optional[List[str]]:
""" """
@ -272,7 +285,7 @@ class BaseModelClient:
# 2) If still no match, check for triple-backtick code fences containing JSON # 2) If still no match, check for triple-backtick code fences containing JSON
if not matches: if not matches:
code_fence_pattern = r"```json\s*(\{.*?\})\s*```" code_fence_pattern = r"```json\n(.*?)\n```"
matches = re.search(code_fence_pattern, raw_response, re.DOTALL) matches = re.search(code_fence_pattern, raw_response, re.DOTALL)
if matches: if matches:
logger.debug( logger.debug(
@ -481,81 +494,126 @@ class BaseModelClient:
game_history: GameHistory, game_history: GameHistory,
game_phase: str, game_phase: str,
log_file_path: str, log_file_path: str,
active_powers: Optional[List[str]] = None, # Keep active_powers if needed by prompt logic active_powers: Optional[List[str]] = None,
agent_goals: Optional[List[str]] = None, agent_goals: Optional[List[str]] = None,
agent_relationships: Optional[Dict[str, str]] = None, agent_relationships: Optional[Dict[str, str]] = None,
agent_private_diary_str: Optional[str] = None, # Added agent_private_diary_str: Optional[str] = None,
) -> List[Dict[str, str]]: ) -> List[Dict[str, str]]:
""" """
Generates a negotiation message, considering agent state. Generates a negotiation message, considering agent state.
""" """
prompt = self.build_conversation_prompt( raw_input_prompt = "" # Initialize for finally block
game, raw_response = "" # Initialize for finally block
board_state, success_status = "Failure: Initialized" # Default status
power_name, messages_to_return = [] # Initialize to ensure it's defined
possible_orders,
game_history,
# game_phase, # Not passed to build_conversation_prompt directly
# log_file_path, # Not passed to build_conversation_prompt directly
agent_goals=agent_goals,
agent_relationships=agent_relationships,
agent_private_diary_str=agent_private_diary_str, # Pass diary string
)
logger.debug(f"[{self.model_name}] Conversation prompt for {power_name}:\n{prompt}")
try: try:
# Call LLM using the logging wrapper raw_input_prompt = self.build_conversation_prompt(
response = await run_llm_and_log( game,
board_state,
power_name,
possible_orders,
game_history,
agent_goals=agent_goals,
agent_relationships=agent_relationships,
agent_private_diary_str=agent_private_diary_str,
)
logger.debug(f"[{self.model_name}] Conversation prompt for {power_name}:\n{raw_input_prompt}")
raw_response = await run_llm_and_log(
client=self, client=self,
prompt=prompt, prompt=raw_input_prompt,
log_file_path=log_file_path, log_file_path=log_file_path,
power_name=power_name, power_name=power_name,
phase=game_phase, # Use game_phase for logging phase=game_phase,
response_type='negotiation', response_type='negotiation', # For run_llm_and_log's internal context
) )
logger.debug(f"[{self.model_name}] Raw LLM response for {power_name}:\n{response}") logger.debug(f"[{self.model_name}] Raw LLM response for {power_name}:\n{raw_response}")
messages = [] parsed_messages = []
json_blocks = [] json_blocks = []
json_decode_error_occurred = False
double_brace_blocks = re.findall(r'\{\{(.*?)\}\}', response, re.DOTALL) # Attempt to find blocks enclosed in {{...}}
double_brace_blocks = re.findall(r'\{\{(.*?)\}\}', raw_response, re.DOTALL)
if double_brace_blocks: if double_brace_blocks:
# If {{...}} blocks are found, assume each is a self-contained JSON object
json_blocks.extend(['{' + block.strip() + '}' for block in double_brace_blocks]) json_blocks.extend(['{' + block.strip() + '}' for block in double_brace_blocks])
else: else:
code_block_match = re.search(r"```json\n(.*?)\n```", response, re.DOTALL) # If no {{...}} blocks, look for ```json ... ``` markdown blocks
if code_block_match: code_block_match = re.search(r"```json\n(.*?)\n```", raw_response, re.DOTALL)
potential_json = code_block_match.group(1).strip() if code_block_match:
json_blocks = re.findall(r'\{.*?\}', potential_json, re.DOTALL) potential_json_array_or_objects = code_block_match.group(1).strip()
else: # Try to parse as a list of objects or a single object
json_blocks = re.findall(r'\{.*?\}', response, re.DOTALL) try:
data = json.loads(potential_json_array_or_objects)
if isinstance(data, list):
json_blocks = [json.dumps(item) for item in data if isinstance(item, dict)]
elif isinstance(data, dict):
json_blocks = [json.dumps(data)]
except json.JSONDecodeError:
# If parsing the whole block fails, fall back to regex for individual objects
json_blocks = re.findall(r'\{.*?\}', potential_json_array_or_objects, re.DOTALL)
else:
# If no markdown block, fall back to regex for any JSON object in the response
json_blocks = re.findall(r'\{.*?\}', raw_response, re.DOTALL)
if not json_blocks: if not json_blocks:
logger.warning(f"[{self.model_name}] No JSON message blocks found in response for {power_name}. Raw response:\n{response}") logger.warning(f"[{self.model_name}] No JSON message blocks found in response for {power_name}. Raw response:\n{raw_response}")
return [] success_status = "Success: No JSON blocks found"
# messages_to_return remains empty
else:
for block_index, block in enumerate(json_blocks):
try:
cleaned_block = block.strip()
# Attempt to fix common JSON issues like trailing commas before parsing
cleaned_block = re.sub(r',\s*([\}\]])', r'\1', cleaned_block)
parsed_message = json.loads(cleaned_block)
if isinstance(parsed_message, dict) and "message_type" in parsed_message and "content" in parsed_message:
# Further validation, e.g., recipient for private messages
if parsed_message["message_type"] == "private" and "recipient" not in parsed_message:
logger.warning(f"[{self.model_name}] Private message missing recipient for {power_name} in block {block_index}. Skipping: {cleaned_block}")
continue # Skip this message
parsed_messages.append(parsed_message)
else:
logger.warning(f"[{self.model_name}] Invalid message structure or missing keys in block {block_index} for {power_name}: {cleaned_block}")
except json.JSONDecodeError as jde:
json_decode_error_occurred = True
logger.warning(f"[{self.model_name}] Failed to decode JSON block {block_index} for {power_name}. Error: {jde}. Block content:\n{block}")
for block in json_blocks: if parsed_messages:
try: success_status = "Success: Messages extracted"
cleaned_block = block.strip() messages_to_return = parsed_messages
parsed_message = json.loads(cleaned_block) elif json_decode_error_occurred:
success_status = "Failure: JSONDecodeError during block parsing"
if isinstance(parsed_message, dict) and "message_type" in parsed_message and "content" in parsed_message: messages_to_return = []
messages.append(parsed_message) else: # JSON blocks found, but none were valid messages
else: success_status = "Success: No valid messages extracted from JSON blocks"
logger.warning(f"[{self.model_name}] Invalid message structure in block for {power_name}: {cleaned_block}") messages_to_return = []
except json.JSONDecodeError:
logger.warning(f"[{self.model_name}] Failed to decode JSON block for {power_name}. Block content:\n{block}")
if not messages: logger.debug(f"[{self.model_name}] Validated conversation replies for {power_name}: {messages_to_return}")
logger.warning(f"[{self.model_name}] No valid messages extracted after parsing blocks for {power_name}. Raw response:\n{response}") # return messages_to_return # Return will happen in finally block or after
logger.debug(f"[{self.model_name}] Validated conversation replies for {power_name}: {messages}")
return messages
except Exception as e: except Exception as e:
logger.error(f"[{self.model_name}] Error in get_conversation_reply for {power_name}: {e}") logger.error(f"[{self.model_name}] Error in get_conversation_reply for {power_name}: {e}", exc_info=True)
return [] success_status = f"Failure: Exception ({type(e).__name__})"
messages_to_return = [] # Ensure empty list on general exception
finally:
if log_file_path:
log_llm_response(
log_file_path=log_file_path,
model_name=self.model_name,
power_name=power_name,
phase=game_phase,
response_type="negotiation_message",
raw_input_prompt=raw_input_prompt,
raw_response=raw_response,
success=success_status
)
return messages_to_return
async def get_plan( # This is the original get_plan, now distinct from get_planning_reply async def get_plan( # This is the original get_plan, now distinct from get_planning_reply
self, self,
@ -600,22 +658,42 @@ class BaseModelClient:
if self.system_prompt: if self.system_prompt:
full_prompt = f"{self.system_prompt}\n\n{full_prompt}" full_prompt = f"{self.system_prompt}\n\n{full_prompt}"
raw_plan_response = ""
success_status = "Failure: Initialized"
plan_to_return = f"Error: Plan generation failed for {power_name} (initial state)"
try: try:
# Use run_llm_and_log for the actual LLM call # Use run_llm_and_log for the actual LLM call
raw_plan = await run_llm_and_log( raw_plan_response = await run_llm_and_log(
client=self, # Pass self (the client instance) client=self, # Pass self (the client instance)
prompt=full_prompt, prompt=full_prompt,
log_file_path=log_file_path, log_file_path=log_file_path,
power_name=power_name, power_name=power_name,
phase=game.current_short_phase, phase=game.current_short_phase,
response_type='plan_generation', # More specific type response_type='plan_generation', # More specific type for run_llm_and_log context
) )
logger.debug(f"[{self.model_name}] Raw LLM response for {power_name} plan generation:\n{raw_plan}") logger.debug(f"[{self.model_name}] Raw LLM response for {power_name} plan generation:\n{raw_plan_response}")
# No parsing needed for the plan, return the raw string # No parsing needed for the plan, return the raw string
return raw_plan.strip() plan_to_return = raw_plan_response.strip()
success_status = "Success"
except Exception as e: except Exception as e:
logger.error(f"Failed to generate plan for {power_name}: {e}") logger.error(f"Failed to generate plan for {power_name}: {e}", exc_info=True)
return f"Error: Failed to generate plan due to exception: {e}" success_status = f"Failure: Exception ({type(e).__name__})"
plan_to_return = f"Error: Failed to generate plan for {power_name} due to exception: {e}"
finally:
if log_file_path: # Only log if a path is provided
log_llm_response(
log_file_path=log_file_path,
model_name=self.model_name,
power_name=power_name,
phase=game.current_short_phase if game else "UnknownPhase",
response_type="plan_generation", # Specific type for CSV logging
raw_input_prompt=full_prompt, # Renamed from 'full_prompt' to match log_llm_response arg
raw_response=raw_plan_response,
success=success_status
# token_usage and cost can be added later
)
return plan_to_return
############################################################################## ##############################################################################

View file

@ -0,0 +1,169 @@
# ai_diplomacy/initialization.py
import logging
import json
# Forward declaration for type hinting, actual imports in function if complex
if False: # TYPE_CHECKING
from diplomacy import Game
from diplomacy.models.game import GameHistory
from .agent import DiplomacyAgent
from .agent import ALL_POWERS, ALLOWED_RELATIONSHIPS
from .utils import run_llm_and_log, log_llm_response
logger = logging.getLogger(__name__)
async def initialize_agent_state_ext(
agent: 'DiplomacyAgent',
game: 'Game',
game_history: 'GameHistory',
log_file_path: str
):
"""Uses the LLM to set initial goals and relationships for the agent."""
power_name = agent.power_name
logger.info(f"[{power_name}] Initializing agent state using LLM (external function)..." )
current_phase = game.get_current_phase() if game else "UnknownPhase"
full_prompt = "" # Ensure full_prompt is defined in the outer scope for finally block
response = "" # Ensure response is defined for finally block
success_status = "Failure: Initialized" # Default status
try:
# Use a simplified prompt for initial state generation
allowed_labels_str = ", ".join(ALLOWED_RELATIONSHIPS)
initial_prompt = f"You are the agent for {power_name} in a game of Diplomacy at the very start (Spring 1901). " \
f"Analyze the initial board position and suggest 2-3 strategic high-level goals for the early game. " \
f"Consider your power's strengths, weaknesses, and neighbors. " \
f"Also, provide an initial assessment of relationships with other powers. " \
f"IMPORTANT: For each relationship, you MUST use exactly one of the following labels: {allowed_labels_str}. " \
f"Format your response as a JSON object with two keys: 'initial_goals' (a list of strings) and 'initial_relationships' (a dictionary mapping power names to one of the allowed relationship strings)."
board_state = game.get_state() if game else {}
possible_orders = game.get_all_possible_orders() if game else {}
logger.debug(f"[{power_name}] Preparing context for initial state. Board state type: {type(board_state)}, possible_orders type: {type(possible_orders)}, game_history type: {type(game_history)}")
# Ensure agent.client and its methods can handle None for game/board_state/etc. if that's a possibility
# For initialization, game should always be present.
formatted_diary = agent.format_private_diary_for_prompt()
context = agent.client.build_context_prompt(
game=game,
board_state=board_state,
power_name=power_name,
possible_orders=possible_orders,
game_history=game_history,
agent_goals=None,
agent_relationships=None,
agent_private_diary=formatted_diary,
)
full_prompt = initial_prompt + "\n\n" + context
response = await run_llm_and_log(
client=agent.client,
prompt=full_prompt,
log_file_path=log_file_path,
power_name=power_name,
phase=current_phase,
response_type='initialization', # Context for run_llm_and_log internal error logging
)
logger.debug(f"[{power_name}] LLM response for initial state: {response[:300]}...") # Log a snippet
parsed_successfully = False
try:
update_data = agent._extract_json_from_text(response)
logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}")
parsed_successfully = True
except json.JSONDecodeError as e:
logger.error(f"[{power_name}] All JSON extraction attempts failed: {e}. Response snippet: {response[:300]}...")
success_status = "Failure: JSONDecodeError"
update_data = {} # Ensure update_data exists for fallback logic below
# Fallback logic for goals/relationships will be handled later if update_data is empty
initial_goals_applied = False
initial_relationships_applied = False
if parsed_successfully:
initial_goals = update_data.get('initial_goals') or update_data.get('goals')
initial_relationships = update_data.get('initial_relationships') or update_data.get('relationships')
if isinstance(initial_goals, list) and initial_goals:
agent.goals = initial_goals
agent.add_journal_entry(f"[{current_phase}] Initial Goals Set by LLM: {agent.goals}")
logger.info(f"[{power_name}] Goals updated from LLM: {agent.goals}")
initial_goals_applied = True
else:
logger.warning(f"[{power_name}] LLM did not provide valid 'initial_goals' list (got: {initial_goals}).")
if isinstance(initial_relationships, dict) and initial_relationships:
valid_relationships = {}
# ... (rest of relationship validation logic from before) ...
for p_key, r_val in initial_relationships.items():
p_upper = str(p_key).upper()
r_title = str(r_val).title() if isinstance(r_val, str) else str(r_val)
if p_upper in ALL_POWERS and p_upper != power_name:
if r_title in ALLOWED_RELATIONSHIPS:
valid_relationships[p_upper] = r_title
else:
valid_relationships[p_upper] = "Neutral"
if valid_relationships:
agent.relationships = valid_relationships
agent.add_journal_entry(f"[{current_phase}] Initial Relationships Set by LLM: {agent.relationships}")
logger.info(f"[{power_name}] Relationships updated from LLM: {agent.relationships}")
initial_relationships_applied = True
else:
logger.warning(f"[{power_name}] No valid relationships found in LLM response.")
else:
logger.warning(f"[{power_name}] LLM did not provide valid 'initial_relationships' dict (got: {initial_relationships}).")
if initial_goals_applied or initial_relationships_applied:
success_status = "Success: Applied LLM data"
elif parsed_successfully: # Parsed but nothing useful to apply
success_status = "Success: Parsed but no data applied"
# If not parsed_successfully, success_status is already "Failure: JSONDecodeError"
# Fallback if LLM data was not applied or parsing failed
if not initial_goals_applied:
if not agent.goals: # Only set defaults if no goals were set during agent construction or by LLM
agent.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"]
agent.add_journal_entry(f"[{current_phase}] Set default initial goals as LLM provided none or parse failed.")
logger.info(f"[{power_name}] Default goals set.")
if not initial_relationships_applied:
# Check if relationships are still default-like before overriding
is_default_relationships = True
if agent.relationships: # Check if it's not empty
for p in ALL_POWERS:
if p != power_name and agent.relationships.get(p) != "Neutral":
is_default_relationships = False
break
if is_default_relationships:
agent.relationships = {p: "Neutral" for p in ALL_POWERS if p != power_name}
agent.add_journal_entry(f"[{current_phase}] Set default neutral relationships as LLM provided none valid or parse failed.")
logger.info(f"[{power_name}] Default neutral relationships set.")
except Exception as e:
logger.error(f"[{power_name}] Error during external agent state initialization: {e}", exc_info=True)
success_status = f"Failure: Exception ({type(e).__name__})"
# Fallback logic for goals/relationships if not already set by earlier fallbacks
if not agent.goals:
agent.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"]
logger.info(f"[{power_name}] Set fallback goals after top-level error: {agent.goals}")
if not agent.relationships or all(r == "Neutral" for r in agent.relationships.values()):
agent.relationships = {p: "Neutral" for p in ALL_POWERS if p != power_name}
logger.info(f"[{power_name}] Set fallback neutral relationships after top-level error: {agent.relationships}")
finally:
if log_file_path: # Ensure log_file_path is provided
log_llm_response(
log_file_path=log_file_path,
model_name=agent.client.model_name if agent and agent.client else "UnknownModel",
power_name=power_name,
phase=current_phase,
response_type="initial_state_setup", # Specific type for CSV logging
raw_input_prompt=full_prompt,
raw_response=response,
success=success_status
)
# Final log of state after initialization attempt
logger.info(f"[{power_name}] Post-initialization state: Goals={agent.goals}, Relationships={agent.relationships}")

View file

@ -12,11 +12,7 @@ This document provides an analysis of key Python modules within the `ai_diplomac
**Goal:** To structure, store, and retrieve the historical events of a Diplomacy game phase by phase, including messages, plans, orders, and results. **Goal:** To structure, store, and retrieve the historical events of a Diplomacy game phase by phase, including messages, plans, orders, and results.
**Status:** Fully implemented and operational. **Status:** Fully implemented and operational.
#### 1.2. `map_utils.py` (COMPLETE BUT NOT INTEGRATED) *Key Components:*
**Goal:** To provide graph-based map analysis and pathfinding for strategic decision-making.
**Status:** BFS search algorithms implemented but not integrated into planning/order generation.
**Key Components:**
* `DiplomacyGraph`: Represents map territory connectivity with support for unit-specific movement rules (Army vs Fleet). * `DiplomacyGraph`: Represents map territory connectivity with support for unit-specific movement rules (Army vs Fleet).
* `bfs_shortest_path`: Finds shortest path from a starting territory to any territory matching criteria. * `bfs_shortest_path`: Finds shortest path from a starting territory to any territory matching criteria.
* `bfs_nearest_adjacent`: Finds shortest path to a territory adjacent to any territory in a target set. * `bfs_nearest_adjacent`: Finds shortest path to a territory adjacent to any territory in a target set.
@ -35,22 +31,22 @@ This document provides an analysis of key Python modules within the `ai_diplomac
#### 1.4. `agent.py` (COMPLETE) #### 1.4. `agent.py` (COMPLETE)
**Goal:** To maintain stateful agent representation with personality, goals, and relationships. **Goal:** To maintain stateful agent representation with personality, goals, and relationships.
**Status:** Fully implemented and integrated with planning/negotiation workflows. **Status:** Fully implemented and integrated with planning/negotiation workflows. Initialization of goals and relationships via LLM is now handled by `initialization.py`.
**Key Components:** **Key Components:**
* `DiplomacyAgent` class with: * `DiplomacyAgent` class with:
* `power_name`: The power this agent represents * `power_name`: The power this agent represents
* `personality`: Agent's personality profile * `personality`: Agent's personality profile (though less emphasized now, system prompts per power exist)
* `goals`: List of strategic goals * `goals`: List of strategic goals, initially populated by `initialization.py` or constructor.
* `relationships`: Dict of relationships with other powers * `relationships`: Dict of relationships with other powers, initially populated by `initialization.py` or constructor.
* `private_journal`: List of internal thoughts/reflections * `private_journal`: List of internal thoughts/reflections (less structured).
* `private_diary`: List of structured, phase-prefixed summaries (negotiations, intents, orders) for concise historical context provided to LLMs.
* `_extract_json_from_text`: Robust JSON extraction from LLM responses * `_extract_json_from_text`: Robust JSON extraction from LLM responses
* `initialize_agent_state`: Sets initial goals and relationships * `analyze_phase_and_update_state`: Updates goals and relationships based on game events.
* `analyze_phase_and_update_state`: Updates goals and relationships based on game events * Methods for plan generation, updating goals, and updating relationships.
* Methods for plan generation, updating goals, and updating relationships
**Integration Points:** **Integration Points:**
* Connected to context generation in `clients.py` * Connected to context generation in `clients.py` (private diary provides summarized history)
* Influences planning and negotiations through goals and relationships * Influences planning and negotiations through goals and relationships
* Case-insensitive validation of LLM-provided power names and relationship statuses * Case-insensitive validation of LLM-provided power names and relationship statuses
* Robust error recovery with fallback defaults when LLM responses fail to parse * Robust error recovery with fallback defaults when LLM responses fail to parse
@ -70,18 +66,22 @@ This document provides an analysis of key Python modules within the `ai_diplomac
#### 1.8. `clients.py` (COMPLETE) #### 1.8. `clients.py` (COMPLETE)
**Goal:** To abstract and manage interactions with various LLM APIs. **Goal:** To abstract and manage interactions with various LLM APIs.
**Status:** Fully implemented with agent state integration. **Status:** Fully implemented with agent state integration (including personality, goals, relationships, and the new `private_diary` for summarized history). It now also leverages `possible_order_context.py` for richer order details in prompts.
**Note:** Uses various files in `prompts/` (e.g., `context_prompt.txt`, `order_instructions.txt`, `conversation_instructions.txt`) to structure LLM requests and define expected output formats. Ensuring these instruction files are present and correct is critical for reliable operation, especially for parsing structured data like orders or messages. **Note:** Uses various files in `prompts/` (e.g., `context_prompt.txt`, `order_instructions.txt`, `negotiation_diary_prompt.txt`, `order_diary_prompt.txt`) to structure LLM requests. `context_prompt.txt` has been updated to use `agent_private_diary` for history and a more structured `{possible_orders}` section generated by `possible_order_context.generate_rich_order_context`.
### PARTIALLY IMPLEMENTED MODULES: #### 1.9. `initialization.py` (NEWLY ADDED & COMPLETE)
**Goal:** To perform the initial LLM-driven setup of an agent's goals and relationships at the very start of the game (Spring 1901).
**Status:** Fully implemented and integrated into `lm_game.py`.
#### 1.9. `utils.py` (COMPLETE) **Key Components:**
**Goal:** To provide common utility functions used across other AI diplomacy modules. * `initialize_agent_state_ext(agent: DiplomacyAgent, game: Game, game_history: GameHistory, log_file_path: str)`: An asynchronous function that:
**Status:** Fully implemented. * Constructs a specific prompt tailored for Spring 1901, asking for initial goals and relationships.
* Utilizes the agent's client (`agent.client`) and the `run_llm_and_log` utility for the LLM interaction.
* Parses the JSON response using the agent's `_extract_json_from_text` method.
* Directly updates the `agent.goals` and `agent.relationships` attributes with the LLM's suggestions or defaults if parsing fails.
#### 1.10. `clients.py` (COMPLETE BUT NEEDS EXTENSION) **Integration Points:**
**Goal:** To abstract and manage interactions with various LLM APIs. * Called once per agent from `lm_game.py` immediately after the `DiplomacyAgent` object is instantiated and before the main game loop begins.
**Status:** Works, but needs extension to incorporate agent state into context.
--- ---
@ -89,20 +89,24 @@ This document provides an analysis of key Python modules within the `ai_diplomac
The following connections have been established: The following connections have been established:
1. **Agent State → Context Building** 1. **Initial Agent Setup (New)**:
* `BaseModelClient.build_context_prompt` incorporates agent's personality, goals, and relationships * `lm_game.py` calls `initialization.py`'s `initialize_agent_state_ext` for each agent. This function uses an LLM call to populate the agent's initial `goals` and `relationships` before the main game loop and other agent interactions commence.
* Modified prompt templates include sections for agent state
2. **Agent State → Negotiations** 2. **Agent State → Context Building**
* `BaseModelClient.build_context_prompt` in `clients.py` incorporates the agent's current `goals`, `relationships`, and the concise `agent_private_diary` for historical context.
* It also calls `possible_order_context.generate_rich_order_context` to provide a detailed and strategically relevant breakdown of possible orders, replacing a simpler list.
* `prompts/context_prompt.txt` is formatted to accept these inputs, including the structured possible orders and the agent's private diary.
3. **Agent State → Negotiations**
* Agent's personality, goals, and relationships influence message generation * Agent's personality, goals, and relationships influence message generation
* Relationships are updated based on negotiation context and results * Relationships are updated based on negotiation context and results
3. **Robust LLM Interaction** 4. **Robust LLM Interaction**
* Implemented multi-strategy JSON extraction to handle various LLM response formats * Implemented multi-strategy JSON extraction to handle various LLM response formats
* Added case-insensitive validation for power names and relationship statuses * Added case-insensitive validation for power names and relationship statuses
* Created fallback mechanisms for all LLM interactions * Created fallback mechanisms for all LLM interactions
4. **Error Recovery** 5. **Error Recovery**
* Added defensive programming throughout agent state updates * Added defensive programming throughout agent state updates
* Implemented progressive fallback strategies for parsing LLM outputs * Implemented progressive fallback strategies for parsing LLM outputs
* Used intelligent defaults to maintain consistent agent state * Used intelligent defaults to maintain consistent agent state
@ -141,27 +145,28 @@ The following connections have been established:
| game_history.py | <-----------+ | | agent.py | | game_history.py | <-----------+ | | agent.py |
+-----------------+ | +-----------------+ +-----------------+ | +-----------------+
^ | | ^ | |
| v v | v |
| +--------------+ +--------------+ | +--------------+ |
+------------------+ utils.py | <----- | map_utils.py | +------------------+ utils.py | <---------------
+--------------+ +--------------+ +--------------+
``` ```
**Current Integration Status:** **Current Integration Status:**
* `agent.py` is fully implemented and integrated with other modules * `agent.py` is fully implemented and integrated with other modules
* State updates work reliably between phases * State updates work reliably between phases
* Robust JSON parsing and case-insensitive validation ensure smooth operation * Robust JSON parsing and case-insensitive validation ensure smooth operation
* `map_utils.py` is implemented but not yet fully leveraged for strategic planning
**Asynchronous API Calls (Implemented April 2025)** **Asynchronous API Calls (Implemented April 2025)**
- Successfully refactored major LLM interaction points to use asynchronous patterns (`async`/`await`, `asyncio.gather`). - Successfully refactored major LLM interaction points to use asynchronous patterns (`async`/`await`, `asyncio.gather`).
- Utilized async client libraries (`AsyncOpenAI`, `AsyncAnthropic`, `generate_content_async` for Gemini). - Utilized async client libraries (`AsyncOpenAI`, `AsyncAnthropic`, `generate_content_async` for Gemini).
- Refactored components: - Refactored components:
- `DiplomacyAgent.initialize_agent_state` - `initialization.initialize_agent_state_ext` (replaces `DiplomacyAgent.initialize_agent_state`)
- `negotiations.conduct_negotiations` (message generation) - `negotiations.conduct_negotiations` (message generation)
- `utils.get_valid_orders` (order generation) - `utils.get_valid_orders` (order generation)
- `DiplomacyAgent.analyze_phase_and_update_state` - `DiplomacyAgent.analyze_phase_and_update_state`
- `DiplomacyAgent.generate_negotiation_diary_entry`
- `DiplomacyAgent.generate_order_diary_entry`
- `DiplomacyAgent.decide_builds_or_disbands`
- `planning.planning_phase`
- This significantly improves performance by allowing concurrent API calls instead of sequential ones. - This significantly improves performance by allowing concurrent API calls instead of sequential ones.
- Replaced `concurrent.futures.ThreadPoolExecutor` with `asyncio.gather` for managing concurrent async tasks. - Replaced `concurrent.futures.ThreadPoolExecutor` with `asyncio.gather` for managing concurrent async tasks.
```

View file

@ -1,264 +0,0 @@
import logging
from collections import deque
from typing import Dict, Set, List, Tuple, Callable, Any, Optional
from diplomacy.map import Map
logger = logging.getLogger(__name__)
class DiplomacyGraph:
"""Custom graph implementation for Diplomacy map connectivity."""
def __init__(self):
# Main graph structure: dict of dict of sets
# graph[node1][node2] = {'A', 'F'} means both army and fleet can move between nodes
# graph[node1][node2] = {'A'} means only army can move between nodes
self.graph: Dict[str, Dict[str, Set[str]]] = {}
def add_node(self, node: str):
"""Add a node if it doesn't exist."""
if node not in self.graph:
self.graph[node] = {}
def add_edge(self, node1: str, node2: str, unit_type: str):
"""Add an edge between nodes for specific unit type ('A' or 'F')."""
self.add_node(node1)
self.add_node(node2)
# Add connection for node1 -> node2
if node2 not in self.graph[node1]:
self.graph[node1][node2] = set()
self.graph[node1][node2].add(unit_type)
# Add connection for node2 -> node1 (undirected graph)
if node1 not in self.graph[node2]:
self.graph[node2][node1] = set()
self.graph[node2][node1].add(unit_type)
def get_adjacent(self, node: str) -> List[str]:
"""Get all nodes adjacent to given node."""
return list(self.graph.get(node, {}).keys())
def get_allowed_units(self, node1: str, node2: str) -> Set[str]:
"""Get set of unit types that can move between these nodes."""
return self.graph.get(node1, {}).get(node2, set())
def nodes(self) -> List[str]:
"""Return all nodes in the graph."""
return list(self.graph.keys())
def edges(self) -> List[Tuple[str, str, Set[str]]]:
"""Return all edges with their unit types as (node1, node2, unit_types)."""
edges = []
seen = set() # To avoid duplicates in undirected graph
for node1 in self.graph:
for node2, unit_types in self.graph[node1].items():
# Ensure consistent ordering for the 'seen' check
edge_tuple = tuple(sorted((node1, node2)))
if edge_tuple not in seen:
edges.append((node1, node2, unit_types))
seen.add(edge_tuple)
return edges
# --- BFS Functions ---
def bfs_shortest_path(
graph: DiplomacyGraph,
start: str,
match_condition: Callable[[str], Any], # Function returns non-None/non-False if matched
allowed_unit_types: Set[str]
) -> Tuple[Optional[List[str]], Any]:
"""
Performs Breadth-First Search on a DiplomacyGraph from 'start' to find the first territory
for which 'match_condition(territory)' returns a truthy value.
Args:
graph: The DiplomacyGraph instance to search.
start: The starting territory node name (e.g., 'PAR').
match_condition: A function that takes a territory name (str) and returns
any value that evaluates to True if the condition is met,
or False/None otherwise. The returned value is included in the output.
allowed_unit_types: A set of unit types ('A', 'F') allowed for traversal.
Returns:
Tuple[Optional[List[str]], Any]:
- A list of territory names representing the shortest path from 'start' to the matched
territory (inclusive), or None if no path is found.
- The truthy value returned by match_condition for the matched territory, or None.
"""
if start not in graph.graph: # Access the internal graph dict
logger.warning(f"BFS shortest path: Start node '{start}' not in graph.")
return None, None
visited: Set[str] = {start}
# Queue stores paths (lists of nodes)
queue: deque[List[str]] = deque([[start]])
# Check if the starting territory itself satisfies match_condition
initial_match = match_condition(start)
if initial_match:
return [start], initial_match
while queue:
path = queue.popleft()
current = path[-1]
# Check neighbors of the current node
for neighbor in graph.get_adjacent(current):
edge_types = graph.get_allowed_units(current, neighbor)
# Check if any allowed unit type can traverse this edge
if edge_types.intersection(allowed_unit_types):
if neighbor not in visited:
visited.add(neighbor)
new_path = path + [neighbor]
# Check if the neighbor meets the match condition
match_result = match_condition(neighbor)
if match_result:
return new_path, match_result
queue.append(new_path)
logger.debug(f"BFS shortest path: No node matching condition found from '{start}'.")
return None, None
def bfs_nearest_adjacent(
graph: DiplomacyGraph,
start: str,
occupant_map: Dict[str, Any], # Map territory_name -> occupant_info
allowed_unit_types: Set[str]
) -> Tuple[Optional[List[str]], Tuple[Optional[str], Any]]:
"""
Performs Breadth-First Search from 'start' to find the shortest path to a territory
that is *adjacent* to any territory listed in the 'occupant_map'.
Args:
graph: The DiplomacyGraph instance to search.
start: The starting territory node name.
occupant_map: A dictionary where keys are territory names occupied by entities
we want to find adjacency to. Values can be any associated info
(e.g., the occupying unit type or power).
allowed_unit_types: A set of unit types ('A', 'F') allowed for traversal.
Returns:
Tuple[Optional[List[str]], Tuple[Optional[str], Any]]:
- A list representing the shortest path from 'start' to the territory adjacent
to an occupied one, or None if no such path exists.
- A tuple containing:
- The name of the occupied territory that was found adjacent to the path's end.
- The value associated with that occupied territory from occupant_map.
Returns (None, None) if no path is found.
"""
if not occupant_map or start not in graph.graph: # Access the internal graph dict
logger.warning(f"BFS nearest adjacent: Invalid input - occupant_map empty or start node '{start}' not in graph.")
return None, (None, None)
visited: Set[str] = {start}
# Queue stores paths (lists of nodes)
queue: deque[List[str]] = deque([[start]])
while queue:
path = queue.popleft()
current = path[-1]
# Check if ANY neighbor of the current node is in the occupant_map
for neighbor in graph.get_adjacent(current):
if neighbor in occupant_map:
# Found a path ending adjacent to an occupied territory
occupant_info = occupant_map[neighbor]
return path, (neighbor, occupant_info)
# If no adjacent occupant found, expand the search to neighbors
for neighbor in graph.get_adjacent(current):
edge_types = graph.get_allowed_units(current, neighbor)
# Check if traversal is possible with allowed unit types
if edge_types.intersection(allowed_unit_types):
if neighbor not in visited:
visited.add(neighbor)
new_path = path + [neighbor]
queue.append(new_path)
logger.debug(f"BFS nearest adjacent: No path found from '{start}' adjacent to occupied territories.")
return None, (None, None)
# --- Build Function ---
def build_diplomacy_graph(game_map: Map) -> DiplomacyGraph:
"""
Builds a DiplomacyGraph representing the connectivity of a given diplomacy map.
Args:
game_map: An instance of the diplomacy.map.Map class.
Returns:
A populated DiplomacyGraph instance.
"""
graph = DiplomacyGraph()
processed_edges = set() # To avoid redundant checks in undirected graph
for loc1_name in game_map.locs:
graph.add_node(loc1_name)
loc1_area = game_map.area_data[loc1_name]
for loc2_name, coast_spec in loc1_area.adjacencies:
# Ensure loc2 exists in map data (should always be true)
if loc2_name not in game_map.area_data:
logger.warning(f"Adjacent location '{loc2_name}' for '{loc1_name}' not found in map data. Skipping.")
continue
loc2_area = game_map.area_data[loc2_name]
# Create a canonical representation for the edge to avoid duplicates
edge_tuple = tuple(sorted((loc1_name, loc2_name)))
if edge_tuple in processed_edges:
continue
# --- Determine Army ('A') Movement ---
can_army_move = False
# Army moves between land/coastal areas. Cannot move if both are sea.
if not (loc1_area.is_sea and loc2_area.is_sea):
can_army_move = True # Simplified: Assumes land connectivity if not both sea
# More precise check might involve pathfinding logic or specific land borders,
# but this covers basic adjacency for armies.
if can_army_move:
graph.add_edge(loc1_name, loc2_name, 'A')
# --- Determine Fleet ('F') Movement ---
can_fleet_move = False
# Fleet moves between sea/coastal areas. Cannot move if both are pure land.
if not (loc1_area.is_land and not loc1_area.is_coastal and
loc2_area.is_land and not loc2_area.is_coastal):
# Check coasts if both are coastal
if loc1_area.is_coastal and loc2_area.is_coastal:
# Fleet can only move if the adjacency specifically allows it (matching coasts)
# The adjacency tuple (loc2_name, coast_spec) provides this info.
# We need to check if loc1 can reach loc2 via the specified coast(s).
# This often means loc1 needs to have a coast matching coast_spec,
# or the adjacency implies general coastal access.
# Using game_map.coast_data might be needed for complex checks.
# Let's use a simplified check based on whether coast_spec exists.
# A more robust method might directly check map.is_valid_move for fleets.
if coast_spec: # Adjacency has coastal specification
# Check if loc1_area's coasts are compatible with coast_spec
# This logic can be complex; assuming adjacency implies possibility for now.
if game_map.is_valid_move('F', loc1_name, loc2_name): # Use built-in check
can_fleet_move = True
else: # No specific coast needed
if game_map.is_valid_move('F', loc1_name, loc2_name): # Use built-in check
can_fleet_move = True
else:
# One or both are sea, or one is coastal and one is sea/land
# Generally possible if not land-to-land
if game_map.is_valid_move('F', loc1_name, loc2_name): # Use built-in check
can_fleet_move = True
if can_fleet_move:
graph.add_edge(loc1_name, loc2_name, 'F')
processed_edges.add(edge_tuple)
logger.info(f"Built DiplomacyGraph with {len(graph.nodes())} nodes and {len(graph.edges())} edges.")
return graph

View file

@ -0,0 +1,445 @@
# ai_diplomacy/possible_order_context.py
from collections import deque
from typing import Dict, List, Callable, Optional, Any, Set, Tuple
from diplomacy.engine.map import Map as GameMap
from diplomacy.engine.game import Game as BoardState
import logging
# Placeholder for actual map type from diplomacy.engine.map.Map
# GameMap = Any
# Type hint for board_state dictionary from game.get_state()
# BoardState = Dict[str, Any]
logger = logging.getLogger(__name__)
def build_diplomacy_graph(game_map: GameMap) -> Dict[str, Dict[str, List[str]]]:
"""
Builds a graph where keys are SHORT province names (e.g., 'PAR', 'STP').
Adjacency lists also contain SHORT province names.
This graph is used for BFS pathfinding.
"""
graph: Dict[str, Dict[str, List[str]]] = {}
# Deriving a clean list of unique, 3-letter, uppercase short province names
# game_map.locs contains all locations, including coasts e.g. "STP/SC"
unique_short_names = set()
for loc in game_map.locs:
short_name = loc.split('/')[0][:3].upper() # Take first 3 chars and uppercase
if len(short_name) == 3: # Ensure it's a 3-letter name
unique_short_names.add(short_name)
all_short_province_names = sorted(list(unique_short_names))
# Initialize graph with all valid short province names as keys
for province_name in all_short_province_names:
graph[province_name] = {'ARMY': [], 'FLEET': []}
for province_short_source in all_short_province_names: # e.g. 'PAR', 'STP'
# Get all full names for this source province (e.g. 'STP' -> ['STP/NC', 'STP/SC', 'STP'])
full_names_for_source = game_map.loc_coasts.get(province_short_source, [province_short_source])
for loc_full_source_variant in full_names_for_source: # e.g. 'STP/NC', then 'STP/SC', then 'STP'
# province_short_source is already the short name like 'STP'
# game_map.loc_abut provides general adjacencies, which might include specific coasts or lowercase names
for raw_adj_loc_from_loc_abut in game_map.loc_abut.get(province_short_source, []):
# Normalize this raw adjacent location to its short, uppercase form
adj_short_name_normalized = raw_adj_loc_from_loc_abut[:3].upper()
# Get all full names for this *normalized* adjacent short name (e.g. 'BUL' -> ['BUL/EC', 'BUL/SC', 'BUL'])
full_names_for_adj_dest = game_map.loc_coasts.get(adj_short_name_normalized, [adj_short_name_normalized])
# Check for ARMY movement
unit_char_army = 'A'
if any(
game_map.abuts(
unit_char_army,
loc_full_source_variant, # Specific full source, e.g. 'STP/NC'
'-', # Order type for move
full_dest_variant # Specific full destination, e.g. 'MOS' or 'FIN'
)
for full_dest_variant in full_names_for_adj_dest
):
if adj_short_name_normalized not in graph[province_short_source]['ARMY']:
graph[province_short_source]['ARMY'].append(adj_short_name_normalized)
# Check for FLEET movement
unit_char_fleet = 'F'
if any(
game_map.abuts(
unit_char_fleet,
loc_full_source_variant, # Specific full source, e.g. 'STP/NC'
'-', # Order type for move
full_dest_variant # Specific full destination, e.g. 'BAR' or 'NWY'
)
for full_dest_variant in full_names_for_adj_dest
):
if adj_short_name_normalized not in graph[province_short_source]['FLEET']:
graph[province_short_source]['FLEET'].append(adj_short_name_normalized)
# Remove duplicates from adjacency lists (just in case)
for province_short in graph:
if 'ARMY' in graph[province_short]:
graph[province_short]['ARMY'] = sorted(list(set(graph[province_short]['ARMY'])))
if 'FLEET' in graph[province_short]:
graph[province_short]['FLEET'] = sorted(list(set(graph[province_short]['FLEET'])))
return graph
def bfs_shortest_path(
graph: Dict[str, Dict[str, List[str]]],
board_state: BoardState,
game_map: GameMap, # Added game_map
start_loc_full: str, # This is a FULL location name like 'VIE' or 'STP/SC'
unit_type: str,
is_target_func: Callable[[str, BoardState], bool] # Expects SHORT name for loc
) -> Optional[List[str]]: # Returns path of SHORT names
"""Performs BFS to find the shortest path from start_loc to a target satisfying is_target_func."""
# Convert full start location to short province name
start_loc_short = game_map.loc_name.get(start_loc_full, start_loc_full)
if '/' in start_loc_short: # If it was STP/SC, loc_name gives STP. If it was VIE, loc_name gives VIE.
start_loc_short = start_loc_short[:3]
# If start_loc_full was already short (e.g. 'VIE'), get might return it as is, or its value if it was a key.
# A simpler way for non-coastal full (like 'VIE') or already short:
if '/' not in start_loc_full:
start_loc_short = start_loc_full[:3] # Ensures 'VIE' -> 'VIE', 'PAR' -> 'PAR'
else: # Has '/', e.g. 'STP/SC'
start_loc_short = start_loc_full[:3] # 'STP/SC' -> 'STP'
if start_loc_short not in graph:
logger.warning(f"BFS: Start province {start_loc_short} (from {start_loc_full}) not in graph. Pathfinding may fail.")
return None
queue: deque[Tuple[str, List[str]]] = deque([(start_loc_short, [start_loc_short])])
visited_nodes: Set[str] = {start_loc_short}
while queue:
current_loc_short, path = queue.popleft()
# is_target_func expects a short location name
if is_target_func(current_loc_short, board_state):
return path # Path of short names
# possible_neighbors are SHORT names from the graph
possible_neighbors_short = graph.get(current_loc_short, {}).get(unit_type, [])
for next_loc_short in possible_neighbors_short:
if next_loc_short not in visited_nodes:
if next_loc_short not in graph: # Defensive check for neighbors not in graph keys
logger.warning(f"BFS: Neighbor {next_loc_short} of {current_loc_short} not in graph. Skipping.")
continue
visited_nodes.add(next_loc_short)
new_path = path + [next_loc_short]
queue.append((next_loc_short, new_path))
return None
# --- Helper functions for context generation ---
def get_unit_at_location(board_state: BoardState, location: str) -> Optional[str]:
"""Returns the full unit string (e.g., 'A PAR (FRA)') if a unit is at the location, else None."""
for power, unit_list in board_state.get('units', {}).items():
for unit_str in unit_list: # e.g., "A PAR", "F STP/SC"
parts = unit_str.split(" ")
if len(parts) == 2:
unit_map_loc = parts[1]
if unit_map_loc == location:
return f"{parts[0]} {location} ({power})"
return None
def get_sc_controller(game_map: GameMap, board_state: BoardState, location: str) -> Optional[str]:
"""Returns the controlling power's name if the location is an SC, else None."""
# Normalize location to base province name, as SCs are tied to provinces, not specific coasts
loc_province_name = game_map.loc_name.get(location, location).upper()[:3]
if loc_province_name not in game_map.scs:
return None
for power, sc_list in board_state.get('centers', {}).items():
if loc_province_name in sc_list:
return power
return None # Unowned SC
def get_shortest_path_to_friendly_unit(
board_state: BoardState,
graph: Dict[str, Dict[str, List[str]]],
game_map: GameMap, # Added game_map
power_name: str,
start_unit_loc_full: str,
start_unit_type: str
) -> Optional[Tuple[str, List[str]]]:
"""Finds the shortest path to any friendly unit of the same power."""
def is_target_friendly(loc_short: str, current_board_state: BoardState) -> bool:
# loc_short is a short province name. Need to check all its full locations.
full_locs_for_short = game_map.loc_coasts.get(loc_short, [loc_short])
for full_loc_variant in full_locs_for_short:
unit_at_loc = get_unit_at_location(current_board_state, full_loc_variant)
if unit_at_loc and unit_at_loc.split(" ")[2][1:4] == power_name and full_loc_variant != start_unit_loc_full:
return True
return False
path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_target_friendly)
if path_short_names and len(path_short_names) > 1: # Path includes start, so > 1 means a distinct friendly unit found
target_loc_short = path_short_names[-1]
# Find the actual friendly unit string at one of the full locations of target_loc_short
friendly_unit_str = "UNKNOWN_FRIENDLY_UNIT"
full_locs_for_target_short = game_map.loc_coasts.get(target_loc_short, [target_loc_short])
for fl_variant in full_locs_for_target_short:
unit_str = get_unit_at_location(board_state, fl_variant)
if unit_str and unit_str.split(" ")[2][1:4] == power_name:
friendly_unit_str = unit_str
break
return friendly_unit_str, path_short_names
return None
def get_nearest_enemy_units(
board_state: BoardState,
graph: Dict[str, Dict[str, List[str]]],
game_map: GameMap, # Added game_map
power_name: str,
start_unit_loc_full: str,
start_unit_type: str,
n: int = 3
) -> List[Tuple[str, List[str]]]:
"""Finds up to N nearest enemy units, sorted by path length."""
enemy_paths: List[Tuple[str, List[str]]] = [] # (enemy_unit_str, path_short_names)
all_enemy_unit_locations_full: List[Tuple[str,str]] = [] # (loc_full, unit_str_full)
# board_state.get("units", {}) has format: { "POWER_NAME": ["A PAR", "F BRE"], ... }
for p_name, unit_list_for_power in board_state.get("units", {}).items():
if p_name != power_name: # If it's an enemy power
for unit_repr_from_state in unit_list_for_power: # e.g., "A PAR" or "F STP/SC"
parts = unit_repr_from_state.split(" ")
if len(parts) == 2:
# unit_type_char = parts[0] # 'A' or 'F'
loc_full = parts[1] # 'PAR' or 'STP/SC'
# Use get_unit_at_location to get the consistent full unit string like "A PAR (POWER_NAME)"
full_unit_str_with_power = get_unit_at_location(board_state, loc_full)
if full_unit_str_with_power: # Should find the unit if iteration is correct
all_enemy_unit_locations_full.append((loc_full, full_unit_str_with_power))
for target_enemy_loc_full, enemy_unit_str in all_enemy_unit_locations_full:
target_enemy_loc_short = game_map.loc_name.get(target_enemy_loc_full, target_enemy_loc_full)
if '/' in target_enemy_loc_short:
target_enemy_loc_short = target_enemy_loc_short[:3]
if '/' not in target_enemy_loc_full:
target_enemy_loc_short = target_enemy_loc_full[:3]
else:
target_enemy_loc_short = target_enemy_loc_full[:3]
def is_specific_enemy_loc(loc_short: str, current_board_state: BoardState) -> bool:
# Check if loc_short corresponds to target_enemy_loc_full
return loc_short == target_enemy_loc_short
path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_specific_enemy_loc)
if path_short_names:
enemy_paths.append((enemy_unit_str, path_short_names))
enemy_paths.sort(key=lambda x: len(x[1])) # Sort by path length
return enemy_paths[:n]
def get_nearest_uncontrolled_scs(
game_map: GameMap,
board_state: BoardState,
graph: Dict[str, Dict[str, List[str]]],
power_name: str,
start_unit_loc_full: str,
start_unit_type: str,
n: int = 3
) -> List[Tuple[str, int, List[str]]]: # (sc_name_short, distance, path_short_names)
"""Finds up to N nearest SCs not controlled by power_name, sorted by path length."""
uncontrolled_sc_paths: List[Tuple[str, int, List[str]]] = []
all_scs_short = game_map.scs # This is a list of short province names that are SCs
for sc_loc_short in all_scs_short:
controller = get_sc_controller(game_map, board_state, sc_loc_short)
if controller != power_name:
def is_target_sc(loc_short: str, current_board_state: BoardState) -> bool:
return loc_short == sc_loc_short
path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_target_sc)
if path_short_names:
# Path includes start, so distance is len - 1
uncontrolled_sc_paths.append((f"{sc_loc_short} (Ctrl: {controller or 'None'})", len(path_short_names) -1, path_short_names))
# Sort by distance (path length - 1), then by SC name for tie-breaking
uncontrolled_sc_paths.sort(key=lambda x: (x[1], x[0]))
return uncontrolled_sc_paths[:n]
def get_adjacent_territory_details(
game_map: GameMap,
board_state: BoardState,
unit_loc_full: str, # The location of the unit whose adjacencies we're checking
unit_type: str, # ARMY or FLEET of the unit at unit_loc_full
graph: Dict[str, Dict[str, List[str]]]
) -> str:
"""Generates a string describing adjacent territories and units that can interact with them."""
output_lines: List[str] = []
# Get adjacencies for the current unit's type
# The graph already stores processed adjacencies (e.g. army can't go to sea)
# For armies, graph[unit_loc_full]['ARMY'] gives short province names
# For fleets, graph[unit_loc_full]['FLEET'] gives full loc names (incl coasts)
# THIS COMMENT IS NOW OUTDATED. Graph uses short names for keys and values.
unit_loc_short = game_map.loc_name.get(unit_loc_full, unit_loc_full)
if '/' in unit_loc_short:
unit_loc_short = unit_loc_short[:3]
if '/' not in unit_loc_full:
unit_loc_short = unit_loc_full[:3]
else:
unit_loc_short = unit_loc_full[:3]
adjacent_locs_short_for_unit = graph.get(unit_loc_short, {}).get(unit_type, [])
processed_adj_provinces = set() # To handle cases like STP/NC and STP/SC both being adjacent to BOT
for adj_loc_short in adjacent_locs_short_for_unit: # adj_loc_short is already short
# adj_province_short = game_map.loc_name.get(adj_loc_full, adj_loc_full).upper()[:3] # No longer needed
if adj_loc_short in processed_adj_provinces: # adj_loc_short is already short and upper implicitly by map data
continue
processed_adj_provinces.add(adj_loc_short)
adj_loc_type = game_map.loc_type.get(adj_loc_short, 'UNKNOWN').upper()
if adj_loc_type == 'COAST' or adj_loc_type == 'LAND':
adj_loc_type_display = 'LAND' if adj_loc_type == 'LAND' else 'COAST'
elif adj_loc_type == 'WATER':
adj_loc_type_display = 'WATER'
else: # SHUT etc.
adj_loc_type_display = adj_loc_type
line = f" {adj_loc_short} ({adj_loc_type_display})"
sc_controller = get_sc_controller(game_map, board_state, adj_loc_short)
if sc_controller:
line += f" SC Control: {sc_controller}"
unit_in_adj_loc = get_unit_at_location(board_state, adj_loc_short)
if unit_in_adj_loc:
line += f" Units: {unit_in_adj_loc}"
output_lines.append(line)
# "Can support/move to" - Simplified: list units in *further* adjacent provinces
# A true "can support/move to" would require checking possible orders of those further units.
# further_adj_provinces are short names from the graph
further_adj_provinces_short = graph.get(adj_loc_short, {}).get('ARMY', []) + \
graph.get(adj_loc_short, {}).get('FLEET', [])
supporting_units_info = []
processed_further_provinces = set()
for further_adj_loc_short in further_adj_provinces_short:
# further_adj_province_short = game_map.loc_name.get(further_adj_loc_full, further_adj_loc_full).upper()[:3]
# No conversion needed, it's already short
if further_adj_loc_short == adj_loc_short or further_adj_loc_short == unit_loc_short: # Don't list itself or origin
continue
if further_adj_loc_short in processed_further_provinces:
continue
processed_further_provinces.add(further_adj_loc_short)
# Check for units in this further adjacent province (any coast)
# This is a bit broad. We should check units in the specific 'further_adj_loc_full'
# unit_in_further_loc = get_unit_at_location(board_state, further_adj_loc_full)
# We have further_adj_loc_short. Need to check all its full variants.
unit_in_further_loc = ""
full_variants_of_further_short = game_map.loc_coasts.get(further_adj_loc_short, [further_adj_loc_short])
for fv_further in full_variants_of_further_short:
temp_unit = get_unit_at_location(board_state, fv_further)
if temp_unit:
unit_in_further_loc = temp_unit
break # Found a unit in one of the coasts/base
# if not unit_in_further_loc and further_adj_loc_full != further_adj_province_short:
# unit_in_further_loc = get_unit_at_location(board_state, further_adj_province_short)
if unit_in_further_loc:
supporting_units_info.append(unit_in_further_loc)
if supporting_units_info:
output_lines.append(f" => Can support/move to: {', '.join(sorted(list(set(supporting_units_info))))}")
return "\n".join(output_lines)
# --- Main context generation function ---
def generate_rich_order_context(game: Any, power_name: str, possible_orders_for_power: Dict[str, List[str]]) -> str:
"""
Generates the rich, multi-line context string for all units of a given power
that have possible orders.
"""
board_state: BoardState = game.get_state()
game_map: GameMap = game.map
graph = build_diplomacy_graph(game_map)
final_context_lines: List[str] = ["Enhanced Possible Orders Context:"]
# Iterate through units that have orders (keys of possible_orders_for_power are unit locations)
for unit_loc_full, unit_specific_possible_orders in possible_orders_for_power.items():
unit_str_full = get_unit_at_location(board_state, unit_loc_full)
if not unit_str_full: # Should not happen if unit_loc_full is from possible_orders keys
continue
unit_type_char = unit_str_full.split(" ")[0] # 'A' or 'F'
unit_type_long = "ARMY" if unit_type_char == 'A' else "FLEET"
# Section Header: Strategic territory held by POWER: LOC (TYPE)
loc_province_short = game_map.loc_name.get(unit_loc_full, unit_loc_full).upper()[:3]
loc_type_short = game_map.loc_type.get(loc_province_short, "UNKNOWN").upper()
if loc_type_short == 'COAST' or loc_type_short == 'LAND':
loc_type_display = 'LAND' if loc_type_short == 'LAND' else 'COAST'
else:
loc_type_display = loc_type_short
current_unit_lines: List[str] = []
sc_owner_at_loc = get_sc_controller(game_map, board_state, unit_loc_full)
header_line = f"\n# Strategic territory held by {power_name}: {unit_loc_full} ({loc_type_display})"
if sc_owner_at_loc == power_name:
header_line += " (Controls SC)"
elif sc_owner_at_loc:
header_line += f" (SC controlled by {sc_owner_at_loc})"
current_unit_lines.append(header_line)
current_unit_lines.append(f"Units present: {unit_str_full}")
# Shortest path to friendly unit
friendly_path_info = get_shortest_path_to_friendly_unit(board_state, graph, game_map, power_name, unit_loc_full, unit_type_long)
if friendly_path_info:
friendly_unit_str, friendly_path_short = friendly_path_info
current_unit_lines.append(" Shortest path for {}:".format(unit_str_full.split(" ")[0] + " " + unit_loc_full )) # A TYR
current_unit_lines.append(" => Nearest friendly unit:")
current_unit_lines.append(f" {friendly_unit_str} path=[{unit_loc_full}{(''.join(friendly_path_short[1:])) if len(friendly_path_short) > 1 else friendly_path_short[0]}]")
else:
current_unit_lines.append(" Shortest path for {}:".format(unit_str_full.split(" ")[0] + " " + unit_loc_full ))
current_unit_lines.append(" => Nearest friendly unit: None found")
# Possible moves (already given)
current_unit_lines.append(" => Possible moves:")
for order_str in unit_specific_possible_orders:
current_unit_lines.append(f" {order_str}")
# Nearest enemy units
enemy_units_info = get_nearest_enemy_units(board_state, graph, game_map, power_name, unit_loc_full, unit_type_long, n=3)
if enemy_units_info:
current_unit_lines.append(" Nearest units (not ours):")
for enemy_unit_str, enemy_path_short in enemy_units_info:
current_unit_lines.append(f" {enemy_unit_str}, path=[{unit_loc_full}{(''.join(enemy_path_short[1:])) if len(enemy_path_short) > 1 else enemy_path_short[0]}]")
else:
current_unit_lines.append(" Nearest units (not ours): None found")
# Nearest supply centers (not controlled by us)
uncontrolled_scs_info = get_nearest_uncontrolled_scs(game_map, board_state, graph, power_name, unit_loc_full, unit_type_long, n=3)
if uncontrolled_scs_info:
current_unit_lines.append(" Nearest supply centers (not controlled by us):")
for sc_str, dist, sc_path_short in uncontrolled_scs_info:
current_unit_lines.append(f" {sc_str}, dist={dist}, path=[{unit_loc_full}{(''.join(sc_path_short[1:])) if len(sc_path_short) > 1 else sc_path_short[0]}]")
else:
current_unit_lines.append(" Nearest supply centers (not controlled by us): None found")
# Adjacent territories details
adj_details_str = get_adjacent_territory_details(game_map, board_state, unit_loc_full, unit_type_long, graph)
if adj_details_str:
current_unit_lines.append("Adjacent territories (including units that can support/move to the adjacent territory):")
current_unit_lines.append(adj_details_str)
final_context_lines.extend(current_unit_lines)
return "\n".join(final_context_lines)

View file

@ -24,8 +24,4 @@ All Supply Centers:
**MESSAGES RECEIVED THIS ROUND** **MESSAGES RECEIVED THIS ROUND**
{messages_this_round} {messages_this_round}
**PREVIOUS GAME HISTORY (Messages from older rounds & phases)**
{previous_game_history}

View file

@ -7,6 +7,8 @@
**CRITICAL RULES:** **CRITICAL RULES:**
* Your orders *must* be chosen from the `possible_orders` list provided in the context. * Your orders *must* be chosen from the `possible_orders` list provided in the context.
* Support orders must correspond to an actual move or hold order you are issuing (e.g., `A PAR S F PIC - ENG` requires `F PIC - ENG`). * Support orders must correspond to an actual move or hold order you are issuing (e.g., `A PAR S F PIC - ENG` requires `F PIC - ENG`).
* **Build Orders (During Build Phases Only):** To build a new unit in one of your owned and vacant supply centers, use the format `[UnitType] [Location3LetterCode] B`. `UnitType` is `A` for Army or `F` for Fleet. For example: `A PAR B` (Build Army in Paris), `F LON B` (Build Fleet in London). Your `possible_orders` list will show available build locations and unit types.
* **Dual-Coast Provinces**: For fleets in or moving to/from provinces with multiple distinct coasts (e.g., St. Petersburg (STP), Spain (SPA), Bulgaria (BUL)), you MUST specify the coast if it's relevant to the order's validity or ambiguity. Use the format `F [PROVINCE]/[COAST_CODE] ...`. For example: `F STP/NC B` (Build in North Coast), `A MAR S F SPA/SC - WES` (Support fleet in South Coast). Common coast codes are NC (North Coast), SC (South Coast), EC (East Coast), WC (West Coast). Consult the `possible_orders` list for the exact format if unsure.
* Adjudication is simultaneous. * Adjudication is simultaneous.
* You are only submitting orders now. Do not write messages. * You are only submitting orders now. Do not write messages.

View file

@ -30,15 +30,14 @@ def assign_models_to_powers() -> Dict[str, str]:
deepseek-chat, deepseek-reasoner deepseek-chat, deepseek-reasoner
openrouter-meta-llama/llama-3.3-70b-instruct, openrouter-qwen/qwen3-235b-a22b, openrouter-microsoft/phi-4-reasoning-plus:free, openrouter-deepseek/deepseek-prover-v2:free, openrouter-meta-llama/llama-4-maverick:free, openrouter-nvidia/llama-3.3-nemotron-super-49b-v1:free, openrouter-google/gemma-3-12b-it:free openrouter-meta-llama/llama-3.3-70b-instruct, openrouter-qwen/qwen3-235b-a22b, openrouter-microsoft/phi-4-reasoning-plus:free, openrouter-deepseek/deepseek-prover-v2:free, openrouter-meta-llama/llama-4-maverick:free, openrouter-nvidia/llama-3.3-nemotron-super-49b-v1:free, openrouter-google/gemma-3-12b-it:free
""" """
return { return {
"AUSTRIA": "openrouter-meta-llama/llama-3.3-70b-instruct", "AUSTRIA": "openrouter-google/gemini-2.5-flash-preview",
"ENGLAND": "openrouter-qwen/qwen3-235b-a22b", "ENGLAND": "openrouter-google/gemini-2.5-flash-preview",
"FRANCE": "openrouter-microsoft/phi-4-reasoning-plus:free", "FRANCE": "openrouter-google/gemini-2.5-flash-preview",
"GERMANY": "openrouter-deepseek/deepseek-prover-v2:free", "GERMANY": "openrouter-google/gemini-2.5-flash-preview",
"ITALY": "openrouter-meta-llama/llama-4-maverick:free", "ITALY": "openrouter-google/gemini-2.5-flash-preview",
"RUSSIA": "openrouter-nvidia/llama-3.3-nemotron-super-49b-v1:free", "RUSSIA": "openrouter-google/gemini-2.5-flash-preview",
"TURKEY": "openrouter-google/gemma-3-12b-it:free", "TURKEY": "openrouter-google/gemini-2.5-flash-preview",
} }
@ -269,7 +268,9 @@ def log_llm_response(
power_name: Optional[str], # Optional for non-power-specific calls like summary power_name: Optional[str], # Optional for non-power-specific calls like summary
phase: str, phase: str,
response_type: str, response_type: str,
raw_input_prompt: str, # Added new parameter for the raw input
raw_response: str, raw_response: str,
success: str, # Changed from bool to str
): ):
"""Appends a raw LLM response to a CSV log file.""" """Appends a raw LLM response to a CSV log file."""
try: try:
@ -282,7 +283,8 @@ def log_llm_response(
file_exists = os.path.isfile(log_file_path) file_exists = os.path.isfile(log_file_path)
with open(log_file_path, "a", newline="", encoding="utf-8") as csvfile: with open(log_file_path, "a", newline="", encoding="utf-8") as csvfile:
fieldnames = ["model", "power", "phase", "response_type", "raw_response"] # Added "raw_input" to fieldnames
fieldnames = ["model", "power", "phase", "response_type", "raw_input", "raw_response", "success"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists: if not file_exists:
@ -293,7 +295,9 @@ def log_llm_response(
"power": power_name if power_name else "game", # Use 'game' if no specific power "power": power_name if power_name else "game", # Use 'game' if no specific power
"phase": phase, "phase": phase,
"response_type": response_type, "response_type": response_type,
"raw_input": raw_input_prompt, # Added raw_input to the row
"raw_response": raw_response, "raw_response": raw_response,
"success": success,
}) })
except Exception as e: except Exception as e:
logger.error(f"Failed to log LLM response to {log_file_path}: {e}", exc_info=True) logger.error(f"Failed to log LLM response to {log_file_path}: {e}", exc_info=True)
@ -303,36 +307,17 @@ def log_llm_response(
async def run_llm_and_log( async def run_llm_and_log(
client: 'BaseModelClient', client: 'BaseModelClient',
prompt: str, prompt: str,
log_file_path: str, log_file_path: str, # Kept for context, but not used for logging here
power_name: Optional[str], power_name: Optional[str], # Kept for context, but not used for logging here
phase: str, phase: str, # Kept for context, but not used for logging here
response_type: str, response_type: str, # Kept for context, but not used for logging here
) -> str: ) -> str:
"""Calls the client's generate_response and logs the raw output.""" """Calls the client's generate_response and returns the raw output. Logging is handled by the caller."""
raw_response = "" # Initialize in case of error raw_response = "" # Initialize in case of error
try: try:
raw_response = await client.generate_response(prompt) raw_response = await client.generate_response(prompt)
# Log the successful response
log_llm_response(
log_file_path=log_file_path,
model_name=client.model_name,
power_name=power_name,
phase=phase,
response_type=response_type,
raw_response=raw_response,
)
except Exception as e: except Exception as e:
# Log the error attempt (optional, could log empty response instead) # Log the API call error. The caller will decide how to log this in llm_responses.csv
logger.error(f"Error during LLM call for {power_name}/{response_type} in phase {phase}: {e}", exc_info=True) logger.error(f"API Error during LLM call for {client.model_name}/{power_name}/{response_type} in phase {phase}: {e}", exc_info=True)
log_llm_response( # raw_response remains "" indicating failure to the caller
log_file_path=log_file_path,
model_name=client.model_name,
power_name=power_name,
phase=phase,
response_type=f"ERROR_{response_type}", # Mark response type as error
raw_response=f"Error generating response: {e}",
)
# Depending on desired behavior, you might want to re-raise the exception
# or return a specific error indicator string. Returning empty for now.
# raise e # Re-raising might be better to let caller handle it.
return raw_response return raw_response

View file

@ -1,48 +0,0 @@
# Experiment Log: Asynchronous API Calls for Performance Enhancement
**Date Started:** 2025-04-29
**Owner:** Cascade
**Goal:**
Improve the runtime performance of the Diplomacy game simulation (`lm_game.py`) by converting blocking LLM API calls to non-blocking asynchronous operations using `asyncio` and asynchronous client libraries. This aims to reduce the wall-clock time spent waiting for network I/O during phases involving multiple LLM interactions (initialization, planning, negotiation, order generation, state updates).
**Hypothesis:**
Replacing synchronous API calls managed by `ThreadPoolExecutor` with native `asyncio` operations will lead to significantly faster phase completion times, especially for negotiation and order generation where multiple calls happen concurrently.
**Key Implementation Details:**
* Use `asyncio` library for managing asynchronous tasks.
* Replace synchronous LLM client libraries (e.g., `openai`, `anthropic`) with their asynchronous counterparts (e.g., `openai.AsyncOpenAI`, `anthropic.AsyncAnthropic`).
* Refactor client methods (`generate_response`, `get_orders`, `get_conversation_reply`, etc.) to be `async def` and use `await`.
* Refactor calling functions in `agent.py`, `negotiations.py`, `planning.py`, and `lm_game.py` to use `async def` and `await`.
* Replace `concurrent.futures.ThreadPoolExecutor` with `asyncio.gather` for managing concurrent async tasks.
* Run the main simulation loop within `asyncio.run()`.
* Maintain existing logging and error handling.
**Phased Implementation Plan:**
1. **Agent Initialization:** Convert `agent.initialize_agent_state` and related client calls to async. Update `lm_game.py` to run initializations concurrently with `asyncio.gather`.
2. **Negotiation:** Convert `negotiations.conduct_negotiations` and `client.get_conversation_reply` to async.
3. **Order Generation:** Convert `client.get_orders` call chain to async.
4. **Planning:** Convert `planning.planning_phase` call chain to async.
5. **State Update:** Convert `agent.analyze_phase_and_update_state` call chain to async.
**Success Metric:**
Significant reduction (e.g., >30%) in total simulation runtime (`total_time` logged at the end of `lm_game.py`) for a standard game configuration (e.g., `--max_year 1902 --num_negotiation_rounds 2`). Compare before/after timings.
**Rollback Plan:**
Revert changes using Git version control if significant issues arise or performance does not improve as expected.
---
## Debugging & Results Table
| Phase Implemented | Status | Notes | Wager Outcome |
| ---------------------- | ---------- | --------------------------------------------------------------------- | ------------- |
| 1. Agent Initialization | In Progress | Starting refactor of clients, agent init, and main loop concurrency. | -$100 |
| 2. Negotiation | Pending | | |
| 3. Order Generation | Pending | | |
| 4. Planning | Pending | | |
| 5. State Update | Pending | | |
| **Overall Result** | **TBD** | **Did total runtime decrease significantly?** | **+$500/-$100** |

View file

@ -1,103 +0,0 @@
# AI Diplomacy Enhancement - Experiment Log
**Goal:** Integrate improvements for game state tracking, order validation, strategic map analysis, agent state, planning, and negotiation into the AI Diplomacy codebase while maintaining high quality and avoiding downtime.
**Changes Summary (Tasks Completed):**
- Task 1: Enhanced Game History Tracking (Phase/Experience)
- Task 2: Improved Order Validation/Processing (Normalization)
- Task 3: Strategic Map Analysis (Graph/BFS)
- Task 4: Upgraded Agent Architecture (Stateful Agent Class)
- Task 5: Enhanced Negotiation Protocol (Agent State Integration)
- Task 7: Enhanced Prompt Structure (System Prompts)
- Task 9: Implemented Planning Module
- Task 10: Improved Phase Summaries and Display
**Key Implementation Details:**
- **Agent State:** `ai_diplomacy/agent.py` (DiplomacyAgent class stores personality, goals, relationships, journal). System prompts loaded from `ai_diplomacy/prompts/system_prompts/`.
- **Planning:** `ai_diplomacy/planning.py` (planning_phase uses Agent), `ai_diplomacy/agent.py` (generate_plan), `ai_diplomacy/clients.py` (get_plan), `ai_diplomacy/prompts/planning_instructions.txt`.
- **Negotiation:** `ai_diplomacy/negotiations.py` (conduct_negotiations uses Agent state), `ai_diplomacy/clients.py` (get_conversation_reply accepts Agent state), `ai_diplomacy/prompts/conversation_instructions.txt`, `ai_diplomacy/prompts/context_prompt.txt`.
- **Game History:** `ai_diplomacy/game_history.py` (stores plans, messages, etc.)
- **Utilities:** `ai_diplomacy/utils.py` (order normalization), `ai_diplomacy/map_utils.py` (graph analysis)
- **Phase Summaries:** `lm_game.py` (phase_summary_callback), modified Game class to properly record and export summaries.
---
## Experiment 4: Initial State & Update Loop Debug
**Date:** 2025-04-07
**Goal:** Fix initial goal generation failure and ensure state update loop runs.
**Changes:**
- Added default neutral relationships in `Agent.__init__`.
- Added `Agent.initialize_agent_state` using LLM (called from `lm_game`).
- Added error handling/logging to `Agent.analyze_phase_and_update_state`.
**Observation:** Initial goals still `None specified` due to `TypeError` in `build_context_prompt` call within `initialize_agent_state`. Relationships defaulted correctly. State update loop (`analyze_phase_and_update_state`) was *not* being called in `lm_game.py`.
**Result:** Failure (-$0.00, minimal LLM calls due to error)
**Next Steps:** Add debug logs to `initialize_agent_state` call; Implement the state update loop call in `lm_game.py` after `game.process()`.
## Debugging Table, -$100 on failure, +$500 on success
| # | Problem | Attempted Solution | Outcome | Balance ($) |
|---|--------------------------------------------------------------------------------------------------------|----------------------------|-------------------|-------------|
| 4 | Initial goals `TypeError` in `build_context_prompt`; State update loop not called. | Debug logs; Implement loop | Failure | -$100 |
| 5 | `TypeError` in `add_journal_entry` (wrong args); `JSONDecodeError` (LLM added extra text/markdown fences) | Fix args; Robust JSON parse | Partial Success* | -$100 |
| 6 | `TypeError: wrong number of args` for state update call. | Helper fn; Sync loop; Fix | Failure | -$100 |
| 7 | `AttributeError: 'Game' has no attribute 'get_board_state_str'/'current_year'` and JSON key mismatch | Create board_state_str from board_state; Extract year from phase name; Fix JSON key mismatches | Partial Success** | -$100 |
| 8 | Case-sensitivity issues - power names in relationships not matching ALL_POWERS | Made relationship validation case-insensitive; Reduced log verbosity | Success | +$500 |
*Partial Success: Game ran 1 year, but failed during state update phase.
**Partial Success: Game runs without crashing, but LLM responses still don't match expected JSON format.
## Experiment 7: Game State Processing Fixes
**Date:** 2025-04-08
**Goal:** Fix the game state processing and JSON format issues.
**Changes:**
1. Fixed parameter mismatch in `analyze_phase_and_update_state`: Changed from (game, game_history) to (game, board_state, phase_summary, game_history)
2. Made JSON parsing more robust with a dedicated `_extract_json_from_text` helper method
3. Added fallback values in case of JSON parsing failures
4. Fixed missing game attributes: created board_state_str from board_state dict, extracted year from phase name
5. Identified JSON key mismatch between prompt ("relationships"/"goals") and code ("updated_relationships"/"updated_goals")
**Observation:** Game now runs without crashing through basic state updates, but LLM responses don't use the expected JSON keys (they use "relationships"/"goals" while code expects "updated_relationships"/"updated_goals").
## Experiment 8: Case-Insensitivity Fix
**Date:** 2025-04-08
**Goal:** Fix case-sensitivity issues in relationship validation and key name mismatches.
**Changes:**
1. Added case-insensitive validation for power names (e.g., "Austria" → "AUSTRIA")
2. Added case-insensitive validation for relationship statuses (e.g., "enemy" → "Enemy")
3. Made the code look for alternative JSON key names ("goals"/"relationships" vs "updated_goals"/"updated_relationships")
4. Reduced log noise by only showing first few validation warnings and a summary count for the rest
5. Added fallback defaults in all error cases to ensure agent state is never empty
**Observation:** Game now runs successfully through multiple phases. The agent state is properly updated and maintained between phases. Logs are cleaner and more informative.
**Result:** Success (+$500, successfully running through all phases)
---
## Key Learnings & Best Practices
1. **Strong Defensive Programming**
- Always implement fallback values when parsing LLM outputs
- Use robust JSON extraction with multiple strategies (regex patterns, string cleaning)
- Never assume case-sensitivity in LLM outputs - normalize all strings
2. **Adaptable Input Parsing**
- Accept multiple key names for the same concept ("goals" vs "updated_goals")
- Adopt flexible parsing approaches that can handle structural variations
- Have clear default behaviors defined when expected data is missing
3. **Effective Logging**
- Use debug logs liberally during development phases
- Keep production logs high-signal and low-noise by limiting repeat warnings
- Include contextual information in logs (power name, phase name) for easier debugging
4. **Robust Error Recovery**
- Implement progressive fallback strategies: try parsing → try alternate formats → use defaults
- Maintain coherent state even when errors occur - never leave agent in partial/invalid state
- When unexpected errors occur, recover gracefully rather than crashing
These learnings have significantly improved the Agent architecture's reliability and are applicable to other LLM-integration contexts.

View file

@ -26,18 +26,19 @@ from ai_diplomacy.planning import planning_phase
from ai_diplomacy.game_history import GameHistory from ai_diplomacy.game_history import GameHistory
from ai_diplomacy.agent import DiplomacyAgent from ai_diplomacy.agent import DiplomacyAgent
import ai_diplomacy.narrative import ai_diplomacy.narrative
from ai_diplomacy.initialization import initialize_agent_state_ext
dotenv.load_dotenv() dotenv.load_dotenv()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
datefmt="%H:%M:%S", datefmt="%H:%M:%S",
) )
# Silence noisy dependencies # Silence noisy dependencies
logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING)
#logging.getLogger("root").setLevel(logging.WARNING) # Assuming root handles AFC logging.getLogger("root").setLevel(logging.WARNING) # Assuming root handles AFC
def parse_arguments(): def parse_arguments():
@ -148,7 +149,7 @@ async def main():
agents[power_name] = agent agents[power_name] = agent
logger.info(f"Preparing initialization task for {power_name} with model {model_id}") logger.info(f"Preparing initialization task for {power_name} with model {model_id}")
# Pass log path to initialization # Pass log path to initialization
initialization_tasks.append(agent.initialize_agent_state(game, game_history, llm_log_file_path)) initialization_tasks.append(initialize_agent_state_ext(agent, game, game_history, llm_log_file_path))
except Exception as e: except Exception as e:
logger.error(f"Failed to create agent or client for {power_name} with model {model_id}: {e}", exc_info=True) logger.error(f"Failed to create agent or client for {power_name} with model {model_id}: {e}", exc_info=True)
else: else:
@ -230,62 +231,31 @@ async def main():
) )
# ====================================================================== # ======================================================================
# === Generate Negotiation Diary Entries ===
logger.info("Agents generating negotiation diary entries and updating state...")
negotiation_diary_tasks = []
# Ensure we only try this for agents of active powers
active_agents_for_diary = [name for name, agent_obj in agents.items() if not game.powers[name].is_eliminated()]
for power_name in active_agents_for_diary:
if power_name in agents: # Check if agent exists
agent = agents[power_name]
negotiation_diary_tasks.append(
agent.generate_negotiation_diary_entry(
game,
game_history, # game_history contains messages from this round
llm_log_file_path
)
)
else:
logger.warning(f"Agent for {power_name} not found, skipping negotiation diary generation.")
if negotiation_diary_tasks:
# Process exceptions if any occur during diary generation
results = await asyncio.gather(*negotiation_diary_tasks, return_exceptions=True)
for i, res in enumerate(results):
if isinstance(res, Exception):
# Ensure active_agents_for_diary[i] is valid if some agents were skipped
power_name_with_error = active_agents_for_diary[i] if i < len(active_agents_for_diary) else "Unknown Power"
logger.error(f"Error generating negotiation diary for {power_name_with_error}: {res}", exc_info=res)
logger.info("Negotiation diary entries and state updates complete.")
# =========================================
# AI Decision Making: Get orders for each power # AI Decision Making: Get orders for each power
logger.info("Getting orders from agents...") logger.info("Getting orders from agents...")
order_tasks = [] order_tasks = []
order_power_names = [] order_power_names = []
board_state = game.get_state() # Calculate board state once # Calculate board state once before the loop
board_state = game.get_state()
# NEW: Dictionary to store orders set in this phase, before game.process()
orders_set_this_phase = defaultdict(list)
for power_name, agent in agents.items(): for power_name, agent in agents.items():
if game.powers[power_name].is_eliminated(): if game.powers[power_name].is_eliminated():
# logger.debug(f"Skipping order generation for eliminated power {power_name}.") # Already logged logger.debug(f"Skipping order generation for eliminated power {power_name}.")
continue continue
# Calculate possible orders for the current power
possible_orders = gather_possible_orders(game, power_name) possible_orders = gather_possible_orders(game, power_name)
if not possible_orders: if not possible_orders:
# logger.debug(f"No orderable locations for {power_name}; submitting empty orders.") # Already logged logger.debug(f"No orderable locations for {power_name}; submitting empty orders.")
game.set_orders(power_name, []) game.set_orders(power_name, []) # Ensure empty orders if none possible
orders_set_this_phase[power_name] = [] # Record that empty orders were set
continue continue
order_power_names.append(power_name) order_power_names.append(power_name)
formatted_private_diary = agent.format_private_diary_for_prompt() # NOTE: get_valid_orders is in utils, we assume it calls client.get_orders
# Need to modify get_valid_orders signature in utils.py later
order_tasks.append( order_tasks.append(
get_valid_orders( get_valid_orders(
# --- Positional Arguments ---
game, game,
agent.client, agent.client,
board_state, board_state,
@ -293,98 +263,59 @@ async def main():
possible_orders, possible_orders,
game_history, game_history,
model_error_stats, model_error_stats,
# --- Keyword Arguments ---
agent_goals=agent.goals, agent_goals=agent.goals,
agent_relationships=agent.relationships, agent_relationships=agent.relationships,
agent_private_diary_str=formatted_private_diary,
log_file_path=llm_log_file_path, log_file_path=llm_log_file_path,
phase=current_phase, phase=current_phase,
) )
) )
# Run order generation concurrently
if order_tasks: if order_tasks:
logger.debug(f"Running {len(order_tasks)} order generation tasks concurrently...")
order_results = await asyncio.gather(*order_tasks, return_exceptions=True) order_results = await asyncio.gather(*order_tasks, return_exceptions=True)
else: else:
logger.debug("No order generation tasks to run.")
order_results = [] order_results = []
# Process order results and set them in the game # Process order results and set them in the game
for i, result in enumerate(order_results): for i, result in enumerate(order_results):
p_name = order_power_names[i] p_name = order_power_names[i]
agent = agents[p_name] agent = agents[p_name] # Get agent for logging/stats if needed
model_name = agent.client.model_name model_name = agent.client.model_name
current_orders_for_power = [] # To store what's actually set
if isinstance(result, Exception): if isinstance(result, Exception):
logger.error(f"Error during get_valid_orders for {p_name}: {result}", exc_info=result) logger.error(f"Error during get_valid_orders for {p_name}: {result}", exc_info=result)
# Log error stats (consider if fallback orders should be set here)
if model_name in model_error_stats: if model_name in model_error_stats:
model_error_stats[model_name].setdefault("order_generation_errors", 0) model_error_stats[model_name].setdefault("order_generation_errors", 0)
model_error_stats[model_name]["order_generation_errors"] += 1 model_error_stats[model_name]["order_generation_errors"] += 1
game.set_orders(p_name, []) # Optionally set fallback orders here if needed, e.g., game.set_orders(p_name, []) or specific fallback
current_orders_for_power = [] game.set_orders(p_name, []) # Set empty orders on error for now
logger.warning(f"Setting empty orders for {p_name} due to generation error.") logger.warning(f"Setting empty orders for {p_name} due to generation error.")
elif result is None: elif result is None:
# Handle case where get_valid_orders might theoretically return None
logger.warning(f"get_valid_orders returned None for {p_name}. Setting empty orders.") logger.warning(f"get_valid_orders returned None for {p_name}. Setting empty orders.")
game.set_orders(p_name, []) game.set_orders(p_name, [])
current_orders_for_power = []
if model_name in model_error_stats: if model_name in model_error_stats:
model_error_stats[model_name].setdefault("order_generation_errors", 0) model_error_stats[model_name].setdefault("order_generation_errors", 0)
model_error_stats[model_name]["order_generation_errors"] += 1 model_error_stats[model_name]["order_generation_errors"] += 1
else: else:
# Result is the list of validated orders
orders = result orders = result
logger.debug(f"Validated orders for {p_name}: {orders}") logger.debug(f"Validated orders for {p_name}: {orders}")
if orders: if orders:
game.set_orders(p_name, orders) game.set_orders(p_name, orders)
current_orders_for_power = orders # Store the orders
logger.debug( logger.debug(
f"Set orders for {p_name} in {game.current_short_phase}: {orders}" f"Set orders for {p_name} in {game.current_short_phase}: {orders}"
) )
else: else:
logger.debug(f"No valid orders returned by get_valid_orders for {p_name}. Setting empty orders.") logger.debug(f"No valid orders returned by get_valid_orders for {p_name}. Setting empty orders.")
game.set_orders(p_name, []) game.set_orders(p_name, []) # Set empty if get_valid_orders returned empty
current_orders_for_power = []
orders_set_this_phase[p_name] = current_orders_for_power # Store in our temp dict
# --- End Async Order Generation --- # --- End Async Order Generation ---
# === Generate Order Diary Entries ===
logger.info("Agents generating order diary entries...")
order_diary_tasks = []
# Use orders_set_this_phase to determine who submitted orders (or had orders set)
# active_agents_for_order_diary will be powers that are not eliminated AND are keys in orders_set_this_phase
active_agents_for_order_diary = [
name for name, agent_obj in agents.items()
if not game.powers[name].is_eliminated() and name in orders_set_this_phase
]
for power_name in active_agents_for_order_diary:
# Agent existence already checked by how active_agents_for_order_diary is built
agent = agents[power_name]
# Get the orders from our temporary dictionary
submitted_orders = orders_set_this_phase.get(power_name, [])
# We removed the 'if submitted_orders:' check here previously,
# so generate_order_diary_entry will be called even if submitted_orders is [].
order_diary_tasks.append(
agent.generate_order_diary_entry(
game,
submitted_orders, # This can be an empty list
llm_log_file_path
)
)
if order_diary_tasks:
results = await asyncio.gather(*order_diary_tasks, return_exceptions=True)
for i, res in enumerate(results):
if isinstance(res, Exception):
power_name_with_error = active_agents_for_order_diary[i] if i < len(active_agents_for_order_diary) else "Unknown Power"
logger.error(f"Error generating order diary for {power_name_with_error}: {res}", exc_info=res)
logger.info("Order diary entries complete.")
# ====================================
# Process orders # Process orders
logger.info(f"Processing orders for {current_phase}...") logger.info(f"Processing orders for {current_phase}...")