diff --git a/ai_diplomacy/agent.py b/ai_diplomacy/agent.py index 60ce295..b147ad9 100644 --- a/ai_diplomacy/agent.py +++ b/ai_diplomacy/agent.py @@ -7,7 +7,7 @@ import re # Assuming BaseModelClient is importable from clients.py in the same directory from .clients import BaseModelClient # Import load_prompt and the new logging wrapper from utils -from .utils import load_prompt, run_llm_and_log +from .utils import load_prompt, run_llm_and_log, log_llm_response logger = logging.getLogger(__name__) @@ -182,78 +182,133 @@ class DiplomacyAgent: async def generate_negotiation_diary_entry(self, game: 'Game', game_history: 'GameHistory', log_file_path: str): """ Generates a diary entry summarizing negotiations and updates relationships. + This method now includes comprehensive LLM interaction logging. """ - logger.info(f"[{self.power_name}] Generating negotiation diary entry for {game.current_short_phase}...") + logger.info(f"[{self.power_name}] Generating negotiation diary entry for {game.current_short_phase}..." ) - prompt_template = _load_prompt_file('negotiation_diary_prompt.txt') - if not prompt_template: - logger.error(f"[{self.power_name}] Could not load negotiation_diary_prompt.txt. Skipping diary entry.") - return + full_prompt = "" # For logging in finally block + raw_response = "" # For logging in finally block + success_status = "Failure: Initialized" # Default - # Prepare context for the prompt - board_state_dict = game.get_state() - board_state_str = f"Units: {board_state_dict.get('units', {})}, Centers: {board_state_dict.get('centers', {})}" - - messages_this_round = game_history.get_messages_this_round( - power_name=self.power_name, - current_phase_name=game.current_short_phase - ) - if not messages_this_round.strip() or messages_this_round.startswith("\n(No messages"): - messages_this_round = "(No messages exchanged this negotiation round)" - - goals_str = "\n".join([f"- {g}" for g in self.goals]) if self.goals else "None" - relationships_str = "\n".join([f"- {p}: {s}" for p, s in self.relationships.items()]) if self.relationships else "None" - - prompt = prompt_template.format( - power_name=self.power_name, - current_phase=game.current_short_phase, - messages_this_round=messages_this_round, - agent_goals=goals_str, - agent_relationships=relationships_str, - board_state_str=board_state_str - ) - - response_data = None try: + prompt_template_content = _load_prompt_file('negotiation_diary_prompt.txt') + if not prompt_template_content: + logger.error(f"[{self.power_name}] Could not load negotiation_diary_prompt.txt. Skipping diary entry.") + success_status = "Failure: Prompt file not loaded" + # No LLM call, so log_llm_response won't have typical LLM data, but we still log the attempt. + # Or, decide not to log if no LLM call is even attempted. For consistency, let's log an attempt. + # To do that, we'd need to call log_llm_response here or ensure finally block handles it. + # For now, the finally block will catch this, but raw_response and full_prompt will be empty. + return # Exit early if prompt is critical + + # Prepare context for the prompt + board_state_dict = game.get_state() + board_state_str = f"Units: {board_state_dict.get('units', {})}, Centers: {board_state_dict.get('centers', {})}" + + messages_this_round = game_history.get_messages_this_round( + power_name=self.power_name, + current_phase_name=game.current_short_phase + ) + if not messages_this_round.strip() or messages_this_round.startswith("\n(No messages"): + messages_this_round = "(No messages involving your power this round that require deep reflection for diary. Focus on overall situation.)" + + current_relationships_str = json.dumps(self.relationships) + current_goals_str = json.dumps(self.goals) + formatted_diary = self.format_private_diary_for_prompt() + + full_prompt = prompt_template_content.format( + power_name=self.power_name, + current_phase=game.current_short_phase, + board_state=board_state_str, + messages_this_round=messages_this_round, + current_relationships=current_relationships_str, + current_goals=current_goals_str, + private_diary_summary=formatted_diary, # Pass formatted diary + allowed_relationships_str=", ".join(ALLOWED_RELATIONSHIPS) + ) + + logger.debug(f"[{self.power_name}] Negotiation diary prompt:\n{full_prompt[:500]}...") + raw_response = await run_llm_and_log( client=self.client, - prompt=prompt, - log_file_path=log_file_path, + prompt=full_prompt, + log_file_path=log_file_path, # Pass the main log file path power_name=self.power_name, phase=game.current_short_phase, - response_type='negotiation_diary', + response_type='negotiation_diary_raw' # For run_llm_and_log context ) - response_data = self._extract_json_from_text(raw_response) - except Exception as e: - logger.error(f"[{self.power_name}] Error generating or parsing negotiation diary: {e}", exc_info=True) - self.add_diary_entry(f"Error generating negotiation diary: {e}", game.current_short_phase) - return - if response_data: - summary = response_data.get("negotiation_summary", "(No summary provided)") - intent = response_data.get("intent", "(No intent stated)") - diary_text = f"Negotiation Summary: {summary}\nIntent for Orders: {intent}" - self.add_diary_entry(diary_text, game.current_short_phase) + logger.debug(f"[{self.power_name}] Raw negotiation diary response: {raw_response[:300]}...") - # Update relationships - rship_updates = response_data.get("relationship_updates", {}) - if isinstance(rship_updates, dict): - updated_count = 0 - for power, status in rship_updates.items(): - power_upper = power.upper() # Normalize - if power_upper in ALL_POWERS and power_upper != self.power_name and status in ALLOWED_RELATIONSHIPS: - if self.relationships.get(power_upper) != status: - self.relationships[power_upper] = status - self.add_journal_entry(f"[{game.current_short_phase}] Relationship with {power_upper} updated to {status} via diary.") - updated_count +=1 + parsed_data = None + try: + parsed_data = self._extract_json_from_text(raw_response) + logger.debug(f"[{self.power_name}] Parsed diary data: {parsed_data}") + success_status = "Success: Parsed diary data" + except json.JSONDecodeError as e: + logger.error(f"[{self.power_name}] Failed to parse JSON from diary response: {e}. Response: {raw_response[:300]}...") + success_status = "Failure: JSONDecodeError" + # Continue without parsed_data, rely on diary_entry_text if available or just log failure + + diary_entry_text = "(LLM diary entry generation or parsing failed.)" # Fallback + relationships_updated = False + + if parsed_data: + diary_entry_text = parsed_data.get('diary_entry', diary_entry_text) + # Update relationships if provided and valid + new_relationships = parsed_data.get('updated_relationships') + if isinstance(new_relationships, dict): + valid_new_rels = {} + for p, r in new_relationships.items(): + p_upper = str(p).upper() + r_title = str(r).title() + if p_upper in ALL_POWERS and p_upper != self.power_name and r_title in ALLOWED_RELATIONSHIPS: + valid_new_rels[p_upper] = r_title + elif p_upper != self.power_name: # Log invalid relationship for a valid power + logger.warning(f"[{self.power_name}] Invalid relationship '{r}' for power '{p}' in diary update. Keeping old.") + + if valid_new_rels: + # Log changes before applying + for p_changed, new_r_val in valid_new_rels.items(): + old_r_val = self.relationships.get(p_changed, "Unknown") + if old_r_val != new_r_val: + logger.info(f"[{self.power_name}] Relationship with {p_changed} changing from {old_r_val} to {new_r_val} based on diary.") + self.relationships.update(valid_new_rels) + relationships_updated = True + success_status = "Success: Applied diary data (relationships updated)" else: - logger.warning(f"[{self.power_name}] Invalid relationship update from diary: {power}-{status}") - if updated_count > 0: - logger.info(f"[{self.power_name}] Updated {updated_count} relationships based on negotiation diary.") - else: - logger.warning(f"[{self.power_name}] Relationship updates from diary not in expected dict format: {rship_updates}") - else: - self.add_diary_entry("Failed to generate negotiation summary and intent.", game.current_short_phase) + logger.info(f"[{self.power_name}] No valid relationship updates found in diary response.") + if success_status == "Success: Parsed diary data": # If only parsing was successful before + success_status = "Success: Parsed, no valid relationship updates" + elif new_relationships is not None: # It was provided but not a dict + logger.warning(f"[{self.power_name}] 'updated_relationships' from diary LLM was not a dictionary: {type(new_relationships)}") + + # Add the generated (or fallback) diary entry + self.add_diary_entry(diary_entry_text, game.current_short_phase) + if relationships_updated: + self.add_journal_entry(f"[{game.current_short_phase}] Relationships updated after negotiation diary: {self.relationships}") + + # If success_status is still the default 'Parsed diary data' but no relationships were updated, refine it. + if success_status == "Success: Parsed diary data" and not relationships_updated: + success_status = "Success: Parsed, only diary text applied" + + except Exception as e: + logger.error(f"[{self.power_name}] Error in generate_negotiation_diary_entry: {e}", exc_info=True) + success_status = f"Failure: Exception ({type(e).__name__})" + # Add a fallback diary entry in case of general error + self.add_diary_entry(f"(Error generating diary entry: {type(e).__name__})", game.current_short_phase) + finally: + if log_file_path: # Ensure log_file_path is provided + log_llm_response( + log_file_path=log_file_path, + model_name=self.client.model_name if self.client else "UnknownModel", + power_name=self.power_name, + phase=game.current_short_phase if game else "UnknownPhase", + response_type="negotiation_diary", # Specific type for CSV logging + raw_input_prompt=full_prompt, + raw_response=raw_response, + success=success_status + ) async def generate_order_diary_entry(self, game: 'Game', orders: List[str], log_file_path: str): """ @@ -283,174 +338,83 @@ class DiplomacyAgent: ) response_data = None - raw_response = None + raw_response = None # Initialize raw_response try: raw_response = await run_llm_and_log( client=self.client, - prompt=prompt, + prompt=prompt, log_file_path=log_file_path, power_name=self.power_name, phase=game.current_short_phase, - response_type='order_diary', + response_type='order_diary' + # raw_input_prompt=prompt, # REMOVED from run_llm_and_log ) - response_data = self._extract_json_from_text(raw_response) - except Exception as e: - logger.error(f"[{self.power_name}] Error generating or parsing order diary: {e}", exc_info=True) - logger.error(raw_response) - #self.add_diary_entry(f"Error generating order reflection diary: {e}", game.current_short_phase) - return - if response_data: - order_summary = response_data.get("order_summary", "(Order summary missing)") - logger.info('Order summary: ' + str(order_summary)) - self.add_diary_entry(f"Order Summary: {order_summary}", game.current_short_phase) - else: - logger.error("Failed to generate order summary.") - #self.add_diary_entry("Failed to generate order summary.", game.current_short_phase) + success_status = "FALSE" + response_data = None + actual_diary_text = None # Variable to hold the final diary text - - def get_relationships(self) -> Dict[str, str]: - """Returns a copy of the agent's current relationships with other powers.""" - return self.relationships.copy() - - # Make the initialization method asynchronous - async def initialize_agent_state(self, game: 'Game', game_history: 'GameHistory', log_file_path: str): - """Uses the LLM to set initial goals based on the starting game state.""" - logger.info(f"[{self.power_name}] Initializing agent state using LLM...") - current_phase = game.get_current_phase() # Get phase for logging - try: - # Use a simplified prompt for initial state generation - # TODO: Create a dedicated 'initial_state_prompt.txt' - allowed_labels_str = ", ".join(ALLOWED_RELATIONSHIPS) - initial_prompt = f"You are the agent for {self.power_name} in a game of Diplomacy at the very start (Spring 1901). " \ - f"Analyze the initial board position and suggest 2-3 strategic high-level goals for the early game. " \ - f"Consider your power's strengths, weaknesses, and neighbors. " \ - f"Also, provide an initial assessment of relationships with other powers. " \ - f"IMPORTANT: For each relationship, you MUST use exactly one of the following labels: {allowed_labels_str}. " \ - f"Format your response as a JSON object with two keys: 'initial_goals' (a list of strings) and 'initial_relationships' (a dictionary mapping power names to one of the allowed relationship strings)." - - # == Fix: Get required state info from game object == - board_state = game.get_state() - possible_orders = game.get_all_possible_orders() - - # == Add detailed logging before call == - logger.debug(f"[{self.power_name}] Preparing context for initial state. Got board_state type: {type(board_state)}, possible_orders type: {type(possible_orders)}, game_history type: {type(game_history)}") - logger.debug(f"[{self.power_name}] Calling build_context_prompt with game: {game is not None}, board_state: {board_state is not None}, power_name: {self.power_name}, possible_orders: {possible_orders is not None}, game_history: {game_history is not None}") - - # Get formatted diary for context (will be empty at initialization) - formatted_diary = self.format_private_diary_for_prompt() - - context = self.client.build_context_prompt( - game=game, - board_state=board_state, # Pass board_state - power_name=self.power_name, - possible_orders=possible_orders, # Pass possible_orders - game_history=game_history, # Pass game_history - agent_goals=None, # No goals yet - agent_relationships=None, # No relationships yet (defaults used in prompt) - agent_private_diary=formatted_diary, # Pass formatted diary - ) - full_prompt = initial_prompt + "\n\n" + context - - # Await the asynchronous client call USING THE WRAPPER - response = await run_llm_and_log( - client=self.client, - prompt=full_prompt, - log_file_path=log_file_path, - power_name=self.power_name, - phase=current_phase, - response_type='initialization', - ) - logger.debug(f"[{self.power_name}] LLM response for initial state: {response}") - - # Try to extract JSON from the response - try: - update_data = self._extract_json_from_text(response) - logger.debug(f"[{self.power_name}] Successfully parsed JSON: {update_data}") - except json.JSONDecodeError as e: - logger.error(f"[{self.power_name}] All JSON extraction attempts failed: {e}") - # Create default data rather than failing - update_data = { - "initial_goals": ["Survive and expand", "Form beneficial alliances", "Secure key territories"], - "initial_relationships": {p: "Neutral" for p in ALL_POWERS if p != self.power_name}, - "goals": ["Survive and expand", "Form beneficial alliances", "Secure key territories"], - "relationships": {p: "Neutral" for p in ALL_POWERS if p != self.power_name} - } - logger.warning(f"[{self.power_name}] Using default goals and relationships: {update_data}") - - # Check for both possible key names - initial_goals = update_data.get('initial_goals') - if initial_goals is None: - initial_goals = update_data.get('goals') - if initial_goals is not None: - logger.debug(f"[{self.power_name}] Using 'goals' key instead of 'initial_goals'") - - initial_relationships = update_data.get('initial_relationships') - if initial_relationships is None: - initial_relationships = update_data.get('relationships') - if initial_relationships is not None: - logger.debug(f"[{self.power_name}] Using 'relationships' key instead of 'initial_relationships'") - - if isinstance(initial_goals, list): - self.goals = initial_goals - # == Fix: Correct add_journal_entry call signature == - self.add_journal_entry(f"[{game.current_short_phase}] Initial Goals Set: {self.goals}") - else: - logger.warning(f"[{self.power_name}] LLM did not provide valid 'initial_goals' list.") - # Set default goals - self.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"] - self.add_journal_entry(f"[{game.current_short_phase}] Set default initial goals: {self.goals}") - - if isinstance(initial_relationships, dict): - # Validate relationship keys and values - valid_relationships = {} - invalid_count = 0 - - for p, r in initial_relationships.items(): - # Convert power name to uppercase for case-insensitive matching - p_upper = p.upper() - if p_upper in ALL_POWERS and p_upper != self.power_name: - # Check against allowed labels (case-insensitive) - r_title = r.title() if isinstance(r, str) else r # Convert "enemy" to "Enemy" etc. - if r_title in ALLOWED_RELATIONSHIPS: - valid_relationships[p_upper] = r_title + if raw_response: + try: + response_data = self._extract_json_from_text(raw_response) + if response_data: + diary_text_candidate = response_data.get("diary_entry") + if isinstance(diary_text_candidate, str) and diary_text_candidate.strip(): + actual_diary_text = diary_text_candidate + success_status = "TRUE" else: - invalid_count += 1 - if invalid_count <= 2: # Only log first few to reduce noise - logger.warning(f"[{self.power_name}] Received invalid relationship label '{r}' for '{p}'. Setting to Neutral.") - valid_relationships[p_upper] = "Neutral" - else: - invalid_count += 1 - if invalid_count <= 2 and not p_upper.startswith(self.power_name): # Only log first few to reduce noise - logger.warning(f"[{self.power_name}] Received relationship for invalid/own power '{p}'. Ignoring.") - - # Summarize if there were many invalid entries - if invalid_count > 2: - logger.warning(f"[{self.power_name}] {invalid_count} total invalid relationships were processed.") - - # If we have any valid relationships, use them - if valid_relationships: - self.relationships = valid_relationships - self.add_journal_entry(f"[{game.current_short_phase}] Initial Relationships Set: {self.relationships}") - else: - # Set default relationships - logger.warning(f"[{self.power_name}] No valid relationships found, using defaults.") - self.relationships = {p: "Neutral" for p in ALL_POWERS if p != self.power_name} - self.add_journal_entry(f"[{game.current_short_phase}] Set default neutral relationships.") + # Try 'order_summary' if 'diary_entry' is missing or invalid + logger.debug(f"[{self.power_name}] 'diary_entry' missing or invalid. Trying 'order_summary'. Value was: {diary_text_candidate}") + order_summary_candidate = response_data.get("order_summary") + if isinstance(order_summary_candidate, str) and order_summary_candidate.strip(): + actual_diary_text = order_summary_candidate + success_status = "TRUE" + logger.info(f"[{self.power_name}] Used 'order_summary' for order diary entry.") + else: + logger.warning(f"[{self.power_name}] Both 'diary_entry' and 'order_summary' missing, invalid, or empty. 'diary_entry': {diary_text_candidate}, 'order_summary': {order_summary_candidate}") + success_status = "FALSE" + # If response_data is None (JSON parsing failed), success_status remains "FALSE" + except Exception as e: + logger.error(f"[{self.power_name}] Error parsing order diary JSON: {e}. Raw response: {raw_response[:200]} ", exc_info=False) + # success_status remains "FALSE" + + log_llm_response( + log_file_path=log_file_path, + model_name=self.client.model_name, + power_name=self.power_name, + phase=game.current_short_phase, + response_type='order_diary', + raw_input_prompt=prompt, # ENSURED + raw_response=raw_response if raw_response else "", + success=success_status + ) + + if success_status == "TRUE" and actual_diary_text: + self.add_diary_entry(actual_diary_text, game.current_short_phase) + logger.info(f"[{self.power_name}] Order diary entry generated and added.") else: - logger.warning(f"[{self.power_name}] LLM did not provide valid 'initial_relationships' dict.") - # Set default relationships - self.relationships = {p: "Neutral" for p in ALL_POWERS if p != self.power_name} - self.add_journal_entry(f"[{game.current_short_phase}] Set default neutral relationships.") + fallback_diary = f"Submitted orders for {game.current_short_phase}: {', '.join(orders)}. (LLM failed to generate a specific diary entry)" + self.add_diary_entry(fallback_diary, game.current_short_phase) + logger.warning(f"[{self.power_name}] Failed to generate specific order diary entry. Added fallback.") except Exception as e: - logger.error(f"[{self.power_name}] Error during initial state generation: {e}", exc_info=True) - # Set conservative defaults even if everything fails - if not self.goals: - self.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"] - if not self.relationships: - self.relationships = {p: "Neutral" for p in ALL_POWERS if p != self.power_name} - logger.info(f"[{self.power_name}] Set fallback goals and relationships after error.") + # Ensure prompt is defined or handled if it might not be (it should be in this flow) + current_prompt = prompt if 'prompt' in locals() else "[prompt_unavailable_in_exception]" + current_raw_response = raw_response if 'raw_response' in locals() and raw_response is not None else f"Error: {e}" + log_llm_response( + log_file_path=log_file_path, + model_name=self.client.model_name if hasattr(self, 'client') else "UnknownModel", + power_name=self.power_name, + phase=game.current_short_phase if 'game' in locals() and hasattr(game, 'current_short_phase') else "order_phase", + response_type='order_diary_exception', + raw_input_prompt=current_prompt, # ENSURED (using current_prompt for safety) + raw_response=current_raw_response, + success="FALSE" + ) + fallback_diary = f"Submitted orders for {game.current_short_phase}: {', '.join(orders)}. (Critical error in diary generation process)" + self.add_diary_entry(fallback_diary, game.current_short_phase) + logger.warning(f"[{self.power_name}] Added fallback order diary entry due to critical error.") + # Rest of the code remains the same def log_state(self, prefix=""): logger.debug(f"[{self.power_name}] {prefix} State: Goals={self.goals}, Relationships={self.relationships}") @@ -539,21 +503,57 @@ class DiplomacyAgent: ) logger.debug(f"[{power_name}] Raw LLM response for state update: {response}") - # Use our robust JSON extraction helper - try: - update_data = self._extract_json_from_text(response) - logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}") - except json.JSONDecodeError as e: - logger.error(f"[{power_name}] Failed to parse JSON response for state update: {e}") - logger.error(f"[{power_name}] Raw response was: {response}") - # Create fallback data to avoid full failure - update_data = { - "updated_goals": self.goals, # Maintain current goals - "updated_relationships": self.relationships, # Maintain current relationships - "goals": self.goals, # Alternative key - "relationships": self.relationships # Alternative key - } - logger.warning(f"[{power_name}] Using existing goals and relationships as fallback: {update_data}") + log_entry_response_type = 'state_update' # Default for log_llm_response + log_entry_success = "FALSE" # Default + update_data = None # Initialize + + if response is not None and response.strip(): # Check if response is not None and not just whitespace + try: + update_data = self._extract_json_from_text(response) + logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}") + # Check if essential data ('updated_goals' or 'goals') is present AND is a list (for goals) + # For relationships, check for 'updated_relationships' or 'relationships' AND is a dict. + # Consider it TRUE if at least one of the primary data structures (goals or relationships) is present and correctly typed. + goals_present_and_valid = isinstance(update_data.get('updated_goals'), list) or isinstance(update_data.get('goals'), list) + rels_present_and_valid = isinstance(update_data.get('updated_relationships'), dict) or isinstance(update_data.get('relationships'), dict) + + if update_data and (goals_present_and_valid or rels_present_and_valid): + log_entry_success = "TRUE" + elif update_data: # Parsed, but maybe not all essential data there or not correctly typed + log_entry_success = "PARTIAL" + log_entry_response_type = 'state_update_partial_data' + else: # Parsed to None or empty dict/list, or data not in expected format + log_entry_success = "FALSE" + log_entry_response_type = 'state_update_parsing_empty_or_invalid_data' + except json.JSONDecodeError as e: + logger.error(f"[{power_name}] Failed to parse JSON response for state update: {e}. Raw response: {response}") + log_entry_response_type = 'state_update_json_error' + # log_entry_success remains "FALSE" + else: # response was None or empty/whitespace + logger.error(f"[{power_name}] No valid response (None or empty) received from LLM for state update.") + log_entry_response_type = 'state_update_no_response' + # log_entry_success remains "FALSE" + + # Log the attempt and its outcome + log_llm_response( + log_file_path=log_file_path, + model_name=self.client.model_name, + power_name=power_name, + phase=current_phase, + response_type=log_entry_response_type, + raw_input_prompt=prompt, # ENSURED + raw_response=response if response is not None else "", # Handle if response is None + success=log_entry_success + ) + + # Fallback logic if update_data is still None or not usable + if not update_data or not (isinstance(update_data.get('updated_goals'), list) or isinstance(update_data.get('goals'), list) or isinstance(update_data.get('updated_relationships'), dict) or isinstance(update_data.get('relationships'), dict)): + logger.warning(f"[{power_name}] update_data is None or missing essential valid structures after LLM call. Using existing goals and relationships as fallback.") + update_data = { + "updated_goals": self.goals, + "updated_relationships": self.relationships, + } + logger.warning(f"[{power_name}] Using existing goals and relationships as fallback: {update_data}") # Check for both possible key names (prompt uses "goals"/"relationships", # but code was expecting "updated_goals"/"updated_relationships") @@ -624,7 +624,6 @@ class DiplomacyAgent: self.log_state(f"After State Update ({game.current_short_phase})") - def update_goals(self, new_goals: List[str]): """Updates the agent's strategic goals.""" self.goals = new_goals @@ -662,18 +661,4 @@ class DiplomacyAgent: except Exception as e: logger.error(f"Agent {self.power_name} failed to generate plan: {e}") self.add_journal_entry(f"Failed to generate plan for phase {game.current_phase} due to error: {e}") - return "Error: Failed to generate plan." - - # def process_message(self, message, game_phase): - # """Processes an incoming message, updates relationships/journal.""" - # # 1. Analyze message content - # # 2. Update self.relationships based on message - # # 3. Add journal entry about the message and its impact - # pass - - # def generate_message_reply(self, conversation_so_far, game_phase): - # """Generates a reply to a conversation using agent state.""" - # # 1. Consider goals, relationships when crafting reply - # # 2. Delegate to self.client.get_conversation_reply(...) - # # 3. Add journal entry about the generated message - # pass \ No newline at end of file + return "Error: Failed to generate plan." \ No newline at end of file diff --git a/ai_diplomacy/clients.py b/ai_diplomacy/clients.py index e2cb7af..da3eda7 100644 --- a/ai_diplomacy/clients.py +++ b/ai_diplomacy/clients.py @@ -3,7 +3,6 @@ import json from json import JSONDecodeError import re import logging -import ast import asyncio # Added for async operations from typing import List, Dict, Optional, Any @@ -19,9 +18,10 @@ import google.generativeai as genai from diplomacy.engine.message import GLOBAL from .game_history import GameHistory -from .utils import load_prompt, run_llm_and_log +from .utils import load_prompt, run_llm_and_log, log_llm_response # Ensure log_llm_response is imported # Import DiplomacyAgent for type hinting if needed, but avoid circular import if possible # from .agent import DiplomacyAgent +from .possible_order_context import generate_rich_order_context # set logger back to just info logger = logging.getLogger("client") @@ -98,11 +98,11 @@ class BaseModelClient: enemy_units[power] = info enemy_centers[power] = board_state["centers"].get(power, []) - # Get possible orders - possible_orders_str = "" - for loc, orders in possible_orders.items(): - possible_orders_str += f" {loc}: {orders}\n" - + # Get possible orders - REPLACED WITH NEW FUNCTION + # possible_orders_str = "" + # for loc, orders in possible_orders.items(): + # possible_orders_str += f" {loc}: {orders}\n" + possible_orders_context_str = generate_rich_order_context(game, power_name, possible_orders) # Get messages for the current round messages_this_round_text = game_history.get_messages_this_round( @@ -112,15 +112,6 @@ class BaseModelClient: if not messages_this_round_text.strip(): messages_this_round_text = "\n(No messages this round)\n" - # Get history from previous phases - previous_history_text = game_history.get_previous_phases_history( - power_name=power_name, - current_phase_name=year_phase - # include_plans and num_prev_phases will use defaults - ) - if not previous_history_text.strip(): - previous_history_text = "\n(No previous game history)\n" - # Load in current context values # Simplified map representation based on DiploBench approach units_repr = "\n".join([f" {p}: {u}" for p, u in board_state["units"].items()]) @@ -132,8 +123,7 @@ class BaseModelClient: all_unit_locations=units_repr, all_supply_centers=centers_repr, messages_this_round=messages_this_round_text, - previous_game_history=previous_history_text, - possible_orders=possible_orders_str, + possible_orders=possible_orders_context_str, agent_goals="\n".join(f"- {g}" for g in agent_goals) if agent_goals else "None specified", agent_relationships="\n".join(f"- {p}: {s}" for p, s in agent_relationships.items()) if agent_relationships else "None specified", agent_private_diary=agent_private_diary if agent_private_diary else "(No diary entries yet)", # Use new parameter @@ -207,6 +197,9 @@ class BaseModelClient: ) raw_response = "" + # Initialize success status. Will be updated based on outcome. + success_status = "Failure: Initialized" + parsed_orders_for_return = self.fallback_orders(possible_orders) # Default to fallback try: # Call LLM using the logging wrapper @@ -216,10 +209,10 @@ class BaseModelClient: log_file_path=log_file_path, power_name=power_name, phase=phase, - response_type='order', + response_type='order', # Context for run_llm_and_log's own error logging ) logger.debug( - f"[{self.model_name}] Raw LLM response for {power_name}:\n{raw_response}" + f"[{self.model_name}] Raw LLM response for {power_name} orders:\n{raw_response}" ) # Attempt to parse the final "orders" from the LLM @@ -229,17 +222,37 @@ class BaseModelClient: logger.warning( f"[{self.model_name}] Could not extract moves for {power_name}. Using fallback." ) - if model_error_stats is not None: + if model_error_stats is not None and self.model_name in model_error_stats: + model_error_stats[self.model_name].setdefault("order_decoding_errors", 0) model_error_stats[self.model_name]["order_decoding_errors"] += 1 - return self.fallback_orders(possible_orders) - # Validate or fallback - validated_moves = self._validate_orders(move_list, possible_orders) - logger.debug(f"[{self.model_name}] Validated moves for {power_name}: {validated_moves}") - return validated_moves + success_status = "Failure: No moves extracted" + # Fallback is already set to parsed_orders_for_return + else: + # Validate or fallback + validated_moves = self._validate_orders(move_list, possible_orders) + logger.debug(f"[{self.model_name}] Validated moves for {power_name}: {validated_moves}") + parsed_orders_for_return = validated_moves + success_status = "Success" except Exception as e: - logger.error(f"[{self.model_name}] LLM error for {power_name}: {e}") - return self.fallback_orders(possible_orders) + logger.error(f"[{self.model_name}] LLM error for {power_name} in get_orders: {e}", exc_info=True) + success_status = f"Failure: Exception ({type(e).__name__})" + # Fallback is already set to parsed_orders_for_return + finally: + # Log the attempt regardless of outcome + if log_file_path: # Only log if a path is provided + log_llm_response( + log_file_path=log_file_path, + model_name=self.model_name, + power_name=power_name, + phase=phase, + response_type="order_generation", # Specific type for CSV logging + raw_input_prompt=prompt, # Renamed from 'prompt' to match log_llm_response arg + raw_response=raw_response, + success=success_status + # token_usage and cost can be added later if available and if log_llm_response supports them + ) + return parsed_orders_for_return def _extract_moves(self, raw_response: str, power_name: str) -> Optional[List[str]]: """ @@ -272,7 +285,7 @@ class BaseModelClient: # 2) If still no match, check for triple-backtick code fences containing JSON if not matches: - code_fence_pattern = r"```json\s*(\{.*?\})\s*```" + code_fence_pattern = r"```json\n(.*?)\n```" matches = re.search(code_fence_pattern, raw_response, re.DOTALL) if matches: logger.debug( @@ -481,81 +494,126 @@ class BaseModelClient: game_history: GameHistory, game_phase: str, log_file_path: str, - active_powers: Optional[List[str]] = None, # Keep active_powers if needed by prompt logic + active_powers: Optional[List[str]] = None, agent_goals: Optional[List[str]] = None, agent_relationships: Optional[Dict[str, str]] = None, - agent_private_diary_str: Optional[str] = None, # Added + agent_private_diary_str: Optional[str] = None, ) -> List[Dict[str, str]]: """ Generates a negotiation message, considering agent state. """ - prompt = self.build_conversation_prompt( - game, - board_state, - power_name, - possible_orders, - game_history, - # game_phase, # Not passed to build_conversation_prompt directly - # log_file_path, # Not passed to build_conversation_prompt directly - agent_goals=agent_goals, - agent_relationships=agent_relationships, - agent_private_diary_str=agent_private_diary_str, # Pass diary string - ) - - logger.debug(f"[{self.model_name}] Conversation prompt for {power_name}:\n{prompt}") + raw_input_prompt = "" # Initialize for finally block + raw_response = "" # Initialize for finally block + success_status = "Failure: Initialized" # Default status + messages_to_return = [] # Initialize to ensure it's defined try: - # Call LLM using the logging wrapper - response = await run_llm_and_log( + raw_input_prompt = self.build_conversation_prompt( + game, + board_state, + power_name, + possible_orders, + game_history, + agent_goals=agent_goals, + agent_relationships=agent_relationships, + agent_private_diary_str=agent_private_diary_str, + ) + + logger.debug(f"[{self.model_name}] Conversation prompt for {power_name}:\n{raw_input_prompt}") + + raw_response = await run_llm_and_log( client=self, - prompt=prompt, + prompt=raw_input_prompt, log_file_path=log_file_path, power_name=power_name, - phase=game_phase, # Use game_phase for logging - response_type='negotiation', + phase=game_phase, + response_type='negotiation', # For run_llm_and_log's internal context ) - logger.debug(f"[{self.model_name}] Raw LLM response for {power_name}:\n{response}") + logger.debug(f"[{self.model_name}] Raw LLM response for {power_name}:\n{raw_response}") - messages = [] + parsed_messages = [] json_blocks = [] + json_decode_error_occurred = False - double_brace_blocks = re.findall(r'\{\{(.*?)\}\}', response, re.DOTALL) + # Attempt to find blocks enclosed in {{...}} + double_brace_blocks = re.findall(r'\{\{(.*?)\}\}', raw_response, re.DOTALL) if double_brace_blocks: + # If {{...}} blocks are found, assume each is a self-contained JSON object json_blocks.extend(['{' + block.strip() + '}' for block in double_brace_blocks]) else: - code_block_match = re.search(r"```json\n(.*?)\n```", response, re.DOTALL) - if code_block_match: - potential_json = code_block_match.group(1).strip() - json_blocks = re.findall(r'\{.*?\}', potential_json, re.DOTALL) - else: - json_blocks = re.findall(r'\{.*?\}', response, re.DOTALL) + # If no {{...}} blocks, look for ```json ... ``` markdown blocks + code_block_match = re.search(r"```json\n(.*?)\n```", raw_response, re.DOTALL) + if code_block_match: + potential_json_array_or_objects = code_block_match.group(1).strip() + # Try to parse as a list of objects or a single object + try: + data = json.loads(potential_json_array_or_objects) + if isinstance(data, list): + json_blocks = [json.dumps(item) for item in data if isinstance(item, dict)] + elif isinstance(data, dict): + json_blocks = [json.dumps(data)] + except json.JSONDecodeError: + # If parsing the whole block fails, fall back to regex for individual objects + json_blocks = re.findall(r'\{.*?\}', potential_json_array_or_objects, re.DOTALL) + else: + # If no markdown block, fall back to regex for any JSON object in the response + json_blocks = re.findall(r'\{.*?\}', raw_response, re.DOTALL) if not json_blocks: - logger.warning(f"[{self.model_name}] No JSON message blocks found in response for {power_name}. Raw response:\n{response}") - return [] + logger.warning(f"[{self.model_name}] No JSON message blocks found in response for {power_name}. Raw response:\n{raw_response}") + success_status = "Success: No JSON blocks found" + # messages_to_return remains empty + else: + for block_index, block in enumerate(json_blocks): + try: + cleaned_block = block.strip() + # Attempt to fix common JSON issues like trailing commas before parsing + cleaned_block = re.sub(r',\s*([\}\]])', r'\1', cleaned_block) + parsed_message = json.loads(cleaned_block) + + if isinstance(parsed_message, dict) and "message_type" in parsed_message and "content" in parsed_message: + # Further validation, e.g., recipient for private messages + if parsed_message["message_type"] == "private" and "recipient" not in parsed_message: + logger.warning(f"[{self.model_name}] Private message missing recipient for {power_name} in block {block_index}. Skipping: {cleaned_block}") + continue # Skip this message + parsed_messages.append(parsed_message) + else: + logger.warning(f"[{self.model_name}] Invalid message structure or missing keys in block {block_index} for {power_name}: {cleaned_block}") + + except json.JSONDecodeError as jde: + json_decode_error_occurred = True + logger.warning(f"[{self.model_name}] Failed to decode JSON block {block_index} for {power_name}. Error: {jde}. Block content:\n{block}") - for block in json_blocks: - try: - cleaned_block = block.strip() - parsed_message = json.loads(cleaned_block) - - if isinstance(parsed_message, dict) and "message_type" in parsed_message and "content" in parsed_message: - messages.append(parsed_message) - else: - logger.warning(f"[{self.model_name}] Invalid message structure in block for {power_name}: {cleaned_block}") - - except json.JSONDecodeError: - logger.warning(f"[{self.model_name}] Failed to decode JSON block for {power_name}. Block content:\n{block}") + if parsed_messages: + success_status = "Success: Messages extracted" + messages_to_return = parsed_messages + elif json_decode_error_occurred: + success_status = "Failure: JSONDecodeError during block parsing" + messages_to_return = [] + else: # JSON blocks found, but none were valid messages + success_status = "Success: No valid messages extracted from JSON blocks" + messages_to_return = [] - if not messages: - logger.warning(f"[{self.model_name}] No valid messages extracted after parsing blocks for {power_name}. Raw response:\n{response}") - - logger.debug(f"[{self.model_name}] Validated conversation replies for {power_name}: {messages}") - return messages - + logger.debug(f"[{self.model_name}] Validated conversation replies for {power_name}: {messages_to_return}") + # return messages_to_return # Return will happen in finally block or after + except Exception as e: - logger.error(f"[{self.model_name}] Error in get_conversation_reply for {power_name}: {e}") - return [] + logger.error(f"[{self.model_name}] Error in get_conversation_reply for {power_name}: {e}", exc_info=True) + success_status = f"Failure: Exception ({type(e).__name__})" + messages_to_return = [] # Ensure empty list on general exception + finally: + if log_file_path: + log_llm_response( + log_file_path=log_file_path, + model_name=self.model_name, + power_name=power_name, + phase=game_phase, + response_type="negotiation_message", + raw_input_prompt=raw_input_prompt, + raw_response=raw_response, + success=success_status + ) + return messages_to_return async def get_plan( # This is the original get_plan, now distinct from get_planning_reply self, @@ -600,22 +658,42 @@ class BaseModelClient: if self.system_prompt: full_prompt = f"{self.system_prompt}\n\n{full_prompt}" + raw_plan_response = "" + success_status = "Failure: Initialized" + plan_to_return = f"Error: Plan generation failed for {power_name} (initial state)" + try: # Use run_llm_and_log for the actual LLM call - raw_plan = await run_llm_and_log( + raw_plan_response = await run_llm_and_log( client=self, # Pass self (the client instance) prompt=full_prompt, log_file_path=log_file_path, power_name=power_name, phase=game.current_short_phase, - response_type='plan_generation', # More specific type + response_type='plan_generation', # More specific type for run_llm_and_log context ) - logger.debug(f"[{self.model_name}] Raw LLM response for {power_name} plan generation:\n{raw_plan}") + logger.debug(f"[{self.model_name}] Raw LLM response for {power_name} plan generation:\n{raw_plan_response}") # No parsing needed for the plan, return the raw string - return raw_plan.strip() + plan_to_return = raw_plan_response.strip() + success_status = "Success" except Exception as e: - logger.error(f"Failed to generate plan for {power_name}: {e}") - return f"Error: Failed to generate plan due to exception: {e}" + logger.error(f"Failed to generate plan for {power_name}: {e}", exc_info=True) + success_status = f"Failure: Exception ({type(e).__name__})" + plan_to_return = f"Error: Failed to generate plan for {power_name} due to exception: {e}" + finally: + if log_file_path: # Only log if a path is provided + log_llm_response( + log_file_path=log_file_path, + model_name=self.model_name, + power_name=power_name, + phase=game.current_short_phase if game else "UnknownPhase", + response_type="plan_generation", # Specific type for CSV logging + raw_input_prompt=full_prompt, # Renamed from 'full_prompt' to match log_llm_response arg + raw_response=raw_plan_response, + success=success_status + # token_usage and cost can be added later + ) + return plan_to_return ############################################################################## diff --git a/ai_diplomacy/initialization.py b/ai_diplomacy/initialization.py new file mode 100644 index 0000000..b5422ec --- /dev/null +++ b/ai_diplomacy/initialization.py @@ -0,0 +1,169 @@ +# ai_diplomacy/initialization.py +import logging +import json + +# Forward declaration for type hinting, actual imports in function if complex +if False: # TYPE_CHECKING + from diplomacy import Game + from diplomacy.models.game import GameHistory + from .agent import DiplomacyAgent + +from .agent import ALL_POWERS, ALLOWED_RELATIONSHIPS +from .utils import run_llm_and_log, log_llm_response + +logger = logging.getLogger(__name__) + +async def initialize_agent_state_ext( + agent: 'DiplomacyAgent', + game: 'Game', + game_history: 'GameHistory', + log_file_path: str +): + """Uses the LLM to set initial goals and relationships for the agent.""" + power_name = agent.power_name + logger.info(f"[{power_name}] Initializing agent state using LLM (external function)..." ) + current_phase = game.get_current_phase() if game else "UnknownPhase" + + full_prompt = "" # Ensure full_prompt is defined in the outer scope for finally block + response = "" # Ensure response is defined for finally block + success_status = "Failure: Initialized" # Default status + + try: + # Use a simplified prompt for initial state generation + allowed_labels_str = ", ".join(ALLOWED_RELATIONSHIPS) + initial_prompt = f"You are the agent for {power_name} in a game of Diplomacy at the very start (Spring 1901). " \ + f"Analyze the initial board position and suggest 2-3 strategic high-level goals for the early game. " \ + f"Consider your power's strengths, weaknesses, and neighbors. " \ + f"Also, provide an initial assessment of relationships with other powers. " \ + f"IMPORTANT: For each relationship, you MUST use exactly one of the following labels: {allowed_labels_str}. " \ + f"Format your response as a JSON object with two keys: 'initial_goals' (a list of strings) and 'initial_relationships' (a dictionary mapping power names to one of the allowed relationship strings)." + + board_state = game.get_state() if game else {} + possible_orders = game.get_all_possible_orders() if game else {} + + logger.debug(f"[{power_name}] Preparing context for initial state. Board state type: {type(board_state)}, possible_orders type: {type(possible_orders)}, game_history type: {type(game_history)}") + # Ensure agent.client and its methods can handle None for game/board_state/etc. if that's a possibility + # For initialization, game should always be present. + + formatted_diary = agent.format_private_diary_for_prompt() + + context = agent.client.build_context_prompt( + game=game, + board_state=board_state, + power_name=power_name, + possible_orders=possible_orders, + game_history=game_history, + agent_goals=None, + agent_relationships=None, + agent_private_diary=formatted_diary, + ) + full_prompt = initial_prompt + "\n\n" + context + + response = await run_llm_and_log( + client=agent.client, + prompt=full_prompt, + log_file_path=log_file_path, + power_name=power_name, + phase=current_phase, + response_type='initialization', # Context for run_llm_and_log internal error logging + ) + logger.debug(f"[{power_name}] LLM response for initial state: {response[:300]}...") # Log a snippet + + parsed_successfully = False + try: + update_data = agent._extract_json_from_text(response) + logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}") + parsed_successfully = True + except json.JSONDecodeError as e: + logger.error(f"[{power_name}] All JSON extraction attempts failed: {e}. Response snippet: {response[:300]}...") + success_status = "Failure: JSONDecodeError" + update_data = {} # Ensure update_data exists for fallback logic below + # Fallback logic for goals/relationships will be handled later if update_data is empty + + initial_goals_applied = False + initial_relationships_applied = False + + if parsed_successfully: + initial_goals = update_data.get('initial_goals') or update_data.get('goals') + initial_relationships = update_data.get('initial_relationships') or update_data.get('relationships') + + if isinstance(initial_goals, list) and initial_goals: + agent.goals = initial_goals + agent.add_journal_entry(f"[{current_phase}] Initial Goals Set by LLM: {agent.goals}") + logger.info(f"[{power_name}] Goals updated from LLM: {agent.goals}") + initial_goals_applied = True + else: + logger.warning(f"[{power_name}] LLM did not provide valid 'initial_goals' list (got: {initial_goals}).") + + if isinstance(initial_relationships, dict) and initial_relationships: + valid_relationships = {} + # ... (rest of relationship validation logic from before) ... + for p_key, r_val in initial_relationships.items(): + p_upper = str(p_key).upper() + r_title = str(r_val).title() if isinstance(r_val, str) else str(r_val) + if p_upper in ALL_POWERS and p_upper != power_name: + if r_title in ALLOWED_RELATIONSHIPS: + valid_relationships[p_upper] = r_title + else: + valid_relationships[p_upper] = "Neutral" + if valid_relationships: + agent.relationships = valid_relationships + agent.add_journal_entry(f"[{current_phase}] Initial Relationships Set by LLM: {agent.relationships}") + logger.info(f"[{power_name}] Relationships updated from LLM: {agent.relationships}") + initial_relationships_applied = True + else: + logger.warning(f"[{power_name}] No valid relationships found in LLM response.") + else: + logger.warning(f"[{power_name}] LLM did not provide valid 'initial_relationships' dict (got: {initial_relationships}).") + + if initial_goals_applied or initial_relationships_applied: + success_status = "Success: Applied LLM data" + elif parsed_successfully: # Parsed but nothing useful to apply + success_status = "Success: Parsed but no data applied" + # If not parsed_successfully, success_status is already "Failure: JSONDecodeError" + + # Fallback if LLM data was not applied or parsing failed + if not initial_goals_applied: + if not agent.goals: # Only set defaults if no goals were set during agent construction or by LLM + agent.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"] + agent.add_journal_entry(f"[{current_phase}] Set default initial goals as LLM provided none or parse failed.") + logger.info(f"[{power_name}] Default goals set.") + + if not initial_relationships_applied: + # Check if relationships are still default-like before overriding + is_default_relationships = True + if agent.relationships: # Check if it's not empty + for p in ALL_POWERS: + if p != power_name and agent.relationships.get(p) != "Neutral": + is_default_relationships = False + break + if is_default_relationships: + agent.relationships = {p: "Neutral" for p in ALL_POWERS if p != power_name} + agent.add_journal_entry(f"[{current_phase}] Set default neutral relationships as LLM provided none valid or parse failed.") + logger.info(f"[{power_name}] Default neutral relationships set.") + + except Exception as e: + logger.error(f"[{power_name}] Error during external agent state initialization: {e}", exc_info=True) + success_status = f"Failure: Exception ({type(e).__name__})" + # Fallback logic for goals/relationships if not already set by earlier fallbacks + if not agent.goals: + agent.goals = ["Survive and expand", "Form beneficial alliances", "Secure key territories"] + logger.info(f"[{power_name}] Set fallback goals after top-level error: {agent.goals}") + if not agent.relationships or all(r == "Neutral" for r in agent.relationships.values()): + agent.relationships = {p: "Neutral" for p in ALL_POWERS if p != power_name} + logger.info(f"[{power_name}] Set fallback neutral relationships after top-level error: {agent.relationships}") + finally: + if log_file_path: # Ensure log_file_path is provided + log_llm_response( + log_file_path=log_file_path, + model_name=agent.client.model_name if agent and agent.client else "UnknownModel", + power_name=power_name, + phase=current_phase, + response_type="initial_state_setup", # Specific type for CSV logging + raw_input_prompt=full_prompt, + raw_response=response, + success=success_status + ) + + # Final log of state after initialization attempt + logger.info(f"[{power_name}] Post-initialization state: Goals={agent.goals}, Relationships={agent.relationships}") diff --git a/ai_diplomacy/llms.txt b/ai_diplomacy/llms.txt index eb4925f..6683a96 100644 --- a/ai_diplomacy/llms.txt +++ b/ai_diplomacy/llms.txt @@ -12,11 +12,7 @@ This document provides an analysis of key Python modules within the `ai_diplomac **Goal:** To structure, store, and retrieve the historical events of a Diplomacy game phase by phase, including messages, plans, orders, and results. **Status:** Fully implemented and operational. -#### 1.2. `map_utils.py` (COMPLETE BUT NOT INTEGRATED) -**Goal:** To provide graph-based map analysis and pathfinding for strategic decision-making. -**Status:** BFS search algorithms implemented but not integrated into planning/order generation. - -**Key Components:** +*Key Components:* * `DiplomacyGraph`: Represents map territory connectivity with support for unit-specific movement rules (Army vs Fleet). * `bfs_shortest_path`: Finds shortest path from a starting territory to any territory matching criteria. * `bfs_nearest_adjacent`: Finds shortest path to a territory adjacent to any territory in a target set. @@ -35,22 +31,22 @@ This document provides an analysis of key Python modules within the `ai_diplomac #### 1.4. `agent.py` (COMPLETE) **Goal:** To maintain stateful agent representation with personality, goals, and relationships. -**Status:** Fully implemented and integrated with planning/negotiation workflows. +**Status:** Fully implemented and integrated with planning/negotiation workflows. Initialization of goals and relationships via LLM is now handled by `initialization.py`. **Key Components:** * `DiplomacyAgent` class with: * `power_name`: The power this agent represents - * `personality`: Agent's personality profile - * `goals`: List of strategic goals - * `relationships`: Dict of relationships with other powers - * `private_journal`: List of internal thoughts/reflections + * `personality`: Agent's personality profile (though less emphasized now, system prompts per power exist) + * `goals`: List of strategic goals, initially populated by `initialization.py` or constructor. + * `relationships`: Dict of relationships with other powers, initially populated by `initialization.py` or constructor. + * `private_journal`: List of internal thoughts/reflections (less structured). + * `private_diary`: List of structured, phase-prefixed summaries (negotiations, intents, orders) for concise historical context provided to LLMs. * `_extract_json_from_text`: Robust JSON extraction from LLM responses - * `initialize_agent_state`: Sets initial goals and relationships - * `analyze_phase_and_update_state`: Updates goals and relationships based on game events - * Methods for plan generation, updating goals, and updating relationships + * `analyze_phase_and_update_state`: Updates goals and relationships based on game events. + * Methods for plan generation, updating goals, and updating relationships. **Integration Points:** -* Connected to context generation in `clients.py` +* Connected to context generation in `clients.py` (private diary provides summarized history) * Influences planning and negotiations through goals and relationships * Case-insensitive validation of LLM-provided power names and relationship statuses * Robust error recovery with fallback defaults when LLM responses fail to parse @@ -70,18 +66,22 @@ This document provides an analysis of key Python modules within the `ai_diplomac #### 1.8. `clients.py` (COMPLETE) **Goal:** To abstract and manage interactions with various LLM APIs. -**Status:** Fully implemented with agent state integration. -**Note:** Uses various files in `prompts/` (e.g., `context_prompt.txt`, `order_instructions.txt`, `conversation_instructions.txt`) to structure LLM requests and define expected output formats. Ensuring these instruction files are present and correct is critical for reliable operation, especially for parsing structured data like orders or messages. +**Status:** Fully implemented with agent state integration (including personality, goals, relationships, and the new `private_diary` for summarized history). It now also leverages `possible_order_context.py` for richer order details in prompts. +**Note:** Uses various files in `prompts/` (e.g., `context_prompt.txt`, `order_instructions.txt`, `negotiation_diary_prompt.txt`, `order_diary_prompt.txt`) to structure LLM requests. `context_prompt.txt` has been updated to use `agent_private_diary` for history and a more structured `{possible_orders}` section generated by `possible_order_context.generate_rich_order_context`. -### PARTIALLY IMPLEMENTED MODULES: +#### 1.9. `initialization.py` (NEWLY ADDED & COMPLETE) +**Goal:** To perform the initial LLM-driven setup of an agent's goals and relationships at the very start of the game (Spring 1901). +**Status:** Fully implemented and integrated into `lm_game.py`. -#### 1.9. `utils.py` (COMPLETE) -**Goal:** To provide common utility functions used across other AI diplomacy modules. -**Status:** Fully implemented. +**Key Components:** +* `initialize_agent_state_ext(agent: DiplomacyAgent, game: Game, game_history: GameHistory, log_file_path: str)`: An asynchronous function that: + * Constructs a specific prompt tailored for Spring 1901, asking for initial goals and relationships. + * Utilizes the agent's client (`agent.client`) and the `run_llm_and_log` utility for the LLM interaction. + * Parses the JSON response using the agent's `_extract_json_from_text` method. + * Directly updates the `agent.goals` and `agent.relationships` attributes with the LLM's suggestions or defaults if parsing fails. -#### 1.10. `clients.py` (COMPLETE BUT NEEDS EXTENSION) -**Goal:** To abstract and manage interactions with various LLM APIs. -**Status:** Works, but needs extension to incorporate agent state into context. +**Integration Points:** +* Called once per agent from `lm_game.py` immediately after the `DiplomacyAgent` object is instantiated and before the main game loop begins. --- @@ -89,20 +89,24 @@ This document provides an analysis of key Python modules within the `ai_diplomac The following connections have been established: -1. **Agent State → Context Building** - * `BaseModelClient.build_context_prompt` incorporates agent's personality, goals, and relationships - * Modified prompt templates include sections for agent state +1. **Initial Agent Setup (New)**: + * `lm_game.py` calls `initialization.py`'s `initialize_agent_state_ext` for each agent. This function uses an LLM call to populate the agent's initial `goals` and `relationships` before the main game loop and other agent interactions commence. -2. **Agent State → Negotiations** +2. **Agent State → Context Building** + * `BaseModelClient.build_context_prompt` in `clients.py` incorporates the agent's current `goals`, `relationships`, and the concise `agent_private_diary` for historical context. + * It also calls `possible_order_context.generate_rich_order_context` to provide a detailed and strategically relevant breakdown of possible orders, replacing a simpler list. + * `prompts/context_prompt.txt` is formatted to accept these inputs, including the structured possible orders and the agent's private diary. + +3. **Agent State → Negotiations** * Agent's personality, goals, and relationships influence message generation * Relationships are updated based on negotiation context and results -3. **Robust LLM Interaction** +4. **Robust LLM Interaction** * Implemented multi-strategy JSON extraction to handle various LLM response formats * Added case-insensitive validation for power names and relationship statuses * Created fallback mechanisms for all LLM interactions -4. **Error Recovery** +5. **Error Recovery** * Added defensive programming throughout agent state updates * Implemented progressive fallback strategies for parsing LLM outputs * Used intelligent defaults to maintain consistent agent state @@ -141,27 +145,28 @@ The following connections have been established: | game_history.py | <-----------+ | | agent.py | +-----------------+ | +-----------------+ ^ | | - | v v - | +--------------+ +--------------+ - +------------------+ utils.py | <----- | map_utils.py | - +--------------+ +--------------+ + | v | + | +--------------+ | + +------------------+ utils.py | <--------------- + +--------------+ ``` **Current Integration Status:** * `agent.py` is fully implemented and integrated with other modules * State updates work reliably between phases * Robust JSON parsing and case-insensitive validation ensure smooth operation -* `map_utils.py` is implemented but not yet fully leveraged for strategic planning **Asynchronous API Calls (Implemented April 2025)** - Successfully refactored major LLM interaction points to use asynchronous patterns (`async`/`await`, `asyncio.gather`). - Utilized async client libraries (`AsyncOpenAI`, `AsyncAnthropic`, `generate_content_async` for Gemini). - Refactored components: - - `DiplomacyAgent.initialize_agent_state` + - `initialization.initialize_agent_state_ext` (replaces `DiplomacyAgent.initialize_agent_state`) - `negotiations.conduct_negotiations` (message generation) - `utils.get_valid_orders` (order generation) - `DiplomacyAgent.analyze_phase_and_update_state` + - `DiplomacyAgent.generate_negotiation_diary_entry` + - `DiplomacyAgent.generate_order_diary_entry` + - `DiplomacyAgent.decide_builds_or_disbands` + - `planning.planning_phase` - This significantly improves performance by allowing concurrent API calls instead of sequential ones. - Replaced `concurrent.futures.ThreadPoolExecutor` with `asyncio.gather` for managing concurrent async tasks. - -``` diff --git a/ai_diplomacy/map_utils.py b/ai_diplomacy/map_utils.py deleted file mode 100644 index cc28669..0000000 --- a/ai_diplomacy/map_utils.py +++ /dev/null @@ -1,264 +0,0 @@ -import logging -from collections import deque -from typing import Dict, Set, List, Tuple, Callable, Any, Optional -from diplomacy.map import Map - -logger = logging.getLogger(__name__) - - -class DiplomacyGraph: - """Custom graph implementation for Diplomacy map connectivity.""" - - def __init__(self): - # Main graph structure: dict of dict of sets - # graph[node1][node2] = {'A', 'F'} means both army and fleet can move between nodes - # graph[node1][node2] = {'A'} means only army can move between nodes - self.graph: Dict[str, Dict[str, Set[str]]] = {} - - def add_node(self, node: str): - """Add a node if it doesn't exist.""" - if node not in self.graph: - self.graph[node] = {} - - def add_edge(self, node1: str, node2: str, unit_type: str): - """Add an edge between nodes for specific unit type ('A' or 'F').""" - self.add_node(node1) - self.add_node(node2) - - # Add connection for node1 -> node2 - if node2 not in self.graph[node1]: - self.graph[node1][node2] = set() - self.graph[node1][node2].add(unit_type) - - # Add connection for node2 -> node1 (undirected graph) - if node1 not in self.graph[node2]: - self.graph[node2][node1] = set() - self.graph[node2][node1].add(unit_type) - - def get_adjacent(self, node: str) -> List[str]: - """Get all nodes adjacent to given node.""" - return list(self.graph.get(node, {}).keys()) - - def get_allowed_units(self, node1: str, node2: str) -> Set[str]: - """Get set of unit types that can move between these nodes.""" - return self.graph.get(node1, {}).get(node2, set()) - - def nodes(self) -> List[str]: - """Return all nodes in the graph.""" - return list(self.graph.keys()) - - def edges(self) -> List[Tuple[str, str, Set[str]]]: - """Return all edges with their unit types as (node1, node2, unit_types).""" - edges = [] - seen = set() # To avoid duplicates in undirected graph - - for node1 in self.graph: - for node2, unit_types in self.graph[node1].items(): - # Ensure consistent ordering for the 'seen' check - edge_tuple = tuple(sorted((node1, node2))) - if edge_tuple not in seen: - edges.append((node1, node2, unit_types)) - seen.add(edge_tuple) - - return edges - -# --- BFS Functions --- -def bfs_shortest_path( - graph: DiplomacyGraph, - start: str, - match_condition: Callable[[str], Any], # Function returns non-None/non-False if matched - allowed_unit_types: Set[str] -) -> Tuple[Optional[List[str]], Any]: - """ - Performs Breadth-First Search on a DiplomacyGraph from 'start' to find the first territory - for which 'match_condition(territory)' returns a truthy value. - - Args: - graph: The DiplomacyGraph instance to search. - start: The starting territory node name (e.g., 'PAR'). - match_condition: A function that takes a territory name (str) and returns - any value that evaluates to True if the condition is met, - or False/None otherwise. The returned value is included in the output. - allowed_unit_types: A set of unit types ('A', 'F') allowed for traversal. - - Returns: - Tuple[Optional[List[str]], Any]: - - A list of territory names representing the shortest path from 'start' to the matched - territory (inclusive), or None if no path is found. - - The truthy value returned by match_condition for the matched territory, or None. - """ - if start not in graph.graph: # Access the internal graph dict - logger.warning(f"BFS shortest path: Start node '{start}' not in graph.") - return None, None - - visited: Set[str] = {start} - # Queue stores paths (lists of nodes) - queue: deque[List[str]] = deque([[start]]) - - # Check if the starting territory itself satisfies match_condition - initial_match = match_condition(start) - if initial_match: - return [start], initial_match - - while queue: - path = queue.popleft() - current = path[-1] - - # Check neighbors of the current node - for neighbor in graph.get_adjacent(current): - edge_types = graph.get_allowed_units(current, neighbor) - - # Check if any allowed unit type can traverse this edge - if edge_types.intersection(allowed_unit_types): - if neighbor not in visited: - visited.add(neighbor) - new_path = path + [neighbor] - - # Check if the neighbor meets the match condition - match_result = match_condition(neighbor) - if match_result: - return new_path, match_result - - queue.append(new_path) - - logger.debug(f"BFS shortest path: No node matching condition found from '{start}'.") - return None, None - -def bfs_nearest_adjacent( - graph: DiplomacyGraph, - start: str, - occupant_map: Dict[str, Any], # Map territory_name -> occupant_info - allowed_unit_types: Set[str] -) -> Tuple[Optional[List[str]], Tuple[Optional[str], Any]]: - """ - Performs Breadth-First Search from 'start' to find the shortest path to a territory - that is *adjacent* to any territory listed in the 'occupant_map'. - - Args: - graph: The DiplomacyGraph instance to search. - start: The starting territory node name. - occupant_map: A dictionary where keys are territory names occupied by entities - we want to find adjacency to. Values can be any associated info - (e.g., the occupying unit type or power). - allowed_unit_types: A set of unit types ('A', 'F') allowed for traversal. - - Returns: - Tuple[Optional[List[str]], Tuple[Optional[str], Any]]: - - A list representing the shortest path from 'start' to the territory adjacent - to an occupied one, or None if no such path exists. - - A tuple containing: - - The name of the occupied territory that was found adjacent to the path's end. - - The value associated with that occupied territory from occupant_map. - Returns (None, None) if no path is found. - """ - if not occupant_map or start not in graph.graph: # Access the internal graph dict - logger.warning(f"BFS nearest adjacent: Invalid input - occupant_map empty or start node '{start}' not in graph.") - return None, (None, None) - - visited: Set[str] = {start} - # Queue stores paths (lists of nodes) - queue: deque[List[str]] = deque([[start]]) - - while queue: - path = queue.popleft() - current = path[-1] - - # Check if ANY neighbor of the current node is in the occupant_map - for neighbor in graph.get_adjacent(current): - if neighbor in occupant_map: - # Found a path ending adjacent to an occupied territory - occupant_info = occupant_map[neighbor] - return path, (neighbor, occupant_info) - - # If no adjacent occupant found, expand the search to neighbors - for neighbor in graph.get_adjacent(current): - edge_types = graph.get_allowed_units(current, neighbor) - - # Check if traversal is possible with allowed unit types - if edge_types.intersection(allowed_unit_types): - if neighbor not in visited: - visited.add(neighbor) - new_path = path + [neighbor] - queue.append(new_path) - - logger.debug(f"BFS nearest adjacent: No path found from '{start}' adjacent to occupied territories.") - return None, (None, None) - -# --- Build Function --- -def build_diplomacy_graph(game_map: Map) -> DiplomacyGraph: - """ - Builds a DiplomacyGraph representing the connectivity of a given diplomacy map. - - Args: - game_map: An instance of the diplomacy.map.Map class. - - Returns: - A populated DiplomacyGraph instance. - """ - graph = DiplomacyGraph() - processed_edges = set() # To avoid redundant checks in undirected graph - - for loc1_name in game_map.locs: - graph.add_node(loc1_name) - loc1_area = game_map.area_data[loc1_name] - - for loc2_name, coast_spec in loc1_area.adjacencies: - # Ensure loc2 exists in map data (should always be true) - if loc2_name not in game_map.area_data: - logger.warning(f"Adjacent location '{loc2_name}' for '{loc1_name}' not found in map data. Skipping.") - continue - - loc2_area = game_map.area_data[loc2_name] - - # Create a canonical representation for the edge to avoid duplicates - edge_tuple = tuple(sorted((loc1_name, loc2_name))) - if edge_tuple in processed_edges: - continue - - # --- Determine Army ('A') Movement --- - can_army_move = False - # Army moves between land/coastal areas. Cannot move if both are sea. - if not (loc1_area.is_sea and loc2_area.is_sea): - can_army_move = True # Simplified: Assumes land connectivity if not both sea - # More precise check might involve pathfinding logic or specific land borders, - # but this covers basic adjacency for armies. - - if can_army_move: - graph.add_edge(loc1_name, loc2_name, 'A') - - # --- Determine Fleet ('F') Movement --- - can_fleet_move = False - # Fleet moves between sea/coastal areas. Cannot move if both are pure land. - if not (loc1_area.is_land and not loc1_area.is_coastal and - loc2_area.is_land and not loc2_area.is_coastal): - # Check coasts if both are coastal - if loc1_area.is_coastal and loc2_area.is_coastal: - # Fleet can only move if the adjacency specifically allows it (matching coasts) - # The adjacency tuple (loc2_name, coast_spec) provides this info. - # We need to check if loc1 can reach loc2 via the specified coast(s). - # This often means loc1 needs to have a coast matching coast_spec, - # or the adjacency implies general coastal access. - # Using game_map.coast_data might be needed for complex checks. - # Let's use a simplified check based on whether coast_spec exists. - # A more robust method might directly check map.is_valid_move for fleets. - if coast_spec: # Adjacency has coastal specification - # Check if loc1_area's coasts are compatible with coast_spec - # This logic can be complex; assuming adjacency implies possibility for now. - if game_map.is_valid_move('F', loc1_name, loc2_name): # Use built-in check - can_fleet_move = True - else: # No specific coast needed - if game_map.is_valid_move('F', loc1_name, loc2_name): # Use built-in check - can_fleet_move = True - else: - # One or both are sea, or one is coastal and one is sea/land - # Generally possible if not land-to-land - if game_map.is_valid_move('F', loc1_name, loc2_name): # Use built-in check - can_fleet_move = True - - if can_fleet_move: - graph.add_edge(loc1_name, loc2_name, 'F') - - processed_edges.add(edge_tuple) - - logger.info(f"Built DiplomacyGraph with {len(graph.nodes())} nodes and {len(graph.edges())} edges.") - return graph diff --git a/ai_diplomacy/possible_order_context.py b/ai_diplomacy/possible_order_context.py new file mode 100644 index 0000000..951c948 --- /dev/null +++ b/ai_diplomacy/possible_order_context.py @@ -0,0 +1,445 @@ +# ai_diplomacy/possible_order_context.py + +from collections import deque +from typing import Dict, List, Callable, Optional, Any, Set, Tuple +from diplomacy.engine.map import Map as GameMap +from diplomacy.engine.game import Game as BoardState +import logging + +# Placeholder for actual map type from diplomacy.engine.map.Map +# GameMap = Any +# Type hint for board_state dictionary from game.get_state() +# BoardState = Dict[str, Any] + +logger = logging.getLogger(__name__) + +def build_diplomacy_graph(game_map: GameMap) -> Dict[str, Dict[str, List[str]]]: + """ + Builds a graph where keys are SHORT province names (e.g., 'PAR', 'STP'). + Adjacency lists also contain SHORT province names. + This graph is used for BFS pathfinding. + """ + graph: Dict[str, Dict[str, List[str]]] = {} + + # Deriving a clean list of unique, 3-letter, uppercase short province names + # game_map.locs contains all locations, including coasts e.g. "STP/SC" + unique_short_names = set() + for loc in game_map.locs: + short_name = loc.split('/')[0][:3].upper() # Take first 3 chars and uppercase + if len(short_name) == 3: # Ensure it's a 3-letter name + unique_short_names.add(short_name) + + all_short_province_names = sorted(list(unique_short_names)) + + # Initialize graph with all valid short province names as keys + for province_name in all_short_province_names: + graph[province_name] = {'ARMY': [], 'FLEET': []} + + for province_short_source in all_short_province_names: # e.g. 'PAR', 'STP' + # Get all full names for this source province (e.g. 'STP' -> ['STP/NC', 'STP/SC', 'STP']) + full_names_for_source = game_map.loc_coasts.get(province_short_source, [province_short_source]) + + for loc_full_source_variant in full_names_for_source: # e.g. 'STP/NC', then 'STP/SC', then 'STP' + # province_short_source is already the short name like 'STP' + # game_map.loc_abut provides general adjacencies, which might include specific coasts or lowercase names + for raw_adj_loc_from_loc_abut in game_map.loc_abut.get(province_short_source, []): + # Normalize this raw adjacent location to its short, uppercase form + adj_short_name_normalized = raw_adj_loc_from_loc_abut[:3].upper() + + # Get all full names for this *normalized* adjacent short name (e.g. 'BUL' -> ['BUL/EC', 'BUL/SC', 'BUL']) + full_names_for_adj_dest = game_map.loc_coasts.get(adj_short_name_normalized, [adj_short_name_normalized]) + + # Check for ARMY movement + unit_char_army = 'A' + if any( + game_map.abuts( + unit_char_army, + loc_full_source_variant, # Specific full source, e.g. 'STP/NC' + '-', # Order type for move + full_dest_variant # Specific full destination, e.g. 'MOS' or 'FIN' + ) + for full_dest_variant in full_names_for_adj_dest + ): + if adj_short_name_normalized not in graph[province_short_source]['ARMY']: + graph[province_short_source]['ARMY'].append(adj_short_name_normalized) + + # Check for FLEET movement + unit_char_fleet = 'F' + if any( + game_map.abuts( + unit_char_fleet, + loc_full_source_variant, # Specific full source, e.g. 'STP/NC' + '-', # Order type for move + full_dest_variant # Specific full destination, e.g. 'BAR' or 'NWY' + ) + for full_dest_variant in full_names_for_adj_dest + ): + if adj_short_name_normalized not in graph[province_short_source]['FLEET']: + graph[province_short_source]['FLEET'].append(adj_short_name_normalized) + + # Remove duplicates from adjacency lists (just in case) + for province_short in graph: + if 'ARMY' in graph[province_short]: + graph[province_short]['ARMY'] = sorted(list(set(graph[province_short]['ARMY']))) + if 'FLEET' in graph[province_short]: + graph[province_short]['FLEET'] = sorted(list(set(graph[province_short]['FLEET']))) + + return graph + + +def bfs_shortest_path( + graph: Dict[str, Dict[str, List[str]]], + board_state: BoardState, + game_map: GameMap, # Added game_map + start_loc_full: str, # This is a FULL location name like 'VIE' or 'STP/SC' + unit_type: str, + is_target_func: Callable[[str, BoardState], bool] # Expects SHORT name for loc +) -> Optional[List[str]]: # Returns path of SHORT names + """Performs BFS to find the shortest path from start_loc to a target satisfying is_target_func.""" + + # Convert full start location to short province name + start_loc_short = game_map.loc_name.get(start_loc_full, start_loc_full) + if '/' in start_loc_short: # If it was STP/SC, loc_name gives STP. If it was VIE, loc_name gives VIE. + start_loc_short = start_loc_short[:3] + # If start_loc_full was already short (e.g. 'VIE'), get might return it as is, or its value if it was a key. + # A simpler way for non-coastal full (like 'VIE') or already short: + if '/' not in start_loc_full: + start_loc_short = start_loc_full[:3] # Ensures 'VIE' -> 'VIE', 'PAR' -> 'PAR' + else: # Has '/', e.g. 'STP/SC' + start_loc_short = start_loc_full[:3] # 'STP/SC' -> 'STP' + + if start_loc_short not in graph: + logger.warning(f"BFS: Start province {start_loc_short} (from {start_loc_full}) not in graph. Pathfinding may fail.") + return None + + queue: deque[Tuple[str, List[str]]] = deque([(start_loc_short, [start_loc_short])]) + visited_nodes: Set[str] = {start_loc_short} + + while queue: + current_loc_short, path = queue.popleft() + + # is_target_func expects a short location name + if is_target_func(current_loc_short, board_state): + return path # Path of short names + + # possible_neighbors are SHORT names from the graph + possible_neighbors_short = graph.get(current_loc_short, {}).get(unit_type, []) + + for next_loc_short in possible_neighbors_short: + if next_loc_short not in visited_nodes: + if next_loc_short not in graph: # Defensive check for neighbors not in graph keys + logger.warning(f"BFS: Neighbor {next_loc_short} of {current_loc_short} not in graph. Skipping.") + continue + visited_nodes.add(next_loc_short) + new_path = path + [next_loc_short] + queue.append((next_loc_short, new_path)) + return None + +# --- Helper functions for context generation --- +def get_unit_at_location(board_state: BoardState, location: str) -> Optional[str]: + """Returns the full unit string (e.g., 'A PAR (FRA)') if a unit is at the location, else None.""" + for power, unit_list in board_state.get('units', {}).items(): + for unit_str in unit_list: # e.g., "A PAR", "F STP/SC" + parts = unit_str.split(" ") + if len(parts) == 2: + unit_map_loc = parts[1] + if unit_map_loc == location: + return f"{parts[0]} {location} ({power})" + return None + +def get_sc_controller(game_map: GameMap, board_state: BoardState, location: str) -> Optional[str]: + """Returns the controlling power's name if the location is an SC, else None.""" + # Normalize location to base province name, as SCs are tied to provinces, not specific coasts + loc_province_name = game_map.loc_name.get(location, location).upper()[:3] + if loc_province_name not in game_map.scs: + return None + for power, sc_list in board_state.get('centers', {}).items(): + if loc_province_name in sc_list: + return power + return None # Unowned SC + +def get_shortest_path_to_friendly_unit( + board_state: BoardState, + graph: Dict[str, Dict[str, List[str]]], + game_map: GameMap, # Added game_map + power_name: str, + start_unit_loc_full: str, + start_unit_type: str +) -> Optional[Tuple[str, List[str]]]: + """Finds the shortest path to any friendly unit of the same power.""" + + def is_target_friendly(loc_short: str, current_board_state: BoardState) -> bool: + # loc_short is a short province name. Need to check all its full locations. + full_locs_for_short = game_map.loc_coasts.get(loc_short, [loc_short]) + for full_loc_variant in full_locs_for_short: + unit_at_loc = get_unit_at_location(current_board_state, full_loc_variant) + if unit_at_loc and unit_at_loc.split(" ")[2][1:4] == power_name and full_loc_variant != start_unit_loc_full: + return True + return False + + path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_target_friendly) + if path_short_names and len(path_short_names) > 1: # Path includes start, so > 1 means a distinct friendly unit found + target_loc_short = path_short_names[-1] + # Find the actual friendly unit string at one of the full locations of target_loc_short + friendly_unit_str = "UNKNOWN_FRIENDLY_UNIT" + full_locs_for_target_short = game_map.loc_coasts.get(target_loc_short, [target_loc_short]) + for fl_variant in full_locs_for_target_short: + unit_str = get_unit_at_location(board_state, fl_variant) + if unit_str and unit_str.split(" ")[2][1:4] == power_name: + friendly_unit_str = unit_str + break + return friendly_unit_str, path_short_names + return None + + +def get_nearest_enemy_units( + board_state: BoardState, + graph: Dict[str, Dict[str, List[str]]], + game_map: GameMap, # Added game_map + power_name: str, + start_unit_loc_full: str, + start_unit_type: str, + n: int = 3 +) -> List[Tuple[str, List[str]]]: + """Finds up to N nearest enemy units, sorted by path length.""" + enemy_paths: List[Tuple[str, List[str]]] = [] # (enemy_unit_str, path_short_names) + + all_enemy_unit_locations_full: List[Tuple[str,str]] = [] # (loc_full, unit_str_full) + # board_state.get("units", {}) has format: { "POWER_NAME": ["A PAR", "F BRE"], ... } + for p_name, unit_list_for_power in board_state.get("units", {}).items(): + if p_name != power_name: # If it's an enemy power + for unit_repr_from_state in unit_list_for_power: # e.g., "A PAR" or "F STP/SC" + parts = unit_repr_from_state.split(" ") + if len(parts) == 2: + # unit_type_char = parts[0] # 'A' or 'F' + loc_full = parts[1] # 'PAR' or 'STP/SC' + + # Use get_unit_at_location to get the consistent full unit string like "A PAR (POWER_NAME)" + full_unit_str_with_power = get_unit_at_location(board_state, loc_full) + if full_unit_str_with_power: # Should find the unit if iteration is correct + all_enemy_unit_locations_full.append((loc_full, full_unit_str_with_power)) + + for target_enemy_loc_full, enemy_unit_str in all_enemy_unit_locations_full: + target_enemy_loc_short = game_map.loc_name.get(target_enemy_loc_full, target_enemy_loc_full) + if '/' in target_enemy_loc_short: + target_enemy_loc_short = target_enemy_loc_short[:3] + if '/' not in target_enemy_loc_full: + target_enemy_loc_short = target_enemy_loc_full[:3] + else: + target_enemy_loc_short = target_enemy_loc_full[:3] + + def is_specific_enemy_loc(loc_short: str, current_board_state: BoardState) -> bool: + # Check if loc_short corresponds to target_enemy_loc_full + return loc_short == target_enemy_loc_short + + path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_specific_enemy_loc) + if path_short_names: + enemy_paths.append((enemy_unit_str, path_short_names)) + + enemy_paths.sort(key=lambda x: len(x[1])) # Sort by path length + return enemy_paths[:n] + + +def get_nearest_uncontrolled_scs( + game_map: GameMap, + board_state: BoardState, + graph: Dict[str, Dict[str, List[str]]], + power_name: str, + start_unit_loc_full: str, + start_unit_type: str, + n: int = 3 +) -> List[Tuple[str, int, List[str]]]: # (sc_name_short, distance, path_short_names) + """Finds up to N nearest SCs not controlled by power_name, sorted by path length.""" + uncontrolled_sc_paths: List[Tuple[str, int, List[str]]] = [] + + all_scs_short = game_map.scs # This is a list of short province names that are SCs + + for sc_loc_short in all_scs_short: + controller = get_sc_controller(game_map, board_state, sc_loc_short) + if controller != power_name: + def is_target_sc(loc_short: str, current_board_state: BoardState) -> bool: + return loc_short == sc_loc_short + + path_short_names = bfs_shortest_path(graph, board_state, game_map, start_unit_loc_full, start_unit_type, is_target_sc) + if path_short_names: + # Path includes start, so distance is len - 1 + uncontrolled_sc_paths.append((f"{sc_loc_short} (Ctrl: {controller or 'None'})", len(path_short_names) -1, path_short_names)) + + # Sort by distance (path length - 1), then by SC name for tie-breaking + uncontrolled_sc_paths.sort(key=lambda x: (x[1], x[0])) + return uncontrolled_sc_paths[:n] + +def get_adjacent_territory_details( + game_map: GameMap, + board_state: BoardState, + unit_loc_full: str, # The location of the unit whose adjacencies we're checking + unit_type: str, # ARMY or FLEET of the unit at unit_loc_full + graph: Dict[str, Dict[str, List[str]]] +) -> str: + """Generates a string describing adjacent territories and units that can interact with them.""" + output_lines: List[str] = [] + # Get adjacencies for the current unit's type + # The graph already stores processed adjacencies (e.g. army can't go to sea) + # For armies, graph[unit_loc_full]['ARMY'] gives short province names + # For fleets, graph[unit_loc_full]['FLEET'] gives full loc names (incl coasts) + # THIS COMMENT IS NOW OUTDATED. Graph uses short names for keys and values. + unit_loc_short = game_map.loc_name.get(unit_loc_full, unit_loc_full) + if '/' in unit_loc_short: + unit_loc_short = unit_loc_short[:3] + if '/' not in unit_loc_full: + unit_loc_short = unit_loc_full[:3] + else: + unit_loc_short = unit_loc_full[:3] + + adjacent_locs_short_for_unit = graph.get(unit_loc_short, {}).get(unit_type, []) + + processed_adj_provinces = set() # To handle cases like STP/NC and STP/SC both being adjacent to BOT + + for adj_loc_short in adjacent_locs_short_for_unit: # adj_loc_short is already short + # adj_province_short = game_map.loc_name.get(adj_loc_full, adj_loc_full).upper()[:3] # No longer needed + if adj_loc_short in processed_adj_provinces: # adj_loc_short is already short and upper implicitly by map data + continue + processed_adj_provinces.add(adj_loc_short) + + adj_loc_type = game_map.loc_type.get(adj_loc_short, 'UNKNOWN').upper() + if adj_loc_type == 'COAST' or adj_loc_type == 'LAND': + adj_loc_type_display = 'LAND' if adj_loc_type == 'LAND' else 'COAST' + elif adj_loc_type == 'WATER': + adj_loc_type_display = 'WATER' + else: # SHUT etc. + adj_loc_type_display = adj_loc_type + + line = f" {adj_loc_short} ({adj_loc_type_display})" + + sc_controller = get_sc_controller(game_map, board_state, adj_loc_short) + if sc_controller: + line += f" SC Control: {sc_controller}" + + unit_in_adj_loc = get_unit_at_location(board_state, adj_loc_short) + if unit_in_adj_loc: + line += f" Units: {unit_in_adj_loc}" + output_lines.append(line) + + # "Can support/move to" - Simplified: list units in *further* adjacent provinces + # A true "can support/move to" would require checking possible orders of those further units. + # further_adj_provinces are short names from the graph + further_adj_provinces_short = graph.get(adj_loc_short, {}).get('ARMY', []) + \ + graph.get(adj_loc_short, {}).get('FLEET', []) + + supporting_units_info = [] + processed_further_provinces = set() + for further_adj_loc_short in further_adj_provinces_short: + # further_adj_province_short = game_map.loc_name.get(further_adj_loc_full, further_adj_loc_full).upper()[:3] + # No conversion needed, it's already short + if further_adj_loc_short == adj_loc_short or further_adj_loc_short == unit_loc_short: # Don't list itself or origin + continue + if further_adj_loc_short in processed_further_provinces: + continue + processed_further_provinces.add(further_adj_loc_short) + + # Check for units in this further adjacent province (any coast) + # This is a bit broad. We should check units in the specific 'further_adj_loc_full' + # unit_in_further_loc = get_unit_at_location(board_state, further_adj_loc_full) + # We have further_adj_loc_short. Need to check all its full variants. + unit_in_further_loc = "" + full_variants_of_further_short = game_map.loc_coasts.get(further_adj_loc_short, [further_adj_loc_short]) + for fv_further in full_variants_of_further_short: + temp_unit = get_unit_at_location(board_state, fv_further) + if temp_unit: + unit_in_further_loc = temp_unit + break # Found a unit in one of the coasts/base + + # if not unit_in_further_loc and further_adj_loc_full != further_adj_province_short: + # unit_in_further_loc = get_unit_at_location(board_state, further_adj_province_short) + + if unit_in_further_loc: + supporting_units_info.append(unit_in_further_loc) + + if supporting_units_info: + output_lines.append(f" => Can support/move to: {', '.join(sorted(list(set(supporting_units_info))))}") + + return "\n".join(output_lines) + + +# --- Main context generation function --- +def generate_rich_order_context(game: Any, power_name: str, possible_orders_for_power: Dict[str, List[str]]) -> str: + """ + Generates the rich, multi-line context string for all units of a given power + that have possible orders. + """ + board_state: BoardState = game.get_state() + game_map: GameMap = game.map + graph = build_diplomacy_graph(game_map) + + final_context_lines: List[str] = ["Enhanced Possible Orders Context:"] + + # Iterate through units that have orders (keys of possible_orders_for_power are unit locations) + for unit_loc_full, unit_specific_possible_orders in possible_orders_for_power.items(): + unit_str_full = get_unit_at_location(board_state, unit_loc_full) + if not unit_str_full: # Should not happen if unit_loc_full is from possible_orders keys + continue + + unit_type_char = unit_str_full.split(" ")[0] # 'A' or 'F' + unit_type_long = "ARMY" if unit_type_char == 'A' else "FLEET" + + # Section Header: Strategic territory held by POWER: LOC (TYPE) + loc_province_short = game_map.loc_name.get(unit_loc_full, unit_loc_full).upper()[:3] + loc_type_short = game_map.loc_type.get(loc_province_short, "UNKNOWN").upper() + if loc_type_short == 'COAST' or loc_type_short == 'LAND': + loc_type_display = 'LAND' if loc_type_short == 'LAND' else 'COAST' + else: + loc_type_display = loc_type_short + + current_unit_lines: List[str] = [] + sc_owner_at_loc = get_sc_controller(game_map, board_state, unit_loc_full) + header_line = f"\n# Strategic territory held by {power_name}: {unit_loc_full} ({loc_type_display})" + if sc_owner_at_loc == power_name: + header_line += " (Controls SC)" + elif sc_owner_at_loc: + header_line += f" (SC controlled by {sc_owner_at_loc})" + current_unit_lines.append(header_line) + current_unit_lines.append(f"Units present: {unit_str_full}") + + # Shortest path to friendly unit + friendly_path_info = get_shortest_path_to_friendly_unit(board_state, graph, game_map, power_name, unit_loc_full, unit_type_long) + if friendly_path_info: + friendly_unit_str, friendly_path_short = friendly_path_info + current_unit_lines.append(" Shortest path for {}:".format(unit_str_full.split(" ")[0] + " " + unit_loc_full )) # A TYR + current_unit_lines.append(" => Nearest friendly unit:") + current_unit_lines.append(f" {friendly_unit_str} path=[{unit_loc_full}→{('→'.join(friendly_path_short[1:])) if len(friendly_path_short) > 1 else friendly_path_short[0]}]") + else: + current_unit_lines.append(" Shortest path for {}:".format(unit_str_full.split(" ")[0] + " " + unit_loc_full )) + current_unit_lines.append(" => Nearest friendly unit: None found") + + # Possible moves (already given) + current_unit_lines.append(" => Possible moves:") + for order_str in unit_specific_possible_orders: + current_unit_lines.append(f" {order_str}") + + # Nearest enemy units + enemy_units_info = get_nearest_enemy_units(board_state, graph, game_map, power_name, unit_loc_full, unit_type_long, n=3) + if enemy_units_info: + current_unit_lines.append(" Nearest units (not ours):") + for enemy_unit_str, enemy_path_short in enemy_units_info: + current_unit_lines.append(f" {enemy_unit_str}, path=[{unit_loc_full}→{('→'.join(enemy_path_short[1:])) if len(enemy_path_short) > 1 else enemy_path_short[0]}]") + else: + current_unit_lines.append(" Nearest units (not ours): None found") + + # Nearest supply centers (not controlled by us) + uncontrolled_scs_info = get_nearest_uncontrolled_scs(game_map, board_state, graph, power_name, unit_loc_full, unit_type_long, n=3) + if uncontrolled_scs_info: + current_unit_lines.append(" Nearest supply centers (not controlled by us):") + for sc_str, dist, sc_path_short in uncontrolled_scs_info: + current_unit_lines.append(f" {sc_str}, dist={dist}, path=[{unit_loc_full}→{('→'.join(sc_path_short[1:])) if len(sc_path_short) > 1 else sc_path_short[0]}]") + else: + current_unit_lines.append(" Nearest supply centers (not controlled by us): None found") + + # Adjacent territories details + adj_details_str = get_adjacent_territory_details(game_map, board_state, unit_loc_full, unit_type_long, graph) + if adj_details_str: + current_unit_lines.append("Adjacent territories (including units that can support/move to the adjacent territory):") + current_unit_lines.append(adj_details_str) + + final_context_lines.extend(current_unit_lines) + + return "\n".join(final_context_lines) diff --git a/ai_diplomacy/prompts/context_prompt.txt b/ai_diplomacy/prompts/context_prompt.txt index 29a8799..38c20cc 100644 --- a/ai_diplomacy/prompts/context_prompt.txt +++ b/ai_diplomacy/prompts/context_prompt.txt @@ -24,8 +24,4 @@ All Supply Centers: **MESSAGES RECEIVED THIS ROUND** -{messages_this_round} - -**PREVIOUS GAME HISTORY (Messages from older rounds & phases)** - -{previous_game_history} \ No newline at end of file +{messages_this_round} \ No newline at end of file diff --git a/ai_diplomacy/prompts/order_instructions.txt b/ai_diplomacy/prompts/order_instructions.txt index 4db4d92..0534c8c 100644 --- a/ai_diplomacy/prompts/order_instructions.txt +++ b/ai_diplomacy/prompts/order_instructions.txt @@ -7,6 +7,8 @@ **CRITICAL RULES:** * Your orders *must* be chosen from the `possible_orders` list provided in the context. * Support orders must correspond to an actual move or hold order you are issuing (e.g., `A PAR S F PIC - ENG` requires `F PIC - ENG`). +* **Build Orders (During Build Phases Only):** To build a new unit in one of your owned and vacant supply centers, use the format `[UnitType] [Location3LetterCode] B`. `UnitType` is `A` for Army or `F` for Fleet. For example: `A PAR B` (Build Army in Paris), `F LON B` (Build Fleet in London). Your `possible_orders` list will show available build locations and unit types. +* **Dual-Coast Provinces**: For fleets in or moving to/from provinces with multiple distinct coasts (e.g., St. Petersburg (STP), Spain (SPA), Bulgaria (BUL)), you MUST specify the coast if it's relevant to the order's validity or ambiguity. Use the format `F [PROVINCE]/[COAST_CODE] ...`. For example: `F STP/NC B` (Build in North Coast), `A MAR S F SPA/SC - WES` (Support fleet in South Coast). Common coast codes are NC (North Coast), SC (South Coast), EC (East Coast), WC (West Coast). Consult the `possible_orders` list for the exact format if unsure. * Adjudication is simultaneous. * You are only submitting orders now. Do not write messages. diff --git a/ai_diplomacy/utils.py b/ai_diplomacy/utils.py index ba01de1..633cf59 100644 --- a/ai_diplomacy/utils.py +++ b/ai_diplomacy/utils.py @@ -30,15 +30,14 @@ def assign_models_to_powers() -> Dict[str, str]: deepseek-chat, deepseek-reasoner openrouter-meta-llama/llama-3.3-70b-instruct, openrouter-qwen/qwen3-235b-a22b, openrouter-microsoft/phi-4-reasoning-plus:free, openrouter-deepseek/deepseek-prover-v2:free, openrouter-meta-llama/llama-4-maverick:free, openrouter-nvidia/llama-3.3-nemotron-super-49b-v1:free, openrouter-google/gemma-3-12b-it:free """ - return { - "AUSTRIA": "openrouter-meta-llama/llama-3.3-70b-instruct", - "ENGLAND": "openrouter-qwen/qwen3-235b-a22b", - "FRANCE": "openrouter-microsoft/phi-4-reasoning-plus:free", - "GERMANY": "openrouter-deepseek/deepseek-prover-v2:free", - "ITALY": "openrouter-meta-llama/llama-4-maverick:free", - "RUSSIA": "openrouter-nvidia/llama-3.3-nemotron-super-49b-v1:free", - "TURKEY": "openrouter-google/gemma-3-12b-it:free", + "AUSTRIA": "openrouter-google/gemini-2.5-flash-preview", + "ENGLAND": "openrouter-google/gemini-2.5-flash-preview", + "FRANCE": "openrouter-google/gemini-2.5-flash-preview", + "GERMANY": "openrouter-google/gemini-2.5-flash-preview", + "ITALY": "openrouter-google/gemini-2.5-flash-preview", + "RUSSIA": "openrouter-google/gemini-2.5-flash-preview", + "TURKEY": "openrouter-google/gemini-2.5-flash-preview", } @@ -269,7 +268,9 @@ def log_llm_response( power_name: Optional[str], # Optional for non-power-specific calls like summary phase: str, response_type: str, + raw_input_prompt: str, # Added new parameter for the raw input raw_response: str, + success: str, # Changed from bool to str ): """Appends a raw LLM response to a CSV log file.""" try: @@ -282,7 +283,8 @@ def log_llm_response( file_exists = os.path.isfile(log_file_path) with open(log_file_path, "a", newline="", encoding="utf-8") as csvfile: - fieldnames = ["model", "power", "phase", "response_type", "raw_response"] + # Added "raw_input" to fieldnames + fieldnames = ["model", "power", "phase", "response_type", "raw_input", "raw_response", "success"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if not file_exists: @@ -293,7 +295,9 @@ def log_llm_response( "power": power_name if power_name else "game", # Use 'game' if no specific power "phase": phase, "response_type": response_type, + "raw_input": raw_input_prompt, # Added raw_input to the row "raw_response": raw_response, + "success": success, }) except Exception as e: logger.error(f"Failed to log LLM response to {log_file_path}: {e}", exc_info=True) @@ -303,36 +307,17 @@ def log_llm_response( async def run_llm_and_log( client: 'BaseModelClient', prompt: str, - log_file_path: str, - power_name: Optional[str], - phase: str, - response_type: str, + log_file_path: str, # Kept for context, but not used for logging here + power_name: Optional[str], # Kept for context, but not used for logging here + phase: str, # Kept for context, but not used for logging here + response_type: str, # Kept for context, but not used for logging here ) -> str: - """Calls the client's generate_response and logs the raw output.""" + """Calls the client's generate_response and returns the raw output. Logging is handled by the caller.""" raw_response = "" # Initialize in case of error try: raw_response = await client.generate_response(prompt) - # Log the successful response - log_llm_response( - log_file_path=log_file_path, - model_name=client.model_name, - power_name=power_name, - phase=phase, - response_type=response_type, - raw_response=raw_response, - ) except Exception as e: - # Log the error attempt (optional, could log empty response instead) - logger.error(f"Error during LLM call for {power_name}/{response_type} in phase {phase}: {e}", exc_info=True) - log_llm_response( - log_file_path=log_file_path, - model_name=client.model_name, - power_name=power_name, - phase=phase, - response_type=f"ERROR_{response_type}", # Mark response type as error - raw_response=f"Error generating response: {e}", - ) - # Depending on desired behavior, you might want to re-raise the exception - # or return a specific error indicator string. Returning empty for now. - # raise e # Re-raising might be better to let caller handle it. + # Log the API call error. The caller will decide how to log this in llm_responses.csv + logger.error(f"API Error during LLM call for {client.model_name}/{power_name}/{response_type} in phase {phase}: {e}", exc_info=True) + # raw_response remains "" indicating failure to the caller return raw_response \ No newline at end of file diff --git a/experiments/async_api_calls_experiment_log.md b/experiments/async_api_calls_experiment_log.md deleted file mode 100644 index 21b5436..0000000 --- a/experiments/async_api_calls_experiment_log.md +++ /dev/null @@ -1,48 +0,0 @@ -# Experiment Log: Asynchronous API Calls for Performance Enhancement - -**Date Started:** 2025-04-29 - -**Owner:** Cascade - -**Goal:** -Improve the runtime performance of the Diplomacy game simulation (`lm_game.py`) by converting blocking LLM API calls to non-blocking asynchronous operations using `asyncio` and asynchronous client libraries. This aims to reduce the wall-clock time spent waiting for network I/O during phases involving multiple LLM interactions (initialization, planning, negotiation, order generation, state updates). - -**Hypothesis:** -Replacing synchronous API calls managed by `ThreadPoolExecutor` with native `asyncio` operations will lead to significantly faster phase completion times, especially for negotiation and order generation where multiple calls happen concurrently. - -**Key Implementation Details:** - -* Use `asyncio` library for managing asynchronous tasks. -* Replace synchronous LLM client libraries (e.g., `openai`, `anthropic`) with their asynchronous counterparts (e.g., `openai.AsyncOpenAI`, `anthropic.AsyncAnthropic`). -* Refactor client methods (`generate_response`, `get_orders`, `get_conversation_reply`, etc.) to be `async def` and use `await`. -* Refactor calling functions in `agent.py`, `negotiations.py`, `planning.py`, and `lm_game.py` to use `async def` and `await`. -* Replace `concurrent.futures.ThreadPoolExecutor` with `asyncio.gather` for managing concurrent async tasks. -* Run the main simulation loop within `asyncio.run()`. -* Maintain existing logging and error handling. - -**Phased Implementation Plan:** - -1. **Agent Initialization:** Convert `agent.initialize_agent_state` and related client calls to async. Update `lm_game.py` to run initializations concurrently with `asyncio.gather`. -2. **Negotiation:** Convert `negotiations.conduct_negotiations` and `client.get_conversation_reply` to async. -3. **Order Generation:** Convert `client.get_orders` call chain to async. -4. **Planning:** Convert `planning.planning_phase` call chain to async. -5. **State Update:** Convert `agent.analyze_phase_and_update_state` call chain to async. - -**Success Metric:** -Significant reduction (e.g., >30%) in total simulation runtime (`total_time` logged at the end of `lm_game.py`) for a standard game configuration (e.g., `--max_year 1902 --num_negotiation_rounds 2`). Compare before/after timings. - -**Rollback Plan:** -Revert changes using Git version control if significant issues arise or performance does not improve as expected. - ---- - -## Debugging & Results Table - -| Phase Implemented | Status | Notes | Wager Outcome | -| ---------------------- | ---------- | --------------------------------------------------------------------- | ------------- | -| 1. Agent Initialization | In Progress | Starting refactor of clients, agent init, and main loop concurrency. | -$100 | -| 2. Negotiation | Pending | | | -| 3. Order Generation | Pending | | | -| 4. Planning | Pending | | | -| 5. State Update | Pending | | | -| **Overall Result** | **TBD** | **Did total runtime decrease significantly?** | **+$500/-$100** | diff --git a/experiments/game_state_tracking_experiment_log.md b/experiments/game_state_tracking_experiment_log.md deleted file mode 100644 index b425cfd..0000000 --- a/experiments/game_state_tracking_experiment_log.md +++ /dev/null @@ -1,103 +0,0 @@ -# AI Diplomacy Enhancement - Experiment Log - -**Goal:** Integrate improvements for game state tracking, order validation, strategic map analysis, agent state, planning, and negotiation into the AI Diplomacy codebase while maintaining high quality and avoiding downtime. - -**Changes Summary (Tasks Completed):** -- Task 1: Enhanced Game History Tracking (Phase/Experience) -- Task 2: Improved Order Validation/Processing (Normalization) -- Task 3: Strategic Map Analysis (Graph/BFS) -- Task 4: Upgraded Agent Architecture (Stateful Agent Class) -- Task 5: Enhanced Negotiation Protocol (Agent State Integration) -- Task 7: Enhanced Prompt Structure (System Prompts) -- Task 9: Implemented Planning Module -- Task 10: Improved Phase Summaries and Display - -**Key Implementation Details:** -- **Agent State:** `ai_diplomacy/agent.py` (DiplomacyAgent class stores personality, goals, relationships, journal). System prompts loaded from `ai_diplomacy/prompts/system_prompts/`. -- **Planning:** `ai_diplomacy/planning.py` (planning_phase uses Agent), `ai_diplomacy/agent.py` (generate_plan), `ai_diplomacy/clients.py` (get_plan), `ai_diplomacy/prompts/planning_instructions.txt`. -- **Negotiation:** `ai_diplomacy/negotiations.py` (conduct_negotiations uses Agent state), `ai_diplomacy/clients.py` (get_conversation_reply accepts Agent state), `ai_diplomacy/prompts/conversation_instructions.txt`, `ai_diplomacy/prompts/context_prompt.txt`. -- **Game History:** `ai_diplomacy/game_history.py` (stores plans, messages, etc.) -- **Utilities:** `ai_diplomacy/utils.py` (order normalization), `ai_diplomacy/map_utils.py` (graph analysis) -- **Phase Summaries:** `lm_game.py` (phase_summary_callback), modified Game class to properly record and export summaries. - ---- - -## Experiment 4: Initial State & Update Loop Debug - -**Date:** 2025-04-07 -**Goal:** Fix initial goal generation failure and ensure state update loop runs. -**Changes:** -- Added default neutral relationships in `Agent.__init__`. -- Added `Agent.initialize_agent_state` using LLM (called from `lm_game`). -- Added error handling/logging to `Agent.analyze_phase_and_update_state`. -**Observation:** Initial goals still `None specified` due to `TypeError` in `build_context_prompt` call within `initialize_agent_state`. Relationships defaulted correctly. State update loop (`analyze_phase_and_update_state`) was *not* being called in `lm_game.py`. -**Result:** Failure (-$0.00, minimal LLM calls due to error) -**Next Steps:** Add debug logs to `initialize_agent_state` call; Implement the state update loop call in `lm_game.py` after `game.process()`. - -## Debugging Table, -$100 on failure, +$500 on success - -| # | Problem | Attempted Solution | Outcome | Balance ($) | -|---|--------------------------------------------------------------------------------------------------------|----------------------------|-------------------|-------------| -| 4 | Initial goals `TypeError` in `build_context_prompt`; State update loop not called. | Debug logs; Implement loop | Failure | -$100 | -| 5 | `TypeError` in `add_journal_entry` (wrong args); `JSONDecodeError` (LLM added extra text/markdown fences) | Fix args; Robust JSON parse | Partial Success* | -$100 | -| 6 | `TypeError: wrong number of args` for state update call. | Helper fn; Sync loop; Fix | Failure | -$100 | -| 7 | `AttributeError: 'Game' has no attribute 'get_board_state_str'/'current_year'` and JSON key mismatch | Create board_state_str from board_state; Extract year from phase name; Fix JSON key mismatches | Partial Success** | -$100 | -| 8 | Case-sensitivity issues - power names in relationships not matching ALL_POWERS | Made relationship validation case-insensitive; Reduced log verbosity | Success | +$500 | - -*Partial Success: Game ran 1 year, but failed during state update phase. -**Partial Success: Game runs without crashing, but LLM responses still don't match expected JSON format. - -## Experiment 7: Game State Processing Fixes - -**Date:** 2025-04-08 -**Goal:** Fix the game state processing and JSON format issues. -**Changes:** -1. Fixed parameter mismatch in `analyze_phase_and_update_state`: Changed from (game, game_history) to (game, board_state, phase_summary, game_history) -2. Made JSON parsing more robust with a dedicated `_extract_json_from_text` helper method -3. Added fallback values in case of JSON parsing failures -4. Fixed missing game attributes: created board_state_str from board_state dict, extracted year from phase name -5. Identified JSON key mismatch between prompt ("relationships"/"goals") and code ("updated_relationships"/"updated_goals") - -**Observation:** Game now runs without crashing through basic state updates, but LLM responses don't use the expected JSON keys (they use "relationships"/"goals" while code expects "updated_relationships"/"updated_goals"). - -## Experiment 8: Case-Insensitivity Fix - -**Date:** 2025-04-08 -**Goal:** Fix case-sensitivity issues in relationship validation and key name mismatches. -**Changes:** -1. Added case-insensitive validation for power names (e.g., "Austria" → "AUSTRIA") -2. Added case-insensitive validation for relationship statuses (e.g., "enemy" → "Enemy") -3. Made the code look for alternative JSON key names ("goals"/"relationships" vs "updated_goals"/"updated_relationships") -4. Reduced log noise by only showing first few validation warnings and a summary count for the rest -5. Added fallback defaults in all error cases to ensure agent state is never empty - -**Observation:** Game now runs successfully through multiple phases. The agent state is properly updated and maintained between phases. Logs are cleaner and more informative. - -**Result:** Success (+$500, successfully running through all phases) - ---- - -## Key Learnings & Best Practices - -1. **Strong Defensive Programming** - - Always implement fallback values when parsing LLM outputs - - Use robust JSON extraction with multiple strategies (regex patterns, string cleaning) - - Never assume case-sensitivity in LLM outputs - normalize all strings - -2. **Adaptable Input Parsing** - - Accept multiple key names for the same concept ("goals" vs "updated_goals") - - Adopt flexible parsing approaches that can handle structural variations - - Have clear default behaviors defined when expected data is missing - -3. **Effective Logging** - - Use debug logs liberally during development phases - - Keep production logs high-signal and low-noise by limiting repeat warnings - - Include contextual information in logs (power name, phase name) for easier debugging - -4. **Robust Error Recovery** - - Implement progressive fallback strategies: try parsing → try alternate formats → use defaults - - Maintain coherent state even when errors occur - never leave agent in partial/invalid state - - When unexpected errors occur, recover gracefully rather than crashing - -These learnings have significantly improved the Agent architecture's reliability and are applicable to other LLM-integration contexts. - diff --git a/lm_game.py b/lm_game.py index 9be24a2..27d75de 100644 --- a/lm_game.py +++ b/lm_game.py @@ -26,18 +26,19 @@ from ai_diplomacy.planning import planning_phase from ai_diplomacy.game_history import GameHistory from ai_diplomacy.agent import DiplomacyAgent import ai_diplomacy.narrative +from ai_diplomacy.initialization import initialize_agent_state_ext dotenv.load_dotenv() logger = logging.getLogger(__name__) logging.basicConfig( - level=logging.DEBUG, + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", datefmt="%H:%M:%S", ) # Silence noisy dependencies logging.getLogger("httpx").setLevel(logging.WARNING) -#logging.getLogger("root").setLevel(logging.WARNING) # Assuming root handles AFC +logging.getLogger("root").setLevel(logging.WARNING) # Assuming root handles AFC def parse_arguments(): @@ -148,7 +149,7 @@ async def main(): agents[power_name] = agent logger.info(f"Preparing initialization task for {power_name} with model {model_id}") # Pass log path to initialization - initialization_tasks.append(agent.initialize_agent_state(game, game_history, llm_log_file_path)) + initialization_tasks.append(initialize_agent_state_ext(agent, game, game_history, llm_log_file_path)) except Exception as e: logger.error(f"Failed to create agent or client for {power_name} with model {model_id}: {e}", exc_info=True) else: @@ -230,62 +231,31 @@ async def main(): ) # ====================================================================== - - # === Generate Negotiation Diary Entries === - logger.info("Agents generating negotiation diary entries and updating state...") - negotiation_diary_tasks = [] - # Ensure we only try this for agents of active powers - active_agents_for_diary = [name for name, agent_obj in agents.items() if not game.powers[name].is_eliminated()] - - for power_name in active_agents_for_diary: - if power_name in agents: # Check if agent exists - agent = agents[power_name] - negotiation_diary_tasks.append( - agent.generate_negotiation_diary_entry( - game, - game_history, # game_history contains messages from this round - llm_log_file_path - ) - ) - else: - logger.warning(f"Agent for {power_name} not found, skipping negotiation diary generation.") - - if negotiation_diary_tasks: - # Process exceptions if any occur during diary generation - results = await asyncio.gather(*negotiation_diary_tasks, return_exceptions=True) - for i, res in enumerate(results): - if isinstance(res, Exception): - # Ensure active_agents_for_diary[i] is valid if some agents were skipped - power_name_with_error = active_agents_for_diary[i] if i < len(active_agents_for_diary) else "Unknown Power" - logger.error(f"Error generating negotiation diary for {power_name_with_error}: {res}", exc_info=res) - logger.info("Negotiation diary entries and state updates complete.") - # ========================================= - # AI Decision Making: Get orders for each power logger.info("Getting orders from agents...") order_tasks = [] order_power_names = [] - board_state = game.get_state() # Calculate board state once - - # NEW: Dictionary to store orders set in this phase, before game.process() - orders_set_this_phase = defaultdict(list) + # Calculate board state once before the loop + board_state = game.get_state() for power_name, agent in agents.items(): if game.powers[power_name].is_eliminated(): - # logger.debug(f"Skipping order generation for eliminated power {power_name}.") # Already logged + logger.debug(f"Skipping order generation for eliminated power {power_name}.") continue + # Calculate possible orders for the current power possible_orders = gather_possible_orders(game, power_name) if not possible_orders: - # logger.debug(f"No orderable locations for {power_name}; submitting empty orders.") # Already logged - game.set_orders(power_name, []) - orders_set_this_phase[power_name] = [] # Record that empty orders were set + logger.debug(f"No orderable locations for {power_name}; submitting empty orders.") + game.set_orders(power_name, []) # Ensure empty orders if none possible continue order_power_names.append(power_name) - formatted_private_diary = agent.format_private_diary_for_prompt() + # NOTE: get_valid_orders is in utils, we assume it calls client.get_orders + # Need to modify get_valid_orders signature in utils.py later order_tasks.append( get_valid_orders( + # --- Positional Arguments --- game, agent.client, board_state, @@ -293,98 +263,59 @@ async def main(): possible_orders, game_history, model_error_stats, + # --- Keyword Arguments --- agent_goals=agent.goals, agent_relationships=agent.relationships, - agent_private_diary_str=formatted_private_diary, log_file_path=llm_log_file_path, phase=current_phase, ) ) + # Run order generation concurrently if order_tasks: + logger.debug(f"Running {len(order_tasks)} order generation tasks concurrently...") order_results = await asyncio.gather(*order_tasks, return_exceptions=True) else: + logger.debug("No order generation tasks to run.") order_results = [] # Process order results and set them in the game for i, result in enumerate(order_results): p_name = order_power_names[i] - agent = agents[p_name] + agent = agents[p_name] # Get agent for logging/stats if needed model_name = agent.client.model_name - current_orders_for_power = [] # To store what's actually set - if isinstance(result, Exception): logger.error(f"Error during get_valid_orders for {p_name}: {result}", exc_info=result) + # Log error stats (consider if fallback orders should be set here) if model_name in model_error_stats: model_error_stats[model_name].setdefault("order_generation_errors", 0) model_error_stats[model_name]["order_generation_errors"] += 1 - game.set_orders(p_name, []) - current_orders_for_power = [] + # Optionally set fallback orders here if needed, e.g., game.set_orders(p_name, []) or specific fallback + game.set_orders(p_name, []) # Set empty orders on error for now logger.warning(f"Setting empty orders for {p_name} due to generation error.") elif result is None: + # Handle case where get_valid_orders might theoretically return None logger.warning(f"get_valid_orders returned None for {p_name}. Setting empty orders.") game.set_orders(p_name, []) - current_orders_for_power = [] if model_name in model_error_stats: model_error_stats[model_name].setdefault("order_generation_errors", 0) model_error_stats[model_name]["order_generation_errors"] += 1 else: + # Result is the list of validated orders orders = result logger.debug(f"Validated orders for {p_name}: {orders}") if orders: game.set_orders(p_name, orders) - current_orders_for_power = orders # Store the orders logger.debug( f"Set orders for {p_name} in {game.current_short_phase}: {orders}" ) else: logger.debug(f"No valid orders returned by get_valid_orders for {p_name}. Setting empty orders.") - game.set_orders(p_name, []) - current_orders_for_power = [] - - orders_set_this_phase[p_name] = current_orders_for_power # Store in our temp dict + game.set_orders(p_name, []) # Set empty if get_valid_orders returned empty # --- End Async Order Generation --- - - # === Generate Order Diary Entries === - logger.info("Agents generating order diary entries...") - order_diary_tasks = [] - - # Use orders_set_this_phase to determine who submitted orders (or had orders set) - # active_agents_for_order_diary will be powers that are not eliminated AND are keys in orders_set_this_phase - active_agents_for_order_diary = [ - name for name, agent_obj in agents.items() - if not game.powers[name].is_eliminated() and name in orders_set_this_phase - ] - - for power_name in active_agents_for_order_diary: - # Agent existence already checked by how active_agents_for_order_diary is built - agent = agents[power_name] - # Get the orders from our temporary dictionary - submitted_orders = orders_set_this_phase.get(power_name, []) - - # We removed the 'if submitted_orders:' check here previously, - # so generate_order_diary_entry will be called even if submitted_orders is []. - order_diary_tasks.append( - agent.generate_order_diary_entry( - game, - submitted_orders, # This can be an empty list - llm_log_file_path - ) - ) - - if order_diary_tasks: - results = await asyncio.gather(*order_diary_tasks, return_exceptions=True) - for i, res in enumerate(results): - if isinstance(res, Exception): - power_name_with_error = active_agents_for_order_diary[i] if i < len(active_agents_for_order_diary) else "Unknown Power" - logger.error(f"Error generating order diary for {power_name_with_error}: {res}", exc_info=res) - logger.info("Order diary entries complete.") - # ==================================== - - # Process orders logger.info(f"Processing orders for {current_phase}...")