analyze moments, run big models well

This commit is contained in:
AlxAI 2025-05-20 20:04:19 -07:00
parent f36d5672ea
commit 9322ada62b
6 changed files with 722 additions and 177 deletions

1
.gitignore vendored
View file

@ -148,3 +148,4 @@ ai_diplomacy/alt_implementations
/experiments
game_moments.json
game_moments_report.md
/game_moments

View file

@ -4,6 +4,7 @@ from typing import List, Dict, Optional
import json
import re
import json_repair
import json5 # More forgiving JSON parser
# Assuming BaseModelClient is importable from clients.py in the same directory
from .clients import BaseModelClient, load_model_client
@ -99,44 +100,66 @@ class DiplomacyAgent:
def _extract_json_from_text(self, text: str) -> dict:
"""Extract and parse JSON from text, handling common LLM response formats."""
if not text or not text.strip():
logger.warning(f"[{self.power_name}] Empty text provided to JSON extractor")
return {}
# Store original text for debugging
original_text = text
# Preprocessing: Normalize common formatting issues
# This helps with the KeyError: '\n "negotiation_summary"' problem
text = re.sub(r'\n\s+"(\w+)"\s*:', r'"\1":', text) # Remove newlines before keys
# Also fix the specific pattern that's causing trouble
text = text.replace('\n "negotiation_summary"', '"negotiation_summary"')
text = text.replace('\n "relationship_updates"', '"relationship_updates"')
text = text.replace('\n "updated_relationships"', '"updated_relationships"')
# Fix specific patterns that cause trouble
problematic_patterns = [
'negotiation_summary', 'relationship_updates', 'updated_relationships',
'order_summary', 'goals', 'relationships', 'intent'
]
for pattern in problematic_patterns:
text = re.sub(fr'\n\s*"{pattern}"', f'"{pattern}"', text)
# Try different patterns to extract JSON
# 1. Try to find JSON wrapped in markdown code blocks
# Order matters - try most specific patterns first
patterns = [
# New: More robust pattern allowing optional whitespace and 'json'
r"\s*```(?:json)?\s*\n(.*?)\n\s*```\s*",
r"```json\n(.*?)\n```", # Markdown JSON block
r"```\n(.*?)\n```", # Generic markdown block
r"`(.*?)`", # Inline code block
# Special handling for ```{{ ... }}``` format that some models use
r"```\s*\{\{\s*(.*?)\s*\}\}\s*```",
# JSON in code blocks with or without language specifier
r"```(?:json)?\s*\n(.*?)\n\s*```",
# JSON after "PARSABLE OUTPUT:" or similar
r"PARSABLE OUTPUT:\s*(\{.*?\})",
r"JSON:\s*(\{.*?\})",
# Any JSON object
r"(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})",
# Simple JSON in backticks
r"`(\{.*?\})`",
]
for pattern in patterns:
# Try each pattern
for pattern_idx, pattern in enumerate(patterns):
matches = re.findall(pattern, text, re.DOTALL)
if matches:
# Try each match until one parses successfully
for match in matches:
for match_idx, match in enumerate(matches):
# Multiple attempts with different parsers
json_text = match.strip()
# Attempt 1: Standard JSON after basic cleaning
try:
# Additional preprocessing for common formatting issues
clean_match = re.sub(r'\n\s+"(\w+)"\s*:', r'"\1":', match) # Remove newlines before JSON keys
clean_match = re.sub(r',\s*}', '}', clean_match) # Remove trailing commas
return json.loads(clean_match) # First attempt with the cleaned match
except json.JSONDecodeError as e_initial_markdown_parse:
# If initial parsing of the markdown-extracted block fails, try surgical cleaning
cleaned = self._clean_json_text(json_text)
result = json.loads(cleaned)
logger.debug(f"[{self.power_name}] Successfully parsed JSON with pattern {pattern_idx}, match {match_idx}")
return result
except json.JSONDecodeError as e_initial:
logger.debug(f"[{self.power_name}] Standard JSON parse failed: {e_initial}")
# Attempt 1.5: Try surgical cleaning with original patterns if basic cleaning failed
try:
# Apply several different cleaning patterns to fix common LLM-generated JSON issues
cleaned_match_candidate = match
# Apply several different cleaning patterns from the old method
cleaned_match_candidate = json_text
# Pattern 1: Removes 'Sentence.' when followed by ',', '}', or ']'
cleaned_match_candidate = re.sub(r'\s*([A-Z][\w\s,]*?\.(?:\s+[A-Z][\w\s,]*?\.)*)\s*(?=[,\}\]])', '', cleaned_match_candidate)
# Pattern 2: Removes 'Sentence.' when it's at the very end, before the final '}' of the current match scope
# Pattern 2: Removes 'Sentence.' when it's at the very end, before the final '}' of the current scope
cleaned_match_candidate = re.sub(r'\s*([A-Z][\w\s,]*?\.(?:\s+[A-Z][\w\s,]*?\.)*)\s*(?=\s*\}\s*$)', '', cleaned_match_candidate)
# Pattern 3: Fix for newlines and spaces before JSON keys (common problem with LLMs)
@ -146,83 +169,110 @@ class DiplomacyAgent:
cleaned_match_candidate = re.sub(r',\s*}', '}', cleaned_match_candidate)
# Pattern 5: Handle specific known problematic patterns
cleaned_match_candidate = cleaned_match_candidate.replace('\n "negotiation_summary"', '"negotiation_summary"')
cleaned_match_candidate = cleaned_match_candidate.replace('\n "relationship_updates"', '"relationship_updates"')
cleaned_match_candidate = cleaned_match_candidate.replace('\n "updated_relationships"', '"updated_relationships"')
for pattern in problematic_patterns:
cleaned_match_candidate = cleaned_match_candidate.replace(f'\n "{pattern}"', f'"{pattern}"')
# Pattern 6: Fix quotes - replace single quotes with double quotes for keys
cleaned_match_candidate = re.sub(r"'(\w+)'\s*:", r'"\1":', cleaned_match_candidate)
if cleaned_match_candidate != match: # Log if actual cleaning happened
logger.debug(f"Surgically cleaned JSON candidate. Original snippet: '{match[:150]}...', Cleaned snippet: '{cleaned_match_candidate[:150]}...'")
return json.loads(cleaned_match_candidate) # Second attempt with cleaned string
else:
# If no surgical cleaning was applicable or changed the string, re-raise to fall through
# or let the original loop continue if there are more matches from findall.
# This 'continue' is for the inner 'for match in matches:' loop.
logger.debug(f"Surgical cleaning regex made no changes to: {match[:100]}... Original error: {e_initial_markdown_parse}")
continue # Try next match from re.findall(pattern, text, re.DOTALL)
except json.JSONDecodeError as e_cleaned:
# This error means cleaning happened, but the result was still not valid JSON.
logger.warning(f"Surgical cleaning applied but did not result in valid JSON. Cleaned error: {e_cleaned}. Original snippet: {match[:150]}... Initial error: {e_initial_markdown_parse}")
# Continue to the next match from re.findall or next pattern
continue
# 2. Try to find JSON between braces
try:
start = text.find('{')
end = text.rfind('}') + 1
if start != -1 and end > start:
return json.loads(text[start:end])
except json.JSONDecodeError:
pass
# 3. Aggressively clean the string and try again
# Remove common non-JSON text that LLMs might add
cleaned_text = re.sub(r'[^{}[\]"\',:.\d\w\s_-]', '', text)
try:
start = cleaned_text.find('{')
end = cleaned_text.rfind('}') + 1
if start != -1 and end > start:
return json.loads(cleaned_text[start:end])
except json.JSONDecodeError:
pass
# 4. Repair common JSON issues and try again
try:
# Replace single quotes with double quotes (common LLM error)
text_fixed = re.sub(r"'([^']*)':", r'"\1":', text)
text_fixed = re.sub(r': *\'([^\']*)\'', r': "\1"', text_fixed)
start = text_fixed.find('{')
end = text_fixed.rfind('}') + 1
if start != -1 and end > start:
return json.loads(text_fixed[start:end])
except json.JSONDecodeError:
pass
# 4.5. Try to extract JSON from ```json ``` blocks
try:
# Find all ```json ``` blocks
json_blocks = re.findall(r'```json\s*\n(.*?)\n\s*```', text, re.DOTALL)
if json_blocks:
# Try to parse the first block
for block in json_blocks:
# Only try parsing if cleaning actually changed something
if cleaned_match_candidate != json_text:
logger.debug(f"[{self.power_name}] Surgical cleaning applied. Attempting to parse modified JSON.")
return json.loads(cleaned_match_candidate)
except json.JSONDecodeError as e_surgical:
logger.debug(f"[{self.power_name}] Surgical cleaning didn't work: {e_surgical}")
# Attempt 2: json5 (more forgiving)
try:
return json.loads(block)
except json.JSONDecodeError:
pass
except Exception as e_json_extract:
logger.error(f"Failed to extract JSON from ```json ``` blocks: {e_json_extract}")
result = json5.loads(json_text)
logger.debug(f"[{self.power_name}] Successfully parsed with json5")
return result
except Exception as e:
logger.debug(f"[{self.power_name}] json5 parse failed: {e}")
# Attempt 3: json-repair
try:
result = json_repair.loads(json_text)
logger.debug(f"[{self.power_name}] Successfully parsed with json-repair")
return result
except Exception as e:
logger.debug(f"[{self.power_name}] json-repair failed: {e}")
# 5. Last resort: try json-repair on the original text
# Fallback: Try to find ANY JSON-like structure
try:
logger.debug(f"Attempting to repair JSON with json-repair for text: {text[:200]}...")
return json_repair.loads(text)
except Exception as e_repair: # Catching a broader exception as json_repair might raise different errors
logger.error(f"json-repair failed to parse JSON. Error: {e_repair}. Original text snippet: {text[:200]}...")
# If all attempts fail, including json-repair, raise the original-style error
raise json.JSONDecodeError("Could not extract valid JSON from LLM response after all attempts including json-repair", text, 0)
# Find the first { and last }
start = text.find('{')
end = text.rfind('}') + 1 # Include the closing brace
if start != -1 and end > start:
potential_json = text[start:end]
# Try all parsers on this extracted text
for parser_name, parser_func in [
("json", json.loads),
("json5", json5.loads),
("json_repair", json_repair.loads)
]:
try:
cleaned = self._clean_json_text(potential_json) if parser_name == "json" else potential_json
result = parser_func(cleaned)
logger.debug(f"[{self.power_name}] Fallback parse succeeded with {parser_name}")
return result
except Exception as e:
logger.debug(f"[{self.power_name}] Fallback {parser_name} failed: {e}")
# If standard parsers failed, try aggressive cleaning
try:
# Remove common non-JSON text that LLMs might add
cleaned_text = re.sub(r'[^{}[\]"\',:.\d\w\s_-]', '', potential_json)
# Replace single quotes with double quotes (common LLM error)
text_fixed = re.sub(r"'([^']*)':", r'"\1":', cleaned_text)
text_fixed = re.sub(r': *\'([^\']*)\'', r': "\1"', text_fixed)
result = json.loads(text_fixed)
logger.debug(f"[{self.power_name}] Aggressive cleaning worked")
return result
except json.JSONDecodeError:
pass
except Exception as e:
logger.debug(f"[{self.power_name}] Fallback extraction failed: {e}")
# Last resort: Try json-repair on the entire text
try:
result = json_repair.loads(text)
logger.warning(f"[{self.power_name}] Last resort json-repair succeeded")
return result
except Exception as e:
logger.error(f"[{self.power_name}] All JSON extraction attempts failed. Original text: {original_text[:500]}...")
return {}
def _clean_json_text(self, text: str) -> str:
"""Clean common JSON formatting issues from LLM responses."""
if not text:
return text
# Remove trailing commas
text = re.sub(r',\s*}', '}', text)
text = re.sub(r',\s*]', ']', text)
# Fix newlines before JSON keys
text = re.sub(r'\n\s+"(\w+)"\s*:', r'"\1":', text)
# Replace single quotes with double quotes for keys
text = re.sub(r"'(\w+)'\s*:", r'"\1":', text)
# Remove comments (if any)
text = re.sub(r'//.*$', '', text, flags=re.MULTILINE)
text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
# Fix unescaped quotes in values (basic attempt)
# This is risky but sometimes helps with simple cases
text = re.sub(r':\s*"([^"]*)"([^",}\]]+)"', r': "\1\2"', text)
# Remove any BOM or zero-width spaces
text = text.replace('\ufeff', '').replace('\u200b', '')
return text.strip()
def add_journal_entry(self, entry: str):
@ -321,7 +371,16 @@ class DiplomacyAgent:
consolidation_client = self.client # Fallback to agent's own client
logger.warning(f"[{self.power_name}] Using agent's own model for consolidation instead of Gemini Flash")
raw_response = await consolidation_client.generate_response(prompt)
# Use the enhanced wrapper with retry logic
from .utils import run_llm_and_log
raw_response = await run_llm_and_log(
client=consolidation_client,
prompt=prompt,
log_file_path=log_file_path,
power_name=self.power_name,
phase=game.current_short_phase,
response_type='diary_consolidation',
)
if raw_response and raw_response.strip():
consolidated_entry = raw_response.strip()
@ -346,9 +405,9 @@ class DiplomacyAgent:
# Create the new consolidated summary
consolidated_summary = f"[CONSOLIDATED {year}] {consolidated_entry}"
# Sort consolidated entries by year (descending) to keep most recent consolidated years at top
# Sort consolidated entries by year (ascending) to keep historical order
consolidated_entries.append(consolidated_summary)
consolidated_entries.sort(key=lambda x: x[14:18], reverse=True) # Extract year from "[CONSOLIDATED YYYY]"
consolidated_entries.sort(key=lambda x: x[14:18], reverse=False) # Extract year from "[CONSOLIDATED YYYY]"
# Rebuild diary with consolidated entries at the top
self.private_diary = consolidated_entries + regular_entries
@ -485,7 +544,7 @@ class DiplomacyAgent:
log_file_path=log_file_path, # Pass the main log file path
power_name=self.power_name,
phase=game.current_short_phase,
response_type='negotiation_diary_raw' # For run_llm_and_log context
response_type='negotiation_diary_raw', # For run_llm_and_log context
)
logger.debug(f"[{self.power_name}] Raw negotiation diary response: {raw_response[:300]}...")
@ -651,8 +710,7 @@ class DiplomacyAgent:
log_file_path=log_file_path,
power_name=self.power_name,
phase=game.current_short_phase,
response_type='order_diary'
# raw_input_prompt=prompt, # REMOVED from run_llm_and_log
response_type='order_diary',
)
success_status = "FALSE"
@ -790,7 +848,7 @@ class DiplomacyAgent:
log_file_path=log_file_path,
power_name=self.power_name,
phase=game.current_short_phase,
response_type='phase_result_diary'
response_type='phase_result_diary',
)
if raw_response and raw_response.strip():
@ -899,6 +957,7 @@ class DiplomacyAgent:
logger.debug(f"[{power_name}] State update prompt:\n{prompt}")
# Use the client's raw generation capability - AWAIT the async call USING THE WRAPPER
response = await run_llm_and_log(
client=self.client,
prompt=prompt,
@ -917,6 +976,12 @@ class DiplomacyAgent:
try:
update_data = self._extract_json_from_text(response)
logger.debug(f"[{power_name}] Successfully parsed JSON: {update_data}")
# Ensure update_data is a dictionary
if not isinstance(update_data, dict):
logger.warning(f"[{power_name}] Extracted data is not a dictionary, type: {type(update_data)}")
update_data = {}
# Check if essential data ('updated_goals' or 'goals') is present AND is a list (for goals)
# For relationships, check for 'updated_relationships' or 'relationships' AND is a dict.
# Consider it TRUE if at least one of the primary data structures (goals or relationships) is present and correctly typed.
@ -935,6 +1000,11 @@ class DiplomacyAgent:
logger.error(f"[{power_name}] Failed to parse JSON response for state update: {e}. Raw response: {response}")
log_entry_response_type = 'state_update_json_error'
# log_entry_success remains "FALSE"
except Exception as e:
logger.error(f"[{power_name}] Unexpected error parsing state update: {e}")
log_entry_response_type = 'state_update_unexpected_error'
update_data = {}
# log_entry_success remains "FALSE"
else: # response was None or empty/whitespace
logger.error(f"[{power_name}] No valid response (None or empty) received from LLM for state update.")
log_entry_response_type = 'state_update_no_response'

View file

@ -3,7 +3,7 @@ import json
from json import JSONDecodeError
import re
import logging
import asyncio # Added for async operations
import ast # For literal_eval in JSON fallback parsing
from typing import List, Dict, Optional, Any, Tuple
from dotenv import load_dotenv
@ -745,6 +745,7 @@ class GeminiClient(BaseModelClient):
response = await self.client.generate_content_async(
contents=full_prompt,
)
if not response or not response.text:
logger.warning(
f"[{self.model_name}] Empty Gemini generate_response. Returning empty."
@ -765,7 +766,8 @@ class DeepSeekClient(BaseModelClient):
super().__init__(model_name)
self.api_key = os.environ.get("DEEPSEEK_API_KEY")
self.client = AsyncDeepSeekOpenAI(
api_key=self.api_key, base_url="https://api.deepseek.com/"
api_key=self.api_key,
base_url="https://api.deepseek.com/"
)
async def generate_response(self, prompt: str) -> str:
@ -828,7 +830,7 @@ class OpenRouterClient(BaseModelClient):
logger.debug(f"[{self.model_name}] Initialized OpenRouter client")
async def generate_response(self, prompt: str) -> str:
"""Generate a response using OpenRouter."""
"""Generate a response using OpenRouter with robust error handling."""
try:
# Append the call to action to the user's prompt
prompt_with_cta = prompt + "\n\nPROVIDE YOUR RESPONSE BELOW:"
@ -856,8 +858,19 @@ class OpenRouterClient(BaseModelClient):
return content
except Exception as e:
logger.error(f"[{self.model_name}] Error in OpenRouter generate_response: {e}")
return ""
error_msg = str(e)
# Check if it's a specific OpenRouter error
if "429" in error_msg or "rate" in error_msg.lower():
logger.warning(f"[{self.model_name}] OpenRouter rate limit error: {e}")
# The retry logic in run_llm_and_log will handle this
raise e # Re-raise to trigger retry
elif "provider" in error_msg.lower() and "error" in error_msg.lower():
logger.error(f"[{self.model_name}] OpenRouter provider error: {e}")
# This might be a temporary issue with the upstream provider
raise e # Re-raise to trigger retry or fallback
else:
logger.error(f"[{self.model_name}] Error in OpenRouter generate_response: {e}")
return ""
##############################################################################
@ -868,6 +881,10 @@ class OpenRouterClient(BaseModelClient):
def load_model_client(model_id: str) -> BaseModelClient:
"""
Returns the appropriate LLM client for a given model_id string.
Args:
model_id: The model identifier
Example usage:
client = load_model_client("claude-3-5-sonnet-20241022")
"""

View file

@ -32,20 +32,18 @@ def assign_models_to_powers() -> Dict[str, str]:
"""
# POWER MODELS
"""
return {
"AUSTRIA": "openrouter-qwen/qwen3-235b-a22b",
"ENGLAND": "gemini-2.5-pro-preview-05-06",
"FRANCE": "o4-mini",
"GERMANY": "o3",
"ITALY": "claude-3-7-sonnet-20250219",
"RUSSIA": "openrouter-x-ai/grok-3-beta",
"TURKEY": "openrouter-google/gemini-2.5-flash-preview",
"AUSTRIA": "openrouter-meta-llama/llama-4-maverick",
"ENGLAND": "o4-mini",
"FRANCE": "o3",
"GERMANY": "claude-3-7-sonnet-20250219",
"ITALY": "openrouter-mistralai/mistral-medium-3",
"RUSSIA": "openrouter-google/gemini-2.5-flash-preview",
"TURKEY": "gemini-2.5-pro-preview-05-06",
}
"""
# TEST MODELS
"""
return {
"AUSTRIA": "openrouter-google/gemini-2.5-flash-preview",
"ENGLAND": "openrouter-google/gemini-2.5-flash-preview",
@ -55,7 +53,7 @@ def assign_models_to_powers() -> Dict[str, str]:
"RUSSIA": "openrouter-google/gemini-2.5-flash-preview",
"TURKEY": "openrouter-google/gemini-2.5-flash-preview",
}
"""
def gather_possible_orders(game: Game, power_name: str) -> Dict[str, List[str]]:
"""

View file

@ -3,13 +3,17 @@
Analyze Key Game Moments: Betrayals, Collaborations, and Playing Both Sides
This script analyzes Diplomacy game data to identify the most interesting strategic moments.
Enhanced with:
- More stringent rating criteria
- Integration of power diary entries for better context
- Analysis of well-executed strategies and strategic mistakes
"""
import json
import asyncio
import argparse
import logging
import ast
import csv
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
@ -33,7 +37,7 @@ logger = logging.getLogger(__name__)
class GameMoment:
"""Represents a key moment in the game"""
phase: str
category: str # BETRAYAL, COLLABORATION, PLAYING_BOTH_SIDES
category: str # BETRAYAL, COLLABORATION, PLAYING_BOTH_SIDES, BRILLIANT_STRATEGY, STRATEGIC_BLUNDER
powers_involved: List[str]
promise_agreement: str
actual_action: str
@ -41,6 +45,7 @@ class GameMoment:
interest_score: float
raw_messages: List[Dict]
raw_orders: Dict
diary_context: Dict[str, str] # New field for diary entries
class GameAnalyzer:
"""Analyzes Diplomacy game data for key strategic moments"""
@ -49,11 +54,13 @@ class GameAnalyzer:
self.results_folder = Path(results_folder)
self.game_data_path = self.results_folder / "lmvsgame.json"
self.overview_path = self.results_folder / "overview.jsonl"
self.csv_path = self.results_folder / "llm_responses.csv"
self.model_name = model_name
self.client = None
self.game_data = None
self.power_to_model = None
self.moments = []
self.diary_entries = {} # phase -> power -> diary content
async def initialize(self):
"""Initialize the analyzer with game data and model client"""
@ -72,18 +79,105 @@ class GameAnalyzer:
logger.warning("Could not find power-to-model mapping in overview.jsonl")
self.power_to_model = {}
# Load diary entries from CSV
self.diary_entries = self.parse_llm_responses_csv()
logger.info(f"Loaded diary entries for {len(self.diary_entries)} phases")
# Initialize model client
self.client = load_model_client(self.model_name)
logger.info(f"Initialized with model: {self.model_name}")
def parse_llm_responses_csv(self) -> Dict[str, Dict[str, str]]:
"""Parse the CSV file to extract diary entries by phase and power"""
diary_entries = {}
try:
import pandas as pd
# Use pandas for more robust CSV parsing
df = pd.read_csv(self.csv_path)
# Filter for negotiation diary entries
diary_df = df[df['response_type'] == 'negotiation_diary']
for _, row in diary_df.iterrows():
phase = row['phase']
power = row['power']
raw_response = str(row['raw_response']).strip()
if phase not in diary_entries:
diary_entries[phase] = {}
try:
# Try to parse as JSON first
response = json.loads(raw_response)
diary_content = f"Negotiation Summary: {response.get('negotiation_summary', 'N/A')}\n"
diary_content += f"Intent: {response.get('intent', 'N/A')}\n"
relationships = response.get('updated_relationships', {})
if isinstance(relationships, dict):
diary_content += f"Relationships: {relationships}"
else:
diary_content += f"Relationships: {relationships}"
diary_entries[phase][power] = diary_content
except (json.JSONDecodeError, TypeError):
# If JSON parsing fails, use a simplified version or skip
if raw_response and raw_response.lower() not in ['null', 'nan', 'none']:
diary_entries[phase][power] = f"Raw diary: {raw_response[:300]}..."
logger.info(f"Successfully parsed {len(diary_entries)} phases with diary entries")
return diary_entries
except ImportError:
# Fallback to standard CSV if pandas not available
logger.info("Pandas not available, using standard CSV parsing")
import csv
with open(self.csv_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
try:
if row.get('response_type') == 'negotiation_diary':
phase = row.get('phase', '')
power = row.get('power', '')
if phase and power:
if phase not in diary_entries:
diary_entries[phase] = {}
raw_response = row.get('raw_response', '').strip()
try:
# Try to parse as JSON
response = json.loads(raw_response)
diary_content = f"Negotiation Summary: {response.get('negotiation_summary', 'N/A')}\n"
diary_content += f"Intent: {response.get('intent', 'N/A')}\n"
diary_content += f"Relationships: {response.get('updated_relationships', 'N/A')}"
diary_entries[phase][power] = diary_content
except (json.JSONDecodeError, TypeError):
if raw_response and raw_response != "null":
diary_entries[phase][power] = f"Raw diary: {raw_response[:300]}..."
except Exception as e:
continue # Skip problematic rows
return diary_entries
except Exception as e:
logger.error(f"Error parsing CSV file: {e}")
return {}
def extract_turn_data(self, phase_data: Dict) -> Dict:
"""Extract relevant data from a single turn/phase"""
phase_name = phase_data.get("name", "")
# Get diary entries for this phase
phase_diaries = self.diary_entries.get(phase_name, {})
return {
"phase": phase_data.get("name", ""),
"phase": phase_name,
"messages": phase_data.get("messages", []),
"orders": phase_data.get("orders", {}),
"summary": phase_data.get("summary", ""),
"statistical_summary": phase_data.get("statistical_summary", {})
"statistical_summary": phase_data.get("statistical_summary", {}),
"diaries": phase_diaries
}
def create_analysis_prompt(self, turn_data: Dict) -> str:
@ -110,14 +204,40 @@ class GameAnalyzer:
power_str = f"{power} ({power_model})" if power_model else power
formatted_orders.append(f"{power_str}: {power_orders}")
# Format diary entries
formatted_diaries = []
for power, diary in turn_data.get("diaries", {}).items():
power_model = self.power_to_model.get(power, '')
power_str = f"{power} ({power_model})" if power_model else power
formatted_diaries.append(f"{power_str} DIARY:\n{diary}")
prompt = f"""You are analyzing diplomatic negotiations and subsequent military orders from a Diplomacy game. Your task is to identify key strategic moments in the following categories:
1. BETRAYAL: When a power explicitly promises one action but takes a contradictory action
2. COLLABORATION: When powers successfully coordinate as agreed
3. PLAYING_BOTH_SIDES: When a power makes conflicting promises to different parties
4. BRILLIANT_STRATEGY: Exceptionally well-executed strategic maneuvers that gain significant advantage
5. STRATEGIC_BLUNDER: Major strategic mistakes that significantly weaken a power's position
IMPORTANT SCORING GUIDELINES:
- Scores 1-3: Minor or routine diplomatic events
- Scores 4-6: Significant but expected diplomatic maneuvers
- Scores 7-8: Notable strategic moments with clear impact
- Scores 9-10: EXCEPTIONAL moments that are truly dramatic or game-changing
Reserve high scores (8+) for:
- Major betrayals that fundamentally shift alliances
- Successful coordinated attacks on major powers
- Clever deceptions that fool multiple powers
- Brilliant strategic maneuvers that dramatically improve position
- Catastrophic strategic errors with lasting consequences
- Actions that dramatically alter the game's balance
For this turn ({turn_data.get('phase', '')}), analyze:
PRIVATE DIARY ENTRIES (Powers' internal thoughts):
{chr(10).join(formatted_diaries) if formatted_diaries else 'No diary entries available'}
MESSAGES:
{chr(10).join(formatted_messages) if formatted_messages else 'No messages this turn'}
@ -127,24 +247,34 @@ ORDERS:
TURN SUMMARY:
{turn_data.get('summary', 'No summary available')}
Identify ALL instances that fit the three categories. For each instance provide:
Identify ALL instances that fit the five categories. For each instance provide:
{{
"category": "BETRAYAL" or "COLLABORATION" or "PLAYING_BOTH_SIDES",
"category": "BETRAYAL" or "COLLABORATION" or "PLAYING_BOTH_SIDES" or "BRILLIANT_STRATEGY" or "STRATEGIC_BLUNDER",
"powers_involved": ["POWER1", "POWER2", ...],
"promise_agreement": "What was promised or agreed",
"promise_agreement": "What was promised/agreed/intended (or strategy attempted)",
"actual_action": "What actually happened",
"impact": "Strategic impact on the game",
"interest_score": 8.5 // 1-10 scale, how dramatic/interesting this moment is
"interest_score": 6.5 // 1-10 scale, be STRICT with high scores
}}
Use the diary entries to verify:
- Whether actions align with stated intentions
- Hidden motivations behind diplomatic moves
- Contradictions between public promises and private plans
- Strategic planning and its execution
Return your response as a JSON array of detected moments. If no relevant moments are found, return an empty array [].
Focus on:
- Comparing diary intentions vs actual orders
- Explicit promises vs actual orders
- Coordinated attacks or defenses
- DMZ violations
- Support promises kept or broken
- Conflicting negotiations with different powers
- Clever strategic positioning
- Missed strategic opportunities
- Tactical errors that cost supply centers
"""
return prompt
@ -182,7 +312,8 @@ Focus on:
impact=moment.get("impact", ""),
interest_score=float(moment.get("interest_score", 5)),
raw_messages=turn_data["messages"],
raw_orders=turn_data["orders"]
raw_orders=turn_data["orders"],
diary_context=turn_data["diaries"]
)
moments.append(game_moment)
logger.info(f"Detected {game_moment.category} in {game_moment.phase} "
@ -228,18 +359,156 @@ Focus on:
model = self.power_to_model.get(power, '')
return f"{power} ({model})" if model else power
def generate_report(self, output_path: str = "game_moments_report.md"):
def phase_sort_key(self, phase_name):
"""Create a sortable key for diplomacy phases like 'S1901M', 'F1901M', etc."""
# Extract season, year, and type
if not phase_name or len(phase_name) < 6:
return (0, 0, "")
try:
season = phase_name[0] # S, F, W
year = int(phase_name[1:5]) if phase_name[1:5].isdigit() else 0 # 1901, 1902, etc.
phase_type = phase_name[5:] # M, A, R
# Order: Spring (S) < Fall (F) < Winter (W)
season_order = {"S": 1, "F": 2, "W": 3}.get(season, 0)
return (year, season_order, phase_type)
except Exception:
return (0, 0, "")
async def generate_narrative(self) -> str:
"""Generate a narrative story of the game using phase summaries and top moments"""
# Collect all phase summaries
phase_summaries = []
phases_with_summaries = []
for phase in self.game_data.get("phases", []):
if phase.get("summary"):
phase_name = phase.get("name", "")
summary = phase.get("summary", "")
phases_with_summaries.append((phase_name, summary))
# Sort phases chronologically
phases_with_summaries.sort(key=lambda p: self.phase_sort_key(p[0]))
# Create summary strings
for phase_name, summary in phases_with_summaries:
phase_summaries.append(f"{phase_name}: {summary}")
# Create the narrative prompt
narrative_prompt = f"""You are a master war historian writing a dramatic chronicle of a Diplomacy game. Transform the comprehensive game record below into a single, gripping narrative of betrayal, alliance, and conquest.
THE COMPETING POWERS (always refer to them as "Power (Model)"):
{chr(10).join([f"- {power} ({model})" for power, model in sorted(self.power_to_model.items())])}
COMPLETE GAME RECORD (synthesize all of this into your narrative):
{chr(10).join(phase_summaries)}
IMPORTANT POWER DIARIES (internal thoughts of each power):
"""
# Sort diary phases chronologically
diary_phases = list(self.diary_entries.keys())
diary_phases.sort(key=self.phase_sort_key)
# Include power diaries for context (early phases)
for phase in diary_phases[:3]: # First few phases for early intentions
narrative_prompt += f"Phase {phase}:\n"
for power, diary in sorted(self.diary_entries[phase].items()):
power_with_model = self.format_power_with_model(power)
diary_excerpt = diary[:150] + "..." if len(diary) > 150 else diary
narrative_prompt += f"- {power_with_model}: {diary_excerpt}\n"
narrative_prompt += "\n"
# Also include some late-game diaries
if len(diary_phases) > 3:
for phase in diary_phases[-2:]: # Last two phases for endgame context
narrative_prompt += f"Phase {phase}:\n"
for power, diary in sorted(self.diary_entries[phase].items()):
power_with_model = self.format_power_with_model(power)
diary_excerpt = diary[:150] + "..." if len(diary) > 150 else diary
narrative_prompt += f"- {power_with_model}: {diary_excerpt}\n"
narrative_prompt += "\n"
narrative_prompt += """
KEY DRAMATIC MOMENTS (reference these highlights appropriately):
"""
# Extract top moments from each category for narrative context
key_moments = []
for category in ["BETRAYAL", "COLLABORATION", "PLAYING_BOTH_SIDES", "BRILLIANT_STRATEGY", "STRATEGIC_BLUNDER"]:
category_moments = [m for m in self.moments if m.category == category]
category_moments.sort(key=lambda m: m.interest_score, reverse=True)
key_moments.extend(category_moments[:5]) # Top 5 from each category
# Sort by phase chronologically
key_moments.sort(key=lambda m: self.phase_sort_key(m.phase))
# Format dramatic moments with power names and models (simpler format)
for moment in key_moments:
powers_with_models = [f"{p} ({self.power_to_model.get(p, 'Unknown')})" for p in moment.powers_involved]
narrative_prompt += f"{moment.phase} - {moment.category} (Score: {moment.interest_score}/10): {', '.join(powers_with_models)}\n"
narrative_prompt += """
CRITICAL INSTRUCTIONS:
- Write EXACTLY 1-2 paragraphs that tell the COMPLETE story of the ENTIRE game
- This is NOT a summary of each phase - it's ONE flowing narrative of the whole game
- Always refer to powers as "PowerName (ModelName)" - e.g., "Germany (o3)", "France (o4-mini)"
- Start with how the game began and the initial alliances
- Cover the major turning points and dramatic moments
- End with how the game concluded and who won
- Use dramatic, evocative language but be concise
- Focus on the overall arc of the game, not individual phase details
Create a single, cohesive narrative that captures the essence of the entire game from start to finish. Think of it as the opening passage of a history book chapter about this conflict.
"""
try:
response = await self.client.generate_response(narrative_prompt)
return response
except Exception as e:
logger.error(f"Error generating narrative: {e}")
return "Unable to generate narrative due to an error."
async def generate_report(self, output_path: Optional[str] = None):
"""Generate a markdown report of key moments"""
# Generate unique filename with datetime if no path specified
if output_path is None:
# Create in the game_moments directory
game_moments_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "game_moments")
os.makedirs(game_moments_dir, exist_ok=True)
# Use results folder name in the file name
results_name = os.path.basename(os.path.normpath(str(self.results_folder)))
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(game_moments_dir, f"{results_name}_report_{timestamp}.md")
# Generate the narrative first
narrative = await self.generate_narrative()
report_lines = [
"# Diplomacy Game Analysis: Key Moments",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"Game: {self.game_data_path}",
"",
"## Game Narrative",
"",
narrative,
"",
"---",
"",
"## Summary",
f"- Total moments analyzed: {len(self.moments)}",
f"- Betrayals: {len([m for m in self.moments if m.category == 'BETRAYAL'])}",
f"- Collaborations: {len([m for m in self.moments if m.category == 'COLLABORATION'])}",
f"- Playing Both Sides: {len([m for m in self.moments if m.category == 'PLAYING_BOTH_SIDES'])}",
f"- Brilliant Strategies: {len([m for m in self.moments if m.category == 'BRILLIANT_STRATEGY'])}",
f"- Strategic Blunders: {len([m for m in self.moments if m.category == 'STRATEGIC_BLUNDER'])}",
"",
"## Score Distribution",
f"- Scores 9-10: {len([m for m in self.moments if m.interest_score >= 9])}",
f"- Scores 7-8: {len([m for m in self.moments if 7 <= m.interest_score < 9])}",
f"- Scores 4-6: {len([m for m in self.moments if 4 <= m.interest_score < 7])}",
f"- Scores 1-3: {len([m for m in self.moments if m.interest_score < 4])}",
"",
"## Power Models",
""
@ -249,113 +518,303 @@ Focus on:
for power, model in sorted(self.power_to_model.items()):
report_lines.append(f"- **{power}**: {model}")
# Add category breakdowns with detailed information
report_lines.extend([
"",
"## Top 10 Most Interesting Moments",
"## Key Strategic Moments by Category",
""
])
# Add top moments
for i, moment in enumerate(self.moments[:10], 1):
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.extend([
f"### {i}. {moment.category} - {moment.phase} (Score: {moment.interest_score}/10)",
f"**Powers Involved:** {powers_str}",
"",
f"**Promise/Agreement:** {moment.promise_agreement}",
"",
f"**Actual Action:** {moment.actual_action}",
"",
f"**Impact:** {moment.impact}",
"",
"---",
""
])
# Add category breakdowns
# BETRAYALS SECTION
report_lines.extend([
"## Category Breakdown",
"",
"### Betrayals",
"_When powers explicitly promised one action but took a contradictory action_",
""
])
betrayals = [m for m in self.moments if m.category == "BETRAYAL"]
for moment in betrayals[:5]:
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.append(
f"- **{moment.phase}** ({powers_str}): "
f"{moment.promise_agreement[:100]}... Score: {moment.interest_score}"
)
betrayals.sort(key=lambda m: m.interest_score, reverse=True)
report_lines.extend(["", "### Collaborations", ""])
for i, moment in enumerate(betrayals[:5], 1):
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.extend([
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
f"**Powers Involved:** {powers_str}",
"",
f"**Promise:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
"",
f"**Actual Action:** {moment.actual_action if moment.actual_action else 'N/A'}",
"",
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
"",
"**Diary Context:**",
""
])
# Add relevant diary entries
for power in moment.powers_involved:
if power in moment.diary_context:
power_with_model = self.format_power_with_model(power)
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power][:150]}...")
report_lines.append("")
report_lines.append("")
# COLLABORATIONS SECTION
report_lines.extend([
"### Collaborations",
"_When powers successfully coordinated as agreed_",
""
])
collaborations = [m for m in self.moments if m.category == "COLLABORATION"]
for moment in collaborations[:5]:
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.append(
f"- **{moment.phase}** ({powers_str}): "
f"{moment.promise_agreement[:100]}... Score: {moment.interest_score}"
)
collaborations.sort(key=lambda m: m.interest_score, reverse=True)
report_lines.extend(["", "### Playing Both Sides", ""])
for i, moment in enumerate(collaborations[:5], 1):
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.extend([
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
f"**Powers Involved:** {powers_str}",
"",
f"**Agreement:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
"",
f"**Action Taken:** {moment.actual_action if moment.actual_action else 'N/A'}",
"",
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
"",
"**Diary Context:**",
""
])
# Add relevant diary entries
for power in moment.powers_involved:
if power in moment.diary_context:
power_with_model = self.format_power_with_model(power)
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power][:150]}...")
report_lines.append("")
report_lines.append("")
# PLAYING BOTH SIDES SECTION
report_lines.extend([
"### Playing Both Sides",
"_When a power made conflicting promises to different parties_",
""
])
playing_both = [m for m in self.moments if m.category == "PLAYING_BOTH_SIDES"]
for moment in playing_both[:5]:
playing_both.sort(key=lambda m: m.interest_score, reverse=True)
for i, moment in enumerate(playing_both[:5], 1):
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.append(
f"- **{moment.phase}** ({powers_str}): "
f"{moment.promise_agreement[:100]}... Score: {moment.interest_score}"
)
report_lines.extend([
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
f"**Powers Involved:** {powers_str}",
"",
f"**Conflicting Promises:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
"",
f"**Actual Action:** {moment.actual_action if moment.actual_action else 'N/A'}",
"",
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
"",
"**Diary Context:**",
""
])
# Add relevant diary entries
for power in moment.powers_involved:
if power in moment.diary_context:
power_with_model = self.format_power_with_model(power)
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power][:150]}...")
report_lines.append("")
report_lines.append("")
# BRILLIANT STRATEGIES SECTION
report_lines.extend([
"### Brilliant Strategies",
"_Exceptionally well-executed strategic maneuvers that gained significant advantage_",
""
])
brilliant = [m for m in self.moments if m.category == "BRILLIANT_STRATEGY"]
brilliant.sort(key=lambda m: m.interest_score, reverse=True)
for i, moment in enumerate(brilliant[:5], 1):
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.extend([
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
f"**Powers Involved:** {powers_str}",
"",
f"**Strategy:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
"",
f"**Execution:** {moment.actual_action if moment.actual_action else 'N/A'}",
"",
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
"",
"**Diary Context:**",
""
])
# Add relevant diary entries
for power in moment.powers_involved:
if power in moment.diary_context:
power_with_model = self.format_power_with_model(power)
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power][:150]}...")
report_lines.append("")
report_lines.append("")
# STRATEGIC BLUNDERS SECTION
report_lines.extend([
"### Strategic Blunders",
"_Major strategic mistakes that significantly weakened a power's position_",
""
])
blunders = [m for m in self.moments if m.category == "STRATEGIC_BLUNDER"]
blunders.sort(key=lambda m: m.interest_score, reverse=True)
for i, moment in enumerate(blunders[:5], 1):
powers_str = ', '.join([self.format_power_with_model(p) for p in moment.powers_involved])
report_lines.extend([
f"#### {i}. {moment.phase} (Score: {moment.interest_score}/10)",
f"**Powers Involved:** {powers_str}",
"",
f"**Mistaken Strategy:** {moment.promise_agreement if moment.promise_agreement else 'N/A'}",
"",
f"**What Happened:** {moment.actual_action if moment.actual_action else 'N/A'}",
"",
f"**Impact:** {moment.impact if moment.impact else 'N/A'}",
"",
"**Diary Context:**",
""
])
# Add relevant diary entries
for power in moment.powers_involved:
if power in moment.diary_context:
power_with_model = self.format_power_with_model(power)
report_lines.append(f"_{power_with_model} Diary:_ {moment.diary_context[power][:150]}...")
report_lines.append("")
report_lines.append("")
# Write report
with open(output_path, 'w') as f:
f.write('\n'.join(report_lines))
logger.info(f"Report generated: {output_path}")
return output_path
def save_json_results(self, output_path: str = "game_moments.json"):
def save_json_results(self, output_path: Optional[str] = None):
"""Save all moments as JSON for further analysis"""
# Generate unique filename with datetime if no path specified
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if output_path is None:
# Create in the game_moments directory
game_moments_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "game_moments")
os.makedirs(game_moments_dir, exist_ok=True)
# Use results folder name in the file name
results_name = os.path.basename(os.path.normpath(str(self.results_folder)))
output_path = os.path.join(game_moments_dir, f"{results_name}_data_{timestamp}.json")
# Prepare the moments data
moments_data = []
for moment in self.moments:
moment_dict = asdict(moment)
# Remove raw data for cleaner JSON
moment_dict.pop('raw_messages', None)
moment_dict.pop('raw_orders', None)
# Keep diary context but limit size
if 'diary_context' in moment_dict:
for power, diary in moment_dict['diary_context'].items():
moment_dict['diary_context'][power] = diary[:200] + "..." if len(diary) > 200 else diary
moments_data.append(moment_dict)
# Create the final data structure with metadata
full_data = {
"metadata": {
"timestamp": timestamp,
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source_folder": str(self.results_folder),
"analysis_model": self.model_name,
"total_moments": len(self.moments),
"moment_categories": {
"betrayals": len([m for m in self.moments if m.category == "BETRAYAL"]),
"collaborations": len([m for m in self.moments if m.category == "COLLABORATION"]),
"playing_both_sides": len([m for m in self.moments if m.category == "PLAYING_BOTH_SIDES"]),
"brilliant_strategies": len([m for m in self.moments if m.category == "BRILLIANT_STRATEGY"]),
"strategic_blunders": len([m for m in self.moments if m.category == "STRATEGIC_BLUNDER"])
},
"score_distribution": {
"scores_9_10": len([m for m in self.moments if m.interest_score >= 9]),
"scores_7_8": len([m for m in self.moments if 7 <= m.interest_score < 9]),
"scores_4_6": len([m for m in self.moments if 4 <= m.interest_score < 7]),
"scores_1_3": len([m for m in self.moments if m.interest_score < 4])
}
},
"power_models": self.power_to_model,
"moments": moments_data
}
# Write to file
with open(output_path, 'w') as f:
json.dump(moments_data, f, indent=2)
json.dump(full_data, f, indent=2)
logger.info(f"JSON results saved: {output_path}")
return output_path
async def main():
parser = argparse.ArgumentParser(description="Analyze Diplomacy game for key strategic moments")
parser.add_argument("results_folder", help="Path to the results folder containing lmvsgame.json and overview.jsonl")
parser.add_argument("--model", default="openrouter-google/gemini-2.5-flash-preview",
help="Model to use for analysis")
parser.add_argument("--report", default="game_moments_report.md",
help="Output path for markdown report")
parser.add_argument("--json", default="game_moments.json",
help="Output path for JSON results")
parser.add_argument("--report", default=None,
help="Output path for markdown report (auto-generates timestamped name if not specified)")
parser.add_argument("--json", default=None,
help="Output path for JSON results (auto-generates timestamped name if not specified)")
parser.add_argument("--max-phases", type=int, default=None,
help="Maximum number of phases to analyze (useful for testing)")
args = parser.parse_args()
# Ensure the game_moments directory exists
game_moments_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "game_moments")
os.makedirs(game_moments_dir, exist_ok=True)
# Extract game name from the results folder
results_folder_name = os.path.basename(os.path.normpath(args.results_folder))
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create default report and JSON paths in the game_moments directory
if args.report is None:
args.report = os.path.join(game_moments_dir, f"{results_folder_name}_report_{timestamp}.md")
if args.json is None:
args.json = os.path.join(game_moments_dir, f"{results_folder_name}_data_{timestamp}.json")
analyzer = GameAnalyzer(args.results_folder, args.model)
try:
await analyzer.initialize()
await analyzer.analyze_game(max_phases=args.max_phases)
analyzer.generate_report(args.report)
analyzer.save_json_results(args.json)
report_path = await analyzer.generate_report(args.report)
json_path = analyzer.save_json_results(args.json)
# Print summary
print(f"\nAnalysis Complete!")
print(f"Found {len(analyzer.moments)} key moments")
print(f"Report saved to: {args.report}")
print(f"JSON data saved to: {args.json}")
print(f"Report saved to: {report_path}")
print(f"JSON data saved to: {json_path}")
# Show score distribution
print("\nScore Distribution:")
print(f" Scores 9-10: {len([m for m in analyzer.moments if m.interest_score >= 9])}")
print(f" Scores 7-8: {len([m for m in analyzer.moments if 7 <= m.interest_score < 9])}")
print(f" Scores 4-6: {len([m for m in analyzer.moments if 4 <= m.interest_score < 7])}")
print(f" Scores 1-3: {len([m for m in analyzer.moments if m.interest_score < 4])}")
# Show top 3 moments
print("\nTop 3 Most Interesting Moments:")

View file

@ -13,7 +13,7 @@ os.environ["GRPC_PYTHON_LOG_LEVEL"] = "40" # ERROR level only
os.environ["GRPC_VERBOSITY"] = "ERROR" # Additional gRPC verbosity control
os.environ["ABSL_MIN_LOG_LEVEL"] = "2" # Suppress abseil warnings
# Disable gRPC forking warnings
os.environ["GRPC_POLL_STRATEGY"] = "epoll1"
os.environ["GRPC_POLL_STRATEGY"] = "poll" # Use 'poll' for macOS compatibility
from diplomacy import Game
from diplomacy.engine.message import GLOBAL, Message